728x90
반응형

Go로 만든 프로젝트에서 kafka 연동하여 로그를 수집하는데 confluent-kafka-go를 사용하고 있었다.

package main

import (
    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

var kafkaClient *kafka.Producer

func main() {
    kafkaClient, _ = kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "broker:9092",
        "acks": 1,
    })

    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                m := ev
                if m.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v, queue: %d\n", m.TopicPartition.Error, p.Len())
                }
                return
            default:
                fmt.Printf("Ignored event: %s\n", ev)
            }
        }
    }()
}

func SendMessage(topic string, message string) {
    kafkaClient.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(message)}
}

테스트를 했을 때 잘 동작하여 아무 문제가 없었지만 며칠간 실행시켜보니 Local: Queue full 에러가 나면서 더이상 메시지를 전송하지 못했다.

queue.buffering.max.messages 크기를 조절해가면서 테스트를 했지만 해당 에러가 계속 발생해 Flush()로 급한 불을 끌 수 있었다.

librdkafka에서 쓰는 버퍼가 해제되지 않은 문제인 듯 했으나 정확한 원인을 몰라 더 이상의 사용이 불안하여 다른 라이브러리를 찾아봤다.

Shopify에서 만든 sarama가 유명하다고 추천받아 테스트를 해보고 적용하기로 했다.

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

var kafkaClient sarama.AsyncProducer

func main() {
    brokers := []string{"broker:9092"}
    kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
}

func SendMessage(topic string, message string) {
    kafkaClient.Input() <- &sarama.ProducerMessage{
        Topic: topic,
        Partition: -1,
        Value: sarama.StringEncoder(message),
    }
}

테스트는 1000만건의 메시지를 전송하도록 진행했다.

 

confluent-kafka-go와 비교해보니 메모리를 더 적게 사용하면서 성능은 비슷했다.

 

confluent-kafka-go는 메모리를 20MB 정도 기본으로 사용하고 sarma는 최대 3MB를 사용했다.

 

confluent-kafka-go의 경우 Flush를 하지 않으면 GC를 수행해도 메모리가 조금씩 증가했지만

 

sarama는 메모리가 증가하는 것 없이 동작을 잘 하는 것을 확인할 수 있었다.

 

참고 문헌

  1. https://github.com/confluentinc/confluent-kafka-go

  2. https://github.com/Shopify/sarama

반응형

+ Recent posts