U-SQL 入門④ ~ FileSets ~

FileSets を使って複数のファイルを EXTRACT 文で読み込むことができます。

基本

以下のように 5 ファイルがある場合、

/input/2018-01-01.log
/input/2018-02-01.log
/input/2018-03-01.log
/input/2018-04-01.log
/input/2018-05-01.log

以下の EXTRACT 文で 5 ファイルをすべて読み込むことができます。また、{suffix} にマッチした部分を suffix 列に入れることができます。

@rs =
	EXTRACT
		user string,
		id string,
		suffix string
	FROM "/input/{suffix}.log"
	USING Extractors.Csv();

日付形式の対応

以下のように 5 ファイルがある場合、

/input/2018-01-01.log
/input/2018-02-01.log
/input/2018-03-01.log
/input/2018-04-01.log
/input/2018-05-01.log

以下の EXTRACT 文ではファイル名の日付情報を、DateTime 型の値として取得することができます。

@rs =
	EXTRACT
		user string,
		id string,
		date DateTime
	FROM "/input/{date:yyyy}-{date:MM}-{date:dd}.log"
	USING Extractors.Csv();

以下のようなフォルダ構成になっている場合、

/input/2017/01/01/data.txt
/input/2017/02/01/data.txt
/input/2017/03/01/data.txt
/input/2017/04/01/data.txt
/input/2017/05/01/data.txt

以下の EXTRACT 文でフォルダ構造から日付情報を、DateTime 型の値として取得することができます。

@rs =
	EXTRACT
		user string,
		id string,
		date DateTime
	FROM "/input/{date:yyyy}/{date:MM}/{date:dd}/data.txt"
	USING Extractors.Csv();

このデータに SELECT を実行する際に以下のように WHERE 句で DateTime 型の範囲でフィルターをかけることができます。

@rs =
	SELECT * FROM @rs
	WHERE 
		date >= System.DateTime.Parse("2018/1/1") AND
		date < System.DateTime.Parse("2018/2/28");

フィルターの効能

以下のようなファイルを読み込むとき、

/input/ja_data-01.log
/input/ja_data-02.log
/input/en_data-01.log
/input/en_data-02.log
/input/cn_data-01.log

以下の EXTRACT 文でファイル名の先頭にある文字列を {Market} に格納できます。

@rs =
	EXTRACT
		user string,
		id string,
		Market string
	FROM "/input/{Market}_{*}"
	USING Extractors.Csv();

SELECT 文を実行する際に以下のように Market 列でフィルターをかけると、/input/ja_data-01.log、/input/ja_data-02.log の 2 つのファイルのみが読み取られ、その他のファイルにはアクセスされないため、I/O 効率が良くなります。

@us =
	SELECT * FROM @rs
	WHERE Market == “ja" ;

U-SQL 入門③ ~ ファイルのアウトプット ~

U-SQL でのファイルのアウトプットは OUTPUT 文のTO 句 で PATH を指定して、 USING 句で Outputter を指定します。

OUTPUT @rows
    TO “/data.csv”
    USING Outputters.Csv();

Outputtersで書き込むファイルのフォーマットを指定します。

  • Outputters.Text(): フィールドのデリミタを指定して出力
  • Outputters.Csv(): CSV 形式を出力
  • Outputters.Tsv(): TSV 形式を出力
// ヘッダ付きの CSV 形式で出力する
OUTPUT @rows
    TO “/data.csv”
    USING Outputters.Csv(outputHeader: true);

// ダブルクォート無しで TSV 形式で出力する
OUTPUT @rows
    TO “/data.tsv”
    USING Outputters.Tsv(quoting: false);

// "|" で区切られたファイル形式で出力する
OUTPUT @rows
    TO “/data.txt”
    USING Outputters.Text(delimiter: '|');

Outputter 共通のパラメータ

パラメータ デフォルト値 指定できる値 説明
dateTimeFormat "o" datetime側のカラムがあった場合、このパラメータで出力フォーマットを指定できる。指定できるフォーマットはこちらに書いてある。
https://msdn.microsoft.com/library/az4se3k1(v=vs.110).aspx
encoding Encoding.UTF8 Encoding.[ASCII]
Encoding.BigEndianUnicode
Encoding.Unicode
Encoding.UTF7
Encoding.UTF8
Encoding.UTF32
ファイルのエンコード指定
escapeCharacter null エスケープ用の文字を指定
nullEscape null null 値と解釈する文字を指定
quoting true true
false
アウトプットするデータのフィールドを " (ダブルクォート) でくくるかどうか
rowDelimiter \r\n 行デリミタを指定。デフォルトではCRLF
outputHeader false true
ヘッダにフィールド名を付けるかどうか

U-SQL Built-in Outputters | Microsoft Docs

U-SQL 入門② ~ ファイルのインプット ~


U-SQL でのファイルのインプットは FROM 句で PATH を指定します。
※ストレージは ADLA のデータソースとして登録しておく必要があります。

ファイルインプットの例

// デフォルトの ADLS の PATH を指定する場合
@rows = 
     EXTRACT name string, id int
     FROM “/…/data.csv”
     USING Extractors.Csv();

// ADLS を指定する絶対パスの場合
@rows = 
     EXTRACT name string, id int
     FROM “adl://…/data.csv”
     USING Extractors.Csv();

// Blob ストレージを指定する場合
@rows = 
    EXTRACT name string, id int
    FROM  “wasb://…/data.csv”
    USING Extractors.Csv();

USING 句で Extractor を指定して、読み込むファイルのフォーマットを指定します。
ビルトインの Extractor は以下の 3 つがあります。

  • Extractors.Text() : ファイルを指定したデリミタで区切って読み取る
  • Extractors.Csv() : CSV ファイルを読み取る
  • Extractors.Tsv() : TSV ファイルを読み取る
// "|" で区切られたテキストファイルを読み込む。最初の 1 行目はスキップする
@rows =
	EXTRACT name string, id int,
	FROM "/file.text"
	USING Extractors.text(delimiter: "|", skipFirstNRows: 1)

// CSV ファイルを読み込む。カラム数が合わない場合やデータ型が合わない場合はその行をスキップする
@rows =
	EXTRACT name string, id int,
	FROM "/file.csv"
	USING Extractors.csv(silent: true)

// TSV ファイルを読み込む。ASCII エンコードを指定
@rows =
	EXTRACT name string, id int,
	FROM "/file.tsv"
USING Extractors.csv(encoding: Encoding.[ASCII])

Extractor の共通パラメータ

パラメータ デフォルト値 指定できる値 説明
encoding Engoding.UTF8 Encoding.[ASCII]
Encoding.BigEndianUnicode
Encoding.Unicode
Encoding.UTF7Encoding.UTF8
Encoding.UTF32
ファイルのエンコード指定
escapeCharacter null エスケープ用の文字を指定
nullExcape null null 値と解釈する文字を指定
quoting true true
false
カラムのフィールドの " (ダブルクォート) を考慮にいれるかどうか現在は " (ダブルクォート) 以外には対応していない
rowDelimiter "\r\n"
"\r"
"\n"
行のセパレータを何にするか。デフォルトでは改行コードが指定される
silent false true
false
指定したカラム数と違うカラム数の行が見つかった場合や指定したものと異なるデータ型があった場合に、スキップして読み込むか、それともエラーを吐いて処理を止めるか
skipFirstNRows 0 読み込むときに先頭から指定した行数だけスキップする


U-SQL Built-in Extractors | Microsoft Docs

U-SQL 入門① ~ U-SQL の基本形 ~

Azure Data Lake Analytics の U-SQL というクエリ言語についてメモ用に基本的な事をまとめていきます。

Azure Data Lake そのものについてや構築方法等はこちらのブログ (Data Platform Tech Sales Team Blog | Azure Data Lake)を見るとわかりやすいと思います。

 

U-SQL とは?

  • SQLC# の拡張を備えたクエリ言語
  • 主にデータの変換や分析のために使われる
  • 構造/非構造化データを取り扱える
  • 分散処理されるため大容量のデータセットに対してのクエリが可能

U-SQL の基本形

@searchlog =
   EXTRACT UserId          int,
          
Start           DateTime,
           Region          string,
           Query           string,
           Duration        int?,
           Urls            string,
           ClickedUrls     string
          
FROM "/Samples/Data/SearchLog.tsv"
  
USING Extractors.Tsv();
 
@rs1 =
  
SELECT Start, Region, Duration
    
FROM @searchlog
  
WHERE Region == "en-gb";
 
OUTPUT @rs1  
  
TO "/output/SearchLog-transform-rowsets.csv"
  
USING Outputters.Csv();

それぞれの個所で以下のような動作をする。

EXRACT の個所

  • /Samples/Data/SearchLog.tsv ファイルを読み込む
  • Extractors.Tsv() でタブで区切る
  • UserId, Start, ... と列名を付与し、データ型も定義
  • @searchlog という変数にセット

SELECT の個所

  • @searchlog に対して SELECT を実行
  • Region 列の値が en-gb のデータにフィルタ
  • Start, Region, Duration 列のみを選択
  • @rs1 変数にセット

OUTPUT の個所

  • @rs1 にセットされた SELECT の結果を /output/SearchLog-transform-rowsets.csv というファイルに書き出す
  • Outputters.Csv() によって CSV フォーマットに変換される

 

Azure Data Lake Analytics

azure.microsoft.com

FlumeでKafka for HDInsightとデータをやり取りしてみる

Microsoft AzureのHDInsightでKafkaが使えるようになったので、これをデプロイしてFlumeでデータをやり取りしてみます。
HDInsightのKafkaはこのへんのマイクロソフトのブログ記事を見てデプロイしておきます。

Kafka for HDInsight概要

blogs.msdn.microsoft.com

デプロイ方法と使い方blogs.msdn.microsoft.com

 

今回は試してみるのみということで作業を楽にするためにFlumeバージョン1.7.0をHDInsightのヘッドノードにインストールして使います。
ヘッドノードにSSHログインしてFlumeをセットアップします。
Flumeのセットアップは下記のドキュメントを参照しました。

Flume 1.7.0  User Guide

https://flume.apache.org/FlumeUserGuide.html

 

Flumeのセットアップ

Flumeを動かすのに必要なJavaをインストールします。

$ yum install java-1.8.0-openjdk.x86_64
 
Flumeをインストールします。
$ wget http://www-eu.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
$ cd /usr/local
$ sudo tar xzvf /path/to/apache-flume-1.7.0-bin.tar.gz
$ sudo mv apache-flume-1.7.0-bin flume
$ cd flume
 
JAVA_HOMEをflume-env.shに設定します。
$ sudo cp conf/flume-env.sh.template conf/flume-env.sh
$ sudo vim conf/flume-env.sh
export JAVA_HOME=/usr/lib/jvm/jre
 
動作テストします。
$ bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console

 

こんなのがぺろぺろと出てきたらOKだと思います。

2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:155)] Shutdown Metric for type: SOURCE, name: seqGenSrc. source.start.time == 1484633482741
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:161)] Shutdown Metric for type: SOURCE, name: seqGenSrc. source.stop.time == 1484633483298
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append-batch.accepted == 0
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append-batch.received == 0
2017-01-17 06:11:23,298 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: SOURCE, name: seqGenSrc. src.append.accepted == 0

 

Kafkaにデータを送信する

FlumeのKafka Sinkを設定します。
ApacheのログをTailしながらKafkaに送信するように設定してみます。
設定ファイルはconf/flume-conf.properties.templateをconf/flume.confとかにコピーして作成します。
# 各機能の名称定義
agent.sources = s1
agent.channels = c1
agent.sinks = k1
# Exec Sourceでtail -Fコマンドを起動してApacheログをtail
agent.sources.s1.type = exec
#Flumeの実行ユーザーがこのPATHにアクセスできるように権限を設定する必要があります
agent.sources.s1.command = tail -F /var/log/httpd/access_log
agent.sources.s1.channels = c1
# Interceptor (ログの各フィールドをparseしたい場合はこれを使うが、headerに入るのでその後工夫が必要)
agent.sources.s1.interceptors = i1
agent.sources.s1.interceptors.i1.type = regex_extractor
agent.sources.s1.interceptors.i1.regex = ([^ ]*) ([^ ]*) ([^ ]*) \\[(.+)\\] \\"([^ ]*) ([^ ]*) ([^ ]*)\\" ([^ ]*) ([^ ]*) \\"([^ ]*)\\" \\"(.*)\\"
agent.sources.s1.interceptors.i1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11
agent.sources.s1.interceptors.i1.serializers.s1.name = host
agent.sources.s1.interceptors.i1.serializers.s2.name = identity
agent.sources.s1.interceptors.i1.serializers.s3.name = user
agent.sources.s1.interceptors.i1.serializers.s4.name = time
agent.sources.s1.interceptors.i1.serializers.s5.name = method
agent.sources.s1.interceptors.i1.serializers.s6.name = request
agent.sources.s1.interceptors.i1.serializers.s7.name = protocol
agent.sources.s1.interceptors.i1.serializers.s8.name = status
agent.sources.s1.interceptors.i1.serializers.s9.name = size
agent.sources.s1.interceptors.i1.serializers.s10.name = referer
agent.sources.s1.interceptors.i1.serializers.s11.name = agent
# Channel (読み取ったデータを送出するまでのバッファ。テスト中はメモリで良いけど、本番ではDiskを使ったほうが安全かも)
agent.channels.c1.type = memory
# Sink (Kafkaに送るところを定義する)
agent.sinks.k1.channel = c1
# Kafka Sinkのクラスを指定
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 送り先のトピック名。作成済みであればそこに書くし、なければ勝手に作る仕様っぽいです
agent.sinks.k1.kafka.topic = mytopic
# Kafka for HDInsightのBrokerをカンマ区切りで記述(サーバー名は環境に合わせて読み替えること)
agent.sinks.k1.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
agent.sinks.k1.kafka.flumeBatchSize = 20
agent.sinks.k1.kafka.producer.acks = 1
agent.sinks.k1.kafka.producer.linger.ms = 1
agent.sinks.k1.kafka.producer.compression.type = snappy
# Interceptorでログフォーマットをparseした場合はheaderに投入されるので、serializerをheader_and_textに指定
# これをやるとheaders{host=xx.xx.xx.xx, identity=xxx, ….} xx.xx.xx.xx - - みたいな形式でデータが送られる、でもあとでクエリ実行するためにはこのままでは使いにくいかも
# Interceptorを書かずにいい感じでやれる方法があったら教えてください・・・m(_ _)m
agent.sinks.k1.sink.serializer = header_and_text
 

Kafkaからデータを受信する 

データをKafkaに送ったら、今度は読み取ってみます。
FlumeのKafka Sourceを設定します。
Kafka SourceでHDInsightのKafkaに接続して特定のTopicから取得します。
読んだデータはHDFS Sinkを使ってHDInsightに書き出します。
agent.sources = s2
agent.channels = c2
agent.sinks = k2
# Source (sourceのtypeプロパティにKafkaSourceクラスを指定)
agent.sources.s2.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.s2.channels = c2
agent.sources.s2.batchSize = 5000
agent.sources.s2.batchDurationMillis = 2000
agent.sources.s2.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
agent.sources.s2.kafka.topics = mytopic
# Channel
agent.channels.c2.type = memory
agent.channels.c2.capacity = 100
# Sink (HDFSに書く場合)
agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c2
agent.sinks.k2.hdfs.path = /tmp/kafka/%y-%m-%d/%H%M/%S
agent.sinks.k2.hdfs.filePrefix = events-
# デフォルトではSequensceファイルになってる
agent.sinks.k2.hdfs.fileType = DataStream
# デフォルトではWritableフォーマットになってる
agent.sinks.k2.hdfs.serializer = TEXT
agent.sinks.k2.hdfs.round = trued
agent.sinks.k2.hdfs.roundValue = 10
agent.sinks.k2.hdfs.roundUnit = minute
 
HDInsightだとBlobに書かれます。agent.sinks.k2.hdfs.pathに「hdfs://~~~~~」と明示的に指定するとワーカーノードのお腹のディスクで構成されているHDFSに書かれます。
気が向いたら別のツール(fluentdやlogstashなど)も試してみようかと思います。