ひとりZooKeeperコードリーディング(1) クライアント編

さて準備ができたので読み始めることにする。この手のシステムは大体の機能が分かっているならAPIを見るのが手っ取り早い。もろもろの名前でなんとなく雰囲気と使い方が分かるので下手にドキュメントだけを読み込むよりもよい場合もある。Javadocだとクラス毎になってしまうし*1慣れているのもあってC APIというかヘッダにまずは目を通す。基本的には

  • znodeのCRUD
  • それのsync, async, w系のAPI
  • initialize / finalize
  • multiops (3.4+

などである。特徴的なのはZOO_SEQUENCEとかZOO_EPHEMERAL。Chubbyの論文にはEphemeral fileというのが出てくるから後者は分かりやすいけど、前者はZK特有。ACLなんかは気になるけどとりあえず本質的ではないので後回し。

ZooKeeper.java

というわけでExampleを見るとZooKeeperオブジェクトをnewしていろいろ取り回すようだ。ZooKeeper.javaの冒頭のコメントは

  • ZooKeeperのサービスを使うにはこのインスタンスのメソッドをいろいろ呼ぶ
  • 特に何も書かれてないのなら各メソッドはスレッドセーフ
  • サーバーとセッションがつながるとIDが割り当てられてハートビートする
  • セッションIDが有効な限りZooKeeper APIを呼べる
  • ハートビートを送れないとサーバーでセッションが無効になって、そのオブジェクトはもう使えなくなる
  • セッションを再接続するにはnewしてネ
  • セッションが切れる前に怪しくなったときに、クライアントは他のサーバーに再接続しようとする。成功すればそのセッションを使い続けることができる
  • APIは同期・非同期それぞれある。非同期APIはコールバックオブジェクトを仕掛けて結果を得る
  • サーバー上のznodeに"watch"とかいうものを仕掛けることができる
  • 全てのwatchはただ一度だけ(only once)実行されるが、サーバーとの接続が怪しくなって再接続が起きると消えることがある(そりゃそうだわな
  • undelivered eventsを察知するために、コネクションが切れたときのためのwatchを仕掛けることができる

といった感じ。ほー分かりやすい。ソースなんてわざわざ読まなくてもいいんじゃないか。※ここでのセッションはTCPのセッションとは異なる、ZooKeeperのセッションのことを指す。どうも英語でTCPのセッションを指すときはコネクションと言うらしい。紛らわしいので要注意。

ZooKeeperクラスのコンストラクタは2つある。シグネチャから察するにACLの認証してくれる版としてくれない版だ。認証が必要ないAPIをわざわざ残しているということは、ACLなしでのサーバーセットアップが可能か、もしくは特定の部分だけ認証なしにACLを設定できるかのどちらかだろう。大して変わらないけどここでは後者を追っていくことにする。で、何をやっているかというとClientCnxnをnewしてstart()している。
あ、その前にConnectStringParserというのをnewしている。ZooKeeperのホストは "192.168.0.1:2181,192.168.0.2:2181,192.168.1.1:2181" といった風に指定するのだが、どうも '/' を渡すとそこにChrootして接続してくれるみたいだ。なんというキモい機能。Cクライアントでもついてたら嬉しいな(要確認)。
次にnewされているStaticHostProviderはパーズして配列に入れてシャッフル。これで負荷分散をしているようだ。getClientCnxnSocket()で特に何も設定していなければClientCnxnSocketNIOというのを持ってくる。あれ、しかしこのクラスはnewInstance()というメソッドがないみたいだ。そもそもJavaでネットワークIOするときのお作法が分かってないのが問題か。依存ライブラリにnettyが入ってるからnetty使ってると思うのだが。
ああ、ちなみにコンストラクタの引数のsessionTimeoutの単位はミリ秒なので注意すること。注意すること…orz watcherにはセッションまわりで引っ掛けたいのをまとめたオブジェクトを渡す。
ClientCnxnはZooKeeperクラスが持ってるものをそのままもらってくる。メンバにsessionIdというのがあるのだけど初期値は0。長さはlong。

        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;

        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

おお、これは…。おそらく、上から順にTCP接続のタイムアウト。台数で割ってるから、例えば5台構成で1000msにすると一台あたり200msしかないがいいのか。readTimeoutというのは、想像だが、read系のAPIのタイムアウト時間だろう。2/3とかどういう意味があるんだろう。で、おそらく送信用のスレッドとwatcherのイベント処理用のスレッドをひとつずつ立ち上げている。まそんなもんか。あれ、readのスレッドはなくていいのかな。SendThreadはClientCnxnの内部クラスでStatusをCONNECTINGに変更してる。
で、ClientCnxnをstartして終わっている。あれ、connect(2)は‥? というわけで次。

dm = new DataMonitor(zk, znode, null, this);

DataMonitor.javaをみると、WatcherとStatCallbackを継承している。コンストラクタの中で

        zk.exists(znode, true, this, null);

している。これはノードの存在を確認して、それとアトミックにwatcherを仕掛けるAPI。どうしてこうなっているかというと、existsとset_watcherの二段設定になっていると、existsとset_watcherの間にイベントがトリガーされたら届かないので困るのを防ぐことができる。
…うーんペースが遅いのでもうちょっと巻いていこう。

ZooKeeper#exists

直接呼んでるところからもうちょっと入ったところにある。ざっくり何をしているかというと

  • WatchRegistrationのインスタンスを作ってMap>を持っておく
  • RequestHeaderというクラスを作ってOpCode.existsをセット
  • ExistsRequestのインスタンスを作って
    • serverPathを設定
    • setWatch(bool)
  • cnxnにリクエストをqueueする。リクエストにわたすのは
    • RequestHandler, new ReplyHeader, request, new SetDataResponse, callback, clientPath, serverPath, ctx, WatchRegistration
    • つまりリクエストヘッダ、リプライを受け取るオブジェクト、リクエスト本体、レスポンス、etc.
    • StatCallbackとWatcherの違いがよう分からんね。DataMonitorはどっちもimplementしているので分けているのはどうしてか…と思ったが簡単か。セッション状態が変わったら多分StatCallbackでWatcherはznodeの状態が変わったら。

と思って調べてみたらStatCallbackは

public void processResult(int rc, String path, Object ctx, Stat stat);

というメソッドが必要なinterfaceなのだが、この引数のStatが曲者でorg.apache.zookeeper.data.Statらしいのだがソースをgrepしてみてもそんなものはどこにもない。というかorg.apache.zookeeper.dataというパッケージがない。ぐむむ。Watcherの方はもういいよね。processというabstract methodを持っていて、イベントが起きると引数でそれがWatchedEventで渡されてくるというもの。
とりあえずここまで一瞬たりともsynchronizedした形跡がないのだがそれでいいのか。

ClientCnxn#queuePacket

さてやっと送信キューのところまできた。想像するにパケットをキューに積んでくれる。TCP上での送信自体は別の送信スレッドがいるか、もしくはロックを使って毎回送ってみるかのどちらかだろう。
おおsynchronizedがないと心配していたらやっと出てきた。outgoingQueueはLinkedListという型を持っているので大体わかりますね。えっFIFOキューじゃなくてリストですか?という(コードを読み進めると分かるが、優先パケットは先頭に突っ込むとかやっているので、まあ妥当か)。
まず最初にPacketにxidというのを設定する。これはクライアント側でつけるRPCのIDだろう。インクリメントしていることから、サーバー側で欠送に使うのかもしれないが、実は単調増加なだけでもいいのかも。ちょっと先に進んでみないと分からない。
で、当然だけどPacketをnewしている。引数はRequestHeader, ReplyHeader, Record req, Record res, watchRegistrationの6つ。RecordというのはピンとこないがBodyだと思えばいいのか。単なるSerializableみたいなものだ(ちょっと違うけど)。で、引数で渡されたものを全てPacketに押し込んでいる。基本的にはセッションが生きていたらoutgoingQueueにaddする。それでsendThread.getClientCnxnSocket().wakeupCnxn()とかやっている。クラスローダーの壁に阻まれてEclipseでは追えなかったがそれで起こされて送信でもするのだろうか。近くにdoTransportとかいうメソッドがあるからそれで起きるのだろう。

ClientCnxnSocketNIO.java

ClientCnxnSocketを継承しているので、おそらくデフォルトではこのクラスが送信に使われるのだろう。wakeupCnxn()ではjava.nio.channels.Selector#wakeup()を呼んでいる。
ついでにdoTransportも見てみるがネットワークIO周りは未知だな。名前からしてselectorがselect(2)するのだろう。で、SelectionKeyというのがファイルディスクリプタみたいなものか?SelectionKey#selectしておいて、readableだかwritableだかしているところから#selectedKeys()で取り出してくる。
で、そこから取り出せた全てのSelectionKeyについてfor文をまわして*2、それぞれでConnectedになっていたらSocketChannelとかいうのでreadyだったらなんかいろいろゴニョゴニョする。READABLEかWRITABLEになっていたらdoIOメソッドに飛ぶ。
doIOは

  • sendThread.readResponse(incomingBuffer);
  • synchronized (outgoingQueue) {
    • ByteBuffer pbb = outgoingQueue.getFirst().bb; sock.write(pbb);

辺りがキモ。特にreadResponseは受信まわりなのだがちょっと力尽きた。想像するにreadして何かあったらイベントのコールバック呼ぶとか、待ってるスレッドに返事送るとかそんなん(TODO)。
ちょっとまだこの辺は自分にはレベル高い。というかもう眠い。疲れた。

SendThread#run

そういえばThreadなんだからそのスレッドが何してるのか見ればいいんだった。ざっくり流れを。whileで

  • startConnect()
    • 初回接続じゃなかったら1~1000msくらいをランダムに選んでSleep
    • 並べてあるIPアドレスから選ぶ(SASLでの接続もできるらしい
    • 全員に聞いてしまってそれでもいなかったら1000ms待つ(hostProvider.next(1000)っぽい
    • ClientCnxnSocket#connectでふつうに接続
  • connectなり初回readがタイムアウトしていたらSessionTimeoutExceptionを投げる
  • pingを送る
                   if (state.isConnected()) {
                        int timeToNextPing = readTimeout / 2
                                - clientCnxnSocket.getIdleSend();
                        if (timeToNextPing <= 0) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                            clientCnxnSocket.enableWrite();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }
    • ここは大事なのでメモ。timeToNextPingは、最後に何か送ってからの経過時間がreadTimeoutの半分を超えていたらPingを送る。そうじゃなかったらまあtoを延長。toってなんだ…と思ったら、最後に何か受け取ってから経過した時間だ。ここはシーケンス図を書いた方がよいだろうな。
  • // If we are in read-only mode, seek for read/write server
    • おそらくフェイルオーバーか何かの異常が起きたときかな?
    • if (state == States.CONNECTEDREADONLY) {
    • pingRwServer()して、"rw"なサーバーに接続できたら今持ってるのをcloseしてそっちに乗り換える、と書いてあるように読める
  • clientCnxnSocket.doTransport(...);
  • と、ここまでが壮大なtry clauseの中
  • whileループを何かのきっかけで抜けたらEvent.KeeperState.DisconnectedをeventThreadにenqueueしたりするなど。

まとめ

クライアントのexistsの流れをざっくり追った。宿題というか未解決なのは

  • ClientCnxnSocketの実装は結局何なのよ?
  • シーケンス図を書いてタイムアウト設計を考える
  • フェイルオーバーするときの流れも
  • ClientCnxnSocketNIO#doIOの中はもうちょっと追うか
  • org.apache.zookeeper.data.Statが見つからない
  • どこでSessionIDもらってるの?!
  • doIOの中身
  • プロトコルは把握しておきたい

*1:というかそもそも公式から辿れるリンクが404とかいう

*2:mapになっていない。もっというとliftMapになってない。これだからJavaは…