Go
로 만든 프로젝트에서 kafka
연동하여 로그를 수집하는데 confluent-kafka-go
를 사용하고 있었다.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
var kafkaClient *kafka.Producer
func main() {
kafkaClient, _ = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "broker:9092",
"acks": 1,
})
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
m := ev
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v, queue: %d\n", m.TopicPartition.Error, p.Len())
}
return
default:
fmt.Printf("Ignored event: %s\n", ev)
}
}
}()
}
func SendMessage(topic string, message string) {
kafkaClient.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(message)}
}
테스트를 했을 때 잘 동작하여 아무 문제가 없었지만 며칠간 실행시켜보니 Local: Queue full
에러가 나면서 더이상 메시지를 전송하지 못했다.
queue.buffering.max.messages
크기를 조절해가면서 테스트를 했지만 해당 에러가 계속 발생해 Flush()
로 급한 불을 끌 수 있었다.
librdkafka
에서 쓰는 버퍼가 해제되지 않은 문제인 듯 했으나 정확한 원인을 몰라 더 이상의 사용이 불안하여 다른 라이브러리를 찾아봤다.
Shopify
에서 만든 sarama
가 유명하다고 추천받아 테스트를 해보고 적용하기로 했다.
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
var kafkaClient sarama.AsyncProducer
func main() {
brokers := []string{"broker:9092"}
kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
}
func SendMessage(topic string, message string) {
kafkaClient.Input() <- &sarama.ProducerMessage{
Topic: topic,
Partition: -1,
Value: sarama.StringEncoder(message),
}
}
테스트는 1000만건의 메시지를 전송하도록 진행했다.
confluent-kafka-go
와 비교해보니 메모리를 더 적게 사용하면서 성능은 비슷했다.
confluent-kafka-go
는 메모리를 20MB 정도 기본으로 사용하고 sarma
는 최대 3MB를 사용했다.
confluent-kafka-go
의 경우 Flush
를 하지 않으면 GC를 수행해도 메모리가 조금씩 증가했지만
sarama
는 메모리가 증가하는 것 없이 동작을 잘 하는 것을 확인할 수 있었다.
참고 문헌
'Golang' 카테고리의 다른 글
[Golang] 커맨드 명령 동시 실행 및 디렉토리 제거 (0) | 2020.07.02 |
---|---|
[Golang] Go의 선(The Zen of Go) (0) | 2020.06.04 |
[Golang] Go를 사용하면서 발생했던 문제들 (0) | 2020.04.23 |
[Golang] Java gzip migration (0) | 2020.01.30 |
[Golang] 410 Gone (0) | 2020.01.25 |