¡@

Home 

OpenStack Study: ofa_neutron_agent.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (C) 2014 VA Linux Systems Japan K.K.

# Based on openvswitch agent.

#

# Copyright 2011 VMware, Inc.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.

import time

from oslo.config import cfg

from ryu.app.ofctl import api as ryu_api

from ryu.base import app_manager

from ryu.lib import hub

from ryu.ofproto import ofproto_v1_3 as ryu_ofp13

from neutron.agent.linux import ip_lib

from neutron.agent.linux import ovs_lib

from neutron.agent.linux import polling

from neutron.agent.linux import utils

from neutron.agent import rpc as agent_rpc

from neutron.agent import securitygroups_rpc as sg_rpc

from neutron.common import constants as n_const

from neutron.common import topics

from neutron.common import utils as n_utils

from neutron import context

from neutron.extensions import securitygroup as ext_sg

from neutron.openstack.common import log as logging

from neutron.openstack.common import loopingcall

from neutron.openstack.common.rpc import common as rpc_common

from neutron.openstack.common.rpc import dispatcher

from neutron.plugins.common import constants as p_const

from neutron.plugins.ofagent.common import config # noqa

from neutron.plugins.openvswitch.common import constants

LOG = logging.getLogger(__name__)

# A placeholder for dead vlans.

DEAD_VLAN_TAG = str(n_const.MAX_VLAN_TAG + 1)

# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'

# attributes set).

**** CubicPower OpenStack Study ****

class LocalVLANMapping:

**** CubicPower OpenStack Study ****

    def __init__(self, vlan, network_type, physical_network, segmentation_id,

                 vif_ports=None):

        if vif_ports is None:

            vif_ports = {}

        self.vlan = vlan

        self.network_type = network_type

        self.physical_network = physical_network

        self.segmentation_id = segmentation_id

        self.vif_ports = vif_ports

        # set of tunnel ports on which packets should be flooded

        self.tun_ofports = set()

**** CubicPower OpenStack Study ****

    def __str__(self):

        return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %

                (self.vlan, self.network_type, self.physical_network,

                 self.segmentation_id))

**** CubicPower OpenStack Study ****

class Port(object):

"""Represents a neutron port.

Class stores port data in a ORM-free way, so attributres are

still available even if a row has been deleted.

"""

**** CubicPower OpenStack Study ****

    def __init__(self, p):

        self.id = p.id

        self.network_id = p.network_id

        self.device_id = p.device_id

        self.admin_state_up = p.admin_state_up

        self.status = p.status

**** CubicPower OpenStack Study ****

    def __eq__(self, other):

        """Compare only fields that will cause us to re-wire."""

        try:

            return (other and self.id == other.id

                    and self.admin_state_up == other.admin_state_up)

        except Exception:

            return False

**** CubicPower OpenStack Study ****

    def __ne__(self, other):

        return not self.__eq__(other)

**** CubicPower OpenStack Study ****

    def __hash__(self):

        return hash(self.id)

**** CubicPower OpenStack Study ****

class OVSBridge(ovs_lib.OVSBridge):

**** CubicPower OpenStack Study ****

    def __init__(self, br_name, root_helper, ryuapp):

        super(OVSBridge, self).__init__(br_name, root_helper)

        self.datapath_id = None

        self.datapath = None

        self.ofparser = None

        self.ryuapp = ryuapp

**** CubicPower OpenStack Study ****

    def find_datapath_id(self):

        self.datapath_id = self.get_datapath_id()

**** CubicPower OpenStack Study ****

    def get_datapath(self, retry_max=cfg.CONF.AGENT.get_datapath_retry_times):

        retry = 0

        while self.datapath is None:

            self.datapath = ryu_api.get_datapath(self.ryuapp,

                                                 int(self.datapath_id, 16))

            retry += 1

            if retry >= retry_max:

                LOG.error(_('Agent terminated!: Failed to get a datapath.'))

                raise SystemExit(1)

            time.sleep(1)

        self.ofparser = self.datapath.ofproto_parser

**** CubicPower OpenStack Study ****

    def setup_ofp(self, controller_names=None,

                  protocols='OpenFlow13',

                  retry_max=cfg.CONF.AGENT.get_datapath_retry_times):

        if not controller_names:

            host = cfg.CONF.ofp_listen_host

            if not host:

                # 127.0.0.1 is a default for agent style of controller

                host = '127.0.0.1'

            controller_names = ["tcp:%s:%d" % (host,

                                               cfg.CONF.ofp_tcp_listen_port)]

        try:

            self.set_protocols(protocols)

            self.set_controller(controller_names)

        except RuntimeError:

            LOG.exception(_("Agent terminated"))

            raise SystemExit(1)

        self.find_datapath_id()

        self.get_datapath(retry_max)

**** CubicPower OpenStack Study ****

class OFAPluginApi(agent_rpc.PluginApi, sg_rpc.SecurityGroupServerRpcApiMixin):

pass

**** CubicPower OpenStack Study ****

class OFASecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):

**** CubicPower OpenStack Study ****

    def __init__(self, context, plugin_rpc, root_helper):

        self.context = context

        self.plugin_rpc = plugin_rpc

        self.root_helper = root_helper

        self.init_firewall()

**** CubicPower OpenStack Study ****

class OFANeutronAgentRyuApp(app_manager.RyuApp):

OFP_VERSIONS = [ryu_ofp13.OFP_VERSION]

**** CubicPower OpenStack Study ****

    def start(self):

        super(OFANeutronAgentRyuApp, self).start()

        return hub.spawn(self._agent_main, self)

**** CubicPower OpenStack Study ****

    def _agent_main(self, ryuapp):

        cfg.CONF.register_opts(ip_lib.OPTS)

        n_utils.log_opt_values(LOG)

        try:

            agent_config = create_agent_config_map(cfg.CONF)

        except ValueError:

            LOG.exception(_("Agent failed to create agent config map"))

            raise SystemExit(1)

        is_xen_compute_host = ('rootwrap-xen-dom0' in

                               agent_config['root_helper'])

        if is_xen_compute_host:

            # Force ip_lib to always use the root helper to ensure that ip

            # commands target xen dom0 rather than domU.

            cfg.CONF.set_default('ip_lib_force_root', True)

        agent = OFANeutronAgent(ryuapp, **agent_config)

        # Start everything.

        LOG.info(_("Agent initialized successfully, now running... "))

        agent.daemon_loop()

**** CubicPower OpenStack Study ****

class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):

"""A agent for OpenFlow Agent ML2 mechanism driver.

OFANeutronAgent is a OpenFlow Agent agent for a ML2 plugin.

This is as a ryu application thread.

- An agent acts as an OpenFlow controller on each compute nodes.

- OpenFlow 1.3 (vendor agnostic unlike OVS extensions).

"""

# history

# 1.0 Initial version

# 1.1 Support Security Group RPC

RPC_API_VERSION = '1.1'

**** CubicPower OpenStack Study ****

    def __init__(self, ryuapp, integ_br, tun_br, local_ip,

                 bridge_mappings, root_helper,

                 polling_interval, tunnel_types=None,

                 veth_mtu=None, l2_population=False,

                 minimize_polling=False,

                 ovsdb_monitor_respawn_interval=(

                     constants.DEFAULT_OVSDBMON_RESPAWN)):

        """Constructor.

        :param ryuapp: object of the ryu app.

        :param integ_br: name of the integration bridge.

        :param tun_br: name of the tunnel bridge.

        :param local_ip: local IP address of this hypervisor.

        :param bridge_mappings: mappings from physical network name to bridge.

        :param root_helper: utility to use when running shell cmds.

        :param polling_interval: interval (secs) to poll DB.

        :param tunnel_types: A list of tunnel types to enable support for in

               the agent. If set, will automatically set enable_tunneling to

               True.

        :param veth_mtu: MTU size for veth interfaces.

        :param minimize_polling: Optional, whether to minimize polling by

               monitoring ovsdb for interface changes.

        :param ovsdb_monitor_respawn_interval: Optional, when using polling

               minimization, the number of seconds to wait before respawning

               the ovsdb monitor.

        """

        self.ryuapp = ryuapp

        self.veth_mtu = veth_mtu

        self.root_helper = root_helper

        self.available_local_vlans = set(xrange(n_const.MIN_VLAN_TAG,

                                                n_const.MAX_VLAN_TAG))

        self.tunnel_types = tunnel_types or []

        self.l2_pop = l2_population

        self.agent_state = {

            'binary': 'neutron-ofa-agent',

            'host': cfg.CONF.host,

            'topic': n_const.L2_AGENT_TOPIC,

            'configurations': {'bridge_mappings': bridge_mappings,

                               'tunnel_types': self.tunnel_types,

                               'tunneling_ip': local_ip,

                               'l2_population': self.l2_pop},

            'agent_type': n_const.AGENT_TYPE_OFA,

            'start_flag': True}

        # Keep track of int_br's device count for use by _report_state()

        self.int_br_device_count = 0

        self.int_br = OVSBridge(integ_br, self.root_helper, self.ryuapp)

        self.setup_rpc()

        self.setup_integration_br()

        self.setup_physical_bridges(bridge_mappings)

        self.local_vlan_map = {}

        self.tun_br_ofports = {p_const.TYPE_GRE: {},

                               p_const.TYPE_VXLAN: {}}

        self.polling_interval = polling_interval

        self.minimize_polling = minimize_polling

        self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval

        self.enable_tunneling = bool(self.tunnel_types)

        self.local_ip = local_ip

        self.tunnel_count = 0

        self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port

        self._check_ovs_version()

        if self.enable_tunneling:

            self.setup_tunnel_br(tun_br)

        # Collect additional bridges to monitor

        self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)

        # Security group agent support

        self.sg_agent = OFASecurityGroupAgent(self.context,

                                              self.plugin_rpc,

                                              self.root_helper)

        # Initialize iteration counter

        self.iter_num = 0

**** CubicPower OpenStack Study ****

    def _check_ovs_version(self):

        if p_const.TYPE_VXLAN in self.tunnel_types:

            try:

                ovs_lib.check_ovs_vxlan_version(self.root_helper)

            except SystemError:

                LOG.exception(_("Agent terminated"))

                raise SystemExit(1)

**** CubicPower OpenStack Study ****

    def _report_state(self):

        # How many devices are likely used by a VM

        self.agent_state.get('configurations')['devices'] = (

            self.int_br_device_count)

        try:

            self.state_rpc.report_state(self.context,

                                        self.agent_state)

            self.agent_state.pop('start_flag', None)

        except Exception:

            LOG.exception(_("Failed reporting state!"))

**** CubicPower OpenStack Study ****

    def ryu_send_msg(self, msg):

        result = ryu_api.send_msg(self.ryuapp, msg)

        LOG.info(_("ryu send_msg() result: %s"), result)

**** CubicPower OpenStack Study ****

    def setup_rpc(self):

        mac = self.int_br.get_local_port_mac()

        self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))

        self.topic = topics.AGENT

        self.plugin_rpc = OFAPluginApi(topics.PLUGIN)

        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

        # RPC network init

        self.context = context.get_admin_context_without_session()

        # Handle updates from service

        self.dispatcher = self.create_rpc_dispatcher()

        # Define the listening consumers for the agent

        consumers = [[topics.PORT, topics.UPDATE],

                     [topics.NETWORK, topics.DELETE],

                     [constants.TUNNEL, topics.UPDATE],

                     [topics.SECURITY_GROUP, topics.UPDATE]]

        self.connection = agent_rpc.create_consumers(self.dispatcher,

                                                     self.topic,

                                                     consumers)

        report_interval = cfg.CONF.AGENT.report_interval

        if report_interval:

            heartbeat = loopingcall.FixedIntervalLoopingCall(

                self._report_state)

            heartbeat.start(interval=report_interval)

**** CubicPower OpenStack Study ****

    def get_net_uuid(self, vif_id):

        for network_id, vlan_mapping in self.local_vlan_map.iteritems():

            if vif_id in vlan_mapping.vif_ports:

                return network_id

**** CubicPower OpenStack Study ****

    def network_delete(self, context, **kwargs):

        network_id = kwargs.get('network_id')

        LOG.debug(_("network_delete received network %s"), network_id)

        # The network may not be defined on this agent

        lvm = self.local_vlan_map.get(network_id)

        if lvm:

            self.reclaim_local_vlan(network_id)

        else:

            LOG.debug(_("Network %s not used on agent."), network_id)

**** CubicPower OpenStack Study ****

    def port_update(self, context, **kwargs):

        port = kwargs.get('port')

        LOG.debug(_("port_update received port %s"), port['id'])

        # Validate that port is on OVS

        vif_port = self.int_br.get_vif_port_by_id(port['id'])

        if not vif_port:

            return

        if ext_sg.SECURITYGROUPS in port:

            self.sg_agent.refresh_firewall()

        network_type = kwargs.get('network_type')

        segmentation_id = kwargs.get('segmentation_id')

        physical_network = kwargs.get('physical_network')

        self.treat_vif_port(vif_port, port['id'], port['network_id'],

                            network_type, physical_network,

                            segmentation_id, port['admin_state_up'])

        try:

            if port['admin_state_up']:

                # update plugin about port status

                self.plugin_rpc.update_device_up(self.context, port['id'],

                                                 self.agent_id,

                                                 cfg.CONF.host)

            else:

                # update plugin about port status

                self.plugin_rpc.update_device_down(self.context, port['id'],

                                                   self.agent_id,

                                                   cfg.CONF.host)

        except rpc_common.Timeout:

            LOG.error(_("RPC timeout while updating port %s"), port['id'])

**** CubicPower OpenStack Study ****

    def tunnel_update(self, context, **kwargs):

        LOG.debug(_("tunnel_update received"))

        if not self.enable_tunneling:

            return

        tunnel_ip = kwargs.get('tunnel_ip')

        tunnel_id = kwargs.get('tunnel_id', tunnel_ip)

        tunnel_type = kwargs.get('tunnel_type')

        if not tunnel_type:

            LOG.error(_("No tunnel_type specified, cannot create tunnels"))

            return

        if tunnel_type not in self.tunnel_types:

            LOG.error(_("tunnel_type %s not supported by agent"), tunnel_type)

            return

        if tunnel_ip == self.local_ip:

            return

        tun_name = '%s-%s' % (tunnel_type, tunnel_id)

        self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)

**** CubicPower OpenStack Study ****

    def create_rpc_dispatcher(self):

        """Get the rpc dispatcher for this manager.

        If a manager would like to set an rpc API version, or support more than

        one class as the target of rpc messages, override this method.

        """

        return dispatcher.RpcDispatcher([self])

**** CubicPower OpenStack Study ****

    def _provision_local_vlan_outbound_for_tunnel(self, lvid,

                                                  segmentation_id, ofports):

        br = self.tun_br

        match = br.ofparser.OFPMatch(

            vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)

        actions = [br.ofparser.OFPActionPopVlan(),

                   br.ofparser.OFPActionSetField(

                       tunnel_id=int(segmentation_id))]

        for ofport in ofports:

            actions.append(br.ofparser.OFPActionOutput(ofport, 0))

        instructions = [br.ofparser.OFPInstructionActions(

                        ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]

        msg = br.ofparser.OFPFlowMod(

            br.datapath,

            table_id=constants.FLOOD_TO_TUN,

            priority=1,

            match=match, instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _provision_local_vlan_inbound_for_tunnel(self, lvid, network_type,

                                                 segmentation_id):

        br = self.tun_br

        match = br.ofparser.OFPMatch(

            tunnel_id=int(segmentation_id))

        actions = [br.ofparser.OFPActionSetField(

            vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)]

        instructions = [

            br.ofparser.OFPInstructionActions(

                ryu_ofp13.OFPIT_APPLY_ACTIONS, actions),

            br.ofparser.OFPInstructionGotoTable(

                table_id=constants.LEARN_FROM_TUN)]

        msg = br.ofparser.OFPFlowMod(

            br.datapath,

            table_id=constants.TUN_TABLE[network_type],

            priority=1,

            match=match,

            instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _local_vlan_for_tunnel(self, lvid, network_type, segmentation_id):

        ofports = [int(ofport) for ofport in

                   self.tun_br_ofports[network_type].values()]

        if ofports:

            self._provision_local_vlan_outbound_for_tunnel(

                lvid, segmentation_id, ofports)

        self._provision_local_vlan_inbound_for_tunnel(lvid, network_type,

                                                      segmentation_id)

**** CubicPower OpenStack Study ****

    def _provision_local_vlan_outbound(self, br, lvid, actions,

                                       physical_network):

        match = br.ofparser.OFPMatch(

            in_port=int(self.phys_ofports[physical_network]),

            vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)

        instructions = [br.ofparser.OFPInstructionActions(

            ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]

        msg = br.ofparser.OFPFlowMod(br.datapath,

                                     priority=4,

                                     match=match,

                                     instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _provision_local_vlan_inbound(self, lvid, vlan_vid, physical_network):

        match = self.int_br.ofparser.OFPMatch(

            in_port=int(self.int_ofports[physical_network]),

            vlan_vid=vlan_vid)

        actions = [self.int_br.ofparser.OFPActionSetField(

            vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT),

            self.int_br.ofparser.OFPActionOutput(

                ryu_ofp13.OFPP_NORMAL, 0)]

        instructions = [self.int_br.ofparser.OFPInstructionActions(

            ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]

        msg = self.int_br.ofparser.OFPFlowMod(

            self.int_br.datapath,

            priority=3,

            match=match,

            instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _local_vlan_for_flat(self, lvid, physical_network):

        br = self.phys_brs[physical_network]

        actions = [br.ofparser.OFPActionPopVlan(),

                   br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]

        self._provision_local_vlan_outbound(

            br, lvid, actions, physical_network)

        self._provision_local_vlan_inbound(lvid, 0xffff, physical_network)

**** CubicPower OpenStack Study ****

    def _local_vlan_for_vlan(self, lvid, physical_network, segmentation_id):

        br = self.phys_brs[physical_network]

        actions = [br.ofparser.OFPActionSetField(

            vlan_vid=int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT),

            br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]

        self._provision_local_vlan_outbound(

            br, lvid, actions, physical_network)

        self._provision_local_vlan_inbound(

            lvid, int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT,

            physical_network)

**** CubicPower OpenStack Study ****

    def provision_local_vlan(self, net_uuid, network_type, physical_network,

                             segmentation_id):

        """Provisions a local VLAN.

        :param net_uuid: the uuid of the network associated with this vlan.

        :param network_type: the network type ('gre', 'vxlan', 'vlan', 'flat',

                                               'local')

        :param physical_network: the physical network for 'vlan' or 'flat'

        :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'

        """

        if not self.available_local_vlans:

            LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)

            return

        lvid = self.available_local_vlans.pop()

        LOG.info(_("Assigning %(vlan_id)s as local vlan for "

                   "net-id=%(net_uuid)s"),

                 {'vlan_id': lvid, 'net_uuid': net_uuid})

        self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type,

                                                         physical_network,

                                                         segmentation_id)

        if network_type in constants.TUNNEL_NETWORK_TYPES:

            if self.enable_tunneling:

                self._local_vlan_for_tunnel(lvid, network_type,

                                            segmentation_id)

            else:

                LOG.error(_("Cannot provision %(network_type)s network for "

                          "net-id=%(net_uuid)s - tunneling disabled"),

                          {'network_type': network_type,

                           'net_uuid': net_uuid})

        elif network_type == p_const.TYPE_FLAT:

            if physical_network in self.phys_brs:

                self._local_vlan_for_flat(lvid, physical_network)

            else:

                LOG.error(_("Cannot provision flat network for "

                            "net-id=%(net_uuid)s - no bridge for "

                            "physical_network %(physical_network)s"),

                          {'net_uuid': net_uuid,

                           'physical_network': physical_network})

        elif network_type == p_const.TYPE_VLAN:

            if physical_network in self.phys_brs:

                self._local_vlan_for_vlan(lvid, physical_network,

                                          segmentation_id)

            else:

                LOG.error(_("Cannot provision VLAN network for "

                            "net-id=%(net_uuid)s - no bridge for "

                            "physical_network %(physical_network)s"),

                          {'net_uuid': net_uuid,

                           'physical_network': physical_network})

        elif network_type == p_const.TYPE_LOCAL:

            # no flows needed for local networks

            pass

        else:

            LOG.error(_("Cannot provision unknown network type "

                        "%(network_type)s for net-id=%(net_uuid)s"),

                      {'network_type': network_type,

                       'net_uuid': net_uuid})

**** CubicPower OpenStack Study ****

    def _reclaim_local_vlan_outbound(self, lvm):

        br = self.phys_brs[lvm.physical_network]

        match = br.ofparser.OFPMatch(

            in_port=self.phys_ofports[lvm.physical_network],

            vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT)

        msg = br.ofparser.OFPFlowMod(br.datapath,

                                     table_id=ryu_ofp13.OFPTT_ALL,

                                     command=ryu_ofp13.OFPFC_DELETE,

                                     out_group=ryu_ofp13.OFPG_ANY,

                                     out_port=ryu_ofp13.OFPP_ANY,

                                     match=match)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _reclaim_local_vlan_inbound(self, lvm, vlan_vid):

        br = self.int_br

        match = br.ofparser.OFPMatch(

            in_port=self.int_ofports[lvm.physical_network],

            vlan_vid=vlan_vid)

        msg = br.ofparser.OFPFlowMod(br.datapath,

                                     table_id=ryu_ofp13.OFPTT_ALL,

                                     command=ryu_ofp13.OFPFC_DELETE,

                                     out_group=ryu_ofp13.OFPG_ANY,

                                     out_port=ryu_ofp13.OFPP_ANY,

                                     match=match)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def reclaim_local_vlan(self, net_uuid):

        """Reclaim a local VLAN.

        :param net_uuid: the network uuid associated with this vlan.

        :param lvm: a LocalVLANMapping object that tracks (vlan, lsw_id,

            vif_ids) mapping.

        """

        lvm = self.local_vlan_map.pop(net_uuid, None)

        if lvm is None:

            LOG.debug(_("Network %s not used on agent."), net_uuid)

            return

        LOG.info(_("Reclaiming vlan = %(vlan_id)s from net-id = %(net_uuid)s"),

                 {'vlan_id': lvm.vlan,

                  'net_uuid': net_uuid})

        if lvm.network_type in constants.TUNNEL_NETWORK_TYPES:

            if self.enable_tunneling:

                match = self.tun_br.ofparser.OFPMatch(

                    tunnel_id=int(lvm.segmentation_id))

                msg = self.tun_br.ofparser.OFPFlowMod(

                    self.tun_br.datapath,

                    table_id=constants.TUN_TABLE[lvm.network_type],

                    command=ryu_ofp13.OFPFC_DELETE,

                    out_group=ryu_ofp13.OFPG_ANY,

                    out_port=ryu_ofp13.OFPP_ANY,

                    match=match)

                self.ryu_send_msg(msg)

                match = self.tun_br.ofparser.OFPMatch(

                    vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT)

                msg = self.tun_br.ofparser.OFPFlowMod(

                    self.tun_br.datapath,

                    table_id=ryu_ofp13.OFPTT_ALL,

                    command=ryu_ofp13.OFPFC_DELETE,

                    out_group=ryu_ofp13.OFPG_ANY,

                    out_port=ryu_ofp13.OFPP_ANY,

                    match=match)

                self.ryu_send_msg(msg)

        elif lvm.network_type == p_const.TYPE_FLAT:

            if lvm.physical_network in self.phys_brs:

                self._reclaim_local_vlan_outbound(lvm)

                self._reclaim_local_vlan_inbound(lvm, 0xffff)

        elif lvm.network_type == p_const.TYPE_VLAN:

            if lvm.physical_network in self.phys_brs:

                self._reclaim_local_vlan_outbound(lvm)

                self._reclaim_local_vlan_inbound(

                    lvm, lvm.segmentation_id | ryu_ofp13.OFPVID_PRESENT)

        elif lvm.network_type == p_const.TYPE_LOCAL:

            # no flows needed for local networks

            pass

        else:

            LOG.error(_("Cannot reclaim unknown network type "

                        "%(network_type)s for net-id=%(net_uuid)s"),

                      {'network_type': lvm.network_type,

                       'net_uuid': net_uuid})

        self.available_local_vlans.add(lvm.vlan)

**** CubicPower OpenStack Study ****

    def port_bound(self, port, net_uuid,

                   network_type, physical_network, segmentation_id):

        """Bind port to net_uuid/lsw_id and install flow for inbound traffic

        to vm.

        :param port: a ovs_lib.VifPort object.

        :param net_uuid: the net_uuid this port is to be associated with.

        :param network_type: the network type ('gre', 'vlan', 'flat', 'local')

        :param physical_network: the physical network for 'vlan' or 'flat'

        :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'

        """

        if net_uuid not in self.local_vlan_map:

            self.provision_local_vlan(net_uuid, network_type,

                                      physical_network, segmentation_id)

        lvm = self.local_vlan_map[net_uuid]

        lvm.vif_ports[port.vif_id] = port

        self.int_br.set_db_attribute("Port", port.port_name, "tag",

                                     str(lvm.vlan))

        if int(port.ofport) != -1:

            match = self.int_br.ofparser.OFPMatch(in_port=port.ofport)

            msg = self.int_br.ofparser.OFPFlowMod(

                self.int_br.datapath,

                table_id=ryu_ofp13.OFPTT_ALL,

                command=ryu_ofp13.OFPFC_DELETE,

                out_group=ryu_ofp13.OFPG_ANY,

                out_port=ryu_ofp13.OFPP_ANY,

                match=match)

            self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def port_unbound(self, vif_id, net_uuid=None):

        """Unbind port.

        Removes corresponding local vlan mapping object if this is its last

        VIF.

        :param vif_id: the id of the vif

        :param net_uuid: the net_uuid this port is associated with.

        """

        net_uuid = net_uuid or self.get_net_uuid(vif_id)

        if not self.local_vlan_map.get(net_uuid):

            LOG.info(_('port_unbound() net_uuid %s not in local_vlan_map'),

                     net_uuid)

            return

        lvm = self.local_vlan_map[net_uuid]

        lvm.vif_ports.pop(vif_id, None)

        if not lvm.vif_ports:

            self.reclaim_local_vlan(net_uuid)

**** CubicPower OpenStack Study ****

    def port_dead(self, port):

        """Once a port has no binding, put it on the "dead vlan".

        :param port: a ovs_lib.VifPort object.

        """

        self.int_br.set_db_attribute("Port", port.port_name, "tag",

                                     DEAD_VLAN_TAG)

        match = self.int_br.ofparser.OFPMatch(in_port=int(port.ofport))

        msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,

                                              priority=2, match=match)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def setup_integration_br(self):

        """Setup the integration bridge.

        Create patch ports and remove all existing flows.

        :param bridge_name: the name of the integration bridge.

        :returns: the integration bridge

        """

        self.int_br.setup_ofp()

        self.int_br.delete_port(cfg.CONF.OVS.int_peer_patch_port)

        msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,

                                              table_id=ryu_ofp13.OFPTT_ALL,

                                              command=ryu_ofp13.OFPFC_DELETE,

                                              out_group=ryu_ofp13.OFPG_ANY,

                                              out_port=ryu_ofp13.OFPP_ANY)

        self.ryu_send_msg(msg)

        # switch all traffic using L2 learning

        actions = [self.int_br.ofparser.OFPActionOutput(

            ryu_ofp13.OFPP_NORMAL, 0)]

        instructions = [self.int_br.ofparser.OFPInstructionActions(

            ryu_ofp13.OFPIT_APPLY_ACTIONS,

            actions)]

        msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,

                                              priority=1,

                                              instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def setup_ancillary_bridges(self, integ_br, tun_br):

        """Setup ancillary bridges - for example br-ex."""

        ovs_bridges = set(ovs_lib.get_bridges(self.root_helper))

        # Remove all known bridges

        ovs_bridges.remove(integ_br)

        if self.enable_tunneling:

            ovs_bridges.remove(tun_br)

        br_names = [self.phys_brs[physical_network].br_name for

                    physical_network in self.phys_brs]

        ovs_bridges.difference_update(br_names)

        # Filter list of bridges to those that have external

        # bridge-id's configured

        br_names = [

            bridge for bridge in ovs_bridges

            if bridge != ovs_lib.get_bridge_external_bridge_id(

                self.root_helper, bridge)

        ]

        ovs_bridges.difference_update(br_names)

        ancillary_bridges = []

        for bridge in ovs_bridges:

            br = OVSBridge(bridge, self.root_helper, self.ryuapp)

            ancillary_bridges.append(br)

        LOG.info(_('ancillary bridge list: %s.'), ancillary_bridges)

        return ancillary_bridges

**** CubicPower OpenStack Study ****

    def _tun_br_sort_incoming_traffic_depend_in_port(self, br):

        match = br.ofparser.OFPMatch(

            in_port=int(self.patch_int_ofport))

        instructions = [br.ofparser.OFPInstructionGotoTable(

            table_id=constants.PATCH_LV_TO_TUN)]

        msg = br.ofparser.OFPFlowMod(br.datapath,

                                     priority=1,

                                     match=match,

                                     instructions=instructions)

        self.ryu_send_msg(msg)

        msg = br.ofparser.OFPFlowMod(br.datapath, priority=0)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _tun_br_goto_table_ucast_unicast(self, br):

        match = br.ofparser.OFPMatch(eth_dst=('00:00:00:00:00:00',

                                              '01:00:00:00:00:00'))

        instructions = [br.ofparser.OFPInstructionGotoTable(

            table_id=constants.UCAST_TO_TUN)]

        msg = br.ofparser.OFPFlowMod(br.datapath,

                                     table_id=constants.PATCH_LV_TO_TUN,

                                     match=match,

                                     instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _tun_br_goto_table_flood_broad_multi_cast(self, br):

        match = br.ofparser.OFPMatch(eth_dst=('01:00:00:00:00:00',

                                              '01:00:00:00:00:00'))

        instructions = [br.ofparser.OFPInstructionGotoTable(

            table_id=constants.FLOOD_TO_TUN)]

        msg = br.ofparser.OFPFlowMod(br.datapath,

                                     table_id=constants.PATCH_LV_TO_TUN,

                                     match=match,

                                     instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _tun_br_set_table_tun_by_tunnel_type(self, br):

        for tunnel_type in constants.TUNNEL_NETWORK_TYPES:

            msg = br.ofparser.OFPFlowMod(

                br.datapath,

                table_id=constants.TUN_TABLE[tunnel_type],

                priority=0)

            self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _tun_br_output_patch_int(self, br):

        actions = [br.ofparser.OFPActionOutput(

            int(self.patch_int_ofport), 0)]

        instructions = [br.ofparser.OFPInstructionActions(

            ryu_ofp13.OFPIT_APPLY_ACTIONS,

            actions)]

        msg = br.ofparser.OFPFlowMod(br.datapath,

                                     table_id=constants.LEARN_FROM_TUN,

                                     priority=1,

                                     instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _tun_br_goto_table_flood_unknown_unicast(self, br):

        instructions = [br.ofparser.OFPInstructionGotoTable(

            table_id=constants.FLOOD_TO_TUN)]

        msg = br.ofparser.OFPFlowMod(br.datapath,

                                     table_id=constants.UCAST_TO_TUN,

                                     priority=0,

                                     instructions=instructions)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _tun_br_default_drop(self, br):

        msg = br.ofparser.OFPFlowMod(

            br.datapath,

            table_id=constants.FLOOD_TO_TUN,

            priority=0)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def setup_tunnel_br(self, tun_br):

        """Setup the tunnel bridge.

        Creates tunnel bridge, and links it to the integration bridge

        using a patch port.

        :param tun_br: the name of the tunnel bridge.

        """

        self.tun_br = OVSBridge(tun_br, self.root_helper, self.ryuapp)

        self.tun_br.reset_bridge()

        self.tun_br.setup_ofp()

        self.patch_tun_ofport = self.int_br.add_patch_port(

            cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)

        self.patch_int_ofport = self.tun_br.add_patch_port(

            cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port)

        if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0:

            LOG.error(_("Failed to create OVS patch port. Cannot have "

                        "tunneling enabled on this agent, since this version "

                        "of OVS does not support tunnels or patch ports. "

                        "Agent terminated!"))

            raise SystemExit(1)

        msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,

                                              table_id=ryu_ofp13.OFPTT_ALL,

                                              command=ryu_ofp13.OFPFC_DELETE,

                                              out_group=ryu_ofp13.OFPG_ANY,

                                              out_port=ryu_ofp13.OFPP_ANY)

        self.ryu_send_msg(msg)

        self._tun_br_sort_incoming_traffic_depend_in_port(self.tun_br)

        self._tun_br_goto_table_ucast_unicast(self.tun_br)

        self._tun_br_goto_table_flood_broad_multi_cast(self.tun_br)

        self._tun_br_set_table_tun_by_tunnel_type(self.tun_br)

        self._tun_br_output_patch_int(self.tun_br)

        self._tun_br_goto_table_flood_unknown_unicast(self.tun_br)

        self._tun_br_default_drop(self.tun_br)

**** CubicPower OpenStack Study ****

    def _phys_br_prepare_create_veth(self, br, int_veth_name, phys_veth_name):

        self.int_br.delete_port(int_veth_name)

        br.delete_port(phys_veth_name)

        if ip_lib.device_exists(int_veth_name, self.root_helper):

            ip_lib.IPDevice(int_veth_name, self.root_helper).link.delete()

            # Give udev a chance to process its rules here, to avoid

            # race conditions between commands launched by udev rules

            # and the subsequent call to ip_wrapper.add_veth

            utils.execute(['/sbin/udevadm', 'settle', '--timeout=10'])

**** CubicPower OpenStack Study ****

    def _phys_br_create_veth(self, br, int_veth_name,

                             phys_veth_name, physical_network, ip_wrapper):

        int_veth, phys_veth = ip_wrapper.add_veth(int_veth_name,

                                                  phys_veth_name)

        self.int_ofports[physical_network] = self.int_br.add_port(int_veth)

        self.phys_ofports[physical_network] = br.add_port(phys_veth)

        return (int_veth, phys_veth)

**** CubicPower OpenStack Study ****

    def _phys_br_block_untranslated_traffic(self, br, physical_network):

        match = br.ofparser.OFPMatch(in_port=int(

            self.int_ofports[physical_network]))

        msg = br.ofparser.OFPFlowMod(self.int_br.datapath,

                                     priority=2, match=match)

        self.ryu_send_msg(msg)

        match = br.ofparser.OFPMatch(in_port=int(

            self.phys_ofports[physical_network]))

        msg = br.ofparser.OFPFlowMod(br.datapath, priority=2, match=match)

        self.ryu_send_msg(msg)

**** CubicPower OpenStack Study ****

    def _phys_br_enable_veth_to_pass_traffic(self, int_veth, phys_veth):

        # enable veth to pass traffic

        int_veth.link.set_up()

        phys_veth.link.set_up()

        if self.veth_mtu:

            # set up mtu size for veth interfaces

            int_veth.link.set_mtu(self.veth_mtu)

            phys_veth.link.set_mtu(self.veth_mtu)

**** CubicPower OpenStack Study ****

    def _phys_br_patch_physical_bridge_with_integration_bridge(

            self, br, physical_network, bridge, ip_wrapper):

        int_veth_name = constants.VETH_INTEGRATION_PREFIX + bridge

        phys_veth_name = constants.VETH_PHYSICAL_PREFIX + bridge

        self._phys_br_prepare_create_veth(br, int_veth_name, phys_veth_name)

        int_veth, phys_veth = self._phys_br_create_veth(br, int_veth_name,

                                                        phys_veth_name,

                                                        physical_network,

                                                        ip_wrapper)

        self._phys_br_block_untranslated_traffic(br, physical_network)

        self._phys_br_enable_veth_to_pass_traffic(int_veth, phys_veth)

**** CubicPower OpenStack Study ****

    def setup_physical_bridges(self, bridge_mappings):

        """Setup the physical network bridges.

        Creates physical network bridges and links them to the

        integration bridge using veths.

        :param bridge_mappings: map physical network names to bridge names.

        """

        self.phys_brs = {}

        self.int_ofports = {}

        self.phys_ofports = {}

        ip_wrapper = ip_lib.IPWrapper(self.root_helper)

        for physical_network, bridge in bridge_mappings.iteritems():

            LOG.info(_("Mapping physical network %(physical_network)s to "

                       "bridge %(bridge)s"),

                     {'physical_network': physical_network,

                      'bridge': bridge})

            # setup physical bridge

            if not ip_lib.device_exists(bridge, self.root_helper):

                LOG.error(_("Bridge %(bridge)s for physical network "

                            "%(physical_network)s does not exist. Agent "

                            "terminated!"),

                          {'physical_network': physical_network,

                           'bridge': bridge})

                raise SystemExit(1)

            br = OVSBridge(bridge, self.root_helper, self.ryuapp)

            br.setup_ofp()

            msg = br.ofparser.OFPFlowMod(br.datapath,

                                         table_id=ryu_ofp13.OFPTT_ALL,

                                         command=ryu_ofp13.OFPFC_DELETE,

                                         out_group=ryu_ofp13.OFPG_ANY,

                                         out_port=ryu_ofp13.OFPP_ANY)

            self.ryu_send_msg(msg)

            actions = [br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]

            instructions = [br.ofparser.OFPInstructionActions(

                ryu_ofp13.OFPIT_APPLY_ACTIONS,

                actions)]

            msg = br.ofparser.OFPFlowMod(br.datapath,

                                         priority=1,

                                         instructions=instructions)

            self.ryu_send_msg(msg)

            self.phys_brs[physical_network] = br

            self._phys_br_patch_physical_bridge_with_integration_bridge(

                br, physical_network, bridge, ip_wrapper)

**** CubicPower OpenStack Study ****

    def update_ports(self, registered_ports):

        ports = self.int_br.get_vif_port_set()

        if ports == registered_ports:

            return

        self.int_br_device_count = len(ports)

        added = ports - registered_ports

        removed = registered_ports - ports

        return {'current': ports,

                'added': added,

                'removed': removed}

**** CubicPower OpenStack Study ****

    def update_ancillary_ports(self, registered_ports):

        ports = set()

        for bridge in self.ancillary_brs:

            ports |= bridge.get_vif_port_set()

        if ports == registered_ports:

            return

        added = ports - registered_ports

        removed = registered_ports - ports

        return {'current': ports,

                'added': added,

                'removed': removed}

**** CubicPower OpenStack Study ****

    def treat_vif_port(self, vif_port, port_id, network_id, network_type,

                       physical_network, segmentation_id, admin_state_up):

        if vif_port:

            if admin_state_up:

                self.port_bound(vif_port, network_id, network_type,

                                physical_network, segmentation_id)

            else:

                self.port_dead(vif_port)

        else:

            LOG.debug(_("No VIF port for port %s defined on agent."), port_id)

**** CubicPower OpenStack Study ****

    def setup_tunnel_port(self, port_name, remote_ip, tunnel_type):

        ofport = self.tun_br.add_tunnel_port(port_name,

                                             remote_ip,

                                             self.local_ip,

                                             tunnel_type,

                                             self.vxlan_udp_port)

        ofport_int = -1

        try:

            ofport_int = int(ofport)

        except (TypeError, ValueError):

            LOG.exception(_("ofport should have a value that can be "

                            "interpreted as an integer"))

        if ofport_int < 0:

            LOG.error(_("Failed to set-up %(type)s tunnel port to %(ip)s"),

                      {'type': tunnel_type, 'ip': remote_ip})

            return 0

        self.tun_br_ofports[tunnel_type][remote_ip] = ofport

        # Add flow in default table to resubmit to the right

        # tunelling table (lvid will be set in the latter)

        match = self.tun_br.ofparser.OFPMatch(in_port=int(ofport))

        instructions = [self.tun_br.ofparser.OFPInstructionGotoTable(

            table_id=constants.TUN_TABLE[tunnel_type])]

        msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,

                                              priority=1,

                                              match=match,

                                              instructions=instructions)

        self.ryu_send_msg(msg)

        ofports = [int(p) for p in self.tun_br_ofports[tunnel_type].values()]

        if ofports:

            # Update flooding flows to include the new tunnel

            for network_id, vlan_mapping in self.local_vlan_map.iteritems():

                if vlan_mapping.network_type == tunnel_type:

                    match = self.tun_br.ofparser.OFPMatch(

                        vlan_vid=int(vlan_mapping.vlan) |

                        ryu_ofp13.OFPVID_PRESENT)

                    actions = [

                        self.tun_br.ofparser.OFPActionPopVlan(),

                        self.tun_br.ofparser.OFPActionSetField(

                            tunnel_id=int(vlan_mapping.segmentation_id))]

                    actions.extend(

                        self.tun_br.ofparser.OFPActionOutput(p, 0)

                        for p in ofports

                    )

                    instructions = [

                        self.tun_br.ofparser.OFPInstructionActions(

                            ryu_ofp13.OFPIT_APPLY_ACTIONS,

                            actions)]

                    msg = self.tun_br.ofparser.OFPFlowMod(

                        self.tun_br.datapath,

                        table_id=constants.FLOOD_TO_TUN,

                        priority=1,

                        match=match,

                        instructions=instructions)

                    self.ryu_send_msg(msg)

        return ofport

**** CubicPower OpenStack Study ****

    def cleanup_tunnel_port(self, tun_ofport, tunnel_type):

        # Check if this tunnel port is still used

        for lvm in self.local_vlan_map.values():

            if tun_ofport in lvm.tun_ofports:

                break

        # If not, remove it

        else:

            for remote_ip, ofport in self.tun_br_ofports[tunnel_type].items():

                if ofport == tun_ofport:

                    port_name = '%s-%s' % (tunnel_type, remote_ip)

                    self.tun_br.delete_port(port_name)

                    self.tun_br_ofports[tunnel_type].pop(remote_ip, None)

**** CubicPower OpenStack Study ****

    def treat_devices_added(self, devices):

        resync = False

        self.sg_agent.prepare_devices_filter(devices)

        for device in devices:

            LOG.info(_("Port %s added"), device)

            try:

                details = self.plugin_rpc.get_device_details(self.context,

                                                             device,

                                                             self.agent_id)

            except Exception as e:

                LOG.debug(_("Unable to get port details for "

                            "%(device)s: %(e)s"),

                          {'device': device, 'e': e})

                resync = True

                continue

            port = self.int_br.get_vif_port_by_id(details['device'])

            if 'port_id' in details:

                LOG.info(_("Port %(device)s updated. Details: %(details)s"),

                         {'device': device, 'details': details})

                self.treat_vif_port(port, details['port_id'],

                                    details['network_id'],

                                    details['network_type'],

                                    details['physical_network'],

                                    details['segmentation_id'],

                                    details['admin_state_up'])

                # update plugin about port status

                self.plugin_rpc.update_device_up(self.context,

                                                 device,

                                                 self.agent_id,

                                                 cfg.CONF.host)

            else:

                LOG.debug(_("Device %s not defined on plugin"), device)

                if (port and int(port.ofport) != -1):

                    self.port_dead(port)

        return resync

**** CubicPower OpenStack Study ****

    def treat_ancillary_devices_added(self, devices):

        resync = False

        for device in devices:

            LOG.info(_("Ancillary Port %s added"), device)

            try:

                self.plugin_rpc.get_device_details(self.context, device,

                                                   self.agent_id)

            except Exception as e:

                LOG.debug(_("Unable to get port details for "

                            "%(device)s: %(e)s"),

                          {'device': device, 'e': e})

                resync = True

                continue

            # update plugin about port status

            self.plugin_rpc.update_device_up(self.context,

                                             device,

                                             self.agent_id,

                                             cfg.CONF.host)

        return resync

**** CubicPower OpenStack Study ****

    def treat_devices_removed(self, devices):

        resync = False

        self.sg_agent.remove_devices_filter(devices)

        for device in devices:

            LOG.info(_("Attachment %s removed"), device)

            try:

                self.plugin_rpc.update_device_down(self.context,

                                                   device,

                                                   self.agent_id,

                                                   cfg.CONF.host)

            except Exception as e:

                LOG.debug(_("port_removed failed for %(device)s: %(e)s"),

                          {'device': device, 'e': e})

                resync = True

                continue

            self.port_unbound(device)

        return resync

**** CubicPower OpenStack Study ****

    def treat_ancillary_devices_removed(self, devices):

        resync = False

        for device in devices:

            LOG.info(_("Attachment %s removed"), device)

            try:

                details = self.plugin_rpc.update_device_down(self.context,

                                                             device,

                                                             self.agent_id,

                                                             cfg.CONF.host)

            except Exception as e:

                LOG.debug(_("port_removed failed for %(device)s: %(e)s"),

                          {'device': device, 'e': e})

                resync = True

                continue

            if details['exists']:

                LOG.info(_("Port %s updated."), device)

                # Nothing to do regarding local networking

            else:

                LOG.debug(_("Device %s not defined on plugin"), device)

        return resync

**** CubicPower OpenStack Study ****

    def process_network_ports(self, port_info):

        resync_add = False

        resync_removed = False

        if 'added' in port_info:

            start = time.time()

            resync_add = self.treat_devices_added(port_info['added'])

            LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "

                        "treat_devices_added completed in %(elapsed).3f"),

                      {'iter_num': self.iter_num,

                       'elapsed': time.time() - start})

        if 'removed' in port_info:

            start = time.time()

            resync_removed = self.treat_devices_removed(port_info['removed'])

            LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "

                        "treat_devices_removed completed in %(elapsed).3f"),

                      {'iter_num': self.iter_num,

                       'elapsed': time.time() - start})

        # If one of the above opertaions fails => resync with plugin

        return (resync_add | resync_removed)

**** CubicPower OpenStack Study ****

    def process_ancillary_network_ports(self, port_info):

        resync_add = False

        resync_removed = False

        if 'added' in port_info:

            start = time.time()

            resync_add = self.treat_ancillary_devices_added(port_info['added'])

            LOG.debug(_("process_ancillary_network_ports - iteration: "

                        "%(iter_num)d - treat_ancillary_devices_added "

                        "completed in %(elapsed).3f"),

                      {'iter_num': self.iter_num,

                       'elapsed': time.time() - start})

        if 'removed' in port_info:

            start = time.time()

            resync_removed = self.treat_ancillary_devices_removed(

                port_info['removed'])

            LOG.debug(_("process_ancillary_network_ports - iteration: "

                        "%(iter_num)d - treat_ancillary_devices_removed "

                        "completed in %(elapsed).3f"),

                      {'iter_num': self.iter_num,

                       'elapsed': time.time() - start})

        # If one of the above opertaions fails => resync with plugin

        return (resync_add | resync_removed)

**** CubicPower OpenStack Study ****

    def tunnel_sync(self):

        resync = False

        try:

            for tunnel_type in self.tunnel_types:

                details = self.plugin_rpc.tunnel_sync(self.context,

                                                      self.local_ip,

                                                      tunnel_type)

                tunnels = details['tunnels']

                for tunnel in tunnels:

                    if self.local_ip != tunnel['ip_address']:

                        tunnel_id = tunnel.get('id', tunnel['ip_address'])

                        tun_name = '%s-%s' % (tunnel_type, tunnel_id)

                        self.setup_tunnel_port(tun_name,

                                               tunnel['ip_address'],

                                               tunnel_type)

        except Exception as e:

            LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"),

                      {'local_ip': self.local_ip, 'e': e})

            resync = True

        return resync

**** CubicPower OpenStack Study ****

    def ovsdb_monitor_loop(self, polling_manager=None):

        if not polling_manager:

            polling_manager = polling.AlwaysPoll()

        sync = True

        ports = set()

        ancillary_ports = set()

        tunnel_sync = True

        while True:

            try:

                start = time.time()

                port_stats = {'regular': {'added': 0, 'removed': 0},

                              'ancillary': {'added': 0, 'removed': 0}}

                LOG.debug(_("Agent ovsdb_monitor_loop - "

                          "iteration:%d started"),

                          self.iter_num)

                if sync:

                    LOG.info(_("Agent out of sync with plugin!"))

                    ports.clear()

                    ancillary_ports.clear()

                    sync = False

                    polling_manager.force_polling()

                # Notify the plugin of tunnel IP

                if self.enable_tunneling and tunnel_sync:

                    LOG.info(_("Agent tunnel out of sync with plugin!"))

                    tunnel_sync = self.tunnel_sync()

                if polling_manager.is_polling_required:

                    LOG.debug(_("Agent ovsdb_monitor_loop - "

                                "iteration:%(iter_num)d - "

                                "starting polling. Elapsed:%(elapsed).3f"),

                              {'iter_num': self.iter_num,

                               'elapsed': time.time() - start})

                    port_info = self.update_ports(ports)

                    LOG.debug(_("Agent ovsdb_monitor_loop - "

                                "iteration:%(iter_num)d - "

                                "port information retrieved. "

                                "Elapsed:%(elapsed).3f"),

                              {'iter_num': self.iter_num,

                               'elapsed': time.time() - start})

                    # notify plugin about port deltas

                    if port_info:

                        LOG.debug(_("Agent loop has new devices!"))

                        # If treat devices fails - must resync with plugin

                        sync = self.process_network_ports(port_info)

                        LOG.debug(_("Agent ovsdb_monitor_loop - "

                                    "iteration:%(iter_num)d - "

                                    "ports processed. Elapsed:%(elapsed).3f"),

                                  {'iter_num': self.iter_num,

                                   'elapsed': time.time() - start})

                        ports = port_info['current']

                        port_stats['regular']['added'] = (

                            len(port_info.get('added', [])))

                        port_stats['regular']['removed'] = (

                            len(port_info.get('removed', [])))

                    # Treat ancillary devices if they exist

                    if self.ancillary_brs:

                        port_info = self.update_ancillary_ports(

                            ancillary_ports)

                        LOG.debug(_("Agent ovsdb_monitor_loop - "

                                    "iteration:%(iter_num)d - "

                                    "ancillary port info retrieved. "

                                    "Elapsed:%(elapsed).3f"),

                                  {'iter_num': self.iter_num,

                                   'elapsed': time.time() - start})

                        if port_info:

                            rc = self.process_ancillary_network_ports(

                                port_info)

                            LOG.debug(_("Agent ovsdb_monitor_loop - "

                                        "iteration:"

                                        "%(iter_num)d - ancillary ports "

                                        "processed. Elapsed:%(elapsed).3f"),

                                      {'iter_num': self.iter_num,

                                       'elapsed': time.time() - start})

                            ancillary_ports = port_info['current']

                            port_stats['ancillary']['added'] = (

                                len(port_info.get('added', [])))

                            port_stats['ancillary']['removed'] = (

                                len(port_info.get('removed', [])))

                            sync = sync | rc

                    polling_manager.polling_completed()

            except Exception:

                LOG.exception(_("Error in agent event loop"))

                sync = True

                tunnel_sync = True

            # sleep till end of polling interval

            elapsed = (time.time() - start)

            LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d "

                        "completed. Processed ports statistics:"

                        "%(port_stats)s. Elapsed:%(elapsed).3f"),

                      {'iter_num': self.iter_num,

                       'port_stats': port_stats,

                       'elapsed': elapsed})

            if (elapsed < self.polling_interval):

                time.sleep(self.polling_interval - elapsed)

            else:

                LOG.debug(_("Loop iteration exceeded interval "

                            "(%(polling_interval)s vs. %(elapsed)s)!"),

                          {'polling_interval': self.polling_interval,

                           'elapsed': elapsed})

            self.iter_num = self.iter_num + 1

**** CubicPower OpenStack Study ****

    def daemon_loop(self):

        with polling.get_polling_manager(

                self.minimize_polling,

                self.root_helper,

                self.ovsdb_monitor_respawn_interval) as pm:

            self.ovsdb_monitor_loop(polling_manager=pm)

def create_agent_config_map(config):

    """Create a map of agent config parameters.

    :param config: an instance of cfg.CONF

    :returns: a map of agent configuration parameters

    """

    try:

        bridge_mappings = n_utils.parse_mappings(config.OVS.bridge_mappings)

    except ValueError as e:

        raise ValueError(_("Parsing bridge_mappings failed: %s.") % e)

    kwargs = dict(

        integ_br=config.OVS.integration_bridge,

        tun_br=config.OVS.tunnel_bridge,

        local_ip=config.OVS.local_ip,

        bridge_mappings=bridge_mappings,

        root_helper=config.AGENT.root_helper,

        polling_interval=config.AGENT.polling_interval,

        minimize_polling=config.AGENT.minimize_polling,

        tunnel_types=config.AGENT.tunnel_types,

        veth_mtu=config.AGENT.veth_mtu,

        l2_population=False,

        ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,

    )

    # If enable_tunneling is TRUE, set tunnel_type to default to GRE

    if config.OVS.enable_tunneling and not kwargs['tunnel_types']:

        kwargs['tunnel_types'] = [p_const.TYPE_GRE]

    # Verify the tunnel_types specified are valid

    for tun in kwargs['tunnel_types']:

        if tun not in constants.TUNNEL_NETWORK_TYPES:

            msg = _('Invalid tunnel type specificed: %s'), tun

            raise ValueError(msg)

        if not kwargs['local_ip']:

            msg = _('Tunneling cannot be enabled without a valid local_ip.')

            raise ValueError(msg)

    return kwargs

**** CubicPower OpenStack Study ****

def create_agent_config_map(config):

    """Create a map of agent config parameters.

    :param config: an instance of cfg.CONF

    :returns: a map of agent configuration parameters

    """

    try:

        bridge_mappings = n_utils.parse_mappings(config.OVS.bridge_mappings)

    except ValueError as e:

        raise ValueError(_("Parsing bridge_mappings failed: %s.") % e)

    kwargs = dict(

        integ_br=config.OVS.integration_bridge,

        tun_br=config.OVS.tunnel_bridge,

        local_ip=config.OVS.local_ip,

        bridge_mappings=bridge_mappings,

        root_helper=config.AGENT.root_helper,

        polling_interval=config.AGENT.polling_interval,

        minimize_polling=config.AGENT.minimize_polling,

        tunnel_types=config.AGENT.tunnel_types,

        veth_mtu=config.AGENT.veth_mtu,

        l2_population=False,

        ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,

    )

    # If enable_tunneling is TRUE, set tunnel_type to default to GRE

    if config.OVS.enable_tunneling and not kwargs['tunnel_types']:

        kwargs['tunnel_types'] = [p_const.TYPE_GRE]

    # Verify the tunnel_types specified are valid

    for tun in kwargs['tunnel_types']:

        if tun not in constants.TUNNEL_NETWORK_TYPES:

            msg = _('Invalid tunnel type specificed: %s'), tun

            raise ValueError(msg)

        if not kwargs['local_ip']:

            msg = _('Tunneling cannot be enabled without a valid local_ip.')

            raise ValueError(msg)

    return kwargs