kazoo是一个Python库,旨在使得Python能够轻松、便捷的使用zookeeper。

kazoo的安装

使用pip安装kazoo:

pip install kazoo

因为kazoo使用纯python实现zookeeper的协议,所以不必安装Python zookeeper C的各种依赖。

基本用法

连接处理

使用kazoo,首先需要实例化一个KazooClient对象并建立连接,代码如下:

from kazoo.client import KazooClient

zk = KazooClient(hosts="127.0.0.1:2181")
zk.start()

默认情况下,KazooClient会连接本地zookeeper服务,端口号为2181。

当zookeeper服务异常时(服务异常或服务未启动等),zk.start()会不断尝试重新连接,直到连接超时。

连接一旦建立,无论是间歇性连接丢失(网络闪断等)或zookeeper会话过期,KazooClient会不断尝试重新连接。

我们可以通过stop命令显式的中断连接:

zk.stop()

会话状态

Kazoo的客户端在与zookeeper服务会话的过程中,通常会在以下三种状态之间相互切换:CONNECTEDSUSPENDEDLOST

当KazooClient实例第一次被创建时,它的状态为LOST,一旦连接建立成功,状态随即被切换为CONNECTED。

在整个会话的生命周期里,伴随着网络闪断、服务端zookeeper异常或是其他什么原因,导致客户端与服务端出现断开的情况,KazooClient的状态切换成SUSPENDED,与此同时,KazooClient会不断尝试重新连接服务端,一旦连接成功,状态再次回到CONNECTED。

kazoo状态监听

添加状态监听事件,实时监听客户端与服务端的会话状态,使得一旦发生连接中断、连接恢复或会话过期等情景时,我们能及时作出相应的处理。其使用方法如下:

def connection_listener(state):
    if state == "LOST":
        # Register somewhere that the session was lost
        pass
    elif state == "SUSPENDED":
        # Handle being disconnected from Zookeeper
        pass
    else:
        # Handle being connected/reconnected to Zookeeper
        pass
zk = KazooClient(hosts="127.0.0.1:2181")
zk.add_listener(connection_listener)

当使用kazoo.recipe.lock.Lock或创建临时节点时,非常建议大家添加状态监听,以便我们的代码能够正确处理连接中断或Zookeeper会话丢失的情形。

zookeeper的增删改查

创建节点

方法:

  • ensure_path():递归创建节点路径,只能设置权限,不能添加数据。
  • create():创建节点,并同时可以添加数据和监听事件,前提是其父节点必须存在,不能递归创建。

用法:

zk.ensure_path("/china/henan")
zk.create("/china", b"this is china node.")

读取数据

方法:

  • exists():检查节点是否存在
  • get():获取节点数据以及节点状态的详细信息
  • get_children():获取指定节点的所有子节点

更新数据

方法:

  • set():更新指定节点的信息。

删除节点

方法:

  • delete():删除指定节点。

监听器

kazoo可以在节点上添加监听,使得在节点或节点的子节点发生变化时进行触发。

kazoo支持两种类型的添加监听器的方式,一种是zookeeper原生支持的,其用法如下:

def test_watch_data(event):
    print("this is a watcher for node data.")
zk.get_children("/china", watch=test_watch_children)

另一种方式是通过python修饰器的原理实现的,支持该功能的方法有:

  • ChildrenWatch:当子节点发生变化时触发
  • DataWatch:当节点数据发生变化时触发

其用法如下:

@zk.ChildrenWatch("/china")
def watch_china_children(children):
    print("this is watch_china_children %s" % children)
    
@zk.DataWatch("/china")
def watch_china_node(data, state):
    print("china node is %s" % data)

kazoo事务

自v3.4以后,zookeeper支持一次发送多个命令,这些命令作为一个原子进行提交,要么全部执行成功,要么全部失败。

使用方法如下:

transaction = zk.transaction()
transaction.check('/china/hebei', version=3)
transaction.create('/china/shanxi', b"thi is shanxi.")
results = transaction.commit()

扫码关注李苦李公众号

李苦李公众号

标签: zookeeper, Kazoo

已有 2 条评论

  1. add_listener 的方法里怎么获取到 zk 对象呢?谢谢

  2. zk.create("/china", b"this is china node.")第一个参数:节点路径是随便定义的吗

添加新评论