ZooKeeper之分布式协调例子
Contents
首先,启动ZooKeeper集群模式或独立模式.
创建三个程序(即三个不同的Java进程,注意,是进程,不是线程)来模拟分布式协调
POM 依赖
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
</dependencies>
程序1
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new RetryNTimes(3, 2 * 1000);
CuratorFramework cf = CuratorFrameworkFactory.newClient("127.0.0.1:2182", retryPolicy);
cf.start();
final InterProcessSemaphoreMutex interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(cf, "/locks-new-1");
for (int i = 0; i < 20; i++) {
new Thread(new Runnable() {
public void run() {
try {
interProcessSemaphoreMutex.acquire();
if (interProcessSemaphoreMutex.isAcquiredInThisProcess()) {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + " hello 1 --> OK.");
interProcessSemaphoreMutex.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
Thread.sleep(1 * 1000 * 60);
}
程序2
public static void main(String[] args) throws InterruptedException {
RetryPolicy retryPolicy = new RetryNTimes(3, 2 * 1000);
CuratorFramework cf = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
cf.start();
final InterProcessSemaphoreMutex interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(cf, "/locks-new-1");
for (int i = 0; i < 20; i++) {
new Thread(new Runnable() {
public void run() {
try {
interProcessSemaphoreMutex.acquire();
if (interProcessSemaphoreMutex.isAcquiredInThisProcess()) {
System.out.println(Thread.currentThread().getName() + " hello 2 --> OK.");
Thread.sleep(500);
interProcessSemaphoreMutex.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
Thread.sleep(1 * 1000 * 60);
}
程序3
public static void main(String[] args) throws InterruptedException {
RetryPolicy retryPolicy = new RetryNTimes(3, 2 * 1000);
CuratorFramework cf = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
cf.start();
final InterProcessSemaphoreMutex interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(cf, "/locks-new-1");
for (int i = 0; i < 20; i++) {
new Thread(new Runnable() {
public void run() {
try {
interProcessSemaphoreMutex.acquire();
if (interProcessSemaphoreMutex.isAcquiredInThisProcess()) {
System.out.println(Thread.currentThread().getName() + " hello 3 --> OK.");
Thread.sleep(500);
interProcessSemaphoreMutex.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
Thread.sleep(1 * 1000 * 60);
}
启动分布式系统
ZooKeeper会保证程序的顺序性。即如果以程序1 -> 程序2 -> 程序3
的程序分别启动的话,那ZooKeeper也会保证程序1先执行,完成后再执行程序2,2执行完成后再执行程序3
.