728x90
반응형

로그를 ElasticSearch에 쌓아서 실시간으로 보고 있었는데 로그의 양이 많아서 차지하는 용량이 계속 커지고 있었다.

 

파일로도 로그를 남겨두기는 해서 한동안은 80 ~ 90% 정도 되면 인덱스를 지워서 용량을 확보했다.

 

하지만 다년간의 추이를 보기 위해 로그를 분석해서 남겨야할 필요가 있었다.

 

SparkParquet를 조사하면서 요구 사항을 어떻게 적용할 수 있을지를 고심했었다.

 

처음엔 ELK 스택처럼 Kafka로 로그를 받아서 처리하는 방식으로 하려고 시도했었다.

 

Spark 예제에서 Kafka로 로그를 받아서 단어 개수를 세는 프로그램이 있어 이를 활용하면 되겠다 싶었다.

 

하지만 로그를 받는 것은 쉽게 됐지만 Parquet로 저장하는 것이 쉽지 않았다.

 

왜냐하면 Parquet는 열 지향 데이터 스토리지 형식이기 때문에 지속적으로 들어오는 데이터를 쓰는 것이 쉽지 않았다.

 

그래서 중간에 Avro나 다른 DB에 저장했다가 하루가 지나면 Parquet로 저장하면 되겠다고 생각하고 golang으로 대강 완성을 했었다.

 

그러다가 우연히 한 블로그를 발견하고 ElasticSearch에서 Parquet로 변환하여 장기보관하도록 수정했다.

 

저장된 인덱스들로 테스트를 해봤는데 여러 가지 애로 사항을 겪을 수 있었다.

 

우선은 필드명에 점(.)이 들어가 있는 경우 저장하려고 하면 다음과 같은 에러가 발생했다.

org.elasticsearch.hadoop.rest.EsHadoopParsingException: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Position for 'nested.foo.bar' not found in row; typically this is caused by a mapping inconsistency

원인은 Parquet로 저장하려면 csv 처럼 중첩되지 않은 표 형식으로 되어야하는데 열의 개수는 같지만 필드명이 각각 달라서 발생한 문제였다.

 

그래서 해당 필드명만 추출해서 하면 되지 않을까 해서 es.query 설정도 해보고 filter 기능을 사용해서 해봤지만 결과는 같았다.

 

어떻게 하면 가능할지 방법을 찾아보다가 테이블 형태로 추출한 다음에 Paquet로 저장하면 되지 않을까 하고 생각했다.

예제 코드는 다음과 같다.

// es.nodes는 ElasticSearch 주소, es.resource는 인덱스명
val esConf = Map(
    "es.nodes" -> "elasticsearch",
    "es.resource" -> "index"
)

val rdd = sc.esRDD(esConf)
// 필드명에 .이 있는 경우 조회할 때 제대로 안돼서 변경
val result = rdd.map(_._2).map(x => x.map { case (key, value) => key.replace(".", "_") -> value })
val keys = Array(
    "foo_bar_1",
    "foo_bar_2"
)
val rowRDD = result.map( x => {
    val temp = scala.collection.mutable.Map[String, AnyRef]("@timestamp" -> x("@timestamp"))

    // 값이 없는 경우 "0.0"으로 설정
    for (key <- keys if !x.contains(key))
        temp += (key -> "0.0")

    // 값이 있는 경우 문자열로 치환
    for (key <- keys if x.contains(key))
        temp += (key -> x(key).toString)

    // 타입을 명확히 하지 않으면 DataFrame을 만들 때 오류 발생
    Row(
        new java.sql.Timestamp(temp("@timestamp").asInstanceOf[java.util.Date].getTime()),
        temp("foo_bar_1").asInstanceOf[String].toFloat,
        temp("foo_bar_2").asInstanceOf[String].toFloat
    )
})

val schema = StructType(Array(
    StructField("@timestamp", TimestampType, true),
    StructField("foo_bar_1", FloatType, true),
    StructField("foo_bar_2", FloatType, true)
))
val df = spark.createDataFrame(rowRDD, schema)
df.write.parquet("hdfs://hadoop:9000/path/to/save/")

Parquet로 저장할 때 권한이 없다면서 안되는 경우가 있는데 그때에는 다음과 같이 환경 설정에 HADOOP_USER_NAME을 추가하면 된다.

$ export HADOOP_USER_NAME=hadoop

 

참고 문헌

  1. http://jason-heo.github.io/elasticsearch/2016/06/28/elasticsearch-with-spark.html

  2. https://stackoverflow.com/questions/51097818/replace-special-characters-of-column-names-in-spark-dataframe

  3. https://stackoverflow.com/questions/32860831/how-can-i-change-sparkcontext-sparkuser-setting-in-pyspark

반응형

'Log' 카테고리의 다른 글

[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
[Log] Fluentd 성능 튜닝  (0) 2020.01.25

+ Recent posts