FlumeでKafka for HDInsightとデータをやり取りしてみる

Microsoft AzureのHDInsightでKafkaが使えるようになったので、これをデプロイしてFlumeでデータをやり取りしてみます。
HDInsightのKafkaはこのへんのマイクロソフトのブログ記事を見てデプロイしておきます。

Kafka for HDInsight概要

blogs.msdn.microsoft.com

デプロイ方法と使い方blogs.msdn.microsoft.com

 

今回は試してみるのみということで作業を楽にするためにFlumeバージョン1.7.0をHDInsightのヘッドノードにインストールして使います。
ヘッドノードにSSHログインしてFlumeをセットアップします。
Flumeのセットアップは下記のドキュメントを参照しました。

Flume 1.7.0  User Guide

https://flume.apache.org/FlumeUserGuide.html

 

Flumeのセットアップ

Flumeを動かすのに必要なJavaをインストールします。

$ yum install java-1.8.0-openjdk.x86_64
 
Flumeをインストールします。
$ wget http://www-eu.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
$ cd /usr/local
$ sudo tar xzvf /path/to/apache-flume-1.7.0-bin.tar.gz
$ sudo mv apache-flume-1.7.0-bin flume
$ cd flume
 
JAVA_HOMEをflume-env.shに設定します。
$ sudo cp conf/flume-env.sh.template conf/flume-env.sh
$ sudo vim conf/flume-env.sh
export JAVA_HOME=/usr/lib/jvm/jre
 
動作テストします。
$ bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console

 

こんなのがぺろぺろと出てきたらOKだと思います。

2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:155)] Shutdown Metric for type: SOURCE, name: seqGenSrc. source.start.time == 1484633482741
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:161)] Shutdown Metric for type: SOURCE, name: seqGenSrc. source.stop.time == 1484633483298
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append-batch.accepted == 0
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append-batch.received == 0
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append.accepted == 0

 

Kafkaにデータを送信する

FlumeのKafka Sinkを設定します。
ApacheのログをTailしながらKafkaに送信するように設定してみます。
設定ファイルはconf/flume-conf.properties.templateをconf/flume.confとかにコピーして作成します。
# 各機能の名称定義
agent.sources = s1
agent.channels = c1
agent.sinks = k1
# Exec Sourceでtail -Fコマンドを起動してApacheログをtail
agent.sources.s1.type = exec
#Flumeの実行ユーザーがこのPATHにアクセスできるように権限を設定する必要があります
agent.sources.s1.command = tail -F /var/log/httpd/access_log
agent.sources.s1.channels = c1
# Interceptor (ログの各フィールドをparseしたい場合はこれを使うが、headerに入るのでその後工夫が必要)
agent.sources.s1.interceptors = i1
agent.sources.s1.interceptors.i1.type = regex_extractor
agent.sources.s1.interceptors.i1.regex = ([^ ]*) ([^ ]*) ([^ ]*) \\[(.+)\\] \\"([^ ]*) ([^ ]*) ([^ ]*)\\" ([^ ]*) ([^ ]*) \\"([^ ]*)\\" \\"(.*)\\"
agent.sources.s1.interceptors.i1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11
agent.sources.s1.interceptors.i1.serializers.s1.name = host
agent.sources.s1.interceptors.i1.serializers.s2.name = identity
agent.sources.s1.interceptors.i1.serializers.s3.name = user
agent.sources.s1.interceptors.i1.serializers.s4.name = time
agent.sources.s1.interceptors.i1.serializers.s5.name = method
agent.sources.s1.interceptors.i1.serializers.s6.name = request
agent.sources.s1.interceptors.i1.serializers.s7.name = protocol
agent.sources.s1.interceptors.i1.serializers.s8.name = status
agent.sources.s1.interceptors.i1.serializers.s9.name = size
agent.sources.s1.interceptors.i1.serializers.s10.name = referer
agent.sources.s1.interceptors.i1.serializers.s11.name = agent
# Channel (読み取ったデータを送出するまでのバッファ。テスト中はメモリで良いけど、本番ではDiskを使ったほうが安全かも)
agent.channels.c1.type = memory
# Sink (Kafkaに送るところを定義する)
agent.sinks.k1.channel = c1
# Kafka Sinkのクラスを指定
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 送り先のトピック名。作成済みであればそこに書くし、なければ勝手に作る仕様っぽいです
agent.sinks.k1.kafka.topic = mytopic
# Kafka for HDInsightのBrokerをカンマ区切りで記述(サーバー名は環境に合わせて読み替えること)
agent.sinks.k1.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
agent.sinks.k1.kafka.flumeBatchSize = 20
agent.sinks.k1.kafka.producer.acks = 1
agent.sinks.k1.kafka.producer.linger.ms = 1
agent.sinks.k1.kafka.producer.compression.type = snappy
# Interceptorでログフォーマットをparseした場合はheaderに投入されるので、serializerをheader_and_textに指定
# これをやるとheaders{host=xx.xx.xx.xx, identity=xxx, ….} xx.xx.xx.xx - - みたいな形式でデータが送られる、でもあとでクエリ実行するためにはこのままでは使いにくいかも
# Interceptorを書かずにいい感じでやれる方法があったら教えてください・・・m(_ _)m
agent.sinks.k1.sink.serializer = header_and_text
 

Kafkaからデータを受信する 

データをKafkaに送ったら、今度は読み取ってみます。
FlumeのKafka Sourceを設定します。
Kafka SourceでHDInsightのKafkaに接続して特定のTopicから取得します。
読んだデータはHDFS Sinkを使ってHDInsightに書き出します。
agent.sources = s2
agent.channels = c2
agent.sinks = k2
# Source (sourceのtypeプロパティにKafkaSourceクラスを指定)
agent.sources.s2.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.s2.channels = c2
agent.sources.s2.batchSize = 5000
agent.sources.s2.batchDurationMillis = 2000
agent.sources.s2.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
agent.sources.s2.kafka.topics = mytopic
# Channel
agent.channels.c2.type = memory
agent.channels.c2.capacity = 100
# Sink (HDFSに書く場合)
agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c2
agent.sinks.k2.hdfs.path = /tmp/kafka/%y-%m-%d/%H%M/%S
agent.sinks.k2.hdfs.filePrefix = events-
# デフォルトではSequensceファイルになってる
agent.sinks.k2.hdfs.fileType = DataStream
# デフォルトではWritableフォーマットになってる
agent.sinks.k2.hdfs.serializer = TEXT
agent.sinks.k2.hdfs.round = trued
agent.sinks.k2.hdfs.roundValue = 10
agent.sinks.k2.hdfs.roundUnit = minute
 
HDInsightだとBlobに書かれます。agent.sinks.k2.hdfs.pathに「hdfs://~~~~~」と明示的に指定するとワーカーノードのお腹のディスクで構成されているHDFSに書かれます。
気が向いたら別のツール(fluentdやlogstashなど)も試してみようかと思います。