728x90
반응형

Spark에서 하는 일을 주기적으로 수행하기 위해 무엇이 있는지 조사했다.

 

여러 가지 도구들이 있었는데 그 중에서 AirflowLuigi가 좋아보였다.

 

둘 중에 어떤 걸로 정할 지 고민하다 Airflow로 정했다.

 

두 가지 모두 좋아보였지만 AirflowApache에서 관리하고 대시보드가 유려해보였다.

 

Airflow를 설치하는 방법은 Quick Start에 잘 나와 있어 그대로 따라하면 된다.

$ pip install 'apache-airflow[ssh]'

ssh는 원격 서버에 접속하여 spark-submit을 수행하기 위해 설치했다.

$ airflow initdb

~/airflow/airflow.cfg, airflow.db, logs, unittests.cfg 가 만들어진다.

 

Airflow를 구동할 때 airflow webserver와 같이 커맨드로만 가능해서

 

다음과 같이 start-airflow.sh, stop-airflow.sh을 만들어서 실행했다.

#!/bin/sh
nohup airflow webserver > /dev/null 2>&1 &
nohup airflow scheduler > /dev/null 2>&1 &
#!/bin/sh
pkill -f airflow

신규로 DAG를 만들기 위해서 ~/airflowdags 디렉토리를 만들고 파일을 추가한다.

$ cd ~/airflow
$ mkdir dags && cd dags
$ touch my_first_dag.py

my_fist_dag.py에 다음과 같이 작성하여 spark-submit을 주기적으로 수행하도록 했다.

# -- coding: utf-8 --
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
    dag_id='my_dag',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    tags=['spark']
)
templated_bash_command = """
    spark-submit \
    --class my_class \
    --master spark://spark-master:7077 \
    --executor-cores 2 \
    --executor-memory 2g \
    my_first.jar
"""
hook = SSHHook(
    ssh_conn_id='ssh_default',
    remote_host='spark-master',
    username='username',
    key_file='~/.ssh/id_rsa'
)
run_ssh = SSHOperator(
    task_id='spark_submit_task',
    ssh_hook=hook,
    command=templated_bash_command,
    dag=dag
)
run_ssh

schedule_interval='0 0 * * *'은 하루에 한 번 밤에 수행한다고 문서에 적혀있는데

 

UTC 시간대로 동작하여 한국 시간으로는 오전 9시에 수행한다.

 

참고 문헌

  1. http://bytepawn.com/luigi-airflow-pinball.html

  2. https://airflow.apache.org/docs/stable/installation.html

  3. https://airflow.apache.org/docs/stable/scheduler.html#dag-runs

반응형

'Log' 카테고리의 다른 글

[Log] librdkafka 빌드하기  (0) 2020.03.26
[Log] Logstash 403 에러 해결  (0) 2020.03.19
[Log] Zeppelin 설치 및 Spark 연동  (0) 2020.03.05
[Log] NoNodesAvailable 에러 해결  (0) 2020.02.27
[Log] Spark ElasticSearch Parquet  (0) 2020.01.25

+ Recent posts