FlumeでKafka for HDInsightとデータをやり取りしてみる
Microsoft AzureのHDInsightでKafkaが使えるようになったので、これをデプロイしてFlumeでデータをやり取りしてみます。
HDInsightのKafkaはこのへんのマイクロソフトのブログ記事を見てデプロイしておきます。
Kafka for HDInsight概要
デプロイ方法と使い方blogs.msdn.microsoft.com
今回は試してみるのみということで作業を楽にするためにFlumeバージョン1.7.0をHDInsightのヘッドノードにインストールして使います。
ヘッドノードにSSHログインしてFlumeをセットアップします。
Flumeのセットアップは下記のドキュメントを参照しました。
Flume 1.7.0 User Guide
https://flume.apache.org/FlumeUserGuide.html
Flumeのセットアップ
Flumeを動かすのに必要なJavaをインストールします。
$ yum install java-1.8.0-openjdk.x86_64
Flumeをインストールします。
$ wget http://www-eu.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz $ cd /usr/local $ sudo tar xzvf /path/to/apache-flume-1.7.0-bin.tar.gz $ sudo mv apache-flume-1.7.0-bin flume $ cd flume
JAVA_HOMEをflume-env.shに設定します。
$ sudo cp conf/flume-env.sh.template conf/flume-env.sh $ sudo vim conf/flume-env.sh export JAVA_HOME=/usr/lib/jvm/jre
動作テストします。
$ bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console
こんなのがぺろぺろと出てきたらOKだと思います。
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:155)] Shutdown Metric for type: SOURCE, name: seqGenSrc. source.start.time == 1484633482741
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:161)] Shutdown Metric for type: SOURCE, name: seqGenSrc. source.stop.time == 1484633483298
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append-batch.accepted == 0
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append-batch.received == 0
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append.accepted == 0
Kafkaにデータを送信する
FlumeのKafka Sinkを設定します。
ApacheのログをTailしながらKafkaに送信するように設定してみます。
設定ファイルはconf/flume-conf.properties.templateをconf/flume.confとかにコピーして作成します。
ApacheのログをTailしながらKafkaに送信するように設定してみます。
設定ファイルはconf/flume-conf.properties.templateをconf/flume.confとかにコピーして作成します。
# 各機能の名称定義 agent.sources = s1 agent.channels = c1 agent.sinks = k1 # Exec Sourceでtail -Fコマンドを起動してApacheログをtail agent.sources.s1.type = exec #Flumeの実行ユーザーがこのPATHにアクセスできるように権限を設定する必要があります agent.sources.s1.command = tail -F /var/log/httpd/access_log agent.sources.s1.channels = c1 # Interceptor (ログの各フィールドをparseしたい場合はこれを使うが、headerに入るのでその後工夫が必要) agent.sources.s1.interceptors = i1 agent.sources.s1.interceptors.i1.type = regex_extractor agent.sources.s1.interceptors.i1.regex = ([^ ]*) ([^ ]*) ([^ ]*) \\[(.+)\\] \\"([^ ]*) ([^ ]*) ([^ ]*)\\" ([^ ]*) ([^ ]*) \\"([^ ]*)\\" \\"(.*)\\" agent.sources.s1.interceptors.i1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 agent.sources.s1.interceptors.i1.serializers.s1.name = host agent.sources.s1.interceptors.i1.serializers.s2.name = identity agent.sources.s1.interceptors.i1.serializers.s3.name = user agent.sources.s1.interceptors.i1.serializers.s4.name = time agent.sources.s1.interceptors.i1.serializers.s5.name = method agent.sources.s1.interceptors.i1.serializers.s6.name = request agent.sources.s1.interceptors.i1.serializers.s7.name = protocol agent.sources.s1.interceptors.i1.serializers.s8.name = status agent.sources.s1.interceptors.i1.serializers.s9.name = size agent.sources.s1.interceptors.i1.serializers.s10.name = referer agent.sources.s1.interceptors.i1.serializers.s11.name = agent # Channel (読み取ったデータを送出するまでのバッファ。テスト中はメモリで良いけど、本番ではDiskを使ったほうが安全かも) agent.channels.c1.type = memory # Sink (Kafkaに送るところを定義する) agent.sinks.k1.channel = c1 # Kafka Sinkのクラスを指定 agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 送り先のトピック名。作成済みであればそこに書くし、なければ勝手に作る仕様っぽいです agent.sinks.k1.kafka.topic = mytopic # Kafka for HDInsightのBrokerをカンマ区切りで記述(サーバー名は環境に合わせて読み替えること) agent.sinks.k1.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092 agent.sinks.k1.kafka.flumeBatchSize = 20 agent.sinks.k1.kafka.producer.acks = 1 agent.sinks.k1.kafka.producer.linger.ms = 1 agent.sinks.k1.kafka.producer.compression.type = snappy # Interceptorでログフォーマットをparseした場合はheaderに投入されるので、serializerをheader_and_textに指定 # これをやるとheaders{host=xx.xx.xx.xx, identity=xxx, ….} xx.xx.xx.xx - - みたいな形式でデータが送られる、でもあとでクエリ実行するためにはこのままでは使いにくいかも # Interceptorを書かずにいい感じでやれる方法があったら教えてください・・・m(_ _)m agent.sinks.k1.sink.serializer = header_and_text
Kafkaからデータを受信する
データをKafkaに送ったら、今度は読み取ってみます。
FlumeのKafka Sourceを設定します。
Kafka SourceでHDInsightのKafkaに接続して特定のTopicから取得します。
読んだデータはHDFS Sinkを使ってHDInsightに書き出します。
Kafka SourceでHDInsightのKafkaに接続して特定のTopicから取得します。
読んだデータはHDFS Sinkを使ってHDInsightに書き出します。
agent.sources = s2 agent.channels = c2 agent.sinks = k2 # Source (sourceのtypeプロパティにKafkaSourceクラスを指定) agent.sources.s2.type = org.apache.flume.source.kafka.KafkaSource agent.sources.s2.channels = c2 agent.sources.s2.batchSize = 5000 agent.sources.s2.batchDurationMillis = 2000 agent.sources.s2.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092 agent.sources.s2.kafka.topics = mytopic # Channel agent.channels.c2.type = memory agent.channels.c2.capacity = 100 # Sink (HDFSに書く場合) agent.sinks.k2.type = hdfs agent.sinks.k2.channel = c2 agent.sinks.k2.hdfs.path = /tmp/kafka/%y-%m-%d/%H%M/%S agent.sinks.k2.hdfs.filePrefix = events- # デフォルトではSequensceファイルになってる agent.sinks.k2.hdfs.fileType = DataStream # デフォルトではWritableフォーマットになってる agent.sinks.k2.hdfs.serializer = TEXT agent.sinks.k2.hdfs.round = trued agent.sinks.k2.hdfs.roundValue = 10 agent.sinks.k2.hdfs.roundUnit = minute