#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo.config import cfg
from neutron.agent.common import config
from neutron.openstack.common.notifier import test_notifier
from neutron.openstack.common import uuidutils
from neutron.services.metering.agents import metering_agent
from neutron.tests import base
_uuid = uuidutils.generate_uuid
TENANT_ID = _uuid()
LABEL_ID = _uuid()
ROUTERS = [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': TENANT_ID,
'_metering_labels': [{'rules': [],
'id': LABEL_ID}],
'id': _uuid()}]
**** CubicPower OpenStack Study ****
class TestMeteringOperations(base.BaseTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(TestMeteringOperations, self).setUp()
cfg.CONF.register_opts(metering_agent.MeteringAgent.Opts)
config.register_root_helper(cfg.CONF)
self.noop_driver = ('neutron.services.metering.drivers.noop.'
'noop_driver.NoopMeteringDriver')
cfg.CONF.set_override('driver', self.noop_driver)
cfg.CONF.set_override('measure_interval', 0)
cfg.CONF.set_override('report_interval', 0)
self.setup_notification_driver()
metering_rpc = ('neutron.services.metering.agents.metering_agent.'
'MeteringPluginRpc._get_sync_data_metering')
self.metering_rpc_patch = mock.patch(metering_rpc, return_value=[])
self.metering_rpc_patch.start()
self.driver_patch = mock.patch(self.noop_driver, autospec=True)
self.driver_patch.start()
loopingcall_patch = mock.patch(
'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall')
loopingcall_patch.start()
self.agent = metering_agent.MeteringAgent('my agent', cfg.CONF)
self.driver = self.agent.metering_driver
**** CubicPower OpenStack Study ****
def test_add_metering_label(self):
self.agent.add_metering_label(None, ROUTERS)
self.assertEqual(self.driver.add_metering_label.call_count, 1)
**** CubicPower OpenStack Study ****
def test_remove_metering_label(self):
self.agent.remove_metering_label(None, ROUTERS)
self.assertEqual(self.driver.remove_metering_label.call_count, 1)
**** CubicPower OpenStack Study ****
def test_update_metering_label_rule(self):
self.agent.update_metering_label_rules(None, ROUTERS)
self.assertEqual(self.driver.update_metering_label_rules.call_count, 1)
**** CubicPower OpenStack Study ****
def test_routers_updated(self):
self.agent.routers_updated(None, ROUTERS)
self.assertEqual(self.driver.update_routers.call_count, 1)
**** CubicPower OpenStack Study ****
def test_get_traffic_counters(self):
self.agent._get_traffic_counters(None, ROUTERS)
self.assertEqual(self.driver.get_traffic_counters.call_count, 1)
**** CubicPower OpenStack Study ****
def test_notification_report(self):
self.agent.routers_updated(None, ROUTERS)
self.driver.get_traffic_counters.return_value = {LABEL_ID:
{'pkts': 88,
'bytes': 444}}
self.agent._metering_loop()
self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0)
for n in test_notifier.NOTIFICATIONS:
if n['event_type'] == 'l3.meter':
break
self.assertEqual(n['event_type'], 'l3.meter')
payload = n['payload']
self.assertEqual(payload['tenant_id'], TENANT_ID)
self.assertEqual(payload['label_id'], LABEL_ID)
self.assertEqual(payload['pkts'], 88)
self.assertEqual(payload['bytes'], 444)
**** CubicPower OpenStack Study ****
def test_router_deleted(self):
label_id = _uuid()
self.driver.get_traffic_counters = mock.MagicMock()
self.driver.get_traffic_counters.return_value = {label_id:
{'pkts': 44,
'bytes': 222}}
self.agent._add_metering_info = mock.MagicMock()
self.agent.routers_updated(None, ROUTERS)
self.agent.router_deleted(None, ROUTERS[0]['id'])
self.assertEqual(self.agent._add_metering_info.call_count, 1)
self.assertEqual(self.driver.remove_router.call_count, 1)
self.agent._add_metering_info.assert_called_with(label_id, 44, 222)
**** CubicPower OpenStack Study ****
class TestMeteringDriver(base.BaseTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(TestMeteringDriver, self).setUp()
cfg.CONF.register_opts(metering_agent.MeteringAgent.Opts)
config.register_root_helper(cfg.CONF)
self.noop_driver = ('neutron.services.metering.drivers.noop.'
'noop_driver.NoopMeteringDriver')
cfg.CONF.set_override('driver', self.noop_driver)
self.agent = metering_agent.MeteringAgent('my agent', cfg.CONF)
self.driver = mock.Mock()
self.agent.metering_driver = self.driver
**** CubicPower OpenStack Study ****
def test_add_metering_label_with_bad_driver_impl(self):
del self.driver.add_metering_label
with mock.patch.object(metering_agent, 'LOG') as log:
self.agent.add_metering_label(None, ROUTERS)
log.exception.assert_called_with(mock.ANY,
{'driver': self.noop_driver,
'func': 'add_metering_label'})
**** CubicPower OpenStack Study ****
def test_add_metering_label_runtime_error(self):
self.driver.add_metering_label.side_effect = RuntimeError
with mock.patch.object(metering_agent, 'LOG') as log:
self.agent.add_metering_label(None, ROUTERS)
log.exception.assert_called_with(mock.ANY,
{'driver': self.noop_driver,
'func':
'add_metering_label'})