¡@

Home 

OpenStack Study: dispatcher.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2014 Embrane, Inc.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

#

# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com

from eventlet import greenthread

from eventlet import queue

from heleosapi import exceptions as h_exc

from neutron.openstack.common import log as logging

from neutron.plugins.embrane.common import contexts as ctx

from neutron.services.loadbalancer.drivers.embrane.agent import lb_operations

from neutron.services.loadbalancer.drivers.embrane import constants as econ

LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

class Dispatcher(object):

**** CubicPower OpenStack Study ****

    def __init__(self, driver, async=True):

        self._async = async

        self._driver = driver

        self.sync_items = dict()

        self.handlers = lb_operations.handlers

**** CubicPower OpenStack Study ****

    def dispatch_lb(self, d_context, *args, **kwargs):

        item = d_context.item

        event = d_context.event

        n_context = d_context.n_context

        chain = d_context.chain

        item_id = item["id"]

        if event in self.handlers:

            for f in self.handlers[event]:

                first_run = False

                if item_id not in self.sync_items:

                    self.sync_items[item_id] = [queue.Queue()]

                    first_run = True

                self.sync_items[item_id][0].put(

                    ctx.OperationContext(event, n_context, item, chain, f,

                                         args, kwargs))

                if first_run:

                    t = greenthread.spawn(self._consume_lb,

                                          item_id,

                                          self.sync_items[item_id][0],

                                          self._driver,

                                          self._async)

                    self.sync_items[item_id].append(t)

                if not self._async:

                    t = self.sync_items[item_id][1]

                    t.wait()

**** CubicPower OpenStack Study ****

    def _consume_lb(self, sync_item, sync_queue, driver, a_sync):

        current_state = None

        while True:

            try:

                if current_state == econ.DELETED:

                    del self.sync_items[sync_item]

                    return

                try:

                    operation_context = sync_queue.get(

                        block=a_sync,

                        timeout=econ.QUEUE_TIMEOUT)

                except queue.Empty:

                    del self.sync_items[sync_item]

                    return

                (operation_context.chain and

                 operation_context.chain.execute_all())

                transient_state = None

                try:

                    transient_state = operation_context.function(

                        driver, operation_context.n_context,

                        operation_context.item, *operation_context.args,

                        **operation_context.kwargs)

                except (h_exc.PendingDva, h_exc.DvaNotFound,

                        h_exc.BrokenInterface, h_exc.DvaCreationFailed,

                        h_exc.BrokenDva, h_exc.ConfigurationFailed) as ex:

                    LOG.warning(econ.error_map[type(ex)], ex.message)

                except h_exc.DvaDeleteFailed as ex:

                    LOG.warning(econ.error_map[type(ex)], ex.message)

                    transient_state = econ.DELETED

                finally:

                    # if the returned transient state is None, no operations

                    # are required on the DVA status

                    if transient_state == econ.DELETED:

                        current_state = driver._delete_vip(

                            operation_context.n_context,

                            operation_context.item)

                        # Error state cannot be reverted

                    else:

                        driver._update_vip_graph_state(

                            operation_context.n_context,

                            operation_context.item)

            except Exception:

                LOG.exception(_('Unhandled exception occurred'))