Source code for zktools.util

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
"""Utility functions for Zookeeper"""
import uuid
from functools import wraps
from threading import Thread

import zookeeper


[docs]def safe_call(zk, func, *args, **kwargs): """Safely call a function while handling connection loss .. note:: This function merely retries the query until an active Zookeeper session is available that returns without throwing a ConnectionLoss Exception. When creating a node, this could result in a NodeAlreadyExists exception because the create ran twice. """ while 1: try: return getattr(zk, func)(*args, **kwargs) except (zookeeper.ClosingException, zookeeper.ConnectionLossException, zookeeper.OperationTimeoutException): zk.connected.wait()
[docs]def safe_create_ephemeral_sequence(zk, name, data, acl): """Safely creates an ephemeral sequence node using a UUID prefix This function properly handles zookeeper ConnectionLoss exceptions and determines after waiting for reconnection whether it was created successfully. :param zk: Zookeeper instance :param name: Name of the node, it will be prefixed by the UUID :param data: Data to set on the node :param acl: ACL to set for the node :returns: Name of the created node The name will be split so that its prefixed by the UUID and suffixed by the sequence. Example: .. code-block:: pycon >>> safe_create_ephemeral_sequence(zk, '/path/to/node/myname', [ZOO_OPEN_ACL_UNSAFE]) '/path/to/node/dfad3fa294d745e499d883b0a38bbc93-myname-0001' """ prefix = uuid.uuid4().hex + '-' path, node_name = name.rsplit('/', 1) node_name = '/'.join([path, prefix + node_name + '-']) while 1: try: return zk.create(node_name, data, acl, zookeeper.EPHEMERAL | zookeeper.SEQUENCE) except (zookeeper.ClosingException, zookeeper.ConnectionLossException, zookeeper.OperationTimeoutException): # Check children to see if the node was created children = safe_call(zk, 'get_children', path) created = [x for x in children if x.startswith(prefix)] if created: return '/'.join([path, created[0]]) # We've verified the create failed, retry continue
[docs]def threaded(func): """Decorator to run a function in a separate thread :param func: Function to run in the other thread :returns: :class:`Thread` reference Example:: @threaded def task1(): do_something @threaded def task2(): do_something_too t1 = task1() t2 = task2() ... t1.join() t2.join() """ @wraps(func) def threaded_func(*args, **kwargs): func_hl = Thread(target=func, args=args, kwargs=kwargs) func_hl.start() return func_hl return threaded_func

Project Versions

This Page