OpenStack Study: cinder
OpenStack IndexPreviousNext
\OpenStack\cinder-2014.1\cinder\tests\db\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\declare_conf.py
\OpenStack\cinder-2014.1\cinder\tests\fake_driver.py
class FakeISCSIDriver(lvm.LVMISCSIDriver):
def __init__(self, *args, **kwargs):
def check_for_setup_error(self):
def initialize_connection(self, volume, connector):
def terminate_connection(self, volume, connector, **kwargs):
def fake_execute(cmd, *_args, **_kwargs):
class FakeISERDriver(FakeISCSIDriver):
def __init__(self, *args, **kwargs):
def initialize_connection(self, volume, connector):
def fake_execute(cmd, *_args, **_kwargs):
class LoggingVolumeDriver(driver.VolumeDriver):
def check_for_setup_error(self):
def create_volume(self, volume):
def delete_volume(self, volume):
def clear_volume(self, volume):
def local_path(self, volume):
def ensure_export(self, context, volume):
def create_export(self, context, volume):
def remove_export(self, context, volume):
def initialize_connection(self, volume, connector):
def terminate_connection(self, volume, connector):
def clear_logs():
def log_action(action, parameters):
def all_logs():
def logs_like(action, **kwargs):
\OpenStack\cinder-2014.1\cinder\tests\fake_notifier.py
def reset():
class FakeNotifier(object):
def __init__(self, transport, publisher_id, serializer=None):
def prepare(self, publisher_id=None):
def _notify(self, priority, ctxt, event_type, payload):
def stub_notifier(stubs):
\OpenStack\cinder-2014.1\cinder\tests\fake_utils.py
def fake_execute_get_log():
def fake_execute_clear_log():
def fake_execute_set_repliers(repliers):
def fake_execute_default_reply_handler(*ignore_args, **ignore_kwargs):
def fake_execute(*cmd_parts, **kwargs):
def stub_out_utils_execute(stubs):
\OpenStack\cinder-2014.1\cinder\tests\glance\stubs.py
class StubGlanceClient(object):
def __init__(self, images=None):
def list(self, filters=None, marker=None, limit=30):
def get(self, image_id):
def data(self, image_id):
def create(self, **metadata):
def update(self, image_id, **metadata):
def delete(self, image_id):
class FakeImage(object):
def __init__(self, metadata):
def __getattr__(self, key):
def __setattr__(self, key, value):
\OpenStack\cinder-2014.1\cinder\tests\glance\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\image\fake.py
class _FakeImageService(object):
def __init__(self):
def detail(self, context, **kwargs):
def download(self, context, image_id, data):
def show(self, context, image_id):
def create(self, context, metadata, data=None):
def update(self, context, image_id, metadata, data=None, purge_props=False):
def delete(self, context, image_id):
def get_location(self, context, image_id):
def FakeImageService():
def FakeImageService_reset():
def stub_out_image_service(stubs):
def fake_get_remote_image_service(context, image_href):
\OpenStack\cinder-2014.1\cinder\tests\image\test_glance.py
class NullWriter(object):
def write(self, *arg, **kwargs):
class TestGlanceSerializer(test.TestCase):
def test_serialize(self):
class TestGlanceImageService(test.TestCase):
def setUp(self):
def _create_image_service(self, client):
def _fake_create_glance_client(context, netloc, use_ssl, version):
def _make_fixture(**kwargs):
def _make_datetime_fixture(self):
def test_create_with_instance_id(self):
def test_create_without_instance_id(self):
def test_create(self):
def test_create_and_show_non_existing_image(self):
def test_detail_private_image(self):
def test_detail_marker(self):
def test_detail_limit(self):
def test_detail_default_limit(self):
def test_detail_marker_and_limit(self):
def test_detail_invalid_marker(self):
def test_update(self):
def test_delete(self):
def test_show_passes_through_to_client(self):
def test_show_raises_when_no_authtoken_in_the_context(self):
def test_detail_passes_through_to_client(self):
def test_show_makes_datetimes(self):
def test_detail_makes_datetimes(self):
def test_download_with_retries(self):
def test_client_forbidden_converts_to_imagenotauthed(self):
def test_client_httpforbidden_converts_to_imagenotauthed(self):
def test_client_notfound_converts_to_imagenotfound(self):
def test_client_httpnotfound_converts_to_imagenotfound(self):
def test_glance_client_image_id(self):
def test_glance_client_image_ref(self):
class TestGlanceClientVersion(test.TestCase):
def setUp(self):
def fake_get_model(self):
def test_glance_version_by_flag(self):
def test_glance_version_by_arg(self):
def get(self, image_id):
class TestGlanceImageServiceClient(test.TestCase):
def setUp(self):
def test_create_glance_client(self):
def test_create_glance_client_auth_strategy_is_not_keystone(self):
def test_create_glance_client_glance_request_default_timeout(self):
def tearDown(self):
\OpenStack\cinder-2014.1\cinder\tests\image\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\integrated\api\client.py
class OpenStackApiException(Exception):
def __init__(self, message=None, response=None):
class OpenStackApiAuthenticationException(OpenStackApiException):
def __init__(self, response=None, message=None):
class OpenStackApiAuthorizationException(OpenStackApiException):
def __init__(self, response=None, message=None):
class OpenStackApiNotFoundException(OpenStackApiException):
def __init__(self, response=None, message=None):
class TestOpenStackClient(object):
def __init__(self, auth_user, auth_key, auth_uri):
def request(self, url, method='GET', body=None, headers=None, ssl_verify=True, stream=False):
def _authenticate(self):
def api_request(self, relative_uri, check_response_status=None, **kwargs):
def _decode_json(self, response):
def api_get(self, relative_uri, **kwargs):
def api_post(self, relative_uri, body, **kwargs):
def api_put(self, relative_uri, body, **kwargs):
def api_delete(self, relative_uri, **kwargs):
def get_volume(self, volume_id):
def get_volumes(self, detail=True):
def post_volume(self, volume):
def delete_volume(self, volume_id):
def put_volume(self, volume_id, volume):
\OpenStack\cinder-2014.1\cinder\tests\integrated\api\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\integrated\integrated_helpers.py
def generate_random_alphanumeric(length):
def generate_random_numeric(length):
def generate_new_element(items, prefix, numeric=False):
class _IntegratedTestBase(test.TestCase):
def setUp(self):
def tearDown(self):
def _start_api_service(self):
def _get_flags(self):
def get_unused_server_name(self):
def get_invalid_image(self):
def _build_minimal_create_server_request(self):
\OpenStack\cinder-2014.1\cinder\tests\integrated\test_extensions.py
class ExtensionsTest(integrated_helpers._IntegratedTestBase):
def _get_flags(self):
def test_get_foxnsocks(self):
\OpenStack\cinder-2014.1\cinder\tests\integrated\test_login.py
class LoginTest(integrated_helpers._IntegratedTestBase):
def test_login(self):
\OpenStack\cinder-2014.1\cinder\tests\integrated\test_volumes.py
class VolumesTest(integrated_helpers._IntegratedTestBase):
def setUp(self):
def _start_api_service(self):
def _get_flags(self):
def test_get_volumes_summary(self):
def test_get_volumes(self):
def _poll_while(self, volume_id, continue_states, max_retries=5):
def test_create_and_delete_volume(self):
def test_create_volume_with_metadata(self):
def test_create_volume_in_availability_zone(self):
def test_create_and_update_volume(self):
\OpenStack\cinder-2014.1\cinder\tests\integrated\test_xml.py
class XmlTests(integrated_helpers._IntegratedTestBase):
def test_namespace_volumes(self):
\OpenStack\cinder-2014.1\cinder\tests\integrated\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\keymgr\fake.py
def fake_api():
\OpenStack\cinder-2014.1\cinder\tests\keymgr\mock_key_mgr.py
class MockKeyManager(key_mgr.KeyManager):
def __init__(self):
def _generate_hex_key(self, **kwargs):
def _generate_key(self, **kwargs):
def create_key(self, ctxt, **kwargs):
def _generate_key_id(self):
def store_key(self, ctxt, key, **kwargs):
def copy_key(self, ctxt, key_id, **kwargs):
def get_key(self, ctxt, key_id, **kwargs):
def delete_key(self, ctxt, key_id, **kwargs):
\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_conf_key_mgr.py
class ConfKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def __init__(self, *args, **kwargs):
def _create_key_manager(self):
def setUp(self):
def test___init__(self):
def test_create_key(self):
def test_create_null_context(self):
def test_store_key(self):
def test_store_null_context(self):
def test_store_key_invalid(self):
def test_copy_key(self):
def test_copy_null_context(self):
def test_delete_key(self):
def test_delete_null_context(self):
def test_delete_unknown_key(self):
def test_get_key(self):
def test_get_null_context(self):
def test_get_unknown_key(self):
\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_key.py
class KeyTestCase(test.TestCase):
def _create_key(self):
def setUp(self):
class SymmetricKeyTestCase(KeyTestCase):
def _create_key(self):
def setUp(self):
def test_get_algorithm(self):
def test_get_format(self):
def test_get_encoded(self):
def test___eq__(self):
def test___ne__(self):
\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_key_mgr.py
class KeyManagerTestCase(test.TestCase):
def __init__(self, *args, **kwargs):
def _create_key_manager(self):
def setUp(self):
\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_mock_key_mgr.py
class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def _create_key_manager(self):
def setUp(self):
def test_create_key(self):
def test_create_key_with_length(self):
def test_create_null_context(self):
def test_store_key(self):
def test_store_null_context(self):
def test_copy_key(self):
def test_copy_null_context(self):
def test_get_key(self):
def test_get_null_context(self):
def test_get_unknown_key(self):
def test_delete_key(self):
def test_delete_null_context(self):
def test_delete_unknown_key(self):
\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_not_implemented_key_mgr.py
class NotImplementedKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def _create_key_manager(self):
def setUp(self):
def test_create_key(self):
def test_store_key(self):
def test_copy_key(self):
def test_get_key(self):
def test_delete_key(self):
\OpenStack\cinder-2014.1\cinder\tests\keymgr\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\monkey_patch_example\example_a.py
def example_function_a():
class ExampleClassA():
def example_method(self):
def example_method_add(self, arg1, arg2):
\OpenStack\cinder-2014.1\cinder\tests\monkey_patch_example\example_b.py
def example_function_b():
class ExampleClassB():
def example_method(self):
def example_method_add(self, arg1, arg2):
\OpenStack\cinder-2014.1\cinder\tests\monkey_patch_example\__init__.py
def example_decorator(name, function):
\OpenStack\cinder-2014.1\cinder\tests\runtime_conf.py
\OpenStack\cinder-2014.1\cinder\tests\scheduler\fakes.py
class FakeFilterScheduler(filter_scheduler.FilterScheduler):
def __init__(self, *args, **kwargs):
class FakeHostManager(host_manager.HostManager):
def __init__(self):
class FakeHostState(host_manager.HostState):
def __init__(self, host, attribute_dict):
def mock_host_manager_db_calls(mock_obj):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_allocated_capacity_weigher.py
class AllocatedCapacityWeigherTestCase(test.TestCase):
def setUp(self):
def _get_weighed_host(self, hosts, weight_properties=None):
def _get_all_hosts(self, _mock_service_get_all_by_topic):
def test_default_of_spreading_first(self):
def test_capacity_weight_multiplier1(self):
def test_capacity_weight_multiplier2(self):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_capacity_weigher.py
class CapacityWeigherTestCase(test.TestCase):
def setUp(self):
def _get_weighed_host(self, hosts, weight_properties=None):
def _get_all_hosts(self, _mock_service_get_all_by_topic):
def test_default_of_spreading_first(self):
def test_capacity_weight_multiplier1(self):
def test_capacity_weight_multiplier2(self):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_chance_weigher.py
class ChanceWeigherTestCase(test.TestCase):
def setUp(self):
def fake_random(self, reset=False):
def test_chance_weigher(self, _mock_random):
def test_host_manager_choosing_chance_weigher(self):
def test_use_of_chance_weigher_via_host_manager(self):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_filter_scheduler.py
class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
def test_create_volume_no_hosts(self):
def test_create_volume_non_admin(self, _mock_get_all_host_states):
def fake_get(ctxt):
def test_schedule_happy_day(self, _mock_service_get_all_by_topic):
def test_max_attempts(self):
def test_invalid_max_attempts(self):
def test_retry_disabled(self):
def test_retry_attempt_one(self):
def test_retry_attempt_two(self):
def test_retry_exceeded_max_attempts(self):
def test_add_retry_host(self):
def test_post_select_populate(self):
def _host_passes_filters_setup(self, mock_obj):
def test_host_passes_filters_happy_day(self, _mock_service_get_topic):
def test_host_passes_filters_no_capacity(self, _mock_service_get_topic):
def test_retype_policy_never_migrate_pass(self, _mock_service_get_topic):
def test_retype_policy_never_migrate_fail(self, _mock_service_get_topic):
def test_retype_policy_demand_migrate_pass(self, _mock_service_get_topic):
def test_retype_policy_demand_migrate_fail(self, _mock_service_get_topic):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_host_filters.py
class HostFiltersTestCase(test.TestCase):
def setUp(self):
def test_capacity_filter_passes(self, _mock_serv_is_up):
def test_capacity_filter_fails(self, _mock_serv_is_up):
def test_capacity_filter_passes_infinite(self, _mock_serv_is_up):
def test_capacity_filter_passes_unknown(self, _mock_serv_is_up):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_host_manager.py
class FakeFilterClass1(filters.BaseHostFilter):
def host_passes(self, host_state, filter_properties):
class FakeFilterClass2(filters.BaseHostFilter):
def host_passes(self, host_state, filter_properties):
class HostManagerTestCase(test.TestCase):
def setUp(self):
def test_choose_host_filters_not_found(self):
def test_choose_host_filters(self):
def test_get_filtered_hosts(self, _mock_choose_host_filters):
def test_update_service_capabilities(self, _mock_utcnow):
def test_get_all_host_states(self, _mock_service_is_up, _mock_service_get_all_by_topic):
class HostStateTestCase(test.TestCase):
def test_update_from_volume_capability(self):
def test_update_from_volume_infinite_capability(self):
def test_update_from_volume_unknown_capability(self):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_rpcapi.py
class SchedulerRpcAPITestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _test_scheduler_api(self, method, rpc_method, fanout=False, **kwargs):
def _fake_prepare_method(*args, **kwds):
def _fake_rpc_method(*args, **kwargs):
def test_update_service_capabilities(self):
def test_create_volume(self):
def test_migrate_volume_to_host(self):
def test_retype(self):
def test_manage_existing(self):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_scheduler.py
class SchedulerManagerTestCase(test.TestCase):
def setUp(self):
def test_1_correct_init(self):
def test_update_service_capabilities_empty_dict(self, _mock_update_cap):
def test_update_service_capabilities_correct(self, _mock_update_cap):
def test_create_volume_exception_puts_volume_in_error_state( self, _mock_volume_update, _mock_sched_create):
def test_migrate_volume_exception_returns_volume_state( self, _mock_volume_update, _mock_host_passes):
def test_chance_simple_scheduler_mocked(self):
def test_retype_volume_exception_returns_volume_state(self, _mock_vol_get, _mock_vol_update):
class SchedulerTestCase(test.TestCase):
def setUp(self):
def test_update_service_capabilities(self, _mock_update_cap):
class SchedulerDriverBaseTestCase(SchedulerTestCase):
def test_unimplemented_schedule(self):
class SchedulerDriverModuleTestCase(test.TestCase):
def setUp(self):
def test_volume_host_update_db(self, _mock_utcnow, _mock_vol_update):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_scheduler_options.py
class FakeSchedulerOptions(scheduler_options.SchedulerOptions):
def __init__(self, last_checked, now, file_old, file_now, data, filedata):
def _get_file_timestamp(self, filename):
def _get_file_handle(self, filename):
def _get_time_now(self):
class SchedulerOptionsTestCase(test.TestCase):
def test_get_configuration_first_time_no_flag(self):
def test_get_configuration_first_time_empty_file(self):
def test_get_configuration_first_time_happy_day(self):
def test_get_configuration_second_time_no_change(self):
def test_get_configuration_second_time_too_fast(self):
def test_get_configuration_second_time_change(self):
\OpenStack\cinder-2014.1\cinder\tests\scheduler\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\test_api.py
class FakeHttplibSocket(object):
def __init__(self, response_string):
def makefile(self, _mode, _other):
class FakeHttplibConnection(object):
def __init__(self, app, host, is_secure=False):
def request(self, method, path, data, headers):
def getresponse(self):
def getresponsebody(self):
def close(self):
\OpenStack\cinder-2014.1\cinder\tests\test_api_urlmap.py
class TestParseFunctions(test.TestCase):
def test_unquote_header_value_without_quotes(self):
def test_unquote_header_value_with_quotes(self):
def test_parse_list_header(self):
def test_parse_options_header(self):
def test_parse_options_header_without_value(self):
class TestAccept(test.TestCase):
def test_best_match_ValueError(self):
def test_best_match(self):
def test_match_mask_one_asterisk(self):
def test_match_mask_two_asterisk(self):
def test_match_mask_no_asterisk(self):
def test_content_type_params(self):
def test_content_type_params_wrong_content_type(self):
class TestUrlMapFactory(test.TestCase):
def setUp(self):
def test_not_found_app_in_local_conf(self):
def test_not_found_app_not_in_local_conf(self):
def test_not_found_app_is_none(self):
class TestURLMap(test.TestCase):
def setUp(self):
def test_match_with_applications(self):
def test_match_without_applications(self):
def test_match_path_info_equals_app_url(self):
def test_match_path_info_equals_app_url_many_app(self):
def test_set_script_name(self):
def test_munge_path(self):
def test_content_type_strategy_without_version(self):
def test_content_type_strategy_with_version(self):
def test_path_strategy_wrong_path_info(self):
def test_path_strategy_mime_type_only(self):
def test_path_strategy(self):
def test_path_strategy_wrong_mime_type(self):
def test_accept_strategy_version_not_in_params(self):
def test_accept_strategy_version(self):
\OpenStack\cinder-2014.1\cinder\tests\test_backup.py
class FakeBackupException(Exception):
class BackupTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _create_backup_db_entry(self, volume_id=1, display_name='test_backup', display_description='this is a test backup', container='volumebackups', status='creating', size=1, object_count=0, project_id='fake'):
def _create_volume_db_entry(self, display_name='test_volume', display_description='this is a test volume', status='backing-up', size=1):
def _create_exported_record_entry(self, vol_size=1):
def _create_export_record_db_entry(self, volume_id='0000', status='creating', project_id='fake'):
def test_init_host(self):
def test_create_backup_with_bad_volume_status(self):
def test_create_backup_with_bad_backup_status(self):
def test_create_backup_with_error(self, _mock_volume_backup):
def test_create_backup(self, _mock_volume_backup):
def test_restore_backup_with_bad_volume_status(self):
def test_restore_backup_with_bad_backup_status(self):
def test_restore_backup_with_driver_error(self, _mock_volume_restore):
def test_restore_backup_with_bad_service(self):
def test_restore_backup(self, _mock_volume_restore):
def test_delete_backup_with_bad_backup_status(self):
def test_delete_backup_with_error(self):
def test_delete_backup_with_bad_service(self):
def test_delete_backup_with_no_service(self):
def test_delete_backup(self):
def test_list_backup(self):
def test_backup_get_all_by_project_with_deleted(self):
def test_backup_get_all_by_host_with_deleted(self):
def test_backup_manager_driver_name(self):
def test_export_record_with_bad_service(self):
def test_export_record_with_bad_backup_status(self):
def test_export_record(self):
def test_import_record_with_verify_not_implemented(self):
def test_import_record_with_verify(self):
def test_import_record_with_bad_service(self):
def test_import_record_with_invalid_backup(self):
def test_import_record_with_verify_invalid_backup(self):
\OpenStack\cinder-2014.1\cinder\tests\test_backup_ceph.py
class MockException(Exception):
def __init__(self, *args, **kwargs):
class MockImageNotFoundException(MockException):
class MockImageBusyException(MockException):
class MockObjectNotFoundException(MockException):
def _common_inner_inner1(inst, *args, **kwargs):
def _common_inner_inner2(mock_rados, mock_rbd, mock_time, mock_sleep, mock_popen):
class BackupCephTestCase(test.TestCase):
def _create_volume_db_entry(self, id, size):
def _create_backup_db_entry(self, backupid, volid, size):
def time_inc(self):
def _get_wrapped_rbd_io(self, rbd_image):
def _setup_mock_popen(self, mock_popen, retval=None, p1hook=None, p2hook=None):
def setUp(self):
def tearDown(self):
def test_get_rbd_support(self):
def test_get_most_recent_snap(self):
def test_get_backup_snap_name(self):
def get_backup_snaps(inst, *args):
def test_get_backup_snaps(self):
def test_transfer_data_from_rbd_to_file(self):
def test_transfer_data_from_rbd_to_rbd(self):
def mock_write_data(data, offset):
def test_transfer_data_from_file_to_rbd(self):
def mock_write_data(data, offset):
def test_transfer_data_from_file_to_file(self):
def test_backup_volume_from_file(self):
def mock_write_data(data, offset):
def test_get_backup_base_name(self):
def test_backup_volume_from_rbd(self, mock_popen, mock_fnctl):
def mock_write_data():
def mock_read_data():
def test_backup_vol_length_0(self):
def test_restore(self):
def mock_read_data(offset, length):
def test_discard_bytes(self):
def test_delete_backup_snapshot(self):
def test_try_delete_base_image_diff_format(self, mock_meta_backup):
def test_try_delete_base_image(self, mock_meta_backup):
def test_try_delete_base_image_busy(self):
def test_delete(self, mock_meta_backup):
def test_delete_image_not_found(self, mock_meta_backup):
def test_diff_restore_allowed(self):
def test_piped_execute(self, mock_popen, mock_fcntl):
def test_restore_metdata(self):
def mock_read(*args):
def test_backup_metata_already_exists(self, mock_meta_backup):
def mock_set(json_meta):
def test_backup_metata_error(self):
def test_restore_invalid_metadata_version(self):
def mock_read(*args):
def _common_inner_inner1(inst, *args, **kwargs):
def _common_inner_inner2(mock_rados, mock_rbd):
class VolumeMetadataBackupTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_name(self):
def test_exists(self):
def test_set(self):
def mock_read(*args):
def _mock_write(data):
def test_get(self):
def remove_if_exists(self):
\OpenStack\cinder-2014.1\cinder\tests\test_backup_driver_base.py
class BackupBaseDriverTestCase(test.TestCase):
def _create_volume_db_entry(self, id, size):
def _create_backup_db_entry(self, backupid, volid, size):
def setUp(self):
def test_backup(self):
def test_restore(self):
def test_delete(self):
def test_get_metadata(self):
def test_put_metadata(self):
def test_get_put_metadata(self):
def test_export_record(self):
def test_import_record(self):
def test_verify(self):
def tearDown(self):
class BackupMetadataAPITestCase(test.TestCase):
def _create_volume_db_entry(self, id, size):
def setUp(self):
def _add_metadata(self, vol_meta=False, vol_glance_meta=False):
def test_get(self):
def test_put(self):
def test_put_invalid_version(self):
def test_v1_restore_factory(self):
def test_restore_vol_glance_meta(self):
def test_restore_vol_meta(self):
def test_restore_vol_base_meta(self):
def test_filter(self):
def test_save_vol_glance_meta(self):
def test_save_vol_meta(self):
def test_save_vol_base_meta(self):
def test_is_serializable(self):
def test_is_not_serializable(self):
def tearDown(self):
\OpenStack\cinder-2014.1\cinder\tests\test_backup_swift.py
def fake_md5(arg):
class BackupSwiftTestCase(test.TestCase):
def _create_volume_db_entry(self):
def _create_backup_db_entry(self, container='test-container'):
def setUp(self):
def tearDown(self):
def test_backup_uncompressed(self):
def test_backup_bz2(self):
def test_backup_zlib(self):
def test_backup_default_container(self):
def test_backup_custom_container(self):
def test_create_backup_put_object_wraps_socket_error(self):
def test_restore(self):
def test_restore_wraps_socket_error(self):
def test_restore_unsupported_version(self):
def test_delete(self):
def test_delete_wraps_socket_error(self):
def test_get_compressor(self):
\OpenStack\cinder-2014.1\cinder\tests\test_backup_tsm.py
class TSMBackupSimulator:
def __init__(self):
def _cmd_backup(self, **kwargs):
def _backup_exists(self, path):
def _cmd_restore(self, **kwargs):
def _cmd_delete(self, **kwargs):
def _cmd_to_dict(self, arg_list):
def _exec_dsmc_cmd(self, cmd):
def exec_cmd(self, cmd):
def error_injection(self, cmd, error):
def fake_exec(*cmd, **kwargs):
def fake_stat_image(path):
def fake_stat_file(path):
def fake_stat_illegal(path):
class BackupTSMTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _create_volume_db_entry(self, volume_id):
def _create_backup_db_entry(self, backup_id, mode):
def test_backup_image(self):
def test_backup_file(self):
def test_backup_invalid_mode(self):
\OpenStack\cinder-2014.1\cinder\tests\test_block_device.py
class TestBlockDeviceDriver(cinder.test.TestCase):
def setUp(self):
def test_initialize_connection(self):
def test_initialize_connection_different_hosts(self):
def test_delete_not_volume_provider_location(self):
def test_delete_volume_path_exist(self):
def test_delete_path_is_not_in_list_of_available_devices(self):
def test_create_volume(self):
def test_update_volume_stats(self):
def test_create_cloned_volume(self):
def test_copy_image_to_volume(self):
def test_copy_volume_to_image(self):
def test_get_used_devices(self):
def test_get_device_size(self):
def test_devices_sizes(self):
def test_find_appropriate_size_device_no_free_disks(self):
def test_find_appropriate_size_device_not_big_enough_disk(self):
def test_find_appropriate_size_device(self):
\OpenStack\cinder-2014.1\cinder\tests\test_conf.py
class ConfigTestCase(test.TestCase):
def setUp(self):
def test_declare(self):
def test_runtime_and_unknown_conf(self):
def test_long_vs_short_conf(self):
def test_conf_leak_left(self):
def test_conf_leak_right(self):
def test_conf_overrides(self):
\OpenStack\cinder-2014.1\cinder\tests\test_context.py
class ContextTestCase(test.TestCase):
def test_request_context_sets_is_admin(self):
def test_request_context_sets_is_admin_upcase(self):
def test_request_context_read_deleted(self):
def test_request_context_read_deleted_invalid(self):
def test_extra_args_to_context_get_logged(self):
def fake_warn(log_msg):
def test_service_catalog_nova_only(self):
def test_user_identity(self):
\OpenStack\cinder-2014.1\cinder\tests\test_coraid.py
def to_coraid_kb(gb):
def coraid_volume_size(gb):
def compare(a, b):
def pack_data(request):
class FakeRpcBadRequest(Exception):
class FakeRpcIsNotCalled(Exception):
def __init__(self, handle, url_params, data):
def __str__(self):
class FakeRpcHandle(object):
def __init__(self, handle, url_params, data, result):
def set_called(self):
def __call__(self, handle, url_params, data, allow_empty_response=False):
class FakeRpc(object):
def __init__(self):
def handle(self, handle, url_params, data, result):
def __call__(self, handle_name, url_params, data, allow_empty_response=False):
class CoraidDriverTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def mock_volume_types(self, repositories=[]):
class CoraidDriverLoginSuccessTestCase(CoraidDriverTestCase):
def setUp(self):
class CoraidDriverApplianceTestCase(CoraidDriverLoginSuccessTestCase):
def test_resize_volume(self):
class CoraidDriverIntegrationalTestCase(CoraidDriverLoginSuccessTestCase):
def setUp(self):
def test_create_volume(self):
def test_delete_volume(self):
def test_ping_ok(self):
def test_ping_failed(self):
def rpc(handle, url_params, data, allow_empty_response=True):
def test_delete_not_existing_lun(self):
def test_delete_not_existing_volumeappliance_is_ok(self):
def delete_lun(volume_name):
def ping():
def test_delete_not_existing_volume_sleepingappliance(self):
def delete_lun(volume_name):
def ping():
def test_create_snapshot(self):
def test_delete_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_initialize_connection(self):
def test_get_repository_capabilities(self):
def test_create_cloned_volume(self):
def test_create_cloned_volume_with_resize(self):
def test_create_cloned_volume_in_different_repository(self):
def test_extend_volume(self):
class AutoReloginCoraidTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _test_auto_relogin_fail(self, state):
def test_auto_relogin_fail_admin(self):
def test_auto_relogin_fail_inactivity(self):
def test_auto_relogin_fail_absolute(self):
def test_auto_relogin_success(self):
class CoraidDriverImageTestCases(CoraidDriverTestCase):
def setUp(self):
def test_copy_volume_to_image(self):
def test_copy_image_to_volume(self):
class CoraidResetConnectionTestCase(CoraidDriverTestCase):
def test_create_new_appliance_for_every_request(self):
\OpenStack\cinder-2014.1\cinder\tests\test_create_volume_flow.py
class fake_scheduler_rpc_api(object):
def __init__(self, expected_spec, test_inst):
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None):
class fake_volume_api(object):
def __init__(self, expected_spec, test_inst):
def create_volume(self, ctxt, volume, host, request_spec, filter_properties, allow_reschedule=True, snapshot_id=None, image_id=None, source_volid=None):
class fake_db(object):
def volume_get(self, *args, **kwargs):
def volume_update(self, *args, **kwargs):
def snapshot_get(self, *args, **kwargs):
class CreateVolumeFlowTestCase(test.TestCase):
def time_inc(self):
def setUp(self):
def test_cast_create_volume(self):
def tearDown(self):
\OpenStack\cinder-2014.1\cinder\tests\test_db_api.py
def _quota_reserve(context, project_id):
def get_sync(resource, usage):
class ModelsObjectComparatorMixin(object):
def _dict_from_object(self, obj, ignored_keys):
def _assertEqualObjects(self, obj1, obj2, ignored_keys=None):
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None):
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
class BaseTest(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
class DBAPIServiceTestCase(BaseTest):
def _get_base_values(self):
def _create_service(self, values):
def test_service_create(self):
def test_service_destroy(self):
def test_service_update(self):
def test_service_update_not_found_exception(self):
def test_service_get(self):
def test_service_get_not_found_exception(self):
def test_service_get_by_host_and_topic(self):
def test_service_get_all(self):
def test_service_get_all_by_topic(self):
def test_service_get_all_by_host(self):
def test_service_get_by_args(self):
def test_service_get_by_args_not_found_exception(self):
def test_service_get_all_volume_sorted(self):
class DBAPIVolumeTestCase(BaseTest):
def test_volume_create(self):
def test_volume_allocate_iscsi_target_no_more_targets(self):
def test_volume_allocate_iscsi_target(self):
def test_volume_attached_invalid_uuid(self):
def test_volume_attached_to_instance(self):
def test_volume_attached_to_host(self):
def test_volume_data_get_for_host(self):
def test_volume_data_get_for_project(self):
def test_volume_detached_from_instance(self):
def test_volume_detached_from_host(self):
def test_volume_get(self):
def test_volume_destroy(self):
def test_volume_get_all(self):
def test_volume_get_all_marker_passed(self):
def test_volume_get_all_by_host(self):
def test_volume_get_all_by_instance_uuid(self):
def test_volume_get_all_by_instance_uuid_empty(self):
def test_volume_get_all_by_project(self):
def test_volume_get_by_name(self):
def test_volume_list_by_status(self):
def _assertEqualsVolumeOrderResult(self, correct_order, limit=None, sort_key='created_at', sort_dir='asc', filters=None, project_id=None, match_keys=['id', 'display_name', 'volume_metadata', 'created_at']):
def test_volume_get_by_filter(self):
def test_volume_get_all_filters_limit(self):
def test_volume_get_no_migration_targets(self):
def test_volume_get_iscsi_target_num(self):
def test_volume_get_iscsi_target_num_nonexistent(self):
def test_volume_update(self):
def test_volume_update_nonexistent(self):
def test_volume_metadata_get(self):
def test_volume_metadata_update(self):
def test_volume_metadata_update_delete(self):
def test_volume_metadata_delete(self):
class DBAPISnapshotTestCase(BaseTest):
def test_snapshot_data_get_for_project(self):
def test_snapshot_get_all(self):
def test_snapshot_metadata_get(self):
def test_snapshot_metadata_update(self):
def test_snapshot_metadata_update_delete(self):
def test_snapshot_metadata_delete(self):
class DBAPIVolumeTypeTestCase(BaseTest):
def setUp(self):
def test_volume_type_create_exists(self):
class DBAPIEncryptionTestCase(BaseTest):
def setUp(self):
def _get_values(self, one=False, updated=False):
def compose(val, step):
def test_volume_type_encryption_create(self):
def test_volume_type_encryption_update(self):
def test_volume_type_encryption_get(self):
def test_volume_type_update_with_no_create(self):
def test_volume_type_encryption_delete(self):
class DBAPIReservationTestCase(BaseTest):
def setUp(self):
def test_reservation_create(self):
def test_reservation_get(self):
def test_reservation_get_nonexistent(self):
def test_reservation_commit(self):
def test_reservation_rollback(self):
def test_reservation_get_all_by_project(self):
def test_reservation_expire(self):
def test_reservation_destroy(self):
class DBAPIQuotaClassTestCase(BaseTest):
def setUp(self):
def test_quota_class_get(self):
def test_quota_class_destroy(self):
def test_quota_class_get_not_found(self):
def test_quota_class_get_all_by_name(self):
def test_quota_class_update(self):
def test_quota_class_destroy_all_by_name(self):
class DBAPIQuotaTestCase(BaseTest):
def test_quota_create(self):
def test_quota_get(self):
def test_quota_get_all_by_project(self):
def test_quota_update(self):
def test_quota_update_nonexistent(self):
def test_quota_get_nonexistent(self):
def test_quota_reserve(self):
def test_quota_destroy(self):
def test_quota_destroy_all_by_project(self):
def test_quota_usage_get_nonexistent(self):
def test_quota_usage_get(self):
def test_quota_usage_get_all_by_project(self):
class DBAPIIscsiTargetTestCase(BaseTest):
def _get_base_values(self):
def test_iscsi_target_create_safe(self):
def test_iscsi_target_count_by_host(self):
def test_integrity_error(self):
class DBAPIBackupTestCase(BaseTest):
def setUp(self):
def _get_values(self, one=False):
def compose(val, step):
def test_backup_create(self):
def test_backup_get(self):
def tests_backup_get_all(self):
def test_backup_get_all_by_host(self):
def test_backup_get_all_by_project(self):
def test_backup_update_nonexistent(self):
def test_backup_update(self):
def test_backup_destroy(self):
def test_backup_not_found(self):
\OpenStack\cinder-2014.1\cinder\tests\test_drivers_compatibility.py
class VolumeDriverCompatibility(test.TestCase):
def fake_update_cluster_status(self):
def setUp(self):
def tearDown(self):
def _load_driver(self, driver):
def _driver_module_name(self):
def test_nexenta_old(self):
def test_nexenta_new(self):
def test_solidfire_old(self):
def test_solidfire_old2(self):
def test_solidfire_new(self):
def test_storwize_svc_old(self):
def test_storwize_svc_old2(self):
def test_storwize_svc_new(self):
def test_windows_old(self):
def test_windows_new(self):
def test_xiv_old(self):
def test_xiv_ds8k_new(self):
def test_netapp_7m_iscsi_old(self):
def test_netapp_cm_iscsi_old(self):
def test_netapp_7m_nfs_old(self):
def test_netapp_cm_nfs_old(self):
def test_hp_lefthand_rest_old(self):
def test_hp_lefthand_rest_new(self):
def test_gpfs_old(self):
def test_gpfs_new(self):
\OpenStack\cinder-2014.1\cinder\tests\test_emc_smis.py
class EMC_StorageVolume(dict):
class SE_ConcreteJob(dict):
class SE_StorageHardwareID(dict):
class FakeCIMInstanceName(dict):
def fake_getinstancename(self, classname, bindings):
class FakeDB():
def volume_update(self, context, volume_id, model_update):
def snapshot_update(self, context, snapshot_id, model_update):
def volume_get(self, context, volume_id):
class EMCSMISCommonData():
class FakeEcomConnection():
def __init__(self, *args, **kwargs):
def InvokeMethod(self, MethodName, Service, ElementName=None, InPool=None, ElementType=None, Size=None, SyncType=None, SourceElement=None, Operation=None, Synchronization=None, TheElements=None, TheElement=None, LUNames=None, InitiatorPortIDs=None, DeviceAccesses=None, ProtocolControllers=None, MaskingGroup=None, Members=None, HardwareId=None):
def EnumerateInstanceNames(self, name):
def EnumerateInstances(self, name):
def GetInstance(self, objectpath, LocalOnly=False):
def Associators(self, objectpath, resultClass='EMC_StorageHardwareID'):
def AssociatorNames(self, objectpath, resultClass='EMC_LunMaskingSCSIProtocolController'):
def ReferenceNames(self, objectpath, ResultClass='CIM_ProtocolControllerForUnit'):
def _ref_unitnames(self):
def _default_ref(self, objectpath):
def _assoc_hdwid(self):
def _assoc_endpoint(self):
OpenStack IndexPreviousNext
|