728x90
반응형

Spark에서 하는 일을 주기적으로 수행하기 위해 무엇이 있는지 조사했다.

 

여러 가지 도구들이 있었는데 그 중에서 AirflowLuigi가 좋아보였다.

 

둘 중에 어떤 걸로 정할 지 고민하다 Airflow로 정했다.

 

두 가지 모두 좋아보였지만 AirflowApache에서 관리하고 대시보드가 유려해보였다.

 

Airflow를 설치하는 방법은 Quick Start에 잘 나와 있어 그대로 따라하면 된다.

$ pip install 'apache-airflow[ssh]'

ssh는 원격 서버에 접속하여 spark-submit을 수행하기 위해 설치했다.

$ airflow initdb

~/airflow/airflow.cfg, airflow.db, logs, unittests.cfg 가 만들어진다.

 

Airflow를 구동할 때 airflow webserver와 같이 커맨드로만 가능해서

 

다음과 같이 start-airflow.sh, stop-airflow.sh을 만들어서 실행했다.

#!/bin/sh
nohup airflow webserver > /dev/null 2>&1 &
nohup airflow scheduler > /dev/null 2>&1 &
#!/bin/sh
pkill -f airflow

신규로 DAG를 만들기 위해서 ~/airflowdags 디렉토리를 만들고 파일을 추가한다.

$ cd ~/airflow
$ mkdir dags && cd dags
$ touch my_first_dag.py

my_fist_dag.py에 다음과 같이 작성하여 spark-submit을 주기적으로 수행하도록 했다.

# -- coding: utf-8 --
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
    dag_id='my_dag',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    tags=['spark']
)
templated_bash_command = """
    spark-submit \
    --class my_class \
    --master spark://spark-master:7077 \
    --executor-cores 2 \
    --executor-memory 2g \
    my_first.jar
"""
hook = SSHHook(
    ssh_conn_id='ssh_default',
    remote_host='spark-master',
    username='username',
    key_file='~/.ssh/id_rsa'
)
run_ssh = SSHOperator(
    task_id='spark_submit_task',
    ssh_hook=hook,
    command=templated_bash_command,
    dag=dag
)
run_ssh

schedule_interval='0 0 * * *'은 하루에 한 번 밤에 수행한다고 문서에 적혀있는데

 

UTC 시간대로 동작하여 한국 시간으로는 오전 9시에 수행한다.

 

참고 문헌

  1. http://bytepawn.com/luigi-airflow-pinball.html

  2. https://airflow.apache.org/docs/stable/installation.html

  3. https://airflow.apache.org/docs/stable/scheduler.html#dag-runs

반응형

'Log' 카테고리의 다른 글

[Log] librdkafka 빌드하기  (0) 2020.03.26
[Log] Logstash 403 에러 해결  (0) 2020.03.19
[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
728x90
반응형

Zeppelin은 웹 기반으로 다양한 인터프리터를 이용해서 데이터 분석을 도와주는 도구이다.

 

설치를 위해서 다음과 같이 하면 된다.

$ sudo yum install -y java-1.8.0-openjdk.x86_64
$ wget http://mirror.apache-kr.org/zeppelin/zeppelin-0.8.2/zeppelin-0.8.2-bin-all.tgz
$ tar xf zeppelin-0.8.2-bin-all.tgz && cd zeppelin-0.8.2-bin-all

다음과 같이 실행하면 Zeppelin이 구동된다.

$ bin/zeppelin-daemon.sh start

서버에 설치한 경우엔 직접 아이피로 접근해야 하는데 기본 설정으로는 접근할 수 없다.

 

Zeppelin 최신 버전(0.8.2)의 경우 기본 호스트가 0.0.0.0에서 127.0.0.1로 변경됐다.

Upgrading from Zeppelin 0.8.1 (and before) to 0.8.2 (and later)
From 0.8.2, Zeppelin server bind 127.0.0.1 by default instead of 0.0.0.0. Configure zeppelin.server.addr property or ZEPPELIN_ADDR env variable to change.

그래서 zeppelin-env.sh에서 ZEPPELIN_ADDR0.0.0.0으로 변경하고 재시작하면 웹에서 접근할 수 있다.

 

Spark와 연동하기 위해서 Zeppelin이 설치된 서버에 Spark를 설치한다.

 

원격 서버에 설치된 Spark 버전이 2.4.5라면 똑같은 버전을 설치해야 동작한다.

 

그리고 zeppelin-env.sh에 다음과 같이 설정한다.

export SPARK_HOME=~/spark

이제 마지막으로 웹페이지에서 InterpreterSpark를 찾아 설정을 추가한다.

masterspark://remote_addr:7077와 같이 입력하면 된다.

 

Notebook에서 테스트해보면 잘 동작하는 것을 확인할 수 있다.

 

참고 문헌

  1. https://zeppelin.apache.org/docs/0.8.2/setup/operation/upgrading.html#upgrading-from-zeppelin-07-to-08

  2. https://stackoverflow.com/questions/50173371/how-to-solve-java-io-invalidclassexception-local-class-incompatible-with-scala

반응형

'Log' 카테고리의 다른 글

[Log] Logstash 403 에러 해결  (0) 2020.03.19
[Log] Airflow 설치  (0) 2020.03.12
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
728x90
반응형

다음과 같이 각 서버에 Fluentd를 설치해 수집하는 서버로 로그를 전송하고

 

Kafka를 일종의 버퍼로 하여 ELK 스택이나 다른 것들을 이용할 수 있도록 구성했다.

각 서버에서 tail로 로그 파일을 읽어서 forward로 송/수신하고 있었는데

 

이 과정에서 다음과 같은 에러가 발생하며 누락 혹은 지연되는 경우가 발생했다.

[warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=... chunk="..." error_class=Fluent::Plugin::ForwardOutput::NoNodesAvailable error="no nodes are available"

worker를 늘려 보기도 하고 버퍼 설정을 바꿔보기도 했지만 위의 에러는 계속 나왔다.

 

원인이 무엇인지 한동안 못찾다가 UDP 방화벽을 열어줘야한다는 글을 봤다.

 

그래서 문서를 보니 forward 통신을 할 때 heartbeat을 보내는 설정이 있었다.

 

transport가 기본값이라 tcp로 바꿔보니 해당 에러가 더이상 나지 않았다.

 

(아마도 기본값인 경우 udp로도 전송하는데 막혀있어 에러가 난 것으로 생각된다.)

 

하지만 1초(기본값)마다 heartbeat을 보내서 로그 전송 속도가 눈에 띄게 줄었다.

 

heartbeat을 굳이 필요없어 none으로 하니 속도가 빨라진 것을 확인할 수 있었다.

 

참고 문헌

  1. https://qiita.com/kentokento/items/60b6654d4eea1dad5f42

  2. https://docs.fluentd.org/output/forward#heartbeat_type

반응형

'Log' 카테고리의 다른 글

[Log] Airflow 설치  (0) 2020.03.12
[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
728x90
반응형

로그를 ElasticSearch에 쌓아서 실시간으로 보고 있었는데 로그의 양이 많아서 차지하는 용량이 계속 커지고 있었다.

 

파일로도 로그를 남겨두기는 해서 한동안은 80 ~ 90% 정도 되면 인덱스를 지워서 용량을 확보했다.

 

하지만 다년간의 추이를 보기 위해 로그를 분석해서 남겨야할 필요가 있었다.

 

SparkParquet를 조사하면서 요구 사항을 어떻게 적용할 수 있을지를 고심했었다.

 

처음엔 ELK 스택처럼 Kafka로 로그를 받아서 처리하는 방식으로 하려고 시도했었다.

 

Spark 예제에서 Kafka로 로그를 받아서 단어 개수를 세는 프로그램이 있어 이를 활용하면 되겠다 싶었다.

 

하지만 로그를 받는 것은 쉽게 됐지만 Parquet로 저장하는 것이 쉽지 않았다.

 

왜냐하면 Parquet는 열 지향 데이터 스토리지 형식이기 때문에 지속적으로 들어오는 데이터를 쓰는 것이 쉽지 않았다.

 

그래서 중간에 Avro나 다른 DB에 저장했다가 하루가 지나면 Parquet로 저장하면 되겠다고 생각하고 golang으로 대강 완성을 했었다.

 

그러다가 우연히 한 블로그를 발견하고 ElasticSearch에서 Parquet로 변환하여 장기보관하도록 수정했다.

 

저장된 인덱스들로 테스트를 해봤는데 여러 가지 애로 사항을 겪을 수 있었다.

 

우선은 필드명에 점(.)이 들어가 있는 경우 저장하려고 하면 다음과 같은 에러가 발생했다.

org.elasticsearch.hadoop.rest.EsHadoopParsingException: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Position for 'nested.foo.bar' not found in row; typically this is caused by a mapping inconsistency

원인은 Parquet로 저장하려면 csv 처럼 중첩되지 않은 표 형식으로 되어야하는데 열의 개수는 같지만 필드명이 각각 달라서 발생한 문제였다.

 

그래서 해당 필드명만 추출해서 하면 되지 않을까 해서 es.query 설정도 해보고 filter 기능을 사용해서 해봤지만 결과는 같았다.

 

어떻게 하면 가능할지 방법을 찾아보다가 테이블 형태로 추출한 다음에 Paquet로 저장하면 되지 않을까 하고 생각했다.

예제 코드는 다음과 같다.

// es.nodes는 ElasticSearch 주소, es.resource는 인덱스명
val esConf = Map(
    "es.nodes" -> "elasticsearch",
    "es.resource" -> "index"
)

val rdd = sc.esRDD(esConf)
// 필드명에 .이 있는 경우 조회할 때 제대로 안돼서 변경
val result = rdd.map(_._2).map(x => x.map { case (key, value) => key.replace(".", "_") -> value })
val keys = Array(
    "foo_bar_1",
    "foo_bar_2"
)
val rowRDD = result.map( x => {
    val temp = scala.collection.mutable.Map[String, AnyRef]("@timestamp" -> x("@timestamp"))

    // 값이 없는 경우 "0.0"으로 설정
    for (key <- keys if !x.contains(key))
        temp += (key -> "0.0")

    // 값이 있는 경우 문자열로 치환
    for (key <- keys if x.contains(key))
        temp += (key -> x(key).toString)

    // 타입을 명확히 하지 않으면 DataFrame을 만들 때 오류 발생
    Row(
        new java.sql.Timestamp(temp("@timestamp").asInstanceOf[java.util.Date].getTime()),
        temp("foo_bar_1").asInstanceOf[String].toFloat,
        temp("foo_bar_2").asInstanceOf[String].toFloat
    )
})

val schema = StructType(Array(
    StructField("@timestamp", TimestampType, true),
    StructField("foo_bar_1", FloatType, true),
    StructField("foo_bar_2", FloatType, true)
))
val df = spark.createDataFrame(rowRDD, schema)
df.write.parquet("hdfs://hadoop:9000/path/to/save/")

Parquet로 저장할 때 권한이 없다면서 안되는 경우가 있는데 그때에는 다음과 같이 환경 설정에 HADOOP_USER_NAME을 추가하면 된다.

$ export HADOOP_USER_NAME=hadoop

 

참고 문헌

  1. http://jason-heo.github.io/elasticsearch/2016/06/28/elasticsearch-with-spark.html

  2. https://stackoverflow.com/questions/51097818/replace-special-characters-of-column-names-in-spark-dataframe

  3. https://stackoverflow.com/questions/32860831/how-can-i-change-sparkcontext-sparkuser-setting-in-pyspark

반응형

'Log' 카테고리의 다른 글

[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
[Log] Fluentd 성능 튜닝  (0) 2020.01.25

+ Recent posts