**** CubicPower OpenStack Study ****
# Copyright 2012 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob.exc
from cinder.api.contrib import services
from cinder.api import extensions
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import timeutils
from cinder import policy
from cinder import test
from cinder.tests.api import fakes
from datetime import datetime
fake_services_list = [{'binary': 'cinder-scheduler',
'host': 'host1',
'availability_zone': 'cinder',
'id': 1,
'disabled': True,
'updated_at': datetime(2012, 10, 29, 13, 42, 2),
'created_at': datetime(2012, 9, 18, 2, 46, 27),
'disabled_reason': 'test1'},
{'binary': 'cinder-volume',
'host': 'host1',
'availability_zone': 'cinder',
'id': 2,
'disabled': True,
'updated_at': datetime(2012, 10, 29, 13, 42, 5),
'created_at': datetime(2012, 9, 18, 2, 46, 27),
'disabled_reason': 'test2'},
{'binary': 'cinder-scheduler',
'host': 'host2',
'availability_zone': 'cinder',
'id': 3,
'disabled': False,
'updated_at': datetime(2012, 9, 19, 6, 55, 34),
'created_at': datetime(2012, 9, 18, 2, 46, 28),
'disabled_reason': ''},
{'binary': 'cinder-volume',
'host': 'host2',
'availability_zone': 'cinder',
'id': 4,
'disabled': True,
'updated_at': datetime(2012, 9, 18, 8, 3, 38),
'created_at': datetime(2012, 9, 18, 2, 46, 28),
'disabled_reason': 'test4'},
]
**** CubicPower OpenStack Study ****
class FakeRequest(object):
environ = {"cinder.context": context.get_admin_context()}
GET = {}
# NOTE(uni): deprecating service request key, binary takes precedence
# Still keeping service key here for API compatibility sake.
**** CubicPower OpenStack Study ****
class FakeRequestWithService(object):
environ = {"cinder.context": context.get_admin_context()}
GET = {"service": "cinder-volume"}
**** CubicPower OpenStack Study ****
class FakeRequestWithBinary(object):
environ = {"cinder.context": context.get_admin_context()}
GET = {"binary": "cinder-volume"}
**** CubicPower OpenStack Study ****
class FakeRequestWithHost(object):
environ = {"cinder.context": context.get_admin_context()}
GET = {"host": "host1"}
# NOTE(uni): deprecating service request key, binary takes precedence
# Still keeping service key here for API compatibility sake.
**** CubicPower OpenStack Study ****
class FakeRequestWithHostService(object):
environ = {"cinder.context": context.get_admin_context()}
GET = {"host": "host1", "service": "cinder-volume"}
**** CubicPower OpenStack Study ****
class FakeRequestWithHostBinary(object):
environ = {"cinder.context": context.get_admin_context()}
GET = {"host": "host1", "binary": "cinder-volume"}
**** CubicPower OpenStack Study ****
def fake_service_get_all(context):
return fake_services_list
**** CubicPower OpenStack Study ****
def fake_service_get_by_host_binary(context, host, binary):
for service in fake_services_list:
if service['host'] == host and service['binary'] == binary:
return service
return None
**** CubicPower OpenStack Study ****
def fake_service_get_by_id(value):
for service in fake_services_list:
if service['id'] == value:
return service
return None
**** CubicPower OpenStack Study ****
def fake_service_update(context, service_id, values):
service = fake_service_get_by_id(service_id)
if service is None:
raise exception.ServiceNotFound(service_id=service_id)
else:
{'host': 'host1', 'service': 'cinder-volume',
'disabled': values['disabled']}
**** CubicPower OpenStack Study ****
def fake_policy_enforce(context, action, target):
pass
**** CubicPower OpenStack Study ****
def fake_utcnow():
return datetime(2012, 10, 29, 13, 42, 11)
**** CubicPower OpenStack Study ****
class ServicesTest(test.TestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(ServicesTest, self).setUp()
self.stubs.Set(db, "service_get_all", fake_service_get_all)
self.stubs.Set(timeutils, "utcnow", fake_utcnow)
self.stubs.Set(db, "service_get_by_args",
fake_service_get_by_host_binary)
self.stubs.Set(db, "service_update", fake_service_update)
self.stubs.Set(policy, "enforce", fake_policy_enforce)
self.context = context.get_admin_context()
self.ext_mgr = extensions.ExtensionManager()
self.ext_mgr.extensions = {}
self.controller = services.ServiceController(self.ext_mgr)
**** CubicPower OpenStack Study ****
def test_services_list(self):
req = FakeRequest()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-scheduler',
'host': 'host1', 'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime(2012, 10, 29, 13, 42, 2)},
{'binary': 'cinder-volume',
'host': 'host1', 'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime(2012, 10, 29, 13, 42, 5)},
{'binary': 'cinder-scheduler', 'host': 'host2',
'zone': 'cinder',
'status': 'enabled', 'state': 'down',
'updated_at': datetime(2012, 9, 19, 6, 55, 34)},
{'binary': 'cinder-volume', 'host': 'host2',
'zone': 'cinder',
'status': 'disabled', 'state': 'down',
'updated_at': datetime(2012, 9, 18, 8, 3, 38)}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_detail(self):
self.ext_mgr.extensions['os-extended-services'] = True
self.controller = services.ServiceController(self.ext_mgr)
req = FakeRequest()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-scheduler',
'host': 'host1', 'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime(2012, 10, 29, 13, 42, 2),
'disabled_reason': 'test1'},
{'binary': 'cinder-volume',
'host': 'host1', 'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime(2012, 10, 29, 13, 42, 5),
'disabled_reason': 'test2'},
{'binary': 'cinder-scheduler', 'host': 'host2',
'zone': 'cinder',
'status': 'enabled', 'state': 'down',
'updated_at': datetime(2012, 9, 19, 6, 55, 34),
'disabled_reason': ''},
{'binary': 'cinder-volume', 'host': 'host2',
'zone': 'cinder',
'status': 'disabled', 'state': 'down',
'updated_at': datetime(2012, 9, 18, 8, 3, 38),
'disabled_reason': 'test4'}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_list_with_host(self):
req = FakeRequestWithHost()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-scheduler',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime(2012, 10,
29, 13, 42, 2)},
{'binary': 'cinder-volume', 'host': 'host1',
'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5)}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_detail_with_host(self):
self.ext_mgr.extensions['os-extended-services'] = True
self.controller = services.ServiceController(self.ext_mgr)
req = FakeRequestWithHost()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-scheduler',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime(2012, 10,
29, 13, 42, 2),
'disabled_reason': 'test1'},
{'binary': 'cinder-volume', 'host': 'host1',
'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5),
'disabled_reason': 'test2'}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_list_with_service(self):
req = FakeRequestWithService()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5)},
{'binary': 'cinder-volume',
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
'state': 'down',
'updated_at': datetime(2012, 9, 18,
8, 3, 38)}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_detail_with_service(self):
self.ext_mgr.extensions['os-extended-services'] = True
self.controller = services.ServiceController(self.ext_mgr)
req = FakeRequestWithService()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5),
'disabled_reason': 'test2'},
{'binary': 'cinder-volume',
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
'state': 'down',
'updated_at': datetime(2012, 9, 18,
8, 3, 38),
'disabled_reason': 'test4'}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_list_with_binary(self):
req = FakeRequestWithBinary()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5)},
{'binary': 'cinder-volume',
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
'state': 'down',
'updated_at': datetime(2012, 9, 18,
8, 3, 38)}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_detail_with_binary(self):
self.ext_mgr.extensions['os-extended-services'] = True
self.controller = services.ServiceController(self.ext_mgr)
req = FakeRequestWithBinary()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5),
'disabled_reason': 'test2'},
{'binary': 'cinder-volume',
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
'state': 'down',
'updated_at': datetime(2012, 9, 18,
8, 3, 38),
'disabled_reason': 'test4'}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_list_with_host_service(self):
req = FakeRequestWithHostService()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5)}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_detail_with_host_service(self):
self.ext_mgr.extensions['os-extended-services'] = True
self.controller = services.ServiceController(self.ext_mgr)
req = FakeRequestWithHostService()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5),
'disabled_reason': 'test2'}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_list_with_host_binary(self):
req = FakeRequestWithHostBinary()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5)}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_detail_with_host_binary(self):
self.ext_mgr.extensions['os-extended-services'] = True
self.controller = services.ServiceController(self.ext_mgr)
req = FakeRequestWithHostBinary()
res_dict = self.controller.index(req)
response = {'services': [{'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29,
13, 42, 5),
'disabled_reason': 'test2'}]}
self.assertEqual(res_dict, response)
**** CubicPower OpenStack Study ****
def test_services_enable_with_service_key(self):
body = {'host': 'host1', 'service': 'cinder-volume'}
req = fakes.HTTPRequest.blank('/v1/fake/os-services/enable')
res_dict = self.controller.update(req, "enable", body)
self.assertEqual(res_dict['status'], 'enabled')
**** CubicPower OpenStack Study ****
def test_services_enable_with_binary_key(self):
body = {'host': 'host1', 'binary': 'cinder-volume'}
req = fakes.HTTPRequest.blank('/v1/fake/os-services/enable')
res_dict = self.controller.update(req, "enable", body)
self.assertEqual(res_dict['status'], 'enabled')
**** CubicPower OpenStack Study ****
def test_services_disable_with_service_key(self):
req = fakes.HTTPRequest.blank('/v1/fake/os-services/disable')
body = {'host': 'host1', 'service': 'cinder-volume'}
res_dict = self.controller.update(req, "disable", body)
self.assertEqual(res_dict['status'], 'disabled')
**** CubicPower OpenStack Study ****
def test_services_disable_with_binary_key(self):
req = fakes.HTTPRequest.blank('/v1/fake/os-services/disable')
body = {'host': 'host1', 'binary': 'cinder-volume'}
res_dict = self.controller.update(req, "disable", body)
self.assertEqual(res_dict['status'], 'disabled')
**** CubicPower OpenStack Study ****
def test_services_disable_log_reason(self):
self.ext_mgr.extensions['os-extended-services'] = True
self.controller = services.ServiceController(self.ext_mgr)
req = (
fakes.HTTPRequest.blank('v1/fake/os-services/disable-log-reason'))
body = {'host': 'host1',
'binary': 'cinder-scheduler',
'disabled_reason': 'test-reason',
}
res_dict = self.controller.update(req, "disable-log-reason", body)
self.assertEqual(res_dict['status'], 'disabled')
self.assertEqual(res_dict['disabled_reason'], 'test-reason')
**** CubicPower OpenStack Study ****
def test_services_disable_log_reason_none(self):
self.ext_mgr.extensions['os-extended-services'] = True
self.controller = services.ServiceController(self.ext_mgr)
req = (
fakes.HTTPRequest.blank('v1/fake/os-services/disable-log-reason'))
body = {'host': 'host1',
'binary': 'cinder-scheduler',
'disabled_reason': None,
}
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update,
req, "disable-log-reason", body)
**** CubicPower OpenStack Study ****
def test_invalid_reason_field(self):
reason = ' '
self.assertFalse(self.controller._is_valid_as_reason(reason))
reason = 'a' * 256
self.assertFalse(self.controller._is_valid_as_reason(reason))
reason = 'it\'s a valid reason.'
self.assertTrue(self.controller._is_valid_as_reason(reason))
reason = None
self.assertFalse(self.controller._is_valid_as_reason(reason))