728x90
반응형

Zeppelin은 웹 기반으로 다양한 인터프리터를 이용해서 데이터 분석을 도와주는 도구이다.

 

설치를 위해서 다음과 같이 하면 된다.

$ sudo yum install -y java-1.8.0-openjdk.x86_64
$ wget http://mirror.apache-kr.org/zeppelin/zeppelin-0.8.2/zeppelin-0.8.2-bin-all.tgz
$ tar xf zeppelin-0.8.2-bin-all.tgz && cd zeppelin-0.8.2-bin-all

다음과 같이 실행하면 Zeppelin이 구동된다.

$ bin/zeppelin-daemon.sh start

서버에 설치한 경우엔 직접 아이피로 접근해야 하는데 기본 설정으로는 접근할 수 없다.

 

Zeppelin 최신 버전(0.8.2)의 경우 기본 호스트가 0.0.0.0에서 127.0.0.1로 변경됐다.

Upgrading from Zeppelin 0.8.1 (and before) to 0.8.2 (and later)
From 0.8.2, Zeppelin server bind 127.0.0.1 by default instead of 0.0.0.0. Configure zeppelin.server.addr property or ZEPPELIN_ADDR env variable to change.

그래서 zeppelin-env.sh에서 ZEPPELIN_ADDR0.0.0.0으로 변경하고 재시작하면 웹에서 접근할 수 있다.

 

Spark와 연동하기 위해서 Zeppelin이 설치된 서버에 Spark를 설치한다.

 

원격 서버에 설치된 Spark 버전이 2.4.5라면 똑같은 버전을 설치해야 동작한다.

 

그리고 zeppelin-env.sh에 다음과 같이 설정한다.

export SPARK_HOME=~/spark

이제 마지막으로 웹페이지에서 InterpreterSpark를 찾아 설정을 추가한다.

masterspark://remote_addr:7077와 같이 입력하면 된다.

 

Notebook에서 테스트해보면 잘 동작하는 것을 확인할 수 있다.

 

참고 문헌

  1. https://zeppelin.apache.org/docs/0.8.2/setup/operation/upgrading.html#upgrading-from-zeppelin-07-to-08

  2. https://stackoverflow.com/questions/50173371/how-to-solve-java-io-invalidclassexception-local-class-incompatible-with-scala

반응형

'Log' 카테고리의 다른 글

[Log] Logstash 403 에러 해결  (0) 2020.03.19
[Log] Airflow 설치  (0) 2020.03.12
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
728x90
반응형

다음과 같이 각 서버에 Fluentd를 설치해 수집하는 서버로 로그를 전송하고

 

Kafka를 일종의 버퍼로 하여 ELK 스택이나 다른 것들을 이용할 수 있도록 구성했다.

각 서버에서 tail로 로그 파일을 읽어서 forward로 송/수신하고 있었는데

 

이 과정에서 다음과 같은 에러가 발생하며 누락 혹은 지연되는 경우가 발생했다.

[warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=... chunk="..." error_class=Fluent::Plugin::ForwardOutput::NoNodesAvailable error="no nodes are available"

worker를 늘려 보기도 하고 버퍼 설정을 바꿔보기도 했지만 위의 에러는 계속 나왔다.

 

원인이 무엇인지 한동안 못찾다가 UDP 방화벽을 열어줘야한다는 글을 봤다.

 

그래서 문서를 보니 forward 통신을 할 때 heartbeat을 보내는 설정이 있었다.

 

transport가 기본값이라 tcp로 바꿔보니 해당 에러가 더이상 나지 않았다.

 

(아마도 기본값인 경우 udp로도 전송하는데 막혀있어 에러가 난 것으로 생각된다.)

 

하지만 1초(기본값)마다 heartbeat을 보내서 로그 전송 속도가 눈에 띄게 줄었다.

 

heartbeat을 굳이 필요없어 none으로 하니 속도가 빨라진 것을 확인할 수 있었다.

 

참고 문헌

  1. https://qiita.com/kentokento/items/60b6654d4eea1dad5f42

  2. https://docs.fluentd.org/output/forward#heartbeat_type

반응형

'Log' 카테고리의 다른 글

[Log] Airflow 설치  (0) 2020.03.12
[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
728x90
반응형

로그를 ElasticSearch에 쌓아서 실시간으로 보고 있었는데 로그의 양이 많아서 차지하는 용량이 계속 커지고 있었다.

 

파일로도 로그를 남겨두기는 해서 한동안은 80 ~ 90% 정도 되면 인덱스를 지워서 용량을 확보했다.

 

하지만 다년간의 추이를 보기 위해 로그를 분석해서 남겨야할 필요가 있었다.

 

SparkParquet를 조사하면서 요구 사항을 어떻게 적용할 수 있을지를 고심했었다.

 

처음엔 ELK 스택처럼 Kafka로 로그를 받아서 처리하는 방식으로 하려고 시도했었다.

 

Spark 예제에서 Kafka로 로그를 받아서 단어 개수를 세는 프로그램이 있어 이를 활용하면 되겠다 싶었다.

 

하지만 로그를 받는 것은 쉽게 됐지만 Parquet로 저장하는 것이 쉽지 않았다.

 

왜냐하면 Parquet는 열 지향 데이터 스토리지 형식이기 때문에 지속적으로 들어오는 데이터를 쓰는 것이 쉽지 않았다.

 

그래서 중간에 Avro나 다른 DB에 저장했다가 하루가 지나면 Parquet로 저장하면 되겠다고 생각하고 golang으로 대강 완성을 했었다.

 

그러다가 우연히 한 블로그를 발견하고 ElasticSearch에서 Parquet로 변환하여 장기보관하도록 수정했다.

 

저장된 인덱스들로 테스트를 해봤는데 여러 가지 애로 사항을 겪을 수 있었다.

 

우선은 필드명에 점(.)이 들어가 있는 경우 저장하려고 하면 다음과 같은 에러가 발생했다.

org.elasticsearch.hadoop.rest.EsHadoopParsingException: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Position for 'nested.foo.bar' not found in row; typically this is caused by a mapping inconsistency

원인은 Parquet로 저장하려면 csv 처럼 중첩되지 않은 표 형식으로 되어야하는데 열의 개수는 같지만 필드명이 각각 달라서 발생한 문제였다.

 

그래서 해당 필드명만 추출해서 하면 되지 않을까 해서 es.query 설정도 해보고 filter 기능을 사용해서 해봤지만 결과는 같았다.

 

어떻게 하면 가능할지 방법을 찾아보다가 테이블 형태로 추출한 다음에 Paquet로 저장하면 되지 않을까 하고 생각했다.

예제 코드는 다음과 같다.

// es.nodes는 ElasticSearch 주소, es.resource는 인덱스명
val esConf = Map(
    "es.nodes" -> "elasticsearch",
    "es.resource" -> "index"
)

val rdd = sc.esRDD(esConf)
// 필드명에 .이 있는 경우 조회할 때 제대로 안돼서 변경
val result = rdd.map(_._2).map(x => x.map { case (key, value) => key.replace(".", "_") -> value })
val keys = Array(
    "foo_bar_1",
    "foo_bar_2"
)
val rowRDD = result.map( x => {
    val temp = scala.collection.mutable.Map[String, AnyRef]("@timestamp" -> x("@timestamp"))

    // 값이 없는 경우 "0.0"으로 설정
    for (key <- keys if !x.contains(key))
        temp += (key -> "0.0")

    // 값이 있는 경우 문자열로 치환
    for (key <- keys if x.contains(key))
        temp += (key -> x(key).toString)

    // 타입을 명확히 하지 않으면 DataFrame을 만들 때 오류 발생
    Row(
        new java.sql.Timestamp(temp("@timestamp").asInstanceOf[java.util.Date].getTime()),
        temp("foo_bar_1").asInstanceOf[String].toFloat,
        temp("foo_bar_2").asInstanceOf[String].toFloat
    )
})

val schema = StructType(Array(
    StructField("@timestamp", TimestampType, true),
    StructField("foo_bar_1", FloatType, true),
    StructField("foo_bar_2", FloatType, true)
))
val df = spark.createDataFrame(rowRDD, schema)
df.write.parquet("hdfs://hadoop:9000/path/to/save/")

Parquet로 저장할 때 권한이 없다면서 안되는 경우가 있는데 그때에는 다음과 같이 환경 설정에 HADOOP_USER_NAME을 추가하면 된다.

$ export HADOOP_USER_NAME=hadoop

 

참고 문헌

  1. http://jason-heo.github.io/elasticsearch/2016/06/28/elasticsearch-with-spark.html

  2. https://stackoverflow.com/questions/51097818/replace-special-characters-of-column-names-in-spark-dataframe

  3. https://stackoverflow.com/questions/32860831/how-can-i-change-sparkcontext-sparkuser-setting-in-pyspark

반응형

'Log' 카테고리의 다른 글

[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
[Log] Fluentd 성능 튜닝  (0) 2020.01.25
728x90
반응형

Lostash에서 input으로 kafka를 사용하고 있었는데 다음과 같은 오류가 나왔다.

[WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-1
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?]
        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?]

중복되는 설정들이 많아 합치면서 pipeline을 사용하도록 수정했는데

 

Logstahs가 설정 파일을 각각의 pipeline마다 하나의 파일로 합쳐 사용하여

 

pipeline에 설정된 client_id가 서로 중복되어 발생한 문제였다.

 

이전에는 pipeline을 사용하지 않고 개별로 설정하여 사용했는데 kafka도 개별로 사용하고 있었다.

 

이때 문제가 되지 않았던 것은 Logstash가 설정 파일을 하나의 파일로 만들어서 사용하고

 

client_id가 같아도 자동으로 뒤에 숫자를 붙여 다른 client_id로 동작했기 때문이었다.

 

다음과 같이 client_id를 설정할 때 postfix를 붙여서 pipeline마다 다른 client_id를 갖도록 수정했다.

input {
    kafka {
        bootstrap_servers => "kafka_server:9092"
        client_id => "logstash-postfix"
        ...
    }
}

더이상 해당 오류가 발생하지 않고 kafka로부터 데이터를 잘 받아오는 것을 확인할 수 있었다.

반응형

'Log' 카테고리의 다른 글

[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
[Log] Fluentd 성능 튜닝  (0) 2020.01.25
[Log] Fluentd Kafka Logstash 연동  (0) 2020.01.25

+ Recent posts