**** CubicPower OpenStack Study ****
# Copyright (c) 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mock
from neutron.common import exceptions
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.nsxlib import queue as queuelib
from neutron.tests.unit.vmware.nsxlib import base
**** CubicPower OpenStack Study ****
class TestLogicalQueueLib(base.NsxlibTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(TestLogicalQueueLib, self).setUp()
self.fake_queue = {
'name': 'fake_queue',
'min': 0, 'max': 256,
'dscp': 0, 'qos_marking': False
}
**** CubicPower OpenStack Study ****
def test_create_and_get_lqueue(self):
queue_id = queuelib.create_lqueue(
self.fake_cluster, self.fake_queue)
queue_res = queuelib.do_request(
'GET',
queuelib._build_uri_path('lqueue', resource_id=queue_id),
cluster=self.fake_cluster)
self.assertEqual(queue_id, queue_res['uuid'])
self.assertEqual('fake_queue', queue_res['display_name'])
**** CubicPower OpenStack Study ****
def test_create_lqueue_nsx_error_raises(self):
def raise_nsx_exc(*args, **kwargs):
raise api_exc.NsxApiException()
with mock.patch.object(queuelib, 'do_request', new=raise_nsx_exc):
self.assertRaises(
exceptions.NeutronException, queuelib.create_lqueue,
self.fake_cluster, self.fake_queue)
**** CubicPower OpenStack Study ****
def raise_nsx_exc(*args, **kwargs):
raise api_exc.NsxApiException()
with mock.patch.object(queuelib, 'do_request', new=raise_nsx_exc):
self.assertRaises(
exceptions.NeutronException, queuelib.create_lqueue,
self.fake_cluster, self.fake_queue)
**** CubicPower OpenStack Study ****
def test_delete_lqueue(self):
queue_id = queuelib.create_lqueue(
self.fake_cluster, self.fake_queue)
queuelib.delete_lqueue(self.fake_cluster, queue_id)
self.assertRaises(exceptions.NotFound,
queuelib.do_request,
'GET',
queuelib._build_uri_path(
'lqueue', resource_id=queue_id),
cluster=self.fake_cluster)
**** CubicPower OpenStack Study ****
def test_delete_non_existing_lqueue_raises(self):
self.assertRaises(exceptions.NeutronException,
queuelib.delete_lqueue,
self.fake_cluster, 'whatever')