U-SQL 入門④ ~ FileSets ~
FileSets を使って複数のファイルを EXTRACT 文で読み込むことができます。
基本
以下のように 5 ファイルがある場合、
/input/2018-01-01.log /input/2018-02-01.log /input/2018-03-01.log /input/2018-04-01.log /input/2018-05-01.log
以下の EXTRACT 文で 5 ファイルをすべて読み込むことができます。また、{suffix} にマッチした部分を suffix 列に入れることができます。
@rs = EXTRACT user string, id string, suffix string FROM "/input/{suffix}.log" USING Extractors.Csv();
日付形式の対応
以下のように 5 ファイルがある場合、
/input/2018-01-01.log /input/2018-02-01.log /input/2018-03-01.log /input/2018-04-01.log /input/2018-05-01.log
以下の EXTRACT 文ではファイル名の日付情報を、DateTime 型の値として取得することができます。
@rs = EXTRACT user string, id string, date DateTime FROM "/input/{date:yyyy}-{date:MM}-{date:dd}.log" USING Extractors.Csv();
以下のようなフォルダ構成になっている場合、
/input/2017/01/01/data.txt /input/2017/02/01/data.txt /input/2017/03/01/data.txt /input/2017/04/01/data.txt /input/2017/05/01/data.txt
以下の EXTRACT 文でフォルダ構造から日付情報を、DateTime 型の値として取得することができます。
@rs = EXTRACT user string, id string, date DateTime FROM "/input/{date:yyyy}/{date:MM}/{date:dd}/data.txt" USING Extractors.Csv();
このデータに SELECT を実行する際に以下のように WHERE 句で DateTime 型の範囲でフィルターをかけることができます。
@rs = SELECT * FROM @rs WHERE date >= System.DateTime.Parse("2018/1/1") AND date < System.DateTime.Parse("2018/2/28");
フィルターの効能
以下のようなファイルを読み込むとき、
/input/ja_data-01.log /input/ja_data-02.log /input/en_data-01.log /input/en_data-02.log /input/cn_data-01.log
以下の EXTRACT 文でファイル名の先頭にある文字列を {Market} に格納できます。
@rs = EXTRACT user string, id string, Market string FROM "/input/{Market}_{*}" USING Extractors.Csv();
SELECT 文を実行する際に以下のように Market 列でフィルターをかけると、/input/ja_data-01.log、/input/ja_data-02.log の 2 つのファイルのみが読み取られ、その他のファイルにはアクセスされないため、I/O 効率が良くなります。
@us = SELECT * FROM @rs WHERE Market == “ja" ;
U-SQL 入門③ ~ ファイルのアウトプット ~
U-SQL でのファイルのアウトプットは OUTPUT 文のTO 句 で PATH を指定して、 USING 句で Outputter を指定します。
OUTPUT @rows TO “/data.csv” USING Outputters.Csv();
Outputtersで書き込むファイルのフォーマットを指定します。
// ヘッダ付きの CSV 形式で出力する OUTPUT @rows TO “/data.csv” USING Outputters.Csv(outputHeader: true); // ダブルクォート無しで TSV 形式で出力する OUTPUT @rows TO “/data.tsv” USING Outputters.Tsv(quoting: false); // "|" で区切られたファイル形式で出力する OUTPUT @rows TO “/data.txt” USING Outputters.Text(delimiter: '|');
Outputter 共通のパラメータ
パラメータ | デフォルト値 | 指定できる値 | 説明 |
---|---|---|---|
dateTimeFormat | "o" | datetime側のカラムがあった場合、このパラメータで出力フォーマットを指定できる。指定できるフォーマットはこちらに書いてある。 https://msdn.microsoft.com/library/az4se3k1(v=vs.110).aspx |
|
encoding | Encoding.UTF8 | Encoding.[ASCII] Encoding.BigEndianUnicode Encoding.Unicode Encoding.UTF7 Encoding.UTF8 Encoding.UTF32 |
ファイルのエンコード指定 |
escapeCharacter | null | エスケープ用の文字を指定 | |
nullEscape | null | null 値と解釈する文字を指定 | |
quoting | true | true false |
アウトプットするデータのフィールドを " (ダブルクォート) でくくるかどうか |
rowDelimiter | \r\n | 行デリミタを指定。デフォルトではCRLF | |
outputHeader | false | true |
ヘッダにフィールド名を付けるかどうか |
U-SQL 入門② ~ ファイルのインプット ~
※ストレージは ADLA のデータソースとして登録しておく必要があります。
ファイルインプットの例
// デフォルトの ADLS の PATH を指定する場合 @rows = EXTRACT name string, id int FROM “/…/data.csv” USING Extractors.Csv(); // ADLS を指定する絶対パスの場合 @rows = EXTRACT name string, id int FROM “adl://…/data.csv” USING Extractors.Csv(); // Blob ストレージを指定する場合 @rows = EXTRACT name string, id int FROM “wasb://…/data.csv” USING Extractors.Csv();
USING 句で Extractor を指定して、読み込むファイルのフォーマットを指定します。
ビルトインの Extractor は以下の 3 つがあります。
- Extractors.Text() : ファイルを指定したデリミタで区切って読み取る
- Extractors.Csv() : CSV ファイルを読み取る
- Extractors.Tsv() : TSV ファイルを読み取る
// "|" で区切られたテキストファイルを読み込む。最初の 1 行目はスキップする @rows = EXTRACT name string, id int, FROM "/file.text" USING Extractors.text(delimiter: "|", skipFirstNRows: 1) // CSV ファイルを読み込む。カラム数が合わない場合やデータ型が合わない場合はその行をスキップする @rows = EXTRACT name string, id int, FROM "/file.csv" USING Extractors.csv(silent: true) // TSV ファイルを読み込む。ASCII エンコードを指定 @rows = EXTRACT name string, id int, FROM "/file.tsv" USING Extractors.csv(encoding: Encoding.[ASCII])
Extractor の共通パラメータ
パラメータ | デフォルト値 | 指定できる値 | 説明 |
---|---|---|---|
encoding | Engoding.UTF8 | Encoding.[ASCII] Encoding.BigEndianUnicode Encoding.Unicode Encoding.UTF7Encoding.UTF8 Encoding.UTF32 |
ファイルのエンコード指定 |
escapeCharacter | null | エスケープ用の文字を指定 | |
nullExcape | null | null 値と解釈する文字を指定 | |
quoting | true | true false |
カラムのフィールドの " (ダブルクォート) を考慮にいれるかどうか現在は " (ダブルクォート) 以外には対応していない |
rowDelimiter | "\r\n" "\r" "\n" |
行のセパレータを何にするか。デフォルトでは改行コードが指定される | |
silent | false | true false |
指定したカラム数と違うカラム数の行が見つかった場合や指定したものと異なるデータ型があった場合に、スキップして読み込むか、それともエラーを吐いて処理を止めるか |
skipFirstNRows | 0 | 読み込むときに先頭から指定した行数だけスキップする |
U-SQL 入門① ~ U-SQL の基本形 ~
Azure Data Lake Analytics の U-SQL というクエリ言語についてメモ用に基本的な事をまとめていきます。
Azure Data Lake そのものについてや構築方法等はこちらのブログ (Data Platform Tech Sales Team Blog | Azure Data Lake)を見るとわかりやすいと思います。
U-SQL とは?
U-SQL の基本形
@searchlog =
EXTRACT UserId int,
Start DateTime,
Region string,
Query string,
Duration int?,
Urls string,
ClickedUrls string
FROM "/Samples/Data/SearchLog.tsv"
USING Extractors.Tsv();
@rs1 =
SELECT Start, Region, Duration
FROM @searchlog
WHERE Region == "en-gb";
OUTPUT @rs1
TO "/output/SearchLog-transform-rowsets.csv"
USING Outputters.Csv();
それぞれの個所で以下のような動作をする。
EXRACT の個所
- /Samples/Data/SearchLog.tsv ファイルを読み込む
- Extractors.Tsv() でタブで区切る
- UserId, Start, ... と列名を付与し、データ型も定義
- @searchlog という変数にセット
SELECT の個所
- @searchlog に対して SELECT を実行
- Region 列の値が en-gb のデータにフィルタ
- Start, Region, Duration 列のみを選択
- @rs1 変数にセット
OUTPUT の個所
- @rs1 にセットされた SELECT の結果を /output/SearchLog-transform-rowsets.csv というファイルに書き出す
- Outputters.Csv() によって CSV フォーマットに変換される
Azure Data Lake Analytics
FlumeでKafka for HDInsightとデータをやり取りしてみる
Microsoft AzureのHDInsightでKafkaが使えるようになったので、これをデプロイしてFlumeでデータをやり取りしてみます。
HDInsightのKafkaはこのへんのマイクロソフトのブログ記事を見てデプロイしておきます。
Kafka for HDInsight概要
デプロイ方法と使い方blogs.msdn.microsoft.com
今回は試してみるのみということで作業を楽にするためにFlumeバージョン1.7.0をHDInsightのヘッドノードにインストールして使います。
ヘッドノードにSSHログインしてFlumeをセットアップします。
Flumeのセットアップは下記のドキュメントを参照しました。
Flume 1.7.0 User Guide
https://flume.apache.org/FlumeUserGuide.html
Flumeのセットアップ
Flumeを動かすのに必要なJavaをインストールします。
$ yum install java-1.8.0-openjdk.x86_64
$ wget http://www-eu.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz $ cd /usr/local $ sudo tar xzvf /path/to/apache-flume-1.7.0-bin.tar.gz $ sudo mv apache-flume-1.7.0-bin flume $ cd flume
$ sudo cp conf/flume-env.sh.template conf/flume-env.sh $ sudo vim conf/flume-env.sh export JAVA_HOME=/usr/lib/jvm/jre
$ bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console
こんなのがぺろぺろと出てきたらOKだと思います。
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:155)] Shutdown Metric for type: SOURCE, name: seqGenSrc. source.start.time == 1484633482741
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:161)] Shutdown Metric for type: SOURCE, name: seqGenSrc. source.stop.time == 1484633483298
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append-batch.accepted == 0
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append-batch.received == 0
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append.accepted == 0
Kafkaにデータを送信する
ApacheのログをTailしながらKafkaに送信するように設定してみます。
設定ファイルはconf/flume-conf.properties.templateをconf/flume.confとかにコピーして作成します。
# 各機能の名称定義 agent.sources = s1 agent.channels = c1 agent.sinks = k1 # Exec Sourceでtail -Fコマンドを起動してApacheログをtail agent.sources.s1.type = exec #Flumeの実行ユーザーがこのPATHにアクセスできるように権限を設定する必要があります agent.sources.s1.command = tail -F /var/log/httpd/access_log agent.sources.s1.channels = c1 # Interceptor (ログの各フィールドをparseしたい場合はこれを使うが、headerに入るのでその後工夫が必要) agent.sources.s1.interceptors = i1 agent.sources.s1.interceptors.i1.type = regex_extractor agent.sources.s1.interceptors.i1.regex = ([^ ]*) ([^ ]*) ([^ ]*) \\[(.+)\\] \\"([^ ]*) ([^ ]*) ([^ ]*)\\" ([^ ]*) ([^ ]*) \\"([^ ]*)\\" \\"(.*)\\" agent.sources.s1.interceptors.i1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 agent.sources.s1.interceptors.i1.serializers.s1.name = host agent.sources.s1.interceptors.i1.serializers.s2.name = identity agent.sources.s1.interceptors.i1.serializers.s3.name = user agent.sources.s1.interceptors.i1.serializers.s4.name = time agent.sources.s1.interceptors.i1.serializers.s5.name = method agent.sources.s1.interceptors.i1.serializers.s6.name = request agent.sources.s1.interceptors.i1.serializers.s7.name = protocol agent.sources.s1.interceptors.i1.serializers.s8.name = status agent.sources.s1.interceptors.i1.serializers.s9.name = size agent.sources.s1.interceptors.i1.serializers.s10.name = referer agent.sources.s1.interceptors.i1.serializers.s11.name = agent # Channel (読み取ったデータを送出するまでのバッファ。テスト中はメモリで良いけど、本番ではDiskを使ったほうが安全かも) agent.channels.c1.type = memory # Sink (Kafkaに送るところを定義する) agent.sinks.k1.channel = c1 # Kafka Sinkのクラスを指定 agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 送り先のトピック名。作成済みであればそこに書くし、なければ勝手に作る仕様っぽいです agent.sinks.k1.kafka.topic = mytopic # Kafka for HDInsightのBrokerをカンマ区切りで記述(サーバー名は環境に合わせて読み替えること) agent.sinks.k1.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092 agent.sinks.k1.kafka.flumeBatchSize = 20 agent.sinks.k1.kafka.producer.acks = 1 agent.sinks.k1.kafka.producer.linger.ms = 1 agent.sinks.k1.kafka.producer.compression.type = snappy # Interceptorでログフォーマットをparseした場合はheaderに投入されるので、serializerをheader_and_textに指定 # これをやるとheaders{host=xx.xx.xx.xx, identity=xxx, ….} xx.xx.xx.xx - - みたいな形式でデータが送られる、でもあとでクエリ実行するためにはこのままでは使いにくいかも # Interceptorを書かずにいい感じでやれる方法があったら教えてください・・・m(_ _)m agent.sinks.k1.sink.serializer = header_and_text
Kafkaからデータを受信する
Kafka SourceでHDInsightのKafkaに接続して特定のTopicから取得します。
読んだデータはHDFS Sinkを使ってHDInsightに書き出します。
agent.sources = s2 agent.channels = c2 agent.sinks = k2 # Source (sourceのtypeプロパティにKafkaSourceクラスを指定) agent.sources.s2.type = org.apache.flume.source.kafka.KafkaSource agent.sources.s2.channels = c2 agent.sources.s2.batchSize = 5000 agent.sources.s2.batchDurationMillis = 2000 agent.sources.s2.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092 agent.sources.s2.kafka.topics = mytopic # Channel agent.channels.c2.type = memory agent.channels.c2.capacity = 100 # Sink (HDFSに書く場合) agent.sinks.k2.type = hdfs agent.sinks.k2.channel = c2 agent.sinks.k2.hdfs.path = /tmp/kafka/%y-%m-%d/%H%M/%S agent.sinks.k2.hdfs.filePrefix = events- # デフォルトではSequensceファイルになってる agent.sinks.k2.hdfs.fileType = DataStream # デフォルトではWritableフォーマットになってる agent.sinks.k2.hdfs.serializer = TEXT agent.sinks.k2.hdfs.round = trued agent.sinks.k2.hdfs.roundValue = 10 agent.sinks.k2.hdfs.roundUnit = minute