728x90
반응형

기존에 사용 중이던 ElasticSearch 서버가 근래에 자주 다운됐었다.

 

그래서 새로운 방식으로 로그를 수집해보려고 했었다.

 

Hadoop에 JSON으로 저장해서 Zeppelin에서 보려고 하니 불편 사항이 많았다.

 

다른 것들이 더 있을까 하고 찾아봤지만 ElasticSearch와 Kibana 만큼 로그를 검색하고 조회하는 도구가 없었다.

 

결국 두 개를 사용하기로 하고 Logstash 만 다른 걸 이용해보려고 하다가 Apache Flume이 생각나서 적용해보려고 했다.

 

구조는 Source -> Channel -> Sink 로 돼있고 설정이 어려워 보이지 않아 시도해보았다.

 

하지만 java.lang.NoClassDefFoundError: org/elasticsearch/common/io/BytesStream

 

에러가 발생하면서 종료됐다.

 

찾아보니 ElasticSearch jar 파일을 별도로 추가해야한다고 해서 추가했는데도 되지 않았다.

 

무엇인가 이상하여 찾아보니 Flume에서 제공하는 라이브러리에서

 

org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder를 임포트하여 사용하는데

 

해당 패키지는 ElasticSearch 6.x 이상에서는 없어진 패키지였다.

 

Flume을 굳이 사용하고자 한다면 sink 를 직접 만들어서 사용해야 해서 Logstash로 선회했다.

 

(물론 fluentd를 사용해도 되지만 다른 곳에서 사용하여 다른 것을 써보고 싶었다)

참고 문헌

  1. https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java
  2. https://github.com/elastic/elasticsearch/tree/5.6/core/src/main/java/org/elasticsearch/common/xcontent
반응형

'Log' 카테고리의 다른 글

[Log] Logstash k8s로 이전하기  (0) 2023.01.26
[Log] Logstash 메모리 문제  (0) 2022.08.25
[Log] Grafana 이전 하기  (0) 2021.01.28
[Log] Kubernetes에 ElasticSearch 올리기  (0) 2021.01.14
[Log] Logstash 메모리 누수?  (0) 2020.12.03
728x90
반응형

Logstash 부하로 인하여 Elastic Search에 업데이트가 안되고 있었다.

 

그래서 부랴부랴 특정 토픽들을 제외하여 급한 로그들을 처리했다.

 

그런데 일부 로그들을 파싱하는 과정에서 시간 정보를 현재 시간으로 쓰는 경우가 있었다.

 

시간 정보를 따로 갖고는 있었는데 해당 필드로 처리하고 있지 않고 있었다.

 

이것을 해결하기 위한 방법을 찾아보니 _update_by_query 를 사용하면 된다고 하여 시도해보았다.

 

처음엔 인덱스 전부를 업데이트해보았으나 너무 오래 걸렸다.

 

그래서 특정 시간대의 로그만 처리하기로 했고 다음과 같이 데이터를 보내 해결할 수 있었다.

{
  "script": {
    "lang": "painless",
    "source": "ctx._source['@timestamp'] = ctx._source.time"
  },
  "query": {
    "bool": {
        "must": [
            {
                "exists": {
                    "field": "time"
                }
            },
            {
                "range": {
                    "@timestamp": {
                        "time_zone": "+09:00",        
                        "gte": "2020-11-20T21:25:00", 
                        "lte": "2020-11-20T21:30:00"                  
                    }
                }
            }
        ]
    }
  }
}

 

참고 문헌

  1. https://stackoverflow.com/questions/55570214/how-do-i-overwrite-the-timestamp-field-with-another-field-in-elasticsearch

  2. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html

  3. https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-range-query.html

  4. https://www.elastic.co/guide/en/elasticsearch/reference/current/compound-queries.html

반응형
728x90
반응형

ElasticSearch의 인덱스를 불러와서 Zeppelin에서 사용하려고 했다.

 

인덱스를 제목-날짜(yyyy.MM.dd) 형식으로 만들어서 날짜를 항상 오늘 날짜로 사용하고 싶었다.

 

이를 위해서 현재 시간 정보를 불러오는 함수가 필요했는데 다음과 같이 사용했다.

val ts = (current_timestamp()).expr.eval().toString.toLong
val dateValue = new java.sql.Timestamp(ts/1000).toLocalDateTime.format(java.time.format.DateTimeFormatter.ofPattern("yyyy.MM.dd"))

어제 정보도 불러오면 좋을 것 같아 다음과 같이 하여 쉽게 불러올 수 있었다.

val ts = (current_timestamp()).expr.eval().toString.toLong - 86400000L
val dateValue = new java.sql.Timestamp(ts/1000).toLocalDateTime.format(java.time.format.DateTimeFormatter.ofPattern("yyyy.MM.dd"))

특정 데이터를 기준으로 정렬을 하거나 조건으로 조회하려고 했는데 해당 데이터가 문자열이었다.

 

그래서 정렬을 제대로 하기 위해 형변환이 필요했는데 실수형으로 변환해야 했다.

 

withColumn과 cast로 쉽게 해당 열의 타입을 변환할 수 있었다.

import org.apache.spark.sql.types.FloatType

...
df.withColumn("etc", df("etc") cast FloatType).createOrReplaceTempView("temp")

 

참고 문헌

  1. https://stackoverflow.com/questions/55267527/spark-getting-current-date-in-string/55267586

  2. https://stackoverflow.com/questions/52590524/sparkstream-convert-string-to-float-in-sql-query

반응형
728x90
반응형

이번에 프로젝트를 수행하면서 일별로 데이터를 분석할 필요가 있었다.

 

처음엔 수동으로 작업을 했는데 자동으로 구동되면 좋을 것 같아서 찾아보니 역시나 기능이 있었다.

 

$ZEPPELIN_HOME/conf/zeppelin-site.xml 에서 주석으로 돼있는 내용을 풀고 다음과 같이 수정하면 된다.

 

(zeppelin.notebook.cron.folders 에도 값이 있어야 하는데 / 나 특정 디렉토리를 입력했을 때는 버튼이 나오지 않았다.)

<property>
<name>zeppelin.notebook.cron.enable</name>
<value>true</value>
<description>Notebook enable cron scheduler feature</description>
</property>
<property>
<name>zeppelin.notebook.cron.folders</name>
<value>*</value>
<description>Notebook cron folders</description>
</property>

 

이미 설정된 값은 None, 1m, 5m, 1h, 3h, 6h, 12h, 1d로 돼있다.

 

그외에 설정을 하려면 크론 설정은 Quatz에서 하는 것과 같이 하면 된다.

 

10분마다로 설정을 하려며 0 0/10 * * * ? 로 하면 된다.

 

참고 문헌

  1. https://zeppelin.apache.org/docs/0.8.0/usage/other_features/cron_scheduler.html

  2. https://stackoverflow.com/questions/58727773/cant-turn-on-cron-feature-in-apache-zeppelin

  3. https://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html#examples

반응형

+ Recent posts