728x90
반응형

ElasticSearch의 인덱스를 불러와서 Zeppelin에서 사용하려고 했다.

 

인덱스를 제목-날짜(yyyy.MM.dd) 형식으로 만들어서 날짜를 항상 오늘 날짜로 사용하고 싶었다.

 

이를 위해서 현재 시간 정보를 불러오는 함수가 필요했는데 다음과 같이 사용했다.

val ts = (current_timestamp()).expr.eval().toString.toLong
val dateValue = new java.sql.Timestamp(ts/1000).toLocalDateTime.format(java.time.format.DateTimeFormatter.ofPattern("yyyy.MM.dd"))

어제 정보도 불러오면 좋을 것 같아 다음과 같이 하여 쉽게 불러올 수 있었다.

val ts = (current_timestamp()).expr.eval().toString.toLong - 86400000L
val dateValue = new java.sql.Timestamp(ts/1000).toLocalDateTime.format(java.time.format.DateTimeFormatter.ofPattern("yyyy.MM.dd"))

특정 데이터를 기준으로 정렬을 하거나 조건으로 조회하려고 했는데 해당 데이터가 문자열이었다.

 

그래서 정렬을 제대로 하기 위해 형변환이 필요했는데 실수형으로 변환해야 했다.

 

withColumn과 cast로 쉽게 해당 열의 타입을 변환할 수 있었다.

import org.apache.spark.sql.types.FloatType

...
df.withColumn("etc", df("etc") cast FloatType).createOrReplaceTempView("temp")

 

참고 문헌

  1. https://stackoverflow.com/questions/55267527/spark-getting-current-date-in-string/55267586

  2. https://stackoverflow.com/questions/52590524/sparkstream-convert-string-to-float-in-sql-query

반응형
728x90
반응형

Spark에서 하는 일을 주기적으로 수행하기 위해 무엇이 있는지 조사했다.

 

여러 가지 도구들이 있었는데 그 중에서 AirflowLuigi가 좋아보였다.

 

둘 중에 어떤 걸로 정할 지 고민하다 Airflow로 정했다.

 

두 가지 모두 좋아보였지만 AirflowApache에서 관리하고 대시보드가 유려해보였다.

 

Airflow를 설치하는 방법은 Quick Start에 잘 나와 있어 그대로 따라하면 된다.

$ pip install 'apache-airflow[ssh]'

ssh는 원격 서버에 접속하여 spark-submit을 수행하기 위해 설치했다.

$ airflow initdb

~/airflow/airflow.cfg, airflow.db, logs, unittests.cfg 가 만들어진다.

 

Airflow를 구동할 때 airflow webserver와 같이 커맨드로만 가능해서

 

다음과 같이 start-airflow.sh, stop-airflow.sh을 만들어서 실행했다.

#!/bin/sh
nohup airflow webserver > /dev/null 2>&1 &
nohup airflow scheduler > /dev/null 2>&1 &
#!/bin/sh
pkill -f airflow

신규로 DAG를 만들기 위해서 ~/airflowdags 디렉토리를 만들고 파일을 추가한다.

$ cd ~/airflow
$ mkdir dags && cd dags
$ touch my_first_dag.py

my_fist_dag.py에 다음과 같이 작성하여 spark-submit을 주기적으로 수행하도록 했다.

# -- coding: utf-8 --
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
    dag_id='my_dag',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    tags=['spark']
)
templated_bash_command = """
    spark-submit \
    --class my_class \
    --master spark://spark-master:7077 \
    --executor-cores 2 \
    --executor-memory 2g \
    my_first.jar
"""
hook = SSHHook(
    ssh_conn_id='ssh_default',
    remote_host='spark-master',
    username='username',
    key_file='~/.ssh/id_rsa'
)
run_ssh = SSHOperator(
    task_id='spark_submit_task',
    ssh_hook=hook,
    command=templated_bash_command,
    dag=dag
)
run_ssh

schedule_interval='0 0 * * *'은 하루에 한 번 밤에 수행한다고 문서에 적혀있는데

 

UTC 시간대로 동작하여 한국 시간으로는 오전 9시에 수행한다.

 

참고 문헌

  1. http://bytepawn.com/luigi-airflow-pinball.html

  2. https://airflow.apache.org/docs/stable/installation.html

  3. https://airflow.apache.org/docs/stable/scheduler.html#dag-runs

반응형

'Log' 카테고리의 다른 글

[Log] librdkafka 빌드하기  (0) 2020.03.26
[Log] Logstash 403 에러 해결  (0) 2020.03.19
[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
728x90
반응형

Zeppelin은 웹 기반으로 다양한 인터프리터를 이용해서 데이터 분석을 도와주는 도구이다.

 

설치를 위해서 다음과 같이 하면 된다.

$ sudo yum install -y java-1.8.0-openjdk.x86_64
$ wget http://mirror.apache-kr.org/zeppelin/zeppelin-0.8.2/zeppelin-0.8.2-bin-all.tgz
$ tar xf zeppelin-0.8.2-bin-all.tgz && cd zeppelin-0.8.2-bin-all

다음과 같이 실행하면 Zeppelin이 구동된다.

$ bin/zeppelin-daemon.sh start

서버에 설치한 경우엔 직접 아이피로 접근해야 하는데 기본 설정으로는 접근할 수 없다.

 

Zeppelin 최신 버전(0.8.2)의 경우 기본 호스트가 0.0.0.0에서 127.0.0.1로 변경됐다.

Upgrading from Zeppelin 0.8.1 (and before) to 0.8.2 (and later)
From 0.8.2, Zeppelin server bind 127.0.0.1 by default instead of 0.0.0.0. Configure zeppelin.server.addr property or ZEPPELIN_ADDR env variable to change.

그래서 zeppelin-env.sh에서 ZEPPELIN_ADDR0.0.0.0으로 변경하고 재시작하면 웹에서 접근할 수 있다.

 

Spark와 연동하기 위해서 Zeppelin이 설치된 서버에 Spark를 설치한다.

 

원격 서버에 설치된 Spark 버전이 2.4.5라면 똑같은 버전을 설치해야 동작한다.

 

그리고 zeppelin-env.sh에 다음과 같이 설정한다.

export SPARK_HOME=~/spark

이제 마지막으로 웹페이지에서 InterpreterSpark를 찾아 설정을 추가한다.

masterspark://remote_addr:7077와 같이 입력하면 된다.

 

Notebook에서 테스트해보면 잘 동작하는 것을 확인할 수 있다.

 

참고 문헌

  1. https://zeppelin.apache.org/docs/0.8.2/setup/operation/upgrading.html#upgrading-from-zeppelin-07-to-08

  2. https://stackoverflow.com/questions/50173371/how-to-solve-java-io-invalidclassexception-local-class-incompatible-with-scala

반응형

'Log' 카테고리의 다른 글

[Log] Logstash 403 에러 해결  (0) 2020.03.19
[Log] Airflow 설치  (0) 2020.03.12
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
728x90
반응형

로그를 ElasticSearch에 쌓아서 실시간으로 보고 있었는데 로그의 양이 많아서 차지하는 용량이 계속 커지고 있었다.

 

파일로도 로그를 남겨두기는 해서 한동안은 80 ~ 90% 정도 되면 인덱스를 지워서 용량을 확보했다.

 

하지만 다년간의 추이를 보기 위해 로그를 분석해서 남겨야할 필요가 있었다.

 

SparkParquet를 조사하면서 요구 사항을 어떻게 적용할 수 있을지를 고심했었다.

 

처음엔 ELK 스택처럼 Kafka로 로그를 받아서 처리하는 방식으로 하려고 시도했었다.

 

Spark 예제에서 Kafka로 로그를 받아서 단어 개수를 세는 프로그램이 있어 이를 활용하면 되겠다 싶었다.

 

하지만 로그를 받는 것은 쉽게 됐지만 Parquet로 저장하는 것이 쉽지 않았다.

 

왜냐하면 Parquet는 열 지향 데이터 스토리지 형식이기 때문에 지속적으로 들어오는 데이터를 쓰는 것이 쉽지 않았다.

 

그래서 중간에 Avro나 다른 DB에 저장했다가 하루가 지나면 Parquet로 저장하면 되겠다고 생각하고 golang으로 대강 완성을 했었다.

 

그러다가 우연히 한 블로그를 발견하고 ElasticSearch에서 Parquet로 변환하여 장기보관하도록 수정했다.

 

저장된 인덱스들로 테스트를 해봤는데 여러 가지 애로 사항을 겪을 수 있었다.

 

우선은 필드명에 점(.)이 들어가 있는 경우 저장하려고 하면 다음과 같은 에러가 발생했다.

org.elasticsearch.hadoop.rest.EsHadoopParsingException: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Position for 'nested.foo.bar' not found in row; typically this is caused by a mapping inconsistency

원인은 Parquet로 저장하려면 csv 처럼 중첩되지 않은 표 형식으로 되어야하는데 열의 개수는 같지만 필드명이 각각 달라서 발생한 문제였다.

 

그래서 해당 필드명만 추출해서 하면 되지 않을까 해서 es.query 설정도 해보고 filter 기능을 사용해서 해봤지만 결과는 같았다.

 

어떻게 하면 가능할지 방법을 찾아보다가 테이블 형태로 추출한 다음에 Paquet로 저장하면 되지 않을까 하고 생각했다.

예제 코드는 다음과 같다.

// es.nodes는 ElasticSearch 주소, es.resource는 인덱스명
val esConf = Map(
    "es.nodes" -> "elasticsearch",
    "es.resource" -> "index"
)

val rdd = sc.esRDD(esConf)
// 필드명에 .이 있는 경우 조회할 때 제대로 안돼서 변경
val result = rdd.map(_._2).map(x => x.map { case (key, value) => key.replace(".", "_") -> value })
val keys = Array(
    "foo_bar_1",
    "foo_bar_2"
)
val rowRDD = result.map( x => {
    val temp = scala.collection.mutable.Map[String, AnyRef]("@timestamp" -> x("@timestamp"))

    // 값이 없는 경우 "0.0"으로 설정
    for (key <- keys if !x.contains(key))
        temp += (key -> "0.0")

    // 값이 있는 경우 문자열로 치환
    for (key <- keys if x.contains(key))
        temp += (key -> x(key).toString)

    // 타입을 명확히 하지 않으면 DataFrame을 만들 때 오류 발생
    Row(
        new java.sql.Timestamp(temp("@timestamp").asInstanceOf[java.util.Date].getTime()),
        temp("foo_bar_1").asInstanceOf[String].toFloat,
        temp("foo_bar_2").asInstanceOf[String].toFloat
    )
})

val schema = StructType(Array(
    StructField("@timestamp", TimestampType, true),
    StructField("foo_bar_1", FloatType, true),
    StructField("foo_bar_2", FloatType, true)
))
val df = spark.createDataFrame(rowRDD, schema)
df.write.parquet("hdfs://hadoop:9000/path/to/save/")

Parquet로 저장할 때 권한이 없다면서 안되는 경우가 있는데 그때에는 다음과 같이 환경 설정에 HADOOP_USER_NAME을 추가하면 된다.

$ export HADOOP_USER_NAME=hadoop

 

참고 문헌

  1. http://jason-heo.github.io/elasticsearch/2016/06/28/elasticsearch-with-spark.html

  2. https://stackoverflow.com/questions/51097818/replace-special-characters-of-column-names-in-spark-dataframe

  3. https://stackoverflow.com/questions/32860831/how-can-i-change-sparkcontext-sparkuser-setting-in-pyspark

반응형

'Log' 카테고리의 다른 글

[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Logstash Kafka 연동 에러 해결  (0) 2020.01.25
[Log] Logstash Pipeline 사용하기  (0) 2020.01.25
[Log] Fluentd 성능 튜닝  (0) 2020.01.25

+ Recent posts