728x90
반응형

Go로 만든 프로젝트에서 kafka 연동하여 로그를 수집하는데 confluent-kafka-go를 사용하고 있었다.

package main

import (
    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

var kafkaClient *kafka.Producer

func main() {
    kafkaClient, _ = kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "broker:9092",
        "acks": 1,
    })

    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                m := ev
                if m.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v, queue: %d\n", m.TopicPartition.Error, p.Len())
                }
                return
            default:
                fmt.Printf("Ignored event: %s\n", ev)
            }
        }
    }()
}

func SendMessage(topic string, message string) {
    kafkaClient.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(message)}
}

테스트를 했을 때 잘 동작하여 아무 문제가 없었지만 며칠간 실행시켜보니 Local: Queue full 에러가 나면서 더이상 메시지를 전송하지 못했다.

queue.buffering.max.messages 크기를 조절해가면서 테스트를 했지만 해당 에러가 계속 발생해 Flush()로 급한 불을 끌 수 있었다.

librdkafka에서 쓰는 버퍼가 해제되지 않은 문제인 듯 했으나 정확한 원인을 몰라 더 이상의 사용이 불안하여 다른 라이브러리를 찾아봤다.

Shopify에서 만든 sarama가 유명하다고 추천받아 테스트를 해보고 적용하기로 했다.

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

var kafkaClient sarama.AsyncProducer

func main() {
    brokers := []string{"broker:9092"}
    kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
}

func SendMessage(topic string, message string) {
    kafkaClient.Input() <- &sarama.ProducerMessage{
        Topic: topic,
        Partition: -1,
        Value: sarama.StringEncoder(message),
    }
}

테스트는 1000만건의 메시지를 전송하도록 진행했다.

 

confluent-kafka-go와 비교해보니 메모리를 더 적게 사용하면서 성능은 비슷했다.

 

confluent-kafka-go는 메모리를 20MB 정도 기본으로 사용하고 sarma는 최대 3MB를 사용했다.

 

confluent-kafka-go의 경우 Flush를 하지 않으면 GC를 수행해도 메모리가 조금씩 증가했지만

 

sarama는 메모리가 증가하는 것 없이 동작을 잘 하는 것을 확인할 수 있었다.

 

참고 문헌

  1. https://github.com/confluentinc/confluent-kafka-go

  2. https://github.com/Shopify/sarama

반응형
728x90
반응형

golang 프로젝트의 로그를 수집하기 위해 confluent-kafka-go를 이용했다.

 

이를 위해서 librdkafka를 설치해야 하는데

 

confluent-kafka-go 최신 버전의 경우 1.3.0 이상을 사용해야 한다는 에러가 났다.

 

CeontOS 7에서 yum으로 설치하는 경우 0.11.x로만 가능해서 생긴 문제로

 

최신 버전은 바이너리 파일을 제공하지 않아 직접 빌드해야 했다.

 

다음과 같이 저장소를 받아서 태그를 1.3.0으로 변경했다.

 

최신은 1.4.0-RC4 이지만 아직 정식이 아니라 1.3.0으로 했다.

$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdkafka
$ git checkout tags/v1.3.0 -b v1.3.0
$ cd librdkafka

librdkafka에서 packaging/rpm/Makefile을 보면 mock를 이용하여 rpm 파일을 생성한다.

$ sudo yum install -y mock
$ sudo usermod -a -G mock current_user
$ cd packaging/rpm
$ make

pkg-1.3.0-1-default 디렉토리 밑에 rpm 파일이 생긴 것을 확인할 수 있다.

$ sudo yum install -y librdkafka1-1.3.0-1.el7.x86_64.rpm
$ sudo yum install -y librdkafka-devel-1.3.0-1.el7.x86_64.rpm

위와 같이 설치해주면 confluent-kafka-go를 에러없이 사용할 수 있다.

 

참고 문헌

  1. https://github.com/confluentinc/confluent-kafka-go

  2. https://github.com/edenhill/librdkafka

반응형

'Log' 카테고리의 다른 글

[Log] ElasticSearch _default_ mapping 문제 해결  (0) 2020.05.28
[Log] HDFS Web UI Permission denied  (0) 2020.04.02
[Log] Logstash 403 에러 해결  (0) 2020.03.19
[Log] Airflow 설치  (0) 2020.03.12
[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
728x90
반응형

Lostash에서 input으로 kafka를 사용하고 있었는데 다음과 같은 오류가 나왔다.

[WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-1
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?]
        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?]

중복되는 설정들이 많아 합치면서 pipeline을 사용하도록 수정했는데

 

Logstahs가 설정 파일을 각각의 pipeline마다 하나의 파일로 합쳐 사용하여

 

pipeline에 설정된 client_id가 서로 중복되어 발생한 문제였다.

 

이전에는 pipeline을 사용하지 않고 개별로 설정하여 사용했는데 kafka도 개별로 사용하고 있었다.

 

이때 문제가 되지 않았던 것은 Logstash가 설정 파일을 하나의 파일로 만들어서 사용하고

 

client_id가 같아도 자동으로 뒤에 숫자를 붙여 다른 client_id로 동작했기 때문이었다.

 

다음과 같이 client_id를 설정할 때 postfix를 붙여서 pipeline마다 다른 client_id를 갖도록 수정했다.

input {
    kafka {
        bootstrap_servers => "kafka_server:9092"
        client_id => "logstash-postfix"
        ...
    }
}

더이상 해당 오류가 발생하지 않고 kafka로부터 데이터를 잘 받아오는 것을 확인할 수 있었다.

반응형

'Log' 카테고리의 다른 글

[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
[Log] Fluentd 성능 튜닝  (0) 2020.01.25
[Log] Fluentd Kafka Logstash 연동  (0) 2020.01.25
728x90
반응형

ELK를 이용하여 로그 수집/분석을 하기 위해 Fluentd와 Kafka를 연동할 필요가 있었다.

 

그래서 Fluentd에서 Kafka로 로그를 전송할 수 있도록 설정을 추가했는데

 

Kafka가 죽어서(원인은 파악하지 못했다.) Fluentd가 설치된 로그 서버가 행이 걸리는 경우가 발생했다.

 

retry_limit의 기본값이 17로 돼있어서 발생한 문제로 Kafka로 전송이 실패해서 버퍼가 계속 쌓인 상태에서

 

해당 버퍼를 Kafka로 전송하려가 I/O가 가득 차 해당 서버가 행이 걸려 다른 서버로 절체된 것이었다.

 

retry_limit을 1로 주고 테스트를 해보니 Kafka 전송에 실패해도 버퍼가 더이상 쌓이지 않는 것을 확인했다.

<match>
  @type copy

  <store>
    @type kafka_buffered

    ...
    retry_limit 1
    ...
  </store>
</match>

Logstash에서 Kafka 연동할 때 토픽별로 인덱스를 설정해 Elastic Search에 저장하기 위해

 

filter와 output에 다음의 내용을 추가하면 된다.

if [@metadata][kafka][topic] == "topic" {
}

위의 설정을 추가하지 않는 경우 filter에서는 오류가 나지 않을 수도 있지만

 

output에서는 가장 먼저 설정된 인덱스만 인식하여 나머지 인덱스에는 저장이 되지 않는다.

 

참고 문헌

  1. https://docs.fluentd.org/v/0.12/output/kafka#retry_limit-disable_retry_limit

반응형

'Log' 카테고리의 다른 글

[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
[Log] Fluentd 성능 튜닝  (0) 2020.01.25

+ Recent posts