¡@

Home 

OpenStack Study: impl_fake.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2011 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""Fake RPC implementation which calls proxy methods directly with no

queues. Casts will block, but this is very useful for tests.

"""

import inspect

# NOTE(russellb): We specifically want to use json, not our own jsonutils.

# jsonutils has some extra logic to automatically convert objects to primitive

# types so that they can be serialized. We want to catch all cases where

# non-primitive types make it into this code and treat it as an error.

import json

import time

import eventlet

import six

from neutron.openstack.common.rpc import common as rpc_common

CONSUMERS = {}

**** CubicPower OpenStack Study ****

class RpcContext(rpc_common.CommonRpcContext):

**** CubicPower OpenStack Study ****

    def __init__(self, **kwargs):

        super(RpcContext, self).__init__(**kwargs)

        self._response = []

        self._done = False

**** CubicPower OpenStack Study ****

    def deepcopy(self):

        values = self.to_dict()

        new_inst = self.__class__(**values)

        new_inst._response = self._response

        new_inst._done = self._done

        return new_inst

**** CubicPower OpenStack Study ****

    def reply(self, reply=None, failure=None, ending=False):

        if ending:

            self._done = True

        if not self._done:

            self._response.append((reply, failure))

**** CubicPower OpenStack Study ****

class Consumer(object):

**** CubicPower OpenStack Study ****

    def __init__(self, topic, proxy):

        self.topic = topic

        self.proxy = proxy

**** CubicPower OpenStack Study ****

    def call(self, context, version, method, namespace, args, timeout):

        done = eventlet.event.Event()

        def _inner():

            ctxt = RpcContext.from_dict(context.to_dict())

            try:

                rval = self.proxy.dispatch(context, version, method,

                                           namespace, **args)

                res = []

                # Caller might have called ctxt.reply() manually

                for (reply, failure) in ctxt._response:

                    if failure:

                        six.reraise(failure[0], failure[1], failure[2])

                    res.append(reply)

                # if ending not 'sent'...we might have more data to

                # return from the function itself

                if not ctxt._done:

                    if inspect.isgenerator(rval):

                        for val in rval:

                            res.append(val)

                    else:

                        res.append(rval)

                done.send(res)

            except rpc_common.ClientException as e:

                done.send_exception(e._exc_info[1])

            except Exception as e:

                done.send_exception(e)

        thread = eventlet.greenthread.spawn(_inner)

        if timeout:

            start_time = time.time()

            while not done.ready():

                eventlet.greenthread.sleep(1)

                cur_time = time.time()

                if (cur_time - start_time) > timeout:

                    thread.kill()

                    raise rpc_common.Timeout()

        return done.wait()

**** CubicPower OpenStack Study ****

        def _inner():

            ctxt = RpcContext.from_dict(context.to_dict())

            try:

                rval = self.proxy.dispatch(context, version, method,

                                           namespace, **args)

                res = []

                # Caller might have called ctxt.reply() manually

                for (reply, failure) in ctxt._response:

                    if failure:

                        six.reraise(failure[0], failure[1], failure[2])

                    res.append(reply)

                # if ending not 'sent'...we might have more data to

                # return from the function itself

                if not ctxt._done:

                    if inspect.isgenerator(rval):

                        for val in rval:

                            res.append(val)

                    else:

                        res.append(rval)

                done.send(res)

            except rpc_common.ClientException as e:

                done.send_exception(e._exc_info[1])

            except Exception as e:

                done.send_exception(e)

        thread = eventlet.greenthread.spawn(_inner)

        if timeout:

            start_time = time.time()

            while not done.ready():

                eventlet.greenthread.sleep(1)

                cur_time = time.time()

                if (cur_time - start_time) > timeout:

                    thread.kill()

                    raise rpc_common.Timeout()

        return done.wait()

**** CubicPower OpenStack Study ****

class Connection(object):

"""Connection object."""

**** CubicPower OpenStack Study ****

    def __init__(self):

        self.consumers = []

**** CubicPower OpenStack Study ****

    def create_consumer(self, topic, proxy, fanout=False):

        consumer = Consumer(topic, proxy)

        self.consumers.append(consumer)

        if topic not in CONSUMERS:

            CONSUMERS[topic] = []

        CONSUMERS[topic].append(consumer)

**** CubicPower OpenStack Study ****

    def close(self):

        for consumer in self.consumers:

            CONSUMERS[consumer.topic].remove(consumer)

        self.consumers = []

**** CubicPower OpenStack Study ****

    def consume_in_thread(self):

        pass

def create_connection(conf, new=True):

    """Create a connection."""

    return Connection()

def check_serialize(msg):

    """Make sure a message intended for rpc can be serialized."""

    json.dumps(msg)

def multicall(conf, context, topic, msg, timeout=None):

    """Make a call that returns multiple times."""

    check_serialize(msg)

    method = msg.get('method')

    if not method:

        return

    args = msg.get('args', {})

    version = msg.get('version')

    namespace = msg.get('namespace')

    try:

        consumer = CONSUMERS[topic][0]

    except (KeyError, IndexError):

        raise rpc_common.Timeout("No consumers available")

    else:

        return consumer.call(context, version, method, namespace, args,

                             timeout)

def call(conf, context, topic, msg, timeout=None):

    """Sends a message on a topic and wait for a response."""

    rv = multicall(conf, context, topic, msg, timeout)

    # NOTE(vish): return the last result from the multicall

    rv = list(rv)

    if not rv:

        return

    return rv[-1]

def cast(conf, context, topic, msg):

    check_serialize(msg)

    try:

        call(conf, context, topic, msg)

    except Exception:

        pass

def notify(conf, context, topic, msg, envelope):

    check_serialize(msg)

def cleanup():

    pass

def fanout_cast(conf, context, topic, msg):

    """Cast to all consumers of a topic."""

    check_serialize(msg)

    method = msg.get('method')

    if not method:

        return

    args = msg.get('args', {})

    version = msg.get('version')

    namespace = msg.get('namespace')

    for consumer in CONSUMERS.get(topic, []):

        try:

            consumer.call(context, version, method, namespace, args, None)

        except Exception:

            pass

**** CubicPower OpenStack Study ****

def create_connection(conf, new=True):

    """Create a connection."""

    return Connection()

**** CubicPower OpenStack Study ****

def check_serialize(msg):

    """Make sure a message intended for rpc can be serialized."""

    json.dumps(msg)

**** CubicPower OpenStack Study ****

def multicall(conf, context, topic, msg, timeout=None):

    """Make a call that returns multiple times."""

    check_serialize(msg)

    method = msg.get('method')

    if not method:

        return

    args = msg.get('args', {})

    version = msg.get('version')

    namespace = msg.get('namespace')

    try:

        consumer = CONSUMERS[topic][0]

    except (KeyError, IndexError):

        raise rpc_common.Timeout("No consumers available")

    else:

        return consumer.call(context, version, method, namespace, args,

                             timeout)

**** CubicPower OpenStack Study ****

def call(conf, context, topic, msg, timeout=None):

    """Sends a message on a topic and wait for a response."""

    rv = multicall(conf, context, topic, msg, timeout)

    # NOTE(vish): return the last result from the multicall

    rv = list(rv)

    if not rv:

        return

    return rv[-1]

**** CubicPower OpenStack Study ****

def cast(conf, context, topic, msg):

    check_serialize(msg)

    try:

        call(conf, context, topic, msg)

    except Exception:

        pass

**** CubicPower OpenStack Study ****

def notify(conf, context, topic, msg, envelope):

    check_serialize(msg)

**** CubicPower OpenStack Study ****

def cleanup():

    pass

**** CubicPower OpenStack Study ****

def fanout_cast(conf, context, topic, msg):

    """Cast to all consumers of a topic."""

    check_serialize(msg)

    method = msg.get('method')

    if not method:

        return

    args = msg.get('args', {})

    version = msg.get('version')

    namespace = msg.get('namespace')

    for consumer in CONSUMERS.get(topic, []):

        try:

            consumer.call(context, version, method, namespace, args, None)

        except Exception:

            pass