pubisher
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new RetryNTimes(3, 2 * 1000);
CuratorFramework cf = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
cf.start();
Stat stat = cf.checkExists().forPath("/topic");
if (stat == null) {
cf.create().forPath("/topic");
}
cf.setData().forPath("/topic", "hello new data".getBytes());
cf.close();
}
subscriber
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new RetryNTimes(3, 2 * 1000);
CuratorFramework cf = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
cf.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("root -->" + event);
}
});
cf.start();
TreeCache treeCache = new TreeCache(cf, "/topic");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("event -->" + event);
System.out.println("change data -> " + new String(event.getData().getData()));
}
});
treeCache.start();
Thread.sleep(60 * 1000 * 5);
System.out.println("Exit");
}
参考资料