目录
- 简介
- 基于Curator的Zookeeper基本用法
- 监听器
- 分布式锁
- Leader选举
简介
Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。通过查看官方文档,可以发现Curator主要解决了三类问题:
- 封装ZooKeeper client与ZooKeeper server之间的连接处理
- 提供了一套Fluent风格的操作API
- 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装
Curator主要从以下几个方面降低了zk使用的复杂性:
- 重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)
- 连接状态监控: Curator初始化之后会一直对zk连接进行监听,一旦发现连接状态发生变化将会作出相应的处理
- zk客户端实例管理:Curator会对zk客户端到server集群的连接进行管理,并在需要的时候重建zk实例,保证与zk集群连接的可靠性
- 各种使用场景支持:Curator实现了zk支持的大部分使用场景(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况
基于Curator的Zookeeper基本用法
public class CuratorBase { private final int SESSION_TIMEOUT = 30 * 1000; private final int CONNECTION_TIMEOUT = 3 * 1000; private static final String CONNECT_ADDR = "192.168.1.1:2100,192.168.1.1:2101,192.168.1.:2102"; private CuratorFramework client = null;
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR).connectionTimeoutMs(CONNECTION_TIMEOUT) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy)
.build(); cf.start(); System.out.println(States.CONNECTED); System.out.println(cf.getState());
client.create().forPath("/curator","/curator data".getBytes()); client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes()); client.create().withMode(CreateMode.EPHEMERAL) .forPath("/curator/ephemeral","/curator/ephemeral data".getBytes()); client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes()); client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator/ephemeral_path2","/curator/ephemeral_path2 data".getBytes()); Stat stat1 = client.checkExists().forPath("/curator"); Stat stat2 = client.checkExists().forPath("/curator2"); System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false)); System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false));
System.out.println(client.getChildren().forPath("/")); System.out.println(new String(client.getData().forPath("/curator"))); client.setData().forPath("/curator","/curator modified data".getBytes());
client.create().orSetData().creatingParentContainersIfNeeded() .forPath("/curator/del_key1","/curator/del_key1 data".getBytes()); client.create().orSetData().creatingParentContainersIfNeeded() .forPath("/curator/del_key2","/curator/del_key2 data".getBytes()); client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes()); client.delete().forPath("/curator/del_key1"); client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2"); } }
|
orSetData()
方法:如果节点存在则Curator将会使用给出的数据设置这个节点的值,相当于 setData() 方法
creatingParentContainersIfNeeded()
方法:如果指定节点的父节点不存在,则Curator将会自动级联创建父节点
guaranteed()
方法:如果服务端可能删除成功,但是client没有接收到删除成功的提示,Curator将会在后台持续尝试删除该节点
deletingChildrenIfNeeded()
方法:如果待删除节点存在子节点,则Curator将会级联删除该节点的子节点
事务管理:
* 事务管理:碰到异常,事务会回滚 * @throws Exception */ @Test public void testTransaction() throws Exception{ CuratorOp createOp = client.transactionOp().create() .forPath("/curator/one_path","some data".getBytes()); CuratorOp setDataOp = client.transactionOp().setData() .forPath("/curator","other data".getBytes()); CuratorOp deleteOp = client.transactionOp().delete() .forPath("/curator"); List<CuratorTransactionResult> results = client.transaction() .forOperations(createOp,setDataOp,deleteOp); for(CuratorTransactionResult result : results){ System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType()); } }
|
监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
- Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
- Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
- Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
ExecutorService pool = Executors.newFixedThreadPool(2);
final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false); nodeCache.start(true); nodeCache.getListenable().addListener( new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("Node data is changed, new data: " + new String(nodeCache.getCurrentData().getData())); } }, pool );
final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true); childrenCache.start(StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener( new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED: " + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED: " + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED: " + event.getData().getPath()); break; default: break; } } }, pool ); client.setData().forPath("/zk-huey/cnode", "world".getBytes()); Thread.sleep(10 * 1000); pool.shutdown(); client.close();
|
分布式锁
分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。
下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[c64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;
public class CuratorDistrLockTest {
private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest";
public static void main(String[] args) throws InterruptedException { CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!");
Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2");
t1.start(); t2.start(); }
private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
|
Leader选举
当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath;
public class CuratorLeaderTest {
private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest";
public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); }
@Override public void stateChanged(CuratorFramework client, ConnectionState state) { } };
new Thread(() -> { registerListener(listener); }).start();
new Thread(() -> { registerListener(listener); }).start();
new Thread(() -> { registerListener(listener); }).start();
Thread.sleep(Integer.MAX_VALUE); }
private static void registerListener(LeaderSelectorListener listener) { CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start();
try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); }
LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } }
|
转载:https://www.cnblogs.com/erbing/p/9799098.html