

OpenStack Study: eswitch_neutron_agent.py

OpenStack Index

**** CubicPower OpenStack Study ****

# vim: tabstop=4 shiftwidth=4 softtabstop=4


# Copyright 2013 Mellanox Technologies, Ltd


# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at


# http://www.apache.org/licenses/LICENSE-2.0


# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,


# implied.

# See the License for the specific language governing permissions and

# limitations under the License.

import socket

import sys

import time

import eventlet

from oslo.config import cfg

from neutron.agent import rpc as agent_rpc

from neutron.agent import securitygroups_rpc as sg_rpc

from neutron.common import config as logging_config

from neutron.common import constants as q_constants

from neutron.common import topics

from neutron.common import utils as q_utils

from neutron import context

from neutron.openstack.common import log as logging

from neutron.openstack.common import loopingcall

from neutron.openstack.common.rpc import common as rpc_common

from neutron.openstack.common.rpc import dispatcher

from neutron.plugins.common import constants as p_const

from neutron.plugins.mlnx.agent import utils

from neutron.plugins.mlnx.common import config # noqa

from neutron.plugins.mlnx.common import exceptions

LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

class EswitchManager(object):

**** CubicPower OpenStack Study ****

    def __init__(self, interface_mappings, endpoint, timeout):

        self.utils = utils.EswitchUtils(endpoint, timeout)

        self.interface_mappings = interface_mappings

        self.network_map = {}


**** CubicPower OpenStack Study ****

    def get_port_id_by_mac(self, port_mac):

        for network_id, data in self.network_map.iteritems():

            for port in data['ports']:

                if port['port_mac'] == port_mac:

                    return port['port_id']

        err_msg = _("Agent cache inconsistency - port id "

                    "is not stored for %s") % port_mac


        raise exceptions.MlnxException(err_msg)

**** CubicPower OpenStack Study ****

    def get_vnics_mac(self):

        return set(self.utils.get_attached_vnics().keys())

**** CubicPower OpenStack Study ****

    def vnic_port_exists(self, port_mac):

        return port_mac in self.utils.get_attached_vnics()

**** CubicPower OpenStack Study ****

    def remove_network(self, network_id):

        if network_id in self.network_map:

            del self.network_map[network_id]


            LOG.debug(_("Network %s not defined on Agent."), network_id)

**** CubicPower OpenStack Study ****

    def port_down(self, network_id, physical_network, port_mac):

        """Sets port to down.

        Check  internal network map for port data.

        If port exists set port to Down


        for network_id, data in self.network_map.iteritems():

            for port in data['ports']:

                if port['port_mac'] == port_mac:

                    self.utils.port_down(physical_network, port_mac)


        LOG.info(_('Network %s is not available on this agent'), network_id)

**** CubicPower OpenStack Study ****

    def port_up(self, network_id, network_type,

                physical_network, seg_id, port_id, port_mac):

        """Sets port to up.

        Update internal network map with port data.

        -Check if vnic defined

        - configure eswitch vport

        - set port to Up


        LOG.debug(_("Connecting port %s"), port_id)

        if network_id not in self.network_map:

            self.provision_network(port_id, port_mac,

                                   network_id, network_type,

                                   physical_network, seg_id)

        net_map = self.network_map[network_id]

        net_map['ports'].append({'port_id': port_id, 'port_mac': port_mac})

        if network_type == p_const.TYPE_VLAN:

            LOG.info(_('Binding Segmentation ID %(seg_id)s'

                       'to eSwitch for vNIC mac_address %(mac)s'),

                     {'seg_id': seg_id,

                      'mac': port_mac})




            self.utils.port_up(physical_network, port_mac)


            LOG.error(_('Unsupported network type %s'), network_type)

**** CubicPower OpenStack Study ****

    def port_release(self, port_mac):

        """Clear port configuration from eSwitch."""

        for network_id, net_data in self.network_map.iteritems():

            for port in net_data['ports']:

                if port['port_mac'] == port_mac:




        LOG.info(_('Port_mac %s is not available on this agent'), port_mac)

**** CubicPower OpenStack Study ****

    def provision_network(self, port_id, port_mac,

                          network_id, network_type,

                          physical_network, segmentation_id):

        LOG.info(_("Provisioning network %s"), network_id)

        if network_type == p_const.TYPE_VLAN:

            LOG.debug(_("Creating VLAN Network"))


            LOG.error(_("Unknown network type %(network_type)s "

                        "for network %(network_id)s"),

                      {'network_type': network_type,

                       'network_id': network_id})


        data = {

            'physical_network': physical_network,

            'network_type': network_type,

            'ports': [],

            'vlan_id': segmentation_id}

        self.network_map[network_id] = data

**** CubicPower OpenStack Study ****

class MlnxEswitchRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin):

# Set RPC API version to 1.0 by

**** CubicPower OpenStack Study ****

    def __init__(self, context, agent):

        self.context = context

        self.agent = agent

        self.eswitch = agent.eswitch

        self.sg_agent = agent

**** CubicPower OpenStack Study ****

    def network_delete(self, context, **kwargs):

        LOG.debug(_("network_delete received"))

        network_id = kwargs.get('network_id')

        if not network_id:

            LOG.warning(_("Invalid Network ID, cannot remove Network"))


            LOG.debug(_("Delete network %s"), network_id)


**** CubicPower OpenStack Study ****

    def port_update(self, context, **kwargs):

        LOG.debug(_("port_update received"))

        port = kwargs.get('port')

        net_type = kwargs.get('network_type')

        segmentation_id = kwargs.get('segmentation_id')

        if not segmentation_id:

            # compatibility with pre-Havana RPC vlan_id encoding

            segmentation_id = kwargs.get('vlan_id')

        physical_network = kwargs.get('physical_network')

        net_id = port['network_id']

        if self.eswitch.vnic_port_exists(port['mac_address']):

            if 'security_groups' in port:



                if port['admin_state_up']:







                    # update plugin about port status









                    # update plugin about port status






            except rpc_common.Timeout:

                LOG.error(_("RPC timeout while updating port %s"), port['id'])


            LOG.debug(_("No port %s defined on agent."), port['id'])

**** CubicPower OpenStack Study ****

    def create_rpc_dispatcher(self):

        """Get the rpc dispatcher for this manager.

        If a manager would like to set an rpc API version,

        or support more than one class as the target of rpc messages,

        override this method.


        return dispatcher.RpcDispatcher([self])

**** CubicPower OpenStack Study ****

class MlnxEswitchPluginApi(agent_rpc.PluginApi, sg_rpc.SecurityGroupServerRpcApiMixin):


**** CubicPower OpenStack Study ****

class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):

# Set RPC API version to 1.0 by

**** CubicPower OpenStack Study ****

    def __init__(self, interface_mapping):

        self._polling_interval = cfg.CONF.AGENT.polling_interval


        configurations = {'interface_mappings': interface_mapping}

        self.agent_state = {

            'binary': 'neutron-mlnx-agent',

            'host': cfg.CONF.host,

            'topic': q_constants.L2_AGENT_TOPIC,

            'configurations': configurations,

            'agent_type': q_constants.AGENT_TYPE_MLNX,

            'start_flag': True}



**** CubicPower OpenStack Study ****

    def _setup_eswitches(self, interface_mapping):

        daemon = cfg.CONF.ESWITCH.daemon_endpoint

        timeout = cfg.CONF.ESWITCH.request_timeout

        self.eswitch = EswitchManager(interface_mapping, daemon, timeout)

**** CubicPower OpenStack Study ****

    def _report_state(self):


            devices = len(self.eswitch.get_vnics_mac())

            self.agent_state.get('configurations')['devices'] = devices



            self.agent_state.pop('start_flag', None)

        except Exception:

            LOG.exception(_("Failed reporting state!"))

**** CubicPower OpenStack Study ****

    def _setup_rpc(self):

        self.agent_id = 'mlnx-agent.%s' % socket.gethostname()

        LOG.info(_("RPC agent_id: %s"), self.agent_id)

        self.topic = topics.AGENT

        self.plugin_rpc = MlnxEswitchPluginApi(topics.PLUGIN)

        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

        # RPC network init

        self.context = context.get_admin_context_without_session()

        # Handle updates from service

        self.callbacks = MlnxEswitchRpcCallbacks(self.context,


        self.dispatcher = self.callbacks.create_rpc_dispatcher()

        # Define the listening consumers for the agent

        consumers = [[topics.PORT, topics.UPDATE],

                     [topics.NETWORK, topics.DELETE],

                     [topics.SECURITY_GROUP, topics.UPDATE]]

        self.connection = agent_rpc.create_consumers(self.dispatcher,



        report_interval = cfg.CONF.AGENT.report_interval

        if report_interval:

            heartbeat = loopingcall.LoopingCall(self._report_state)


**** CubicPower OpenStack Study ****

    def update_ports(self, registered_ports):

        ports = self.eswitch.get_vnics_mac()

        if ports == registered_ports:


        added = ports - registered_ports

        removed = registered_ports - ports

        return {'current': ports,

                'added': added,

                'removed': removed}

**** CubicPower OpenStack Study ****

    def process_network_ports(self, port_info):

        resync_a = False

        resync_b = False

        if port_info.get('added'):

            LOG.debug(_("Ports added!"))

            resync_a = self.treat_devices_added(port_info['added'])

        if port_info.get('removed'):

            LOG.debug(_("Ports removed!"))

            resync_b = self.treat_devices_removed(port_info['removed'])

        # If one of the above opertaions fails => resync with plugin

        return (resync_a | resync_b)

**** CubicPower OpenStack Study ****

    def treat_vif_port(self, port_id, port_mac,

                       network_id, network_type,

                       physical_network, segmentation_id,


        if self.eswitch.vnic_port_exists(port_mac):

            if admin_state_up:








                self.eswitch.port_down(network_id, physical_network, port_mac)


            LOG.debug(_("No port %s defined on agent."), port_id)

**** CubicPower OpenStack Study ****

    def treat_devices_added(self, devices):

        resync = False

        for device in devices:

            LOG.info(_("Adding port with mac %s"), device)


                dev_details = self.plugin_rpc.get_device_details(




            except Exception as e:

                LOG.debug(_("Unable to get device dev_details for device "

                          "with mac_address %(device)s: due to %(exc)s"),

                          {'device': device, 'exc': e})

                resync = True


            if 'port_id' in dev_details:

                LOG.info(_("Port %s updated"), device)

                LOG.debug(_("Device details %s"), str(dev_details))








                if dev_details.get('admin_state_up'):





                LOG.debug(_("Device with mac_address %s not defined "

                          "on Neutron Plugin"), device)

        return resync

**** CubicPower OpenStack Study ****

    def treat_devices_removed(self, devices):

        resync = False

        for device in devices:

            LOG.info(_("Removing device with mac_address %s"), device)


                port_id = self.eswitch.get_port_id_by_mac(device)

                dev_details = self.plugin_rpc.update_device_down(self.context,




            except Exception as e:

                LOG.debug(_("Removing port failed for device %(device)s "

                          "due to %(exc)s"), {'device': device, 'exc': e})

                resync = True


            if dev_details['exists']:

                LOG.info(_("Port %s updated."), device)


                LOG.debug(_("Device %s not defined on plugin"), device)


        return resync

**** CubicPower OpenStack Study ****

    def daemon_loop(self):

        sync = True

        ports = set()

        LOG.info(_("eSwitch Agent Started!"))

        while True:


                start = time.time()

                if sync:

                    LOG.info(_("Agent out of sync with plugin!"))


                    sync = False

                port_info = self.update_ports(ports)

                # notify plugin about port deltas

                if port_info:

                    LOG.debug(_("Agent loop process devices!"))

                    # If treat devices fails - must resync with plugin

                    sync = self.process_network_ports(port_info)

                    ports = port_info['current']

            except exceptions.RequestTimeout:

                LOG.exception(_("Request timeout in agent event loop "

                                "eSwitchD is not responding - exiting..."))

                raise SystemExit(1)

            except Exception:

                LOG.exception(_("Error in agent event loop"))

                sync = True

            # sleep till end of polling interval

            elapsed = (time.time() - start)

            if (elapsed < self._polling_interval):

                time.sleep(self._polling_interval - elapsed)


                LOG.debug(_("Loop iteration exceeded interval "

                            "(%(polling_interval)s vs. %(elapsed)s)"),

                          {'polling_interval': self._polling_interval,

                           'elapsed': elapsed})

def main():





        interface_mappings = q_utils.parse_mappings(


    except ValueError as e:

        LOG.error(_("Parsing physical_interface_mappings failed: %s."

                    " Agent terminated!"), e)


    LOG.info(_("Interface mappings: %s"), interface_mappings)


        agent = MlnxEswitchNeutronAgent(interface_mappings)

    except Exception as e:

        LOG.error(_("Failed on Agent initialisation : %s."

                    " Agent terminated!"), e)


    # Start everything.

    LOG.info(_("Agent initialised successfully, now running... "))



if __name__ == '__main__':


**** CubicPower OpenStack Study ****

def main():





        interface_mappings = q_utils.parse_mappings(


    except ValueError as e:

        LOG.error(_("Parsing physical_interface_mappings failed: %s."

                    " Agent terminated!"), e)


    LOG.info(_("Interface mappings: %s"), interface_mappings)


        agent = MlnxEswitchNeutronAgent(interface_mappings)

    except Exception as e:

        LOG.error(_("Failed on Agent initialisation : %s."

                    " Agent terminated!"), e)


    # Start everything.

    LOG.info(_("Agent initialised successfully, now running... "))



if __name__ == '__main__':
