¡@

Home 

OpenStack Study: impl_zmq.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2011 Cloudscaling Group, Inc

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import os

import pprint

import re

import socket

import sys

import types

import uuid

import eventlet

import greenlet

from oslo.config import cfg

import six

from six import moves

from neutron.openstack.common import excutils

from neutron.openstack.common.gettextutils import _, _LE, _LI

from neutron.openstack.common import importutils

from neutron.openstack.common import jsonutils

from neutron.openstack.common.rpc import common as rpc_common

zmq = importutils.try_import('eventlet.green.zmq')

# for convenience, are not modified.

pformat = pprint.pformat

Timeout = eventlet.timeout.Timeout

LOG = rpc_common.LOG

RemoteError = rpc_common.RemoteError

RPCException = rpc_common.RPCException

zmq_opts = [

cfg.StrOpt('rpc_zmq_bind_address', default='*',

help='ZeroMQ bind address. Should be a wildcard (*), '

'an ethernet interface, or IP. '

'The "host" option should point or resolve to this '

'address.'),

# The module.Class to use for matchmaking.

cfg.StrOpt(

'rpc_zmq_matchmaker',

default=('neutron.openstack.common.rpc.'

'matchmaker.MatchMakerLocalhost'),

help='MatchMaker driver',

),

# The following port is unassigned by IANA as of 2012-05-21

cfg.IntOpt('rpc_zmq_port', default=9501,

help='ZeroMQ receiver listening port'),

cfg.IntOpt('rpc_zmq_contexts', default=1,

help='Number of ZeroMQ contexts, defaults to 1'),

cfg.IntOpt('rpc_zmq_topic_backlog', default=None,

help='Maximum number of ingress messages to locally buffer '

'per topic. Default is unlimited.'),

cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',

help='Directory for holding IPC sockets'),

cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),

help='Name of this node. Must be a valid hostname, FQDN, or '

'IP address. Must match "host" option, if running Nova.')

]

CONF = cfg.CONF

CONF.register_opts(zmq_opts)

ZMQ_CTX = None # ZeroMQ Context, must be global.

matchmaker = None # memorized matchmaker object

**** CubicPower OpenStack Study ****

def _serialize(data):

    """Serialization wrapper.

    We prefer using JSON, but it cannot encode all types.

    Error if a developer passes us bad data.

    """

    try:

        return jsonutils.dumps(data, ensure_ascii=True)

    except TypeError:

        with excutils.save_and_reraise_exception():

            LOG.error(_LE("JSON serialization failed."))

**** CubicPower OpenStack Study ****

def _deserialize(data):

    """Deserialization wrapper."""

    LOG.debug("Deserializing: %s", data)

    return jsonutils.loads(data)

**** CubicPower OpenStack Study ****

class ZmqSocket(object):

"""A tiny wrapper around ZeroMQ.

Simplifies the send/recv protocol and connection management.

Can be used as a Context (supports the 'with' statement).

"""

**** CubicPower OpenStack Study ****

    def __init__(self, addr, zmq_type, bind=True, subscribe=None):

        self.sock = _get_ctxt().socket(zmq_type)

        self.addr = addr

        self.type = zmq_type

        self.subscriptions = []

        # Support failures on sending/receiving on wrong socket type.

        self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)

        self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)

        self.can_sub = zmq_type in (zmq.SUB, )

        # Support list, str, & None for subscribe arg (cast to list)

        do_sub = {

            list: subscribe,

            str: [subscribe],

            type(None): []

        }[type(subscribe)]

        for f in do_sub:

            self.subscribe(f)

        str_data = {'addr': addr, 'type': self.socket_s(),

                    'subscribe': subscribe, 'bind': bind}

        LOG.debug("Connecting to %(addr)s with %(type)s", str_data)

        LOG.debug("-> Subscribed to %(subscribe)s", str_data)

        LOG.debug("-> bind: %(bind)s", str_data)

        try:

            if bind:

                self.sock.bind(addr)

            else:

                self.sock.connect(addr)

        except Exception:

            raise RPCException(_("Could not open socket."))

**** CubicPower OpenStack Study ****

    def socket_s(self):

        """Get socket type as string."""

        t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',

                  'DEALER')

        return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]

**** CubicPower OpenStack Study ****

    def subscribe(self, msg_filter):

        """Subscribe."""

        if not self.can_sub:

            raise RPCException("Cannot subscribe on this socket.")

        LOG.debug("Subscribing to %s", msg_filter)

        try:

            self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)

        except Exception:

            return

        self.subscriptions.append(msg_filter)

**** CubicPower OpenStack Study ****

    def unsubscribe(self, msg_filter):

        """Unsubscribe."""

        if msg_filter not in self.subscriptions:

            return

        self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)

        self.subscriptions.remove(msg_filter)

**** CubicPower OpenStack Study ****

    def close(self):

        if self.sock is None or self.sock.closed:

            return

        # We must unsubscribe, or we'll leak descriptors.

        if self.subscriptions:

            for f in self.subscriptions:

                try:

                    self.sock.setsockopt(zmq.UNSUBSCRIBE, f)

                except Exception:

                    pass

            self.subscriptions = []

        try:

            # Default is to linger

            self.sock.close()

        except Exception:

            # While this is a bad thing to happen,

            # it would be much worse if some of the code calling this

            # were to fail. For now, lets log, and later evaluate

            # if we can safely raise here.

            LOG.error(_LE("ZeroMQ socket could not be closed."))

        self.sock = None

**** CubicPower OpenStack Study ****

    def recv(self, **kwargs):

        if not self.can_recv:

            raise RPCException(_("You cannot recv on this socket."))

        return self.sock.recv_multipart(**kwargs)

**** CubicPower OpenStack Study ****

    def send(self, data, **kwargs):

        if not self.can_send:

            raise RPCException(_("You cannot send on this socket."))

        self.sock.send_multipart(data, **kwargs)

**** CubicPower OpenStack Study ****

class ZmqClient(object):

"""Client for ZMQ sockets."""

**** CubicPower OpenStack Study ****

    def __init__(self, addr):

        self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)

**** CubicPower OpenStack Study ****

    def cast(self, msg_id, topic, data, envelope):

        msg_id = msg_id or 0

        if not envelope:

            self.outq.send(map(bytes,

                           (msg_id, topic, 'cast', _serialize(data))))

            return

        rpc_envelope = rpc_common.serialize_msg(data[1], envelope)

        zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())

        self.outq.send(map(bytes,

                       (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))

**** CubicPower OpenStack Study ****

    def close(self):

        self.outq.close()

**** CubicPower OpenStack Study ****

class RpcContext(rpc_common.CommonRpcContext):

"""Context that supports replying to a rpc.call."""

**** CubicPower OpenStack Study ****

    def __init__(self, **kwargs):

        self.replies = []

        super(RpcContext, self).__init__(**kwargs)

**** CubicPower OpenStack Study ****

    def deepcopy(self):

        values = self.to_dict()

        values['replies'] = self.replies

        return self.__class__(**values)

**** CubicPower OpenStack Study ****

    def reply(self, reply=None, failure=None, ending=False):

        if ending:

            return

        self.replies.append(reply)

    @classmethod

**** CubicPower OpenStack Study ****

    def marshal(self, ctx):

        ctx_data = ctx.to_dict()

        return _serialize(ctx_data)

    @classmethod

**** CubicPower OpenStack Study ****

    def unmarshal(self, data):

        return RpcContext.from_dict(_deserialize(data))

**** CubicPower OpenStack Study ****

class InternalContext(object):

"""Used by ConsumerBase as a private context for - methods."""

**** CubicPower OpenStack Study ****

    def __init__(self, proxy):

        self.proxy = proxy

        self.msg_waiter = None

**** CubicPower OpenStack Study ****

    def _get_response(self, ctx, proxy, topic, data):

        """Process a curried message and cast the result to topic."""

        LOG.debug("Running func with context: %s", ctx.to_dict())

        data.setdefault('version', None)

        data.setdefault('args', {})

        try:

            result = proxy.dispatch(

                ctx, data['version'], data['method'],

                data.get('namespace'), **data['args'])

            return ConsumerBase.normalize_reply(result, ctx.replies)

        except greenlet.GreenletExit:

            # ignore these since they are just from shutdowns

            pass

        except rpc_common.ClientException as e:

            LOG.debug("Expected exception during message handling (%s)" %

                      e._exc_info[1])

            return {'exc':

                    rpc_common.serialize_remote_exception(e._exc_info,

                                                          log_failure=False)}

        except Exception:

            LOG.error(_LE("Exception during message handling"))

            return {'exc':

                    rpc_common.serialize_remote_exception(sys.exc_info())}

**** CubicPower OpenStack Study ****

    def reply(self, ctx, proxy,

              msg_id=None, context=None, topic=None, msg=None):

        """Reply to a casted call."""

        # NOTE(ewindisch): context kwarg exists for Grizzly compat.

        #                  this may be able to be removed earlier than

        #                  'I' if ConsumerBase.process were refactored.

        if type(msg) is list:

            payload = msg[-1]

        else:

            payload = msg

        response = ConsumerBase.normalize_reply(

            self._get_response(ctx, proxy, topic, payload),

            ctx.replies)

        LOG.debug("Sending reply")

        _multi_send(_cast, ctx, topic, {

            'method': '-process_reply',

            'args': {

                'msg_id': msg_id,  # Include for Folsom compat.

                'response': response

            }

        }, _msg_id=msg_id)

**** CubicPower OpenStack Study ****

class ConsumerBase(object):

"""Base Consumer."""

**** CubicPower OpenStack Study ****

    def __init__(self):

        self.private_ctx = InternalContext(None)

    @classmethod

**** CubicPower OpenStack Study ****

    def normalize_reply(self, result, replies):

        #TODO(ewindisch): re-evaluate and document this method.

        if isinstance(result, types.GeneratorType):

            return list(result)

        elif replies:

            return replies

        else:

            return [result]

**** CubicPower OpenStack Study ****

    def process(self, proxy, ctx, data):

        data.setdefault('version', None)

        data.setdefault('args', {})

        # Method starting with - are

        # processed internally. (non-valid method name)

        method = data.get('method')

        if not method:

            LOG.error(_LE("RPC message did not include method."))

            return

        # Internal method

        # uses internal context for safety.

        if method == '-reply':

            self.private_ctx.reply(ctx, proxy, **data['args'])

            return

        proxy.dispatch(ctx, data['version'],

                       data['method'], data.get('namespace'), **data['args'])

**** CubicPower OpenStack Study ****

class ZmqBaseReactor(ConsumerBase):

"""A consumer class implementing a centralized casting broker (PULL-PUSH).

Used for RoundRobin requests.

"""

**** CubicPower OpenStack Study ****

    def __init__(self, conf):

        super(ZmqBaseReactor, self).__init__()

        self.proxies = {}

        self.threads = []

        self.sockets = []

        self.subscribe = {}

        self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)

**** CubicPower OpenStack Study ****

    def register(self, proxy, in_addr, zmq_type_in,

                 in_bind=True, subscribe=None):

        LOG.info(_LI("Registering reactor"))

        if zmq_type_in not in (zmq.PULL, zmq.SUB):

            raise RPCException("Bad input socktype")

        # Items push in.

        inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,

                        subscribe=subscribe)

        self.proxies[inq] = proxy

        self.sockets.append(inq)

        LOG.info(_LI("In reactor registered"))

**** CubicPower OpenStack Study ****

    def consume_in_thread(self):

        @excutils.forever_retry_uncaught_exceptions

        def _consume(sock):

            LOG.info(_LI("Consuming socket"))

            while True:

                self.consume(sock)

        for k in self.proxies.keys():

            self.threads.append(

                self.pool.spawn(_consume, k)

            )

**** CubicPower OpenStack Study ****

        def _consume(sock):

            LOG.info(_LI("Consuming socket"))

            while True:

                self.consume(sock)

        for k in self.proxies.keys():

            self.threads.append(

                self.pool.spawn(_consume, k)

            )

**** CubicPower OpenStack Study ****

    def wait(self):

        for t in self.threads:

            t.wait()

**** CubicPower OpenStack Study ****

    def close(self):

        for s in self.sockets:

            s.close()

        for t in self.threads:

            t.kill()

**** CubicPower OpenStack Study ****

class ZmqProxy(ZmqBaseReactor):

"""A consumer class implementing a topic-based proxy.

Forwards to IPC sockets.

"""

**** CubicPower OpenStack Study ****

    def __init__(self, conf):

        super(ZmqProxy, self).__init__(conf)

        pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))

        self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))

        self.topic_proxy = {}

**** CubicPower OpenStack Study ****

    def consume(self, sock):

        ipc_dir = CONF.rpc_zmq_ipc_dir

        data = sock.recv(copy=False)

        topic = data[1].bytes

        if topic.startswith('fanout~'):

            sock_type = zmq.PUB

            topic = topic.split('.', 1)[0]

        elif topic.startswith('zmq_replies'):

            sock_type = zmq.PUB

        else:

            sock_type = zmq.PUSH

        if topic not in self.topic_proxy:

            def publisher(waiter):

                LOG.info(_LI("Creating proxy for topic: %s"), topic)

                try:

                    # The topic is received over the network,

                    # don't trust this input.

                    if self.badchars.search(topic) is not None:

                        emsg = _("Topic contained dangerous characters.")

                        LOG.warn(emsg)

                        raise RPCException(emsg)

                    out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %

                                         (ipc_dir, topic),

                                         sock_type, bind=True)

                except RPCException:

                    waiter.send_exception(*sys.exc_info())

                    return

                self.topic_proxy[topic] = eventlet.queue.LightQueue(

                    CONF.rpc_zmq_topic_backlog)

                self.sockets.append(out_sock)

                # It takes some time for a pub socket to open,

                # before we can have any faith in doing a send() to it.

                if sock_type == zmq.PUB:

                    eventlet.sleep(.5)

                waiter.send(True)

                while(True):

                    data = self.topic_proxy[topic].get()

                    out_sock.send(data, copy=False)

            wait_sock_creation = eventlet.event.Event()

            eventlet.spawn(publisher, wait_sock_creation)

            try:

                wait_sock_creation.wait()

            except RPCException:

                LOG.error(_LE("Topic socket file creation failed."))

                return

        try:

            self.topic_proxy[topic].put_nowait(data)

        except eventlet.queue.Full:

            LOG.error(_LE("Local per-topic backlog buffer full for topic "

                          "%(topic)s. Dropping message.") % {'topic': topic})

**** CubicPower OpenStack Study ****

    def consume_in_thread(self):

        """Runs the ZmqProxy service."""

        ipc_dir = CONF.rpc_zmq_ipc_dir

        consume_in = "tcp://%s:%s" % \

            (CONF.rpc_zmq_bind_address,

             CONF.rpc_zmq_port)

        consumption_proxy = InternalContext(None)

        try:

            os.makedirs(ipc_dir)

        except os.error:

            if not os.path.isdir(ipc_dir):

                with excutils.save_and_reraise_exception():

                    LOG.error(_LE("Required IPC directory does not exist at"

                                  " %s") % (ipc_dir, ))

        try:

            self.register(consumption_proxy,

                          consume_in,

                          zmq.PULL)

        except zmq.ZMQError:

            if os.access(ipc_dir, os.X_OK):

                with excutils.save_and_reraise_exception():

                    LOG.error(_LE("Permission denied to IPC directory at"

                                  " %s") % (ipc_dir, ))

            with excutils.save_and_reraise_exception():

                LOG.error(_LE("Could not create ZeroMQ receiver daemon. "

                              "Socket may already be in use."))

        super(ZmqProxy, self).consume_in_thread()

def unflatten_envelope(packenv):

    """Unflattens the RPC envelope.

    Takes a list and returns a dictionary.

    i.e. [1,2,3,4] => {1: 2, 3: 4}

    """

    i = iter(packenv)

    h = {}

    try:

        while True:

            k = six.next(i)

            h[k] = six.next(i)

    except StopIteration:

        return h

**** CubicPower OpenStack Study ****

def unflatten_envelope(packenv):

    """Unflattens the RPC envelope.

    Takes a list and returns a dictionary.

    i.e. [1,2,3,4] => {1: 2, 3: 4}

    """

    i = iter(packenv)

    h = {}

    try:

        while True:

            k = six.next(i)

            h[k] = six.next(i)

    except StopIteration:

        return h

**** CubicPower OpenStack Study ****

class ZmqReactor(ZmqBaseReactor):

"""A consumer class implementing a consumer for messages.

Can also be used as a 1:1 proxy

"""

**** CubicPower OpenStack Study ****

    def __init__(self, conf):

        super(ZmqReactor, self).__init__(conf)

**** CubicPower OpenStack Study ****

    def consume(self, sock):

        #TODO(ewindisch): use zero-copy (i.e. references, not copying)

        data = sock.recv()

        LOG.debug("CONSUMER RECEIVED DATA: %s", data)

        proxy = self.proxies[sock]

        if data[2] == 'cast':  # Legacy protocol

            packenv = data[3]

            ctx, msg = _deserialize(packenv)

            request = rpc_common.deserialize_msg(msg)

            ctx = RpcContext.unmarshal(ctx)

        elif data[2] == 'impl_zmq_v2':

            packenv = data[4:]

            msg = unflatten_envelope(packenv)

            request = rpc_common.deserialize_msg(msg)

            # Unmarshal only after verifying the message.

            ctx = RpcContext.unmarshal(data[3])

        else:

            LOG.error(_LE("ZMQ Envelope version unsupported or unknown."))

            return

        self.pool.spawn_n(self.process, proxy, ctx, request)

**** CubicPower OpenStack Study ****

class Connection(rpc_common.Connection):

"""Manages connections and threads."""

**** CubicPower OpenStack Study ****

    def __init__(self, conf):

        self.topics = []

        self.reactor = ZmqReactor(conf)

**** CubicPower OpenStack Study ****

    def create_consumer(self, topic, proxy, fanout=False):

        # Register with matchmaker.

        _get_matchmaker().register(topic, CONF.rpc_zmq_host)

        # Subscription scenarios

        if fanout:

            sock_type = zmq.SUB

            subscribe = ('', fanout)[type(fanout) == str]

            topic = 'fanout~' + topic.split('.', 1)[0]

        else:

            sock_type = zmq.PULL

            subscribe = None

            topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))

        if topic in self.topics:

            LOG.info(_LI("Skipping topic registration. Already registered."))

            return

        # Receive messages from (local) proxy

        inaddr = "ipc://%s/zmq_topic_%s" % \

            (CONF.rpc_zmq_ipc_dir, topic)

        LOG.debug("Consumer is a zmq.%s",

                  ['PULL', 'SUB'][sock_type == zmq.SUB])

        self.reactor.register(proxy, inaddr, sock_type,

                              subscribe=subscribe, in_bind=False)

        self.topics.append(topic)

**** CubicPower OpenStack Study ****

    def close(self):

        _get_matchmaker().stop_heartbeat()

        for topic in self.topics:

            _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)

        self.reactor.close()

        self.topics = []

**** CubicPower OpenStack Study ****

    def wait(self):

        self.reactor.wait()

**** CubicPower OpenStack Study ****

    def consume_in_thread(self):

        _get_matchmaker().start_heartbeat()

        self.reactor.consume_in_thread()

def _cast(addr, context, topic, msg, timeout=None, envelope=False,

          _msg_id=None):

    timeout_cast = timeout or CONF.rpc_cast_timeout

    payload = [RpcContext.marshal(context), msg]

    with Timeout(timeout_cast, exception=rpc_common.Timeout):

        try:

            conn = ZmqClient(addr)

            # assumes cast can't return an exception

            conn.cast(_msg_id, topic, payload, envelope)

        except zmq.ZMQError:

            raise RPCException("Cast failed. ZMQ Socket Exception")

        finally:

            if 'conn' in vars():

                conn.close()

def _call(addr, context, topic, msg, timeout=None,

          envelope=False):

    # timeout_response is how long we wait for a response

    timeout = timeout or CONF.rpc_response_timeout

    # The msg_id is used to track replies.

    msg_id = uuid.uuid4().hex

    # Replies always come into the reply service.

    reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host

    LOG.debug("Creating payload")

    # Curry the original request into a reply method.

    mcontext = RpcContext.marshal(context)

    payload = {

        'method': '-reply',

        'args': {

            'msg_id': msg_id,

            'topic': reply_topic,

            # TODO(ewindisch): safe to remove mcontext in I.

            'msg': [mcontext, msg]

        }

    }

    LOG.debug("Creating queue socket for reply waiter")

    # Messages arriving async.

    # TODO(ewindisch): have reply consumer with dynamic subscription mgmt

    with Timeout(timeout, exception=rpc_common.Timeout):

        try:

            msg_waiter = ZmqSocket(

                "ipc://%s/zmq_topic_zmq_replies.%s" %

                (CONF.rpc_zmq_ipc_dir,

                 CONF.rpc_zmq_host),

                zmq.SUB, subscribe=msg_id, bind=False

            )

            LOG.debug("Sending cast")

            _cast(addr, context, topic, payload, envelope)

            LOG.debug("Cast sent; Waiting reply")

            # Blocks until receives reply

            msg = msg_waiter.recv()

            LOG.debug("Received message: %s", msg)

            LOG.debug("Unpacking response")

            if msg[2] == 'cast':  # Legacy version

                raw_msg = _deserialize(msg[-1])[-1]

            elif msg[2] == 'impl_zmq_v2':

                rpc_envelope = unflatten_envelope(msg[4:])

                raw_msg = rpc_common.deserialize_msg(rpc_envelope)

            else:

                raise rpc_common.UnsupportedRpcEnvelopeVersion(

                    _("Unsupported or unknown ZMQ envelope returned."))

            responses = raw_msg['args']['response']

        # ZMQError trumps the Timeout error.

        except zmq.ZMQError:

            raise RPCException("ZMQ Socket Error")

        except (IndexError, KeyError):

            raise RPCException(_("RPC Message Invalid."))

        finally:

            if 'msg_waiter' in vars():

                msg_waiter.close()

    # It seems we don't need to do all of the following,

    # but perhaps it would be useful for multicall?

    # One effect of this is that we're checking all

    # responses for Exceptions.

    for resp in responses:

        if isinstance(resp, types.DictType) and 'exc' in resp:

            raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])

    return responses[-1]

def _multi_send(method, context, topic, msg, timeout=None,

                envelope=False, _msg_id=None):

    """Wraps the sending of messages.

    Dispatches to the matchmaker and sends message to all relevant hosts.

    """

    conf = CONF

    LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))})

    queues = _get_matchmaker().queues(topic)

    LOG.debug("Sending message(s) to: %s", queues)

    # Don't stack if we have no matchmaker results

    if not queues:

        LOG.warn(_("No matchmaker results. Not casting."))

        # While not strictly a timeout, callers know how to handle

        # this exception and a timeout isn't too big a lie.

        raise rpc_common.Timeout(_("No match from matchmaker."))

    # This supports brokerless fanout (addresses > 1)

    for queue in queues:

        (_topic, ip_addr) = queue

        _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)

        if method.__name__ == '_cast':

            eventlet.spawn_n(method, _addr, context,

                             _topic, msg, timeout, envelope,

                             _msg_id)

            return

        return method(_addr, context, _topic, msg, timeout,

                      envelope)

def create_connection(conf, new=True):

    return Connection(conf)

def multicall(conf, *args, **kwargs):

    """Multiple calls."""

    return _multi_send(_call, *args, **kwargs)

def call(conf, *args, **kwargs):

    """Send a message, expect a response."""

    data = _multi_send(_call, *args, **kwargs)

    return data[-1]

def cast(conf, *args, **kwargs):

    """Send a message expecting no reply."""

    _multi_send(_cast, *args, **kwargs)

def fanout_cast(conf, context, topic, msg, **kwargs):

    """Send a message to all listening and expect no reply."""

    # NOTE(ewindisch): fanout~ is used because it avoid splitting on .

    # and acts as a non-subtle hint to the matchmaker and ZmqProxy.

    _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)

def notify(conf, context, topic, msg, envelope):

    """Send notification event.

    Notifications are sent to topic-priority.

    This differs from the AMQP drivers which send to topic.priority.

    """

    # NOTE(ewindisch): dot-priority in rpc notifier does not

    # work with our assumptions.

    topic = topic.replace('.', '-')

    cast(conf, context, topic, msg, envelope=envelope)

def cleanup():

    """Clean up resources in use by implementation."""

    global ZMQ_CTX

    if ZMQ_CTX:

        ZMQ_CTX.term()

    ZMQ_CTX = None

    global matchmaker

    matchmaker = None

def _get_ctxt():

    if not zmq:

        raise ImportError("Failed to import eventlet.green.zmq")

    global ZMQ_CTX

    if not ZMQ_CTX:

        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)

    return ZMQ_CTX

def _get_matchmaker(*args, **kwargs):

    global matchmaker

    if not matchmaker:

        mm = CONF.rpc_zmq_matchmaker

        if mm.endswith('matchmaker.MatchMakerRing'):

            mm.replace('matchmaker', 'matchmaker_ring')

            LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'

                       ' %(new)s instead') % dict(

                     orig=CONF.rpc_zmq_matchmaker, new=mm))

        matchmaker = importutils.import_object(mm, *args, **kwargs)

    return matchmaker

**** CubicPower OpenStack Study ****

def _cast(addr, context, topic, msg, timeout=None, envelope=False,

          _msg_id=None):

    timeout_cast = timeout or CONF.rpc_cast_timeout

    payload = [RpcContext.marshal(context), msg]

    with Timeout(timeout_cast, exception=rpc_common.Timeout):

        try:

            conn = ZmqClient(addr)

            # assumes cast can't return an exception

            conn.cast(_msg_id, topic, payload, envelope)

        except zmq.ZMQError:

            raise RPCException("Cast failed. ZMQ Socket Exception")

        finally:

            if 'conn' in vars():

                conn.close()

**** CubicPower OpenStack Study ****

def _call(addr, context, topic, msg, timeout=None,

          envelope=False):

    # timeout_response is how long we wait for a response

    timeout = timeout or CONF.rpc_response_timeout

    # The msg_id is used to track replies.

    msg_id = uuid.uuid4().hex

    # Replies always come into the reply service.

    reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host

    LOG.debug("Creating payload")

    # Curry the original request into a reply method.

    mcontext = RpcContext.marshal(context)

    payload = {

        'method': '-reply',

        'args': {

            'msg_id': msg_id,

            'topic': reply_topic,

            # TODO(ewindisch): safe to remove mcontext in I.

            'msg': [mcontext, msg]

        }

    }

    LOG.debug("Creating queue socket for reply waiter")

    # Messages arriving async.

    # TODO(ewindisch): have reply consumer with dynamic subscription mgmt

    with Timeout(timeout, exception=rpc_common.Timeout):

        try:

            msg_waiter = ZmqSocket(

                "ipc://%s/zmq_topic_zmq_replies.%s" %

                (CONF.rpc_zmq_ipc_dir,

                 CONF.rpc_zmq_host),

                zmq.SUB, subscribe=msg_id, bind=False

            )

            LOG.debug("Sending cast")

            _cast(addr, context, topic, payload, envelope)

            LOG.debug("Cast sent; Waiting reply")

            # Blocks until receives reply

            msg = msg_waiter.recv()

            LOG.debug("Received message: %s", msg)

            LOG.debug("Unpacking response")

            if msg[2] == 'cast':  # Legacy version

                raw_msg = _deserialize(msg[-1])[-1]

            elif msg[2] == 'impl_zmq_v2':

                rpc_envelope = unflatten_envelope(msg[4:])

                raw_msg = rpc_common.deserialize_msg(rpc_envelope)

            else:

                raise rpc_common.UnsupportedRpcEnvelopeVersion(

                    _("Unsupported or unknown ZMQ envelope returned."))

            responses = raw_msg['args']['response']

        # ZMQError trumps the Timeout error.

        except zmq.ZMQError:

            raise RPCException("ZMQ Socket Error")

        except (IndexError, KeyError):

            raise RPCException(_("RPC Message Invalid."))

        finally:

            if 'msg_waiter' in vars():

                msg_waiter.close()

    # It seems we don't need to do all of the following,

    # but perhaps it would be useful for multicall?

    # One effect of this is that we're checking all

    # responses for Exceptions.

    for resp in responses:

        if isinstance(resp, types.DictType) and 'exc' in resp:

            raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])

    return responses[-1]

**** CubicPower OpenStack Study ****

def _multi_send(method, context, topic, msg, timeout=None,

                envelope=False, _msg_id=None):

    """Wraps the sending of messages.

    Dispatches to the matchmaker and sends message to all relevant hosts.

    """

    conf = CONF

    LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))})

    queues = _get_matchmaker().queues(topic)

    LOG.debug("Sending message(s) to: %s", queues)

    # Don't stack if we have no matchmaker results

    if not queues:

        LOG.warn(_("No matchmaker results. Not casting."))

        # While not strictly a timeout, callers know how to handle

        # this exception and a timeout isn't too big a lie.

        raise rpc_common.Timeout(_("No match from matchmaker."))

    # This supports brokerless fanout (addresses > 1)

    for queue in queues:

        (_topic, ip_addr) = queue

        _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)

        if method.__name__ == '_cast':

            eventlet.spawn_n(method, _addr, context,

                             _topic, msg, timeout, envelope,

                             _msg_id)

            return

        return method(_addr, context, _topic, msg, timeout,

                      envelope)

**** CubicPower OpenStack Study ****

def create_connection(conf, new=True):

    return Connection(conf)

**** CubicPower OpenStack Study ****

def multicall(conf, *args, **kwargs):

    """Multiple calls."""

    return _multi_send(_call, *args, **kwargs)

**** CubicPower OpenStack Study ****

def call(conf, *args, **kwargs):

    """Send a message, expect a response."""

    data = _multi_send(_call, *args, **kwargs)

    return data[-1]

**** CubicPower OpenStack Study ****

def cast(conf, *args, **kwargs):

    """Send a message expecting no reply."""

    _multi_send(_cast, *args, **kwargs)

**** CubicPower OpenStack Study ****

def fanout_cast(conf, context, topic, msg, **kwargs):

    """Send a message to all listening and expect no reply."""

    # NOTE(ewindisch): fanout~ is used because it avoid splitting on .

    # and acts as a non-subtle hint to the matchmaker and ZmqProxy.

    _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)

**** CubicPower OpenStack Study ****

def notify(conf, context, topic, msg, envelope):

    """Send notification event.

    Notifications are sent to topic-priority.

    This differs from the AMQP drivers which send to topic.priority.

    """

    # NOTE(ewindisch): dot-priority in rpc notifier does not

    # work with our assumptions.

    topic = topic.replace('.', '-')

    cast(conf, context, topic, msg, envelope=envelope)

**** CubicPower OpenStack Study ****

def cleanup():

    """Clean up resources in use by implementation."""

    global ZMQ_CTX

    if ZMQ_CTX:

        ZMQ_CTX.term()

    ZMQ_CTX = None

    global matchmaker

    matchmaker = None

**** CubicPower OpenStack Study ****

def _get_ctxt():

    if not zmq:

        raise ImportError("Failed to import eventlet.green.zmq")

    global ZMQ_CTX

    if not ZMQ_CTX:

        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)

    return ZMQ_CTX

**** CubicPower OpenStack Study ****

def _get_matchmaker(*args, **kwargs):

    global matchmaker

    if not matchmaker:

        mm = CONF.rpc_zmq_matchmaker

        if mm.endswith('matchmaker.MatchMakerRing'):

            mm.replace('matchmaker', 'matchmaker_ring')

            LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'

                       ' %(new)s instead') % dict(

                     orig=CONF.rpc_zmq_matchmaker, new=mm))

        matchmaker = importutils.import_object(mm, *args, **kwargs)

    return matchmaker