ZooKeeper API
ZooKeeperはJavaとC向けの公式APIを提供しており、ZooKeeperコミュニティでは.NET、Pythonなど複数言語向けの非公式APIも提供している。
ZooKeeper API概要
ZnodeはZooKeeperアンサンブルの中核コンポーネントであり、ZooKeeper APIはZooKeeperアンサンブルとznodeを細かく操作できる簡単なメソッドを提供する。
ZooKeeperアンサンブルに接続すると、ZooKeeperアンサンブルはクライアントへ一つのSession IDを発行する。 クライアントは定期的にサーバーへheartbeatsを送信する。そうしないと、ZooKeeperアンサンブルはSession IDを期限切れにし、クライアントは再接続しなければならない。 znodeのGet/Setはsession IDが存在する限り有効である。 すべての作業が完了したら、ZooKeeperアンサンブルとの接続を終了する。クライアントが長時間非アクティブになると、ZooKeeperアンサンブルは自動的にクライアントとの接続を終了する。
ZooKeeper APIライブラリを追加する
まず、ZooKeeper APIを使用するために次のライブラリを追加する。
dependencies {
implementation 'org.apache.zookeeper:zookeeper:3.8.1'
}
Javaバインディング
ZooKeeper APIの中心的な部分はZooKeeperクラスである。これはZooKeeperアンサンブルとの接続のため、クラスのコンストラクタとメソッドを通じてオプションを提供する。
- connect: ZooKeeperアンサンブルに接続
- create: Znodeを作成
- exists: Znodeが存在するか、その情報を確認
- getData: 特定Znodeからデータを取得
- setData: 特定Znodeにデータを設定
- getChildren: 特定Znodeで使用可能なすべての子ノードを取得
- delete: 特定znodeとすべての子を削除
- close: 接続を切断
connect: ZooKeeperアンサンブルに接続する
ZooKeeperクラスは、コンストラクタを通じて接続できる機能を提供する。
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
connectionString: ZooKeeperアンサンブルhostsessionTimeout: セッションタイムアウト(milliseconds)watcher:Watcherインターフェースを実装している一つのオブジェクト。ZooKeeperアンサンブルはwatcherオブジェクトを通じて接続状態を返す。
新しいヘルパークラスZooKeeperConnectionを作成し、connectメソッドを追加してみよう。connectメソッドは一つのZooKeeperオブジェクトを作成し、ZooKeeperアンサンブルへ接続した後、そのオブジェクトを返す。
ここではCountDownLatchが、クライアントが接続されるまでメインプロセスを待機させるためにawaitを呼び出す。
ZooKeeperアンサンブルはWatcher callbackを通じて接続状態を受け取る。Watcher callbackは、クライアントがZooKeeperアンサンブルへ接続されたときに一度呼び出され、メインプロセスのawait lockを解放するためにCountDownLatchのcountDownメソッドを呼び出す。
package com.devkuma.zookeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class ZooKeeperConnection {
private ZooKeeper zoo;
final CountDownLatch connectedSignal = new CountDownLatch(1);
public ZooKeeper connect(String host) throws IOException, InterruptedException {
zoo = new ZooKeeper(host, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
return zoo;
}
public void close() throws InterruptedException {
zoo.close();
}
}
create: Znodeを作成する
ZooKeeperクラスは、ZooKeeperアンサンブルに新しいznodeを作成するためのcreateメソッドを提供する。
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
path: Znodeのパス。- たとえば、
/myapp1、/myapp2、/myapp1/mydata1、myapp2/mydata1/myanothersubdata
- たとえば、
data: 特定znodeパスに保存されたデータacl: 作成されたノードのアクセス制御リスト。ZooKeeper APIはデフォルトACLリストを取得するためのZooDefs.Idsstaticインターフェースを提供する。- たとえば、
ZooDefs.Ids.OPEN_ACL_UNSAFEは開かれたznode用のACLリストを返す。
- たとえば、
createMode: nodeのタイプである。ephemeral、sequential、またはその両方になり得る。これはenum値である。
createメソッドの機能を確認するため、新しいJavaアプリケーションを作成してみる。mainメソッドではZooKeeperConnection型のオブジェクトを一つ作成し、connectメソッドでZooKeeperアンサンブルへ接続する。
connectメソッドはZooKeeperオブジェクトであるzkを返す。custom pathとデータとともに、zkオブジェクトのcreateメソッドを呼び出す。
package com.devkuma.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class ZKCreate {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
public static void create(String path, byte[] data) throws KeeperException, InterruptedException {
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public static void main(String[] args) {
String path = "/MyFirstZnode";
byte[] data = "My first zookeeper app".getBytes();
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
create(path, data);
conn.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
exists: Znodeの存在確認
ZooKeeperクラスは、Znodeの存在有無をチェックするexistsメソッドを提供する。探しているznodeが存在すると、そのznodeのメタデータを返す。
exists(String path, boolean watcher)
path: Znodeパスwatcher: 指定されたznodeを監視するかどうかを指定するboolean値
package com.devkuma.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZKExist {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
public static Stat znode_exists(String path) throws KeeperException, InterruptedException {
return zk.exists(path, true);
}
public static void main(String[] args) {
String path = "/MyFirstZnode";
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
Stat stat = znode_exists(path);
if (stat != null) {
System.out.println("Node exists and the node version is " + stat.getVersion());
} else {
System.out.println("Node does not exists.");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
getData: Znodeデータ照会
ZooKeeperクラスは、指定されたznode内に保存されたデータとstatus情報を取得するためのgetDataメソッドを提供する。
getData(String path, Watcher watcher, Stat stat)
path: znodeのパス。watcher: Watcher型のコールバック関数。ZooKeeperアンサンブルは、指定されたznodeのデータが変更されると、このWatcherコールバックを通じて通知する。stat: znodeのメタデータを返す。
package com.devkuma.zookeeper;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZKGetData {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
public static Stat znode_exists(String path) throws KeeperException, InterruptedException {
return zk.exists(path, true);
}
public static void main(String[] args) {
String path = "/MyFirstZnode";
final CountDownLatch connectedSignal = new CountDownLatch(1);
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
Stat stat = znode_exists(path);
if (stat != null) {
byte[] b = zk.getData(path, event -> {
if (event.getType() == Watcher.Event.EventType.None) {
switch (event.getState()) {
case Expired:
connectedSignal.countDown();
break;
}
} else {
String path1 = "/MyFirstZnode";
try {
byte[] bn = zk.getData(path1, false, null);
String data = new String(bn, "UTF-8");
System.out.println(data);
connectedSignal.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}, null);
String data = new String(b, "UTF-8");
System.out.println(data);
connectedSignal.await();
} else {
System.out.println("Node does not exists.");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
アプリケーションを実行すると、現在設定されているdataがコンソールに出力され、await状態で待機する。Watcherに対するコールバック関数も指定しているため、zkCliを使用してデータをsetすると、Watcherコールバック関数が実行される。
bin/zkCli.sh
>>> set /MyFirstZnode Hello
するとコンソールにコールバック関数が呼び出されてHelloが出力され、countDownメソッドによりawait状態のlockが解放されるため、アプリケーションが終了する。
Hello
setData: Znodeデータ設定
ZooKeeperクラスは、指定したznodeにdataを変更して追加するためのsetDataメソッドを提供する。
setData(String path, byte[] data, int version)
path: znodeのパス。data: 指定されたznodeパスに保存されるデータ。version: znodeの現在バージョン。ZooKeeperはdataに変更があるとznodeのバージョン番号を更新する。
package com.devkuma.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
public class ZKSetData {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
public static void update(String path, byte[] data) throws KeeperException, InterruptedException {
zk.setData(path, data, zk.exists(path, true).getVersion());
}
public static void main(String[] args) {
String path = "/MyFirstZnode";
byte[] data = "Success".getBytes();
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
update(path, data);
System.out.println("Done.");
} catch (Exception e) {
e.printStackTrace();
}
}
}
アプリケーションが実行されると、指定されたznodeのデータが変更される。ZooKeeper CLIであるzkCli.shを使用して確認できる。
cd /path/to/zookeeper
bin/zkCli.sh
>>> get /MyFirstZnode
getChildren: 子ノードデータ設定
ZooKeeperクラスは、特定znodeのすべての子ノードを取得するgetChildrenメソッドを提供する。
getChildren(String path, Watcher watcher)
path: znodeのパス。watcher: Watcher型のコールバック関数。ZooKeeperアンサンブルは、指定されたznodeのデータが削除されたり、znode配下の子が作成/削除されたりすると、このWatcherコールバックを通じて通知する。
package com.devkuma.zookeeper;
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZKGetChildren {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
public static Stat znode_exists(String path) throws KeeperException, InterruptedException {
return zk.exists(path, true);
}
public static void main(String[] args) {
String path = "/MyFirstZnode";
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
Stat stat = znode_exists(path);
if (stat != null) {
List<String> children = zk.getChildren(path, false);
children.forEach(System.out::println);
} else {
System.out.println("Node does not exists.");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
子ノードが存在しない場合は、zkCliを使用して追加する。
bin/zkCli.sh
>>> create /MyFirstZnode/myfirstsubnode Hi
>>> create /MyFirstZnode/mysecondsubnode Hi
delete: Znode削除
ZooKeeperクラスは、指定されたznodeを削除するためのdeleteメソッドを提供する。
delete(String path, int version)
path: znodeのパス。version: znodeの現在バージョン。
package com.devkuma.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
public class ZKDelete {
private static ZooKeeper zk;
private static ZooKeeperConnection conn;
public static void delete(String path) throws KeeperException, InterruptedException {
zk.delete(path, zk.exists(path, true).getVersion());
}
public static void main(String[] args) {
String path = "/MyFirstZnode";
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
delete(path);
System.out.println("Done.");
} catch (Exception e) {
e.printStackTrace();
}
}
}
このとき、指定されたノード内に子ノードが存在するとExceptionが発生する。そのため、子ノードをすべて削除してからdeleteを実行する必要がある。
org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode = Directory not empty for /MyFirstZnode
at org.apache.zookeeper.KeeperException.create(KeeperException.java:125)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
at ZKDelete.delete(ZKDelete.java:11)
at ZKDelete.main(ZKDelete.java:20)