OpenStack Study: cinder
OpenStack IndexPreviousNext
def fake_reserve(context, expire=None, project_id=None, **deltas):
def fake_commit_and_rollback(context, reservations, project_id=None):
def test_create_delete_volume(self):
def fake_reserve(context, expire=None, project_id=None, **deltas):
def fake_commit(context, reservations, project_id=None):
def fake_rollback(context, reservations, project_id=None):
def test_create_delete_volume_with_metadata(self):
def test_create_volume_with_invalid_metadata(self):
def test_create_volume_uses_default_availability_zone(self):
def fake_list_availability_zones():
def test_create_volume_with_volume_type(self):
def fake_reserve(context, expire=None, project_id=None, **deltas):
def fake_commit(context, reservations, project_id=None):
def fake_rollback(context, reservations, project_id=None):
def test_create_volume_with_encrypted_volume_type(self):
def test_create_delete_volume_with_encrypted_volume_type(self):
def test_extra_capabilities(self):
def test_extra_capabilities_fail(self):
def test_delete_busy_volume(self):
def test_delete_volume_in_error_extending(self):
def test_create_volume_from_snapshot(self):
def test_create_volume_from_snapshot_with_types(self, _get_flow):
def test_create_volume_from_source_with_types(self, _get_flow):
def test_create_snapshot_driver_not_initialized(self):
def _mock_synchronized(self, name, *s_args, **s_kwargs):
def inner_sync1(f):
def test_create_volume_from_snapshot_check_locks(self):
def mock_flow_run(*args, **kwargs):
def test_create_volume_from_volume_check_locks(self):
def mock_flow_run(*args, **kwargs):
def test_create_volume_from_volume_delete_lock_taken(self):
def mock_elevated(*args, **kwargs):
def test_create_volume_from_snapshot_delete_lock_taken(self):
def mock_elevated(*args, **kwargs):
def test_create_volume_from_snapshot_with_encryption(self):
def test_create_volume_from_encrypted_volume(self):
def test_create_volume_from_snapshot_fail_bad_size(self):
def test_create_volume_from_snapshot_fail_wrong_az(self):
def fake_list_availability_zones():
def test_create_volume_with_invalid_exclusive_options(self):
def test_initialize_connection_fetchqos(self, _mock_volume_update, _mock_volume_get, _mock_volume_admin_metadata_get):
def test_run_attach_detach_volume_for_instance(self):
def test_run_attach_detach_volume_for_host(self):
def test_run_attach_detach_volume_with_attach_mode(self):
def test_run_manager_attach_detach_volume_with_wrong_attach_mode(self):
def test_run_api_attach_detach_volume_with_wrong_attach_mode(self):
def test_reserve_volume_success(self, volume_get, volume_update):
def test_reserve_volume_bad_status(self):
def test_unreserve_volume_success(self):
def test_concurrent_volumes_get_different_targets(self):
def _check(volume_id):
def test_multi_node(self):
def _create_snapshot(volume_id, size='0', metadata=None):
def test_create_delete_snapshot(self):
def test_create_delete_snapshot_with_metadata(self):
def test_cannot_delete_volume_in_use(self):
def test_force_delete_volume(self):
def test_cannot_force_delete_attached_volume(self):
def test_cannot_delete_volume_with_snapshots(self):
def test_can_delete_errored_snapshot(self):
def test_create_snapshot_force(self):
def test_delete_busy_snapshot(self):
def test_delete_no_dev_fails(self):
def _create_volume_from_image(self, fakeout_copy_image_to_volume=False, fakeout_clone_image=False):
def fake_local_path(volume):
def fake_copy_image_to_volume(context, volume, image_service, image_id):
def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize, size=None):
def fake_clone_image(volume_ref, image_location, image_id, image_meta):
def test_create_volume_from_image_cloned_status_available(self):
def test_create_volume_from_image_not_cloned_status_available(self):
def test_create_volume_from_image_exception(self):
def test_create_volume_from_exact_sized_image(self):
def test_create_volume_from_oversized_image(self):
def test_create_volume_with_mindisk_error(self):
def _do_test_create_volume_with_size(self, size):
def fake_reserve(context, expire=None, project_id=None, **deltas):
def fake_commit(context, reservations, project_id=None):
def fake_rollback(context, reservations, project_id=None):
def test_create_volume_int_size(self):
def test_create_volume_string_size(self):
def test_create_volume_with_bad_size(self):
def fake_reserve(context, expire=None, project_id=None, **deltas):
def fake_commit(context, reservations, project_id=None):
def fake_rollback(context, reservations, project_id=None):
def test_begin_roll_detaching_volume(self):
def test_volume_api_update(self):
def test_volume_api_update_snapshot(self):
def test_extend_volume(self, reserve):
def test_extend_volume_driver_not_initialized(self):
def test_extend_volume_manager(self):
def fake_extend(volume, new_size):
def test_create_volume_from_unelevated_context(self):
def fake_create_volume(*args, **kwargs):
def test_create_volume_from_sourcevol(self):
def fake_create_cloned_volume(volume, src_vref):
def test_create_volume_from_sourcevol_fail_wrong_az(self):
def fake_list_availability_zones():
def test_create_volume_from_sourcevol_with_glance_metadata(self):
def fake_create_cloned_volume(volume, src_vref):
def test_create_volume_from_sourcevol_failed_clone(self):
def fake_error_create_cloned_volume(volume, src_vref):
def test_list_availability_zones_enabled_service(self):
def stub_service_get_all_by_topic(*args, **kwargs):
def test_migrate_volume_driver(self):
def test_migrate_volume_generic(self):
def fake_migr(vol, host):
def fake_delete_volume_rpc(self, ctxt, vol_id):
def fake_create_volume(self, ctxt, volume, host, req_spec, filters, allow_reschedule=True):
def _retype_volume_exec(self, driver, snap=False, policy='on-demand', migrate_exc=False, exc=None, diff_equal=False):
def test_retype_volume_driver_success(self):
def test_retype_volume_migration_bad_policy(self):
def test_retype_volume_migration_with_snaps(self):
def test_retype_volume_migration_failed(self):
def test_retype_volume_migration_success(self):
def test_retype_volume_migration_equal_types(self):
def test_migrate_driver_not_initialized(self):
def test_update_volume_readonly_flag(self):
class CopyVolumeToImageTestCase(BaseVolumeTestCase):
def fake_local_path(self, volume):
def setUp(self):
def tearDown(self):
def test_copy_volume_to_image_status_available(self):
def test_copy_volume_to_image_status_use(self):
def test_copy_volume_to_image_exception(self):
class GetActiveByWindowTestCase(BaseVolumeTestCase):
def setUp(self):
def test_volume_get_active_by_window(self):
def test_snapshot_get_active_by_window(self):
class DriverTestCase(test.TestCase):
def setUp(self):
def _fake_execute(_command, *_args, **_kwargs):
def tearDown(self):
def fake_get_target(obj, iqn):
def _attach_volume(self):
def _detach_volume(self, volume_id_list):
class GenericVolumeDriverTestCase(DriverTestCase):
def test_backup_volume(self):
def test_restore_backup(self):
class LVMISCSIVolumeDriverTestCase(DriverTestCase):
def test_delete_busy_volume(self):
def test_lvm_migrate_volume_no_loc_info(self):
def test_lvm_migrate_volume_bad_loc_info(self):
def test_lvm_migrate_volume_diff_driver(self):
def test_lvm_migrate_volume_diff_host(self):
def test_lvm_migrate_volume_in_use(self):
def test_lvm_volume_group_missing(self):
def get_all_volume_groups():
def test_lvm_migrate_volume_proceed(self):
def fake_execute(*args, **kwargs):
def get_all_volume_groups():
def _fake_get_all_physical_volumes(obj, root_helper, vg_name):
def _get_manage_existing_lvs(name):
def _setup_stubs_for_manage_existing(self):
def test_lvm_manage_existing(self):
def _rename_volume(old_name, new_name):
def test_lvm_manage_existing_bad_size(self):
def test_lvm_manage_existing_bad_ref(self):
class LVMVolumeDriverTestCase(DriverTestCase):
def test_delete_volume_invalid_parameter(self):
def test_delete_volume_bad_path(self):
def test_delete_volume_thinlvm_snap(self):
class ISCSITestCase(DriverTestCase):
def setUp(self):
def _attach_volume(self):
def test_do_iscsi_discovery(self):
def test_get_iscsi_properties(self):
def test_get_volume_stats(self):
def _fake_get_all_physical_volumes(obj, root_helper, vg_name):
def _fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
def test_validate_connector(self):
class ISERTestCase(ISCSITestCase):
def setUp(self):
def test_get_volume_stats(self):
def _fake_get_all_physical_volumes(obj, root_helper, vg_name):
def _fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
def test_get_volume_stats2(self):
class FibreChannelTestCase(DriverTestCase):
def test_initialize_connection(self):
class VolumePolicyTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _set_rules(self, rules):
def test_check_policy(self):
def test_check_policy_with_target(self):
\OpenStack\cinder-2014.1\cinder\tests\test_volume_configuration.py
class VolumeConfigurationTest(test.TestCase):
def setUp(self):
def tearDown(self):
def test_group_grafts_opts(self):
def test_opts_no_group(self):
def test_grafting_multiple_opts(self):
def test_safe_get(self):
\OpenStack\cinder-2014.1\cinder\tests\test_volume_glance_metadata.py
class VolumeGlanceMetadataTestCase(test.TestCase):
def setUp(self):
def test_vol_glance_metadata_bad_vol_id(self):
def test_vol_update_glance_metadata(self):
def test_vols_get_glance_metadata(self):
def _assert_metadata_equals(self, volume_id, key, value, observed):
def test_vol_delete_glance_metadata(self):
def test_vol_glance_metadata_copy_to_snapshot(self):
def test_vol_glance_metadata_copy_from_volume_to_volume(self):
def test_volume_glance_metadata_copy_to_volume(self):
def test_volume_snapshot_glance_metadata_get_nonexistent(self):
\OpenStack\cinder-2014.1\cinder\tests\test_volume_rpcapi.py
class VolumeRpcAPITestCase(test.TestCase):
def setUp(self):
def test_serialized_volume_has_id(self):
def _test_volume_api(self, method, rpc_method, **kwargs):
def _fake_prepare_method(*args, **kwds):
def _fake_rpc_method(*args, **kwargs):
def test_create_volume(self):
def test_create_volume_serialization(self):
def test_delete_volume(self):
def test_create_snapshot(self):
def test_delete_snapshot(self):
def test_attach_volume_to_instance(self):
def test_attach_volume_to_host(self):
def test_detach_volume(self):
def test_copy_volume_to_image(self):
def test_initialize_connection(self):
def test_terminate_connection(self):
def test_accept_transfer(self):
def test_extend_volume(self):
def test_migrate_volume(self):
def test_migrate_volume_completion(self):
def test_retype(self):
def test_manage_existing(self):
\OpenStack\cinder-2014.1\cinder\tests\test_volume_transfer.py
class VolumeTransferTestCase(test.TestCase):
def setUp(self):
def test_transfer_volume_create_delete(self):
def test_transfer_invalid_volume(self):
def test_transfer_accept(self):
def test_transfer_get(self):
def test_delete_transfer_with_deleted_volume(self):
\OpenStack\cinder-2014.1\cinder\tests\test_volume_types.py
class VolumeTypeTestCase(test.TestCase):
def setUp(self):
def test_volume_type_create_then_destroy(self):
def test_get_all_volume_types(self):
def test_get_default_volume_type(self):
def test_default_volume_type_missing_in_db(self):
def test_non_existent_vol_type_shouldnt_delete(self):
def test_volume_type_with_volumes_shouldnt_delete(self):
def test_repeated_vol_types_shouldnt_raise(self):
def test_invalid_volume_types_params(self):
def test_volume_type_get_by_id_and_name(self):
def test_volume_type_search_by_extra_spec(self):
def test_volume_type_search_by_extra_spec_multiple(self):
def test_is_encrypted(self):
def test_get_volume_type_qos_specs(self):
def test_volume_types_diff(self):
\OpenStack\cinder-2014.1\cinder\tests\test_volume_types_extra_specs.py
class VolumeTypeExtraSpecsTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_volume_type_specs_get(self):
def test_volume_type_extra_specs_delete(self):
def test_volume_type_extra_specs_update(self):
def test_volume_type_extra_specs_create(self):
def test_volume_type_get_with_extra_specs(self):
def test_volume_type_get_by_name_with_extra_specs(self):
def test_volume_type_get_all(self):
\OpenStack\cinder-2014.1\cinder\tests\test_volume_utils.py
class UsageInfoTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _create_volume(self, params={}):
class LVMVolumeDriverTestCase(test.TestCase):
def test_convert_blocksize_option(self):
class ClearVolumeTestCase(test.TestCase):
def test_clear_volume(self):
def test_clear_volume_zero(self):
def test_clear_volume_ionice(self):
def test_clear_volume_zero_ionice(self):
def test_clear_volume_shred(self):
def test_clear_volume_shred_not_clear_size(self):
def test_clear_volume_invalid_opt(self):
def test_clear_volume_lvm_snap(self):
def fake_copy_volume(srcstr, deststr, size, blocksize, **kwargs):
\OpenStack\cinder-2014.1\cinder\tests\test_windows.py
class TestWindowsDriver(test.TestCase):
def __init__(self, method):
def setUp(self):
def tearDown(self):
def _setup_stubs(self):
def fake_wutils__init__(self):
def fake_local_path(self, volume):
def test_check_for_setup_errors(self):
def test_create_volume(self):
def test_delete_volume(self):
def test_create_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_delete_snapshot(self):
def test_create_export(self):
def test_initialize_connection(self):
def test_terminate_connection(self):
def test_ensure_export(self):
def test_remove_export(self):
def test_copy_image_to_volume(self):
def test_copy_volume_to_image(self):
def test_create_cloned_volume(self):
def test_extend_volume(self):
\OpenStack\cinder-2014.1\cinder\tests\test_wsgi.py
class TestLoaderNothingExists(test.TestCase):
def setUp(self):
def test_config_not_found(self):
class TestLoaderNormalFilesystem(test.TestCase):
def setUp(self):
def test_config_found(self):
def test_app_not_found(self):
def test_app_found(self):
class TestWSGIServer(test.TestCase):
def _ipv6_configured():
def test_no_app(self):
def test_start_random_port(self):
def test_start_random_port_with_ipv6(self):
def test_app(self):
def hello_world(env, start_response):
def test_app_using_ssl(self):
def hello_world(req):
def test_app_using_ipv6_and_ssl(self):
def hello_world(req):
class ExceptionTest(test.TestCase):
def _wsgi_app(self, inner_app):
def _do_test_exception_safety_reflected_in_faults(self, expose):
def fail(req):
def test_safe_exceptions_are_described_in_faults(self):
def test_unsafe_exceptions_are_not_described_in_faults(self):
def _do_test_exception_mapping(self, exception_type, msg):
def fail(req):
def test_quota_error_mapping(self):
def test_non_cinder_notfound_exception_mapping(self):
def test_non_cinder_exception_mapping(self):
def test_exception_with_none_code_throws_500(self):
def fail(req):
def test_cinder_exception_with_localized_explanation(self, mock_t9n):
def fail(req):
def mock_get_non_localized_message(msgid, locale):
def mock_translate(msgid, locale):
\OpenStack\cinder-2014.1\cinder\tests\test_xenapi_sm.py
class MockContext(object):
def __init__(ctxt, auth_token):
def simple_context(value):
def get_configured_driver(server='ignore_server', path='ignore_path'):
class DriverTestCase(test.TestCase):
def assert_flag(self, flagname):
def test_config_options(self):
def test_do_setup(self):
def test_create_volume(self):
def test_delete_volume(self):
def test_create_export_does_not_raise_exception(self):
def test_remove_export_does_not_raise_exception(self):
def test_initialize_connection(self):
def test_initialize_connection_null_values(self):
def _setup_mock_driver(self, server, serverpath, sr_base_path="_srbp"):
def test_create_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_delete_snapshot(self):
def test_copy_volume_to_image_xenserver_case(self):
def test_copy_volume_to_image_non_xenserver_case(self):
def test_use_image_utils_to_upload_volume(self):
def test_use_glance_plugin_to_upload_volume(self):
def test_copy_image_to_volume_xenserver_case(self):
def test_copy_image_to_volume_non_xenserver_case(self):
def test_use_image_utils_to_pipe_bytes_to_volume(self):
def test_use_glance_plugin_to_copy_image_to_volume_success(self):
def test_use_glance_plugin_to_copy_image_to_volume_fail(self):
def test_get_volume_stats_reports_required_keys(self):
def test_get_volume_stats_reports_unknown_cap(self):
def test_reported_driver_type(self):
class ToolsTest(test.TestCase):
def test_get_this_vm_uuid(self, mock_read_first_line):
def test_stripped_first_line_of(self):
\OpenStack\cinder-2014.1\cinder\tests\test_zadara.py
class FakeRequest(object):
def __init__(self, method, url, body):
def read(self):
def _compare_url(self, url, template_url):
def _get_parameters(self, data):
def _get_counter(self):
def _login(self):
def _incorrect_access_key(self, params):
def _create_volume(self):
def _create_server(self):
def _attach(self):
def _detach(self):
def _expand(self):
def _create_snapshot(self):
def _delete_snapshot(self):
def _create_clone(self):
def _delete(self):
def _generate_list_resp(self, header, footer, body, lst, vol):
def _list_volumes(self):
def _list_controllers(self):
def _list_pools(self):
def _list_servers(self):
def _get_server_obj(self, name):
def _list_vol_attachments(self):
def _list_vol_snapshots(self):
class FakeHTTPConnection(object):
def __init__(self, host, port, use_ssl=False):
def request(self, method, url, body):
def getresponse(self):
def close(self):
class FakeHTTPSConnection(FakeHTTPConnection):
def __init__(self, host, port):
class ZadaraVPSADriverTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_create_destroy(self):
def test_create_destroy_multiple(self):
def test_destroy_non_existent(self):
def test_empty_apis(self):
def test_volume_attach_detach(self):
def test_volume_attach_multiple_detach(self):
def test_wrong_attach_params(self):
def test_wrong_detach_params(self):
def test_wrong_login_reply(self):
def test_ssl_use(self):
def test_bad_http_response(self):
def test_delete_without_detach(self):
def test_no_active_ctrl(self):
def test_create_destroy_snapshot(self):
def test_expand_volume(self):
def test_create_destroy_clones(self):
def test_get_volume_stats(self):
\OpenStack\cinder-2014.1\cinder\tests\utils.py
def get_test_admin_context():
def create_volume(ctxt, host='test_host', display_name='test_volume', display_description='this is a test volume', status='available', migration_status=None, size=1, availability_zone='fake_az', volume_type_id=None, **kwargs):
def create_snapshot(ctxt, volume_id, display_name='test_snapshot', display_description='this is a test snapshot', status='creating'):
\OpenStack\cinder-2014.1\cinder\tests\volume\drivers\netapp\test_iscsi.py
class NetAppDirectISCSIDriverTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_create_lun(self):
def test_create_lun_with_qos_policy_group(self):
class NetAppiSCSICModeTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_clone_lun_multiple_zapi_calls(self):
def test_clone_lun_zero_block_count(self):
class NetAppiSCSI7ModeTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_clone_lun_multiple_zapi_calls(self):
def test_clone_lun_zero_block_count(self):
\OpenStack\cinder-2014.1\cinder\tests\volume\drivers\netapp\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\volume\drivers\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\volume\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\windows\db_fakes.py
def get_fake_volume_info():
def get_fake_volume_info_cloned():
def get_fake_image_meta():
def get_fake_snapshot_info():
def get_fake_connector_info():
\OpenStack\cinder-2014.1\cinder\tests\windows\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\xenapi\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_san_lookup_service.py
class TestBrcdFCSanLookupService(brcd_lookup.BrcdFCSanLookupService,
test.TestCase):
def setUp(self):
def __init__(self, *args, **kwargs):
def create_configuration(self):
def test_get_device_mapping_from_network(self, get_nameserver_info_mock):
def test_get_nameserver_info(self, get_switch_data_mock):
def test__get_switch_data(self):
def test__parse_ns_output(self):
def test_get_formatted_wwn(self):
class Channel(object):
def recv_exit_status(self):
class Stream(object):
def __init__(self, buffer=''):
def readlines(self):
def close(self):
def flush(self):
\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_zone_client_cli.py
class TestBrcdFCZoneClientCLI(BrcdFCZoneClientCLI, test.TestCase):
def setUp(self):
def __init__(self, *args, **kwargs):
def test_get_active_zone_set(self, get_switch_info_mock):
def test_get_active_zone_set_ssh_error(self, run_ssh_mock):
def test_add_zones_new_zone_no_activate(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock):
def test_add_zones_new_zone_activate(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock, activate_zoneset_mock):
def test_activate_zoneset(self, ssh_execute_mock):
def test_deactivate_zoneset(self, ssh_execute_mock):
def test_delete_zones_activate_false(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock):
def test_delete_zones_activate_true(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock, activate_zs_mock):
def test_get_nameserver_info(self, get_switch_info_mock):
def test_get_nameserver_info_ssh_error(self, run_ssh_mock):
def test__cfg_save(self, ssh_execute_mock):
def test__zone_delete(self, apply_zone_change_mock):
def test__cfg_trans_abort(self, apply_zone_change_mock):
def test__is_trans_abortable_true(self, run_ssh_mock):
def test__is_trans_abortable_ssh_error(self, run_ssh_mock):
def test__is_trans_abortable_false(self, run_ssh_mock):
def test_apply_zone_change(self, run_ssh_mock):
def test__get_switch_info(self, run_ssh_mock):
def test__parse_ns_output(self):
def test_is_supported_firmware(self, exec_shell_cmd_mock):
def test_is_supported_firmware_invalid(self, exec_shell_cmd_mock):
def test_is_supported_firmware_no_ssh_response(self, exec_shell_cmd_mock):
def test_is_supported_firmware_ssh_error(self, exec_shell_cmd_mock):
class Channel(object):
def recv_exit_status(self):
class Stream(object):
def __init__(self, buffer=''):
def readlines(self):
def splitlines(self):
def close(self):
def flush(self):
\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_zone_driver.py
class BrcdFcZoneDriverBaseTest(object):
def setup_config(self, is_normal, mode):
class TestBrcdFcZoneDriver(BrcdFcZoneDriverBaseTest, test.TestCase):
def setUp(self):
def setup_driver(self, config):
def fake_get_active_zone_set(self, fabric_ip, fabric_user, fabric_pwd):
def fake_get_san_context(self, target_wwn_list):
def test_add_connection(self):
def test_delete_connection(self):
def test_add_connection_for_initiator_mode(self):
def test_delete_connection_for_initiator_mode(self):
def test_add_connection_for_invalid_fabric(self):
def test_delete_connection_for_invalid_fabric(self):
class FakeBrcdFCZoneClientCLI(object):
def __init__(self, ipaddress, username, password, port):
def get_active_zone_set(self):
def add_zones(self, zones, isActivate):
def delete_zones(self, zone_names, isActivate):
def is_supported_firmware(self):
def get_nameserver_info(self):
def close_connection(self):
def cleanup(self):
class FakeBrcdFCSanLookupService(object):
def get_device_mapping_from_network(self, initiator_wwn_list, target_wwn_list):
class GlobalVars(object):
\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_lookup_service.py
class TestFCSanLookupService(FCSanLookupService, test.TestCase):
def setUp(self):
def __init__(self, *args, **kwargs):
def setup_config(self):
def test_get_device_mapping_from_network(self):
def test_get_device_mapping_from_network_for_invalid_config(self):
class FakeBrcdFCSanLookupService(object):
def __init__(self, **kwargs):
def get_device_mapping_from_network(self, initiator_wwn_list, target_wwn_list):
class GlobalParams(object):
\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_fc_zone_manager.py
class TestFCZoneManager(ZoneManager, test.TestCase):
def setUp(self):
def __init__(self, *args, **kwargs):
def test_add_connection(self):
def test_add_connection_error(self):
def test_delete_connection(self):
def test_delete_connection_error(self):
\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_volume_manager_fc.py
class TestVolumeManager(manager.VolumeManager, test.TestCase):
def setUp(self):
def tearDown(self):
def __init__(self, *args, **kwargs):
def test_initialize_connection_voltype_fc_mode_fabric(self, utils_mock):
def test_initialize_connection_voltype_fc_mode_none(self, utils_mock):
def test_terminate_connection_exception(self):
def test_terminate_connection_voltype_fc_mode_fabric(self, utils_mock):
def test_terminate_connection_mode_none(self, utils_mock):
def test_terminate_connection_conn_info_none(self, utils_mock):
def test__add_or_delete_connection_add(self, add_connection_mock):
def test__add_or_delete_connection_delete(self, delete_connection_mock):
def test__add_or_delete_connection_no_init_target_map(self, del_conn_mock):
\OpenStack\cinder-2014.1\cinder\tests\zonemanager\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\__init__.py
\OpenStack\cinder-2014.1\cinder\transfer\api.py
class API(base.Base):
def __init__(self, db_driver=None):
def get(self, context, transfer_id):
def delete(self, context, transfer_id):
def get_all(self, context, filters={}):
def _get_random_string(self, length):
def _get_crypt_hash(self, salt, auth_key):
def create(self, context, volume_id, display_name):
def accept(self, context, transfer_id, auth_key):
\OpenStack\cinder-2014.1\cinder\transfer\__init__.py
\OpenStack\cinder-2014.1\cinder\units.py
\OpenStack\cinder-2014.1\cinder\utils.py
def find_config(config_path):
def as_int(obj, quiet=True):
def check_exclusive_options(**kwargs):
def execute(*cmd, **kwargs):
def check_ssh_injection(cmd_list):
def create_channel(client, width, height):
class SSHPool(pools.Pool):
def __init__(self, ip, port, conn_timeout, login, password=None, privatekey=None, *args, **kwargs):
def create(self):
def get(self):
def remove(self, ssh):
def cinderdir():
def last_completed_audit_period(unit=None):
def generate_password(length=20, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):
def generate_username(length=20, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):
class LazyPluggable(object):
def __init__(self, pivot, **backends):
def __get_backend(self):
def __getattr__(self, key):
class ProtectedExpatParser(expatreader.ExpatParser):
def __init__(self, forbid_dtd=True, forbid_entities=True, *args, **kwargs):
def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
def entity_decl(self, entityName, is_parameter_entity, value, base, systemId, publicId, notationName):
def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
def reset(self):
def safe_minidom_parse_string(xml_string):
def xhtml_escape(value):
def get_from_path(items, path):
def is_valid_boolstr(val):
def monkey_patch():
def generate_glance_url():
def make_dev_path(dev, partition=None, base='/dev'):
def total_seconds(td):
def sanitize_hostname(hostname):
def read_cached_file(filename, cache_info, reload_func=None):
def hash_file(file_like_object):
def service_is_up(service):
def read_file_as_root(file_path):
def temporary_chown(path, owner_uid=None):
def tempdir(**kwargs):
def walk_class_hierarchy(clazz, encountered=None):
def get_root_helper():
def brick_get_connector_properties():
def brick_get_connector(protocol, driver=None, execute=processutils.execute, use_multipath=False, device_scan_attempts=3, *args, **kwargs):
def require_driver_initialized(driver):
def get_file_mode(path):
def get_file_gid(path):
def check_string_length(value, name, min_length=0, max_length=None):
def add_visible_admin_metadata(context, volume, volume_api):
\OpenStack\cinder-2014.1\cinder\version.py
\OpenStack\cinder-2014.1\cinder\volume\api.py
def wrap_check_policy(func):
def wrapped(self, context, target_obj, *args, **kwargs):
def check_policy(context, action, target_obj=None):
class API(base.Base):
def __init__(self, db_driver=None, image_service=None):
def _valid_availability_zone(self, availability_zone):
def list_availability_zones(self):
def create(self, context, size, name, description, snapshot=None, image_id=None, volume_type=None, metadata=None, availability_zone=None, source_volume=None, scheduler_hints=None, backup_source_volume=None):
def check_volume_az_zone(availability_zone):
def delete(self, context, volume, force=False, unmanage_only=False):
def update(self, context, volume, fields):
def get(self, context, volume_id):
def get_all(self, context, marker=None, limit=None, sort_key='created_at', sort_dir='desc', filters=None):
def get_snapshot(self, context, snapshot_id):
def get_volume(self, context, volume_id):
def get_all_snapshots(self, context, search_opts=None):
def check_attach(self, volume):
def check_detach(self, volume):
def reserve_volume(self, context, volume):
def unreserve_volume(self, context, volume):
def begin_detaching(self, context, volume):
def roll_detaching(self, context, volume):
def attach(self, context, volume, instance_uuid, host_name, mountpoint, mode):
def detach(self, context, volume):
def initialize_connection(self, context, volume, connector):
def terminate_connection(self, context, volume, connector, force=False):
def accept_transfer(self, context, volume, new_user, new_project):
def _create_snapshot(self, context, volume, name, description, force=False, metadata=None):
def create_snapshot(self, context, volume, name, description, metadata=None):
def create_snapshot_force(self, context, volume, name, description, metadata=None):
def delete_snapshot(self, context, snapshot, force=False):
def update_snapshot(self, context, snapshot, fields):
def get_volume_metadata(self, context, volume):
def delete_volume_metadata(self, context, volume, key):
def _check_metadata_properties(self, metadata=None):
def update_volume_metadata(self, context, volume, metadata, delete=False):
def get_volume_metadata_value(self, volume, key):
def get_volume_admin_metadata(self, context, volume):
def delete_volume_admin_metadata(self, context, volume, key):
def update_volume_admin_metadata(self, context, volume, metadata, delete=False):
def get_snapshot_metadata(self, context, snapshot):
def delete_snapshot_metadata(self, context, snapshot, key):
def update_snapshot_metadata(self, context, snapshot, metadata, delete=False):
def get_snapshot_metadata_value(self, snapshot, key):
def get_volumes_image_metadata(self, context):
def get_volume_image_metadata(self, context, volume):
def _check_volume_availability(self, volume, force):
def copy_volume_to_image(self, context, volume, metadata, force):
def extend(self, context, volume, new_size):
def migrate_volume(self, context, volume, host, force_host_copy):
def migrate_volume_completion(self, context, volume, new_volume, error):
def update_readonly_flag(self, context, volume, flag):
def retype(self, context, volume, new_type, migration_policy=None):
def manage_existing(self, context, host, ref, name=None, description=None, volume_type=None, metadata=None, availability_zone=None):
class HostAPI(base.Base):
def __init__(self):
def set_host_enabled(self, context, host, enabled):
def get_host_uptime(self, context, host):
def host_power_action(self, context, host, action):
def set_host_maintenance(self, context, host, mode):
\OpenStack\cinder-2014.1\cinder\volume\configuration.py
class Configuration(object):
def __init__(self, volume_opts, config_group=None):
def _ensure_config_values(self, volume_opts):
def append_config_values(self, volume_opts):
def safe_get(self, value):
def __getattr__(self, value):
\OpenStack\cinder-2014.1\cinder\volume\driver.py
class VolumeDriver(object):
def __init__(self, execute=utils.execute, *args, **kwargs):
def set_execute(self, execute):
def set_initialized(self):
def initialized(self):
def get_version(self):
def _is_non_recoverable(self, err, non_recoverable_list):
def _try_execute(self, *command, **kwargs):
def check_for_setup_error(self):
def create_volume(self, volume):
def create_volume_from_snapshot(self, volume, snapshot):
def create_cloned_volume(self, volume, src_vref):
def delete_volume(self, volume):
def create_snapshot(self, snapshot):
def delete_snapshot(self, snapshot):
def local_path(self, volume):
def ensure_export(self, context, volume):
def create_export(self, context, volume):
def remove_export(self, context, volume):
def initialize_connection(self, volume, connector):
def terminate_connection(self, volume, connector, **kwargs):
def attach_volume(self, context, volume, instance_uuid, host_name, mountpoint):
def detach_volume(self, context, volume):
def get_volume_stats(self, refresh=False):
def do_setup(self, context):
def validate_connector(self, connector):
def copy_volume_data(self, context, src_vol, dest_vol, remote=None):
def copy_image_to_volume(self, context, volume, image_service, image_id):
def copy_volume_to_image(self, context, volume, image_service, image_meta):
def _attach_volume(self, context, volume, properties, remote=False):
def _detach_volume(self, context, attach_info, volume, properties, force=False, remote=False):
def clone_image(self, volume, image_location, image_id, image_meta):
def backup_volume(self, context, backup, backup_service):
def restore_backup(self, context, backup, volume, backup_service):
def clear_download(self, context, volume):
def extend_volume(self, volume, new_size):
def migrate_volume(self, context, volume, host):
def retype(self, context, volume, new_type, diff, host):
def accept_transfer(self, context, volume, new_user, new_project):
def manage_existing(self, volume, existing_ref):
def manage_existing_get_size(self, volume, existing_ref):
def unmanage(self, volume):
class ISCSIDriver(VolumeDriver):
def __init__(self, *args, **kwargs):
def _do_iscsi_discovery(self, volume):
def _get_iscsi_properties(self, volume):
def _run_iscsiadm(self, iscsi_properties, iscsi_command, **kwargs):
def _run_iscsiadm_bare(self, iscsi_command, **kwargs):
def _iscsiadm_update(self, iscsi_properties, property_key, property_value, **kwargs):
def initialize_connection(self, volume, connector):
def validate_connector(self, connector):
def terminate_connection(self, volume, connector, **kwargs):
def get_volume_stats(self, refresh=False):
def _update_volume_stats(self):
def get_target_helper(self, db):
class FakeISCSIDriver(ISCSIDriver):
def __init__(self, *args, **kwargs):
def create_volume(self, volume):
def check_for_setup_error(self):
def initialize_connection(self, volume, connector):
def terminate_connection(self, volume, connector, **kwargs):
def fake_execute(cmd, *_args, **_kwargs):
class ISERDriver(ISCSIDriver):
def __init__(self, *args, **kwargs):
def initialize_connection(self, volume, connector):
def _update_volume_stats(self):
def get_target_helper(self, db):
class FakeISERDriver(FakeISCSIDriver):
def __init__(self, *args, **kwargs):
def initialize_connection(self, volume, connector):
def fake_execute(cmd, *_args, **_kwargs):
class FibreChannelDriver(VolumeDriver):
def __init__(self, *args, **kwargs):
def initialize_connection(self, volume, connector):
\OpenStack\cinder-2014.1\cinder\volume\drivers\block_device.py
class BlockDeviceDriver(driver.ISCSIDriver):
def __init__(self, *args, **kwargs):
def set_execute(self, execute):
def check_for_setup_error(self):
def create_volume(self, volume):
def initialize_connection(self, volume, connector):
def terminate_connection(self, volume, connector, **kwargs):
def create_export(self, context, volume):
def remove_export(self, context, volume):
def ensure_export(self, context, volume):
def delete_volume(self, volume):
def local_path(self, volume):
def copy_image_to_volume(self, context, volume, image_service, image_id):
def copy_volume_to_image(self, context, volume, image_service, image_meta):
def create_cloned_volume(self, volume, src_vref):
def get_volume_stats(self, refresh=False):
def _update_volume_stats(self):
def _get_used_devices(self):
def _get_device_size(self, dev_path):
def _devices_sizes(self):
def find_appropriate_size_device(self, size):
\OpenStack\cinder-2014.1\cinder\volume\drivers\coraid.py
class CoraidRESTClient(object):
def __init__(self, esm_url):
def _check_esm_url(self, esm_url):
def rpc(self, handle, url_params, data, allow_empty_response=False):
def _rpc(self, handle, url_params, data, allow_empty_response):
def to_coraid_kb(gb):
def coraid_volume_size(gb):
class CoraidAppliance(object):
def __init__(self, rest_client, username, password, group):
def _login(self):
def _set_effective_group(self, groups_map, group):
def _ensure_session(self):
def _relogin(self):
def rpc(self, handle, url_params, data, allow_empty_response=False):
def _is_session_expired(self, reply):
def _is_bad_config_state(self, reply):
def configure(self, json_request):
def esm_command(self, request):
def get_volume_info(self, volume_name):
def get_volume_repository(self, volume_name):
def get_all_repos(self):
def ping(self):
def create_lun(self, repository_name, volume_name, volume_size_in_gb):
def delete_lun(self, volume_name):
def resize_volume(self, volume_name, new_volume_size_in_gb):
def create_snapshot(self, volume_name, snapshot_name):
def delete_snapshot(self, snapshot_name):
def create_volume_from_snapshot(self, snapshot_name, volume_name, dest_repository_name):
def clone_volume(self, src_volume_name, dst_volume_name, dst_repository_name):
class CoraidDriver(driver.VolumeDriver):
def __init__(self, *args, **kwargs):
def appliance(self):
def check_for_setup_error(self):
def _get_repository(self, volume_type):
def create_volume(self, volume):
def create_cloned_volume(self, volume, src_vref):
def delete_volume(self, volume):
def create_snapshot(self, snapshot):
def delete_snapshot(self, snapshot):
def create_volume_from_snapshot(self, volume, snapshot):
def extend_volume(self, volume, new_size):
def initialize_connection(self, volume, connector):
def _get_repository_capabilities(self):
def update_volume_stats(self):
def get_volume_stats(self, refresh=False):
def local_path(self, volume):
def create_export(self, context, volume):
def remove_export(self, context, volume):
def terminate_connection(self, volume, connector, **kwargs):
def ensure_export(self, context, volume):
\OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_cli_iscsi.py
class EMCCLIISCSIDriver(driver.ISCSIDriver):
def __init__(self, *args, **kwargs):
def check_for_setup_error(self):
def create_volume(self, volume):
def create_volume_from_snapshot(self, volume, snapshot):
def create_cloned_volume(self, volume, src_vref):
def delete_volume(self, volume):
def create_snapshot(self, snapshot):
def delete_snapshot(self, snapshot):
def ensure_export(self, context, volume):
def create_export(self, context, volume):
def remove_export(self, context, volume):
def check_for_export(self, context, volume_id):
def extend_volume(self, volume, new_size):
def initialize_connection(self, volume, connector):
def do_initialize_connection():
def _do_iscsi_discovery(self, volume):
def vnx_get_iscsi_properties(self, volume, connector):
def terminate_connection(self, volume, connector, **kwargs):
def do_terminate_connection():
def get_volume_stats(self, refresh=False):
def update_volume_stats(self):
\OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_smis_common.py
class EMCSMISCommon():
def __init__(self, prtcl, configuration=None):
def create_volume(self, volume):
def create_volume_from_snapshot(self, volume, snapshot):
def create_cloned_volume(self, volume, src_vref):
def delete_volume(self, volume):
def create_snapshot(self, snapshot, volume):
def delete_snapshot(self, snapshot, volume):
def _expose_paths(self, configservice, vol_instance, connector):
def _hide_paths(self, configservice, vol_instance, connector):
def _add_members(self, configservice, vol_instance):
def _remove_members(self, configservice, vol_instance):
def _map_lun(self, volume, connector):
def _unmap_lun(self, volume, connector):
def initialize_connection(self, volume, connector):
def terminate_connection(self, volume, connector):
def extend_volume(self, volume, new_size):
def update_volume_stats(self):
def _get_storage_type(self, volume, filename=None):
def _get_storage_type_conffile(self, filename=None):
def _get_masking_view(self, filename=None):
def _get_timeout(self, filename=None):
def _get_ecom_cred(self, filename=None):
def _get_ecom_server(self, filename=None):
def _get_ecom_connection(self, filename=None):
def _find_replication_service(self, storage_system):
def _find_storage_configuration_service(self, storage_system):
def _find_controller_configuration_service(self, storage_system):
def _find_storage_hardwareid_service(self, storage_system):
def _find_pool(self, storage_type, details=False):
def _parse_pool_instance_id(self, instanceid):
def _find_lun(self, volume):
def _find_storage_sync_sv_sv(self, snapshot, volume, waitforsync=True):
def _find_initiator_names(self, connector):
def _wait_for_job_complete(self, job):
def _find_lunmasking_scsi_protocol_controller(self, storage_system, connector):
def _find_lunmasking_scsi_protocol_controller_for_vol(self, vol_instance, connector):
def get_num_volumes_mapped(self, volume, connector):
def _find_avail_device_number(self, storage_system):
def find_device_number(self, volume, connector):
def _find_device_masking_group(self):
def _find_storage_processor_system(self, owningsp, storage_system):
def _find_iscsi_protocol_endpoints(self, owningsp, storage_system):
def _getnum(self, num, datatype):
def _getinstancename(self, classname, bindings):
def get_target_wwns(self, storage_system, connector):
def _find_storage_hardwareids(self, connector):
def _get_volumetype_extraspecs(self, volume):
def _get_provisioning(self, storage_type):
\OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_smis_fc.py
class EMCSMISFCDriver(driver.FibreChannelDriver):
def __init__(self, *args, **kwargs):
def check_for_setup_error(self):
def create_volume(self, volume):
def create_volume_from_snapshot(self, volume, snapshot):
def create_cloned_volume(self, volume, src_vref):
def delete_volume(self, volume):
OpenStack IndexPreviousNext
|