ソフトウェアデザイン2月号はHadoopのはなしです

Riak CSは以前にも紹介したとおり、S3とほぼ同じAPIを持っているのでMapReduceに入っているjets3tがそのまま使えるので、HDFSの代わりにRiak CS使ってみようよという試みです。以前ぼくもHDFSにデータを出し入れしようとしたら結構たいへんだったので、Riak CSが使える場面もまあまああるかもしれない。一方でMapReduceは、分散処理においては他に選択肢がない。最近でこそいろいろ出てきたがやはり鉄板でして…

他に選択肢がないとは、どういうことかというと、自分たちの環境やユースケースに合わず多少使いにくくても我慢して使わなければならないということ *1。特にHDFSバッチ処理のための分散ファイルシステムとしては非常に優れており、高性能で安定していたが、ネームノードを特別扱いする運用が複雑になったり、ユーザー管理の仕組みがあとづけであったり、インターフェースが独自のものであったりと、通常のファイルシステムには、使いやすさは及ばない(当たり前なのだけど)。特に、比較的小さなファイルを大量に管理したい場合にはネームノードのメモリが不足しがちになる。これに対して、市場ではいくつかの分散ファイルシステムが登場した。GlusterFSはその筆頭でしょう。また、AmazonのS3もサービスであるが、これは技術的には分散ファイルシステムで構成される(と想像される)。実際にHadoopが必要な場面で、HDFSである必要がない場合もあります。HDFSが特に有効なのはI/O分散が可能だからで、I/Oインテンシブなワークロードに対して絶対的な優位性を今でも持っている。

実はMapReduceが動くのでHiveも動くと想像されるのだけど、Hive力が足りずなかなか手を動かせていない状態。黄色い象蜂が得意な人誰か試してみてくれないかなあ。

追記

同僚がErlang/OTPの入門記事を書いているので、そちらもどうぞ。まあ言語というよりは処理系の宣伝になってしまっていますが…

追記2: あわせて読みたい

Hadoop 第3版

Hadoop 第3版

*1:もちろん、自分が便利なようにパッチを送る、別のものを自分で作る等の選択肢はあ るし、Hadoopコミュニティは多様なニーズに適応しようと努力している。しかし、これではあまりに時間と手間がかかる場合がある

poolcatというライブラリを公開した

以前gen_queueというライブラリを公開したが、またライブラリ公開のお知らせだ。今回作ったのはpoolcatという、いかにもpoolboyからパクったような名前であるが、poolboyはネーミングの参考になった。

これも大したものではないのだが、ワーカープール+タスクキューのようなものをやるライブラリを見つけられず、どうも同じものを何度も再実装していたのでgen_queueを使ってライブラリ化してみたら思いの他便利で、同じレポジトリの3箇所で既存のコードを大幅に削ることができた。デバッグの手間も省けるというものだ。改めてOTPの勉強になったという…ちょっと調べた限りだとRabbitMQの中にも似たような機能があるし、Riak CSの中にもある。たしかRiakの中にも似たようなものを作ってるはずで…と、まあ再発明よくないですよね。という結論に。

poolboyじゃダメなん?

poolboyはコネクションプーリングなど、「使われる」リソースの側をリミットをつけつつプールしておくためのものなので、今回のように「使う」側にリミットをつけるのが目的なので、poolboyだとそれなりにワークアラウンドが必要になる(というか、どうやってやるんだ?)ので、まあどうせ小さなコードだろうし素振りも兼ねて作ってみたのである。

なぜタスクキューが必要なのか?

Erlangは軽量プロセスが売りなのであって、タスクを非同期に処理したければ軽量プロセスをspawnしておけばあとはrunqueueがよしなにスケジューリングしてくれる…と思っていた時期が私にもあった。実際にやってみると、Erlang VMのスケジューリングは(当たり前だが)まあまあフェアなので、例えば10万個のタスクを同時にspawnすると、起動した軽量プロセスの中身によっては例えば10万個のファイルディスクリプタやTCPコネクション、それなりのメモリを確保しようとプロセスが動くわけで(ちなみに一番困るのはエフェメラルポートを使われるケース)、トータルでの性能は劇的に下がってしまう。これを防ぐためにはいくつか方法があると思うのだけど、ちょろちょろっと試行錯誤した結果やはり伝統的なこのモデルが便利だなあということに今更ながら気づいたのであった。それじゃErlang使ってる意味ねーじゃん!というツッコミに関していうと、軽量プロセスはすごいものでたとえばConsumerになる軽量プロセスそのものは100万くらいになってもおそらくErlang VMそのものはビクともしないのでやっぱりいつか便利になる時がくるだろうと信じている。

アイディア自体はここ1週間くらいなのだけど、実装は子供の相手をしたりしなかったりしながら、土曜日の1日でできてしまった。こういうしょうもないライブラリを作るのとかは得意だし、それでも1日でできるようになったというのは腕が上がったなあとか、転職した甲斐があったものだと思う。これを使ってちょっとしたサービスを作ろうと思っているのだけど、さすがにまだ無理かなあ。

2014年になりました

この手の話は遡ってみると2012年の正月に書いたもの以来ということになるようだ。この2年何があったかというとまあ転職して仕事でErlang/OTPを書くようになったとか、転職して仕事の考え方が大きく変わった(正確にいえば、なんとなく感じていたものが確信と実践になった)くらい。
そういえば書き納めと書き初めはOCamlでした。みなさまに置かれましては昨年と変わらぬご愛顧のほどをいただければと思います。

時間をとってくる関数

Trouble with timestampsというのを見ていろいろ思い出したので。

clock_gettime(2) もあるし、 gettimeofday(2) もあるのだけど後者はカーネルの内部でロックをとっていたように思う。mktime(3) はダメなの。中でファイルIOにロックをとっているから。僕が書いたわけではないのだけど、今年一番のヒットだったパッチを調べてるときに、カーネルとかlibcの中をいろいろ見ていると、そのプロセスのロケールが書いてあるファイルを開くためにロックが必要だったという話で、まあ誰も使っていないのだけど、中で mktime(3)を使っている httpd_util:rfc1123_date/1 とか erlang:localtime_to_universaltime/2 を使ってはいけないということをメモしておこう。

あとはErlang使いなら必須の知識として、erlang:now() ではなく os:timestamp() を使いましょうというのも、いつも忘れてググるのに時間がかかるのでここにメモしておく。 

あとは gettimeofday(2) というのあったのだけど、これがなんでダメになったのかは思い出せない…monotonic clockを使う場合にはよかったのだけどロックをとってるんだかmonotonicじゃないだかでやめた気がするのだが詳しいエロい人おしえてください。

終わってからなんだけど、ずっと下書きになってたこれをRiakのアドベントカレンダーに持っていけばよかったわ。
 

追記 2014/8/15

httpd_util:rfc1123_date/0 もダメだった。 https://gist.github.com/kuenishi/c9867a6f219343344f51


gist:c9867a6f219343344f51

Riak 2.0のCRDTで遊ぶ

今日もRiakアドベントカレンダーの記事だよ! GLEE見てると"Call Me Maybe"という曲があって、それがなんとなくお気に入りな昨日今日です。今日はWebDB Forum 2013で話してきたCRDTの話。

CRDTって何よ?という人多いと思うので当日僕が話したスライドを見てほしい。

まとめると、絶対にデータをなくしたくないし可用性を下げたくないための可換データ型をRiakにネイティブで実装したよという話です*1。で、ちょっと使ってみようと。

Riak 2.0を落としてきて動かす

$ git clone git://github.com/basho/riak
$ cd riak
$ git checkout riak-2.0.0pre7 -b crdttest
$ make rel
$ ulimit -n 4096
$ rel/riak/bin/riak start

これで多分起動したはず。ここまでは簡単... riak-admin は riak と同じパスにあるので適当に読み替えて。

$ riak-admin bucket-type list
$ riak-admin bucket-type create stests '{"props":{"datatype":"set"}}'
stests created
$ riak-admin bucket-type activate stests
stests has been activated
$ riak-admin bucket-type list
stests (active)

これで or-set が使えるようになった。

HTTPで叩いてみる

$ curl -X POST http://localhost:8098/types/stests/buckets/s/datatypes/k \
    -H 'content-type: application/json' -d '{"add":"foobar"}'
$ http localhost:8098/types/stests/buckets/s/datatypes/k                               
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Length: 121
Content-Type: application/json
Date: Sun, 22 Dec 2013 12:34:56 GMT
Server: MochiWeb/1.1 WebMachine/1.10.5 (jokes are better explained)
Vary: Accept-Encoding

{
    "context": "SwEIGINsAAAAAWgCbQAAAAgjCf75UrbcDWEBagAAAAcBZm9vYmFyDYNsAAAAAWgCYQFhAWo=", 
    "type": "set", 
    "value": [
        "foobar"
    ]
}

これがすごいのは、多少ネットワークが切れようがサーバーが落ちようが成功したWriteとDeleteは絶対になくさないところ。Vector Clocks + Sibling Resolutionだとプログラミングが面倒なのだけど、今度は複数の値が読めるということもないので、ちょっと特性を理解しておけば今までのコードがほぼそのまま流用できるはずなので、みんなぜひ試してみてね!

参考

Riak SCR 14回のネタをほぼ流用しているので興味ある人はそちらもどうぞ。Riakのリビジョンはちょっと古いので注意。

*1:世の中にはWriteがなくなるデータベースというもの沢山ありますね^^;

Riak CSのデータをHiveに取り込むなど

みんなでやるRiakアドベントカレンダーの記事。昨年はそういえば辛かったのだけど、今年は沢山の人が参加してくれてとても嬉しいのである。…といいつつ準備するネタがなくてエントリーするのは躊躇していたのだけども。

プログラミング Hive

プログラミング Hive

目的

Riak CSはサーバーを集めてなんとなくクラスタ化するとREST APIでアクセスできるという謎のソフトウェアストレージだ。近い製品や似た製品、ライバル製品はいろいろあるが混乱を呼ぶのでここでは書かない。とはいえデータストアだけあってもこのクラウド時代にデータ貯めるだけじゃあつまらない。プログラムは処理系とデータが集まってこそ使える…ということで、安直な分散処理システムということでHadoopとくっつけて使うのは次のソフトウェアデザイン2014年2月号で出てしまうので、今度はHiveとくっつけて使ってみようという目論見である。

誰得なのか

Hiveといえばアドホッククエリとかバッチ処理をHiveQLというクエリ言語の処理系である。一番よく表に出てくる使い方はログ解析の処理系に使ってアクセス解析とかサイト改善とか、BIツールの代わりに使う場合もある。MapReduceにHDFSを使ってI/O分散をするケースじゃない場合とかだとまあ別にHDFSじゃなくてもいいよねということはある。…と思っていたら、Riak Meetup Tokyo #3mixiの事例 を出してもらえることになったけど、まあそういう使い方ができる。

Riak CSのインストールとか動作確認

Gitから落としてきてソースコードから入れる 方法もあるが、RPMやYumのパッケージから入れる方がよい。理由はErlang/OTPのバージョンにRiakが追随できていないので、古いR15B0xを使わないといけなくてハマったりするからだ。まあ特にハマるところはないという前提で。いずれにせよ、ダウンロードページから落としてきて適当に入れて設定してもらえばよい。健闘を祈る。

このとき、 cs_root_host とかポート番号はデフォルトのままにしておくとよい。

$ s3cmd mb s3://hive
Bucket 's3://hive/' created
$ s3cmd ls 
2013-12-21 13:17  s3://hive

Hive のインストールとか動作確認

これは本当に大変だった。

Hadoop のインストールと用意

これは必要かどうかはちょっとわかっていない。ぼくの環境がそうなっているだけ、ということなので。Hadoop 1.2.1を理研のミラーから落としてきて入れてある。解凍したものは普通にHadoopをセットアップする要領で入れていこう。ただし、JobTrackerやTaskTrackerを上げておく必要はない(ふつうにHiveがローカルモードで動作してくれるっぽい、よくわからない)。

S3にアクセスするので jets3t.properties を conf ディレクトリに入れておく必要がある。よくわからないけど紆余曲折の末こういう感じになった:

storage-service.internal-error-retry-max=5
storage-service.disable-live-md5=false
threaded-service.max-thread-count=20
threaded-service.admin-max-thread-count=20
s3service.max-thread-count=20
s3service.admin-max-thread-count=20
s3service.https-only=false
httpclient.proxy-host=localhost
httpclient.proxy-port=8080

ローカルで動かすので SSL はデフォルトで使わないようにしたりしてある。他のものは特に根拠はなので、なくても動くだろう。$HADOOP_HOMEなどが正しく設定されていないとHiveも多分うごかない。アクセスキーなどをhadoop-site.xmlにS3の場合と同様に書いておく必要がある。うまく設定できていると

$ hadoop jar mapreduce-examples.jar wordcount s3n://foobar/bar /tmp/output

とかそういうのが動いてくれる。このあたりの方法はソフトウェアデザインの来年2月号にもう少しだけ詳しく書いたので、そちらを見てもらいたい。ぜひとも買ってもらいたい。

Hiveのインストールと用意

Hiveも理研のミラーから 0.12.0 を落としてきて解凍。途中までふつうに設定しておく。まずはローカルで動かすのにかなり苦労したのだけど(MySQLなんてやるんじゃなかった)…HIVE_HOMEとかmetastoreとかは、普通にそこまではできたとして。 conf にも念のため jets3t.properties を同じものを入れておく。hive-site.xml はこんな感じ。

<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>/home/kuenishi/hadoop/hive/warehouse</value>
  <description>location of default database for the warehouse</description>
</property>

元データは @ITのHive記事 の data.tar.gz を使う。この中に perf.csv というのがあって、都道府県の番号と都道府県名が並んでいるだけだ。これをあとでロードするが、とりあえずローカルのファイルからテーブルを作ってみる。 data.tar.gz を解凍すると localfiles というディレクトリができて驚いた。

まずは外部テーブルとして登録してみる。

$ bin/hive
> create database postal;
OK

>CREATE TABLE pref (id INT, name STRING)                         
    >ROW FORMAT DELIMITED                                            
    >FIELDS TERMINATED BY ','                                        
    >LINES TERMINATED BY '\n'                                        
    > STORED AS TEXTFILE
    > LOCATION '/home/kuenishi/hadoop/localfiles'
    > ;

クエリをかける

> select * from pref where id = 5;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Execution log at: /tmp/kuenishi/.log
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2013-12-21 22:25:50,183 null map = 50%,  reduce = 0%
Ended Job = job_local1599025941_0001
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
5       秋田県
Time taken: 7.819 seconds, Fetched: 1 row(s)

一行引いてくるだけなのに大袈裟w 無事にHiveはとりあえず動いた。注意しておきたいのは、 warehouse のディレクトリはローカルにあるので大きなデータを置かないようにしないといけないという点。

Riak CS を叩こうとしてみる

まずは同じデータをRiak CS上に置く。

$ s3cmd put pref.csv s3://hive/ext/
$ s3cmd ls -r s3://hive/ext/
2013-12-21 13:27       611   s3://hive/ext/pref/pref.csv

これを外部テーブルにする

    >CREATE TABLE pref2 (id INT, name STRING)                        
    > ROW FORMAT DELIMITED                        
    > FIELDS TERMINATED BY ','                    
    > LINES TERMINATED BY '\n'                    
    > STORED AS TEXTFILE                          
    > LOCATION 's3n://hive/ext/pref'              
    > ;
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).)

おおダメか、と、 hive-site.xml の設定に失敗しているとこれが出るのでhive-site.xmlに追加しておく。

<property>
  <name>fs.s3n.awsAccessKeyId</name>
  <value>J_PJJS7NEHXJSQQboom!</value>
</property>
<property>
  <name>fs.s3n.awsSecretAccessKey</name>
  <value>xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx==</value>
</property>

これを書いて一旦Hiveをログアウトして、もういちどコマンドを起動する。

hive (default)> use postal;
OK
Time taken: 0.016 seconds
hive (postal)> CREATE TABLE pref2 (id INT, name STRING)
             > ROW FORMAT DELIMITED
             > FIELDS TERMINATED BY ','
             > LINES TERMINATED BY '\n'
             > STORED AS TEXTFILE
             > LOCATION 's3n://hive/ext/pref'
             > ;
OK
Time taken: 0.686 seconds

キタ━(゚∀゚)━! 今度はクエリを投げる

hive (postal)> select * from pref2 where id=30;        
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Execution log at: /tmp/kuenishi/.log
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2013-12-21 22:36:26,998 null map = 0%,  reduce = 0%
Ended Job = job_local1077531583_0001
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
30      和歌山県
Time taken: 10.698 seconds, Fetched: 1 row(s)
hive (postal)> 

ここまで、思ったより簡単に動いてしまった…

まとめ

  • Riak CS上のデータをHiveの入力にするのは思ったより簡単だった
  • warehouse置き場はできればHDFSがよい(データ入力バウンドな処理なら)。
  • CPUバウンドとかReduceバウンドなクエリが多いならwarehouseを置いてもよいかも…
  • 必要に応じてインポートするべし

なんとか日付変わる前にできた。

番外

warehouse をRiak CS に置いても大丈夫そう…か?

<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>s3n://hive/warehouse</value>
  <description>location of default database for the warehouse</description>
</property>

だけど、データのインポートを完全に試したわけじゃないのであまりオススメはできない。

ソフトウェアデザイン1月号(2014)もRiak CSとRiakの話

先月も宣伝したが、今月も引き続きRiak CSとRiakの話。分散システムでどうやって削除を実用的に構築するかという非常にマニアックな話と、Riakの上で時系列データをどう扱えばいいかの2本立て。MapReduceの実例もあるので非常によいと思うよ!

はい、非常に混みいった話になってしまい反省しております…

msgpack-erlang 0.2.7 をリリースして新仕様に対応しました

Kyle Academy new extension

MessagePackハッカソン #3が開催され、Bashoで場所を提供したので僕も久しぶりになんかやることにして、そういえばmsgpack-erlang新仕様の実装が途中だったのでやることにした。で、できた。というわけで、さまざまな事件を乗り越えて策定された新仕様をErlangのサンプルコードつきで解説しよう。新仕様の目玉はふたつあるので、それぞれ解説しておきたい。

文字列型

これまでMessagePackは「ほぼJSON互換」だったのだが、その正体は、Cらしくバイナリ型(raw)を <<101XXXXX>> とか 0xDA, 0xDB で表現していた。一方JSONにはバイナリ型はなくて、文字列はUnicodeでなければならないので、生のバイナリをそのままJSONに持っていくことはできなかった。で、新しい仕様ではこれまでバイナリ型に割り当てられていたところを含めて、新しく文字列型をつくった( <<101XXXXX>>, 0xD9, 0xDA, 0xDB)。新仕様では、バイナリ型には新しく 0xC4, 0xC5, 0xC6 が割り当てられた。

で、ライブラリを作る立場としては、まあ最初はオプショナルでこれを実装しようということになる。使い方は簡単で、 enable_str というオプションを一緒に渡してやるだけ(公式のドキュメントは間違っている。私がまだ修正していない)。

  {ok, Bin} = msgpack:pack("埼玉", [{enable_str=true}]),
  {ok, "埼玉"} = msgpack:unpack(Bin, [{enable_str=true}]).

ちなみに、デフォルトではこれは false になっていて、引数を省略したらどちらもこれまでと同様の動作をする。また、これを true にした場合は変換を試みるので自動的に Unicode のバリデーションは走ることになる。失敗するとコケるはず。確か。

拡張型

MessagePackがサポートするのは配列とかMap型とか整数とかそういったプリミティブな型ばかりだが、使っているうちにもうちょっとリッチな型というものがほしくなる。たとえば日付、時刻、その他オレにとっては便利な○○…文字列も初めはそうだったのだが。基本方針は以前と同様「ユーザー側でMessagePack上で好きなように規約やライブラリを作ってやってよー(バイナリレベルではやらない)」だったのだけど、やはり効率重視なのでシリアライザの層でちょっとは効率的にできるようになってほしい。というわけで、ユーザー定義の拡張型を仕様として用意することにした。ライブラリはまだ全然できてないので注意。で、Erlangではユーザー定義型のシリアライザとデシリアライザを関数として渡してやる。

Packer = fun({ref, Ref}, Opt) when is_ref(Ref) -> {ok, {12, term_to_binary(Ref)}} end,
Unpacker = fun(12, Bin) -> {ok, {ref, binary_to_term(Ref)}},
Ref = make_ref(),
Opt = [{ext,{Packer,Unpacker}}],
{ok, {ref, Ref}} = msgpack:unpack(msgpack:pack({ref, Ref}, Opt), Opt).

上の例だと、Erlangではよく使う参照(UUIDのようなもの)はこれまでのMessagePackではシリアライズできなかったのだが、拡張型を使ってコード(ここでは12)を割り当てることができるようになる。これで、すべてのErlang組み込み型がシリアライズできるようになるはず(もう少ししたら便利ライブラリを付け足すはず…)で、BERTやt2bでは多言語とのやりとりが大変だったけど、MessagePackと使い分けるのは面倒だったごく一部の人には朗報かと思います。

あと、拡張型を扱うためのbehaviour (msgpack_ext) も作った。ユーザーの好きなモジュールで、 pack_ext と unpack_ext を実装してbehaviourを定義すればatomを渡すだけで使えるようになる。

-behaviour(native_example).

pack_ext({native, Term}, _) when is_pid(Term) orelse
                                 is_reference(Term) orelse
                                 is_port(Term) orelse
                                 is_tuple(Term) orelse
                                 is_function(Term) ->
    {ok, {42, term_to_binary(Term)}}.

unpack_ext(42, Bin) ->
    {ok, {native, binary_to_term(Bin)}}.

とかしておけば、

  {ok, Bin} = msgpack:pack({native, make_ref()}, [{ext, native_example}]),

とすれば拡張型を使えるようになる。もちろん、既存のリストやMap型に入れ込んでも普通にデコードされる(はず…)。まだproperで網羅的なテストは書けていないので、有志のヘルプ求む。というか、この記事は普通にどっかに英語で書くべきだろうな…

Let's Do MessagePack!!