**** CubicPower OpenStack Study ****
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for linuxbridge rpc
"""
import fixtures
from oslo.config import cfg
from neutron.agent import rpc as agent_rpc
from neutron.common import topics
from neutron.openstack.common import context
from neutron.plugins.linuxbridge import lb_neutron_plugin as plb
from neutron.tests import base
**** CubicPower OpenStack Study ****
class rpcApiTestCase(base.BaseTestCase):
**** CubicPower OpenStack Study ****
def _test_lb_api(self, rpcapi, topic, method, rpc_method,
expected_msg=None, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
if not expected_msg:
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if expected_retval:
return expected_retval
self.useFixture(fixtures.MonkeyPatch(
'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(expected_retval, retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(expected_arg, arg)
**** CubicPower OpenStack Study ****
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if expected_retval:
return expected_retval
self.useFixture(fixtures.MonkeyPatch(
'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(expected_retval, retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(expected_arg, arg)
**** CubicPower OpenStack Study ****
def test_delete_network(self):
rpcapi = plb.AgentNotifierApi(topics.AGENT)
self._test_lb_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.NETWORK,
topics.DELETE),
'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
**** CubicPower OpenStack Study ****
def test_port_update(self):
cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
rpcapi = plb.AgentNotifierApi(topics.AGENT)
expected_msg = rpcapi.make_msg('port_update',
port='fake_port',
network_type='vlan',
physical_network='fake_net',
segmentation_id='fake_vlan_id')
self._test_lb_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
expected_msg=expected_msg,
port='fake_port',
physical_network='fake_net',
vlan_id='fake_vlan_id')
**** CubicPower OpenStack Study ****
def test_port_update_old_agent(self):
cfg.CONF.set_override('rpc_support_old_agents', True, 'AGENT')
rpcapi = plb.AgentNotifierApi(topics.AGENT)
expected_msg = rpcapi.make_msg('port_update',
port='fake_port',
network_type='vlan',
physical_network='fake_net',
segmentation_id='fake_vlan_id',
vlan_id='fake_vlan_id')
self._test_lb_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
expected_msg=expected_msg,
port='fake_port',
physical_network='fake_net',
vlan_id='fake_vlan_id')
**** CubicPower OpenStack Study ****
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
**** CubicPower OpenStack Study ****
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
host='fake_host')
**** CubicPower OpenStack Study ****
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
host='fake_host')