OpenStack Study: cinder
OpenStack IndexPreviousNext
def _assoc_storagevolume(self, objectpath):
def _default_assoc(self, objectpath):
def _assocnames_lunmaskctrl(self):
def _default_assocnames(self, objectpath):
def _getinstance_storagevolume(self, objectpath):
def _getinstance_syncsvsv(self, objectpath):
def _getinstance_lunmask(self):
def _getinstance_unit(self, objectpath):
def _getinstance_job(self, jobpath):
def _default_getinstance(self, objectpath):
def _enum_replicationservices(self):
def _enum_stconfsvcs(self):
def _enum_ctrlconfsvcs(self):
def _enum_pools(self):
def _enum_pool_details(self):
def _enum_storagevolumes(self):
def _enum_syncsvsvs(self):
def _create_sync(self, objpath1, objpath2, percentsynced):
def _enum_unitnames(self):
def _enum_lunmaskctrls(self):
def _enum_processors(self):
def _enum_hdwidmgmts(self):
def _enum_storhdwids(self):
def _default_enum(self):
class EMCSMISISCSIDriverTestCase(test.TestCase):
def setUp(self):
def create_fake_config_file(self):
def fake_ecom_connection(self):
def fake_do_iscsi_discovery(self, volume):
def fake_sleep(self, seconds):
def test_get_volume_stats(self):
def test_create_destroy(self):
def test_create_volume_snapshot_destroy(self):
def test_map_unmap(self):
def test_create_volume_failed(self):
def test_create_volume_snapshot_unsupported(self):
def test_create_volume_snapshot_replica_failed(self):
def test_create_volume_snapshot_sync_failed(self):
def test_create_volume_clone_replica_failed(self):
def test_create_volume_clone_sync_failed(self):
def test_delete_volume_notfound(self):
def test_delete_volume_failed(self):
def test_extend_volume(self):
def _cleanup(self):
def tearDown(self):
class EMCSMISFCDriverTestCase(test.TestCase):
def setUp(self):
def create_fake_config_file(self):
def fake_ecom_connection(self):
def fake_sleep(self, seconds):
def test_get_volume_stats(self):
def test_create_destroy(self):
def test_create_volume_snapshot_destroy(self):
def test_map_unmap(self):
def test_create_volume_failed(self):
def test_create_volume_snapshot_unsupported(self):
def test_create_volume_snapshot_replica_failed(self):
def test_create_volume_snapshot_sync_failed(self):
def test_create_volume_clone_replica_failed(self):
def test_create_volume_clone_sync_failed(self):
def test_delete_volume_notfound(self):
def test_delete_volume_failed(self):
def test_extend_volume(self):
def test_create_volume_with_volume_type(self, _mock_volume_type):
def _cleanup(self):
def tearDown(self):
\OpenStack\cinder-2014.1\cinder\tests\test_emc_vnxdirect.py
class EMCVNXCLIDriverTestData():
class EMCVNXCLIDriverISCSITestCase(test.TestCase):
def _fake_cli_executor(self, *cmd, **kwargv):
def setUp(self):
def _restore(self, back_os_path_exists):
def test_create_destroy_volume_withoutExtraSpec(self):
def test_create_destroy_volume_withExtraSpec(self):
def test_get_volume_stats(self):
def test_create_destroy_volume_snapshot(self):
def test_initialize_connection(self, _mock_iscsi_discovery):
def test_terminate_connection(self):
def test_create_volume_failed(self):
def test_create_volume_snapshot_failed(self):
def test_create_volume_from_snapshot(self):
def test_create_volume_from_snapshot_sync_failed(self):
def test_create_cloned_volume(self):
def test_create_volume_clone_sync_failed(self):
def test_delete_volume_failed(self):
def test_extend_volume(self):
def test_extend_volume_has_snapshot(self):
def test_extend_volume_failed(self):
def test_create_remove_export(self):
\OpenStack\cinder-2014.1\cinder\tests\test_eqlx.py
class DellEQLSanISCSIDriverTestCase(test.TestCase):
def setUp(self):
def _fake_get_iscsi_properties(self, volume):
def test_create_volume(self):
def test_delete_volume(self):
def test_delete_absent_volume(self):
def test_ensure_export(self):
def test_create_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_create_cloned_volume(self):
def test_delete_snapshot(self):
def test_extend_volume(self):
def test_initialize_connection(self):
def test_terminate_connection(self):
def test_do_setup(self):
def test_update_volume_stats(self):
def test_get_volume_stats(self):
def test_get_space_in_gb(self):
def test_get_output(self):
def _fake_recv(ignore_arg):
def test_get_prefixed_value(self):
def test_ssh_execute(self):
def test_ssh_execute_error(self):
def test_with_timeout(self):
def no_timeout(cmd, *args, **kwargs):
def w_timeout(cmd, *args, **kwargs):
def test_local_path(self):
\OpenStack\cinder-2014.1\cinder\tests\test_exception.py
class FakeNotifier(object):
def __init__(self):
def notify(self, context, publisher, event, priority, payload):
def good_function():
def bad_function_error():
def bad_function_exception():
class CinderExceptionTestCase(test.TestCase):
def test_default_error_msg(self):
def test_error_msg(self):
def test_default_error_msg_with_kwargs(self):
def test_error_msg_exception_with_kwargs(self):
def test_default_error_code(self):
def test_error_code_from_kwarg(self):
\OpenStack\cinder-2014.1\cinder\tests\test_glusterfs.py
class DumbVolume(object):
def __setitem__(self, key, value):
def __getitem__(self, item):
class FakeDb(object):
def volume_get(self, *a, **kw):
def snapshot_get_all_for_volume(self, *a, **kw):
class GlusterFsDriverTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def stub_out_not_replaying(self, obj, attr_name):
def test_set_execute(self):
def my_execute(*a, **k):
def test_local_path(self):
def test_mount_glusterfs_should_mount_correctly(self):
def test_mount_glusterfs_should_suppress_already_mounted_error(self):
def test_mount_glusterfs_should_reraise_already_mounted_error(self):
def test_mount_glusterfs_should_create_mountpoint_if_not_yet(self):
def test_get_hash_str(self):
def test_get_mount_point_for_share(self):
def test_get_available_capacity_with_df(self):
def test_load_shares_config(self):
def test_ensure_share_mounted(self):
def test_ensure_shares_mounted_should_save_mounting_successfully(self):
def test_ensure_shares_mounted_should_not_save_mounting_with_error(self):
def test_setup_should_throw_error_if_shares_config_not_configured(self):
def test_setup_should_throw_exception_if_client_is_not_installed(self):
def _fake_load_shares_config(self, conf):
def _fake_NamedTemporaryFile(self, prefix=None, dir=None):
def test_setup_set_share_permissions(self):
def test_find_share_should_throw_error_if_there_is_no_mounted_shares(self):
def test_find_share(self):
def test_find_share_should_throw_error_if_there_is_no_enough_place(self):
def _simple_volume(self, id=None):
def test_create_sparsed_volume(self):
def test_create_nonsparsed_volume(self):
def test_create_qcow2_volume(self):
def test_create_volume_should_ensure_glusterfs_mounted(self):
def test_create_volume_should_return_provider_location(self):
def test_create_cloned_volume(self):
def test_delete_volume(self):
def test_delete_should_ensure_share_mounted(self):
def test_delete_should_not_delete_if_provider_location_not_provided(self):
def test_delete_volume_with_info_file(self, mock_path_exists, mock_remove):
def test_create_snapshot(self):
def test_delete_snapshot_bottom(self):
def test_delete_snapshot_middle(self):
def test_delete_snapshot_not_in_info(self):
def test_read_info_file(self):
def test_extend_volume(self):
def test_create_snapshot_online(self):
def test_create_snapshot_online_novafailure(self):
def test_delete_snapshot_online_1(self):
def test_delete_snapshot_online_2(self):
def test_delete_snapshot_online_novafailure(self):
def test_get_backing_chain_for_path(self):
def test_copy_volume_from_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_initialize_connection(self):
def test_get_mount_point_base(self):
def test_backup_volume(self):
def test_backup_volume_previous_snap(self):
def test_backup_snap_failure_1(self):
def test_backup_snap_failure_2(self):
def test_backup_failure_unsupported_format(self):
\OpenStack\cinder-2014.1\cinder\tests\test_gpfs.py
class FakeQemuImgInfo(object):
def __init__(self):
class GPFSDriverTestCase(test.TestCase):
def _execute_wrapper(self, cmd, *args, **kwargs):
def setUp(self):
def tearDown(self):
def test_different(self):
def test_sizestr(self):
def test_get_gpfs_state_ok(self, mock_exec):
def test_get_gpfs_state_fail_mmgetstate(self, mock_exec):
def test_check_gpfs_state_ok(self, mock_get_gpfs_state):
def test_check_gpfs_state_fail_not_active(self, mock_get_gpfs_state):
def test_get_fs_from_path_ok(self, mock_exec):
def test_get_fs_from_path_fail_path(self, mock_exec):
def test_get_fs_from_path_fail_raise(self, mock_exec):
def test_get_gpfs_cluster_id_ok(self, mock_exec):
def test_get_gpfs_cluster_id_fail_id(self, mock_exec):
def test_get_gpfs_cluster_id_fail_raise(self, mock_exec):
def test_get_fileset_from_path_ok(self, mock_exec):
def test_get_fileset_from_path_fail_mmlsattr(self, mock_exec):
def test_get_fileset_from_path_fail_find_fileset(self, mock_exec):
def test_verify_gpfs_pool_ok(self, mock_exec):
def test_verify_gpfs_pool_fail_pool(self, mock_exec):
def test_verify_gpfs_pool_fail_raise(self, mock_exec):
def test_update_volume_storage_pool_ok(self, mock_exec, mock_verify_pool):
def test_update_volume_storage_pool_ok_pool_none(self, mock_exec, mock_verify_pool):
def test_update_volume_storage_pool_fail_pool(self, mock_exec, mock_verify_pool):
def test_update_volume_storage_pool_fail_mmchattr(self, mock_exec, mock_verify_pool):
def test_get_gpfs_fs_release_level_ok(self, mock_exec, mock_fs_from_path):
def test_get_gpfs_fs_release_level_fail_mmlsfs(self, mock_exec, mock_fs_from_path):
def test_get_gpfs_cluster_release_level_ok(self, mock_exec):
def test_get_gpfs_cluster_release_level_fail_mmlsconfig(self, mock_exec):
def test_is_gpfs_path_fail_mmlsattr(self, mock_exec):
def test_is_same_fileset_ok(self, mock_exec, mock_get_fileset_from_path):
def test_same_cluster_ok(self, mock_exec, mock_avail_capacity):
def test_set_rw_permission(self, mock_exec):
def test_can_migrate_locally(self, mock_exec):
def test_do_setup_ok(self, mock_exec, mock_get_gpfs_cluster_id, mock_get_filesystem_from_path, mock_verify_gpfs_pool):
def test_do_setup_fail_get_cluster_id(self, mock_exec, mock_get_gpfs_cluster_id, mock_get_filesystem_from_path, mock_verify_gpfs_pool):
def test_do_setup_fail_get_fs_from_path(self, mock_exec, mock_get_gpfs_cluster_id, mock_get_fs_from_path, mock_verify_gpfs_pool):
def test_do_setup_fail_volume(self, mock_exec, mock_get_gpfs_cluster_id, mock_get_filesystem_from_path, mock_verify_gpfs_pool):
def test_check_for_setup_error_fail_conf(self, mock_get_gpfs_fs_rel_lev, mock_is_gpfs_path, mock_check_gpfs_state):
def test_create_sparse_file(self, mock_exec):
def test_allocate_file_blocks(self, mock_exec):
def test_gpfs_change_attributes(self, mock_exec):
def test_set_volume_attributes(self, mock_change_attributes):
def test_set_volume_attributes_no_attributes(self, mock_change_attributes):
def test_set_volume_attributes_no_options(self, mock_change_attributes):
def test_create_volume(self, mock_gpfs_path_state, mock_local_path, mock_sparse_file, mock_rw_permission, mock_set_volume_attributes, mock_allocate_file_blocks, mock_exec):
def test_create_volume_no_sparse_volume(self, mock_gpfs_path_state, mock_local_path, mock_sparse_file, mock_rw_permission, mock_set_volume_attributes, mock_allocate_file_blocks, mock_exec):
def test_create_volume_from_snapshot(self, mock_local_path, mock_create_gpfs_copy, mock_rw_permission, mock_gpfs_redirect, mock_resize_volume_file):
def test_create_cloned_volume(self, mock_local_path, mock_create_gpfs_clone, mock_rw_permission, mock_resize_volume_file):
def test_delete_gpfs_file_ok(self, mock_exec):
def test_delete_gpfs_file_ok_parent(self, mock_exec, mock_path_exists):
def test_delete_volume(self, mock_verify_gpfs_path_state, mock_local_path, mock_delete_gpfs_file):
def test_gpfs_redirect_ok(self, mock_exec):
def test_gpfs_redirect_fail_depth(self, mock_exec):
def test_gpfs_redirect_fail_match(self, mock_exec):
def test_create_gpfs_clone(self, mock_exec, mock_redirect, mock_cr_gpfs_cp, mock_cr_gpfs_snap):
def test_create_gpfs_copy(self, mock_exec):
def test_create_gpfs_snap(self, mock_exec):
def test_is_gpfs_parent_file_ok(self, mock_exec):
def test_create_snapshot(self, mock_local_path, mock_create_gpfs_snap, mock_set_rw_permission, mock_gpfs_redirect):
def test_delete_snapshot(self, mock_local_path, mock_exec):
def test_ensure_export(self):
def test_create_export(self):
def test_remove_export(self):
def test_initialize_connection(self):
def test_terminate_connection(self):
def test_get_volume_stats(self):
def test_get_volume_stats_none_stats(self, mock_upd_vol_stats):
def test_clone_image_pub(self, mock_exec):
def test_is_cloneable_ok(self, mock_is_gpfs_path):
def test_is_cloneable_fail_config(self, mock_is_gpfs_path):
def test_is_cloneable_fail_path(self, mock_is_gpfs_path):
def test_clone_image(self, mock_verify_gpfs_path_state, mock_is_cloneable, mock_local_path, mock_is_gpfs_parent_file, mock_create_gpfs_snap, mock_qemu_img_info, mock_create_gpfs_copy, mock_conv_image, mock_set_rw_permission, mock_resize_volume_file):
def test_clone_image_not_cloneable(self, mock_verify_gpfs_path_state, mock_is_cloneable):
def test_clone_image_format(self, mock_verify_gpfs_path_state, mock_is_cloneable, mock_local_path, mock_is_gpfs_parent_file, mock_create_gpfs_snap, mock_qemu_img_info, mock_create_gpfs_copy, mock_copyfile, mock_conv_image, mock_set_rw_permission, mock_resize_volume_file):
def test_copy_image_to_volume(self, mock_verify_gpfs_path_state, mock_fetch_to_raw, mock_local_path, mock_resize_volume_file):
def test_resize_volume_file_ok(self, mock_local_path, mock_resize_image, mock_qemu_img_info):
def test_resize_volume_file_fail(self, mock_local_path, mock_resize_image, mock_qemu_img_info):
def test_extend_volume(self, mock_resize_volume_file):
def test_copy_volume_to_image(self, mock_upload_volume, mock_local_path):
def test_backup_volume(self, mock_local_path, mock_create_gpfs_clone, mock_gpfs_redirect, mock_temp_chown, mock_file_open, mock_delete_gpfs_file):
def test_restore_backup(self, mock_local_path, mock_temp_chown, mock_file_open):
def test_migrate_volume_ok(self, mock_local, mock_exec):
def test_migrate_volume_fail_dest_path(self, mock_local, mock_exec):
def test_migrate_volume_fail_mpb(self, mock_local, mock_exec):
def test_migrate_volume_fail_mv(self, mock_local, mock_exec):
def test_migrate_volume_ok_pub(self, mock_migrate_volume):
def test_retype_ok(self, mock_different, mock_strg_pool, mock_migrate_vol):
def test_retype_diff_backend(self, mock_different, mock_strg_pool, mock_migrate_vol):
def test_retype_diff_pools_migrated(self, mock_different, mock_strg_pool, mock_migrate_vol):
def test_retype_diff_pools(self, mock_different, mock_strg_pool, mock_migrate_vol):
def test_retype_no_diff_hit(self, mock_different, mock_strg_pool, mock_migrate_vol):
def test_mkfs_ok(self, mock_exec):
def test_mkfs_fail_mk(self, mock_exec):
def test_get_available_capacity_ok(self, mock_exec):
def test_get_available_capacity_fail_mounted(self, mock_exec, mock_path_state):
def test_verify_gpfs_path_state_ok(self, mock_is_gpfs_path):
def test_verify_gpfs_path_state_fail_path(self, mock_is_gpfs_path):
def _fake_qemu_qcow2_image_info(self, path):
def _fake_qemu_raw_image_info(self, path):
def _fake_retype_arguments(self):
\OpenStack\cinder-2014.1\cinder\tests\test_hds.py
class SimulatedHusBackend:
def __init__(self):
def get_version(self, cmd, ver, ip0, ip1, user, pw):
def get_iscsi_info(self, cmd, ver, ip0, ip1, user, pw):
def get_hdp_info(self, cmd, ver, ip0, ip1, user, pw):
def create_lu(self, cmd, ver, ip0, ip1, user, pw, id, hdp, start, end, size):
def extend_vol(self, cmd, ver, ip0, ip1, user, pw, id, lu, size):
def delete_lu(self, cmd, ver, ip0, ip1, user, pw, id, lun):
def create_dup(self, cmd, ver, ip0, ip1, user, pw, id, src_lun, hdp, start, end, size):
def add_iscsi_conn(self, cmd, ver, ip0, ip1, user, pw, id, lun, ctl, port, iqn, initiator):
def del_iscsi_conn(self, cmd, ver, ip0, ip1, user, pw, id, lun, ctl, port, iqn, initiator):
class HUSiSCSIDriverTest(test.TestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def tearDown(self):
def test_get_volume_stats(self):
def test_create_volume(self):
def test_delete_volume(self):
def test_extend_volume(self):
def test_create_snapshot(self):
def test_create_clone(self):
def test_delete_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_initialize_connection(self):
def test_terminate_connection(self):
\OpenStack\cinder-2014.1\cinder\tests\test_hp3par.py
class HP3PARBaseDriver(object):
def setup_configuration(self):
def setup_mock_client(self, _m_client, driver, conf=None, m_conf=None):
def test_create_volume(self):
def test_create_volume_qos(self, _mock_volume_types):
def test_delete_volume(self):
def test_create_cloned_volume(self):
def test_migrate_volume(self):
def test_migrate_volume_diff_host(self):
def test_migrate_volume_diff_domain(self):
def test_migrate_volume_attached(self):
def test_attach_volume(self):
def test_detach_volume(self):
def test_create_snapshot(self):
def test_delete_snapshot(self):
def test_delete_snapshot_in_use(self):
def test_delete_snapshot_not_found(self):
def test_create_volume_from_snapshot(self):
def test_create_volume_from_snapshot_and_extend(self):
def test_create_volume_from_snapshot_and_extend_copy_fail(self):
def test_create_volume_from_snapshot_qos(self, _mock_volume_types):
def test_terminate_connection(self):
def test_update_volume_key_value_pair(self):
def test_clear_volume_key_value_pair(self):
def test_extend_volume(self):
def test_extend_volume_non_base(self):
def test_extend_volume_non_base_failure(self):
def test_get_ports(self):
def test_get_by_qos_spec_with_scoping(self):
def test_get_by_qos_spec(self):
def test_get_by_qos_by_type_only(self):
class TestHP3PARFCDriver(HP3PARBaseDriver, test.TestCase):
def setUp(self):
def tearDown(self):
def setup_driver(self, config=None, mock_conf=None):
def test_initialize_connection(self):
def test_terminate_connection(self):
def test_get_volume_stats(self):
def test_create_host(self):
def test_create_invalid_host(self):
def test_create_modify_host(self):
def test_modify_host_with_new_wwn(self):
def test_modify_host_with_unknown_wwn_and_new_wwn(self):
class TestHP3PARISCSIDriver(HP3PARBaseDriver, test.TestCase):
def setUp(self):
def tearDown(self):
def setup_driver(self, config=None, mock_conf=None):
def test_initialize_connection(self):
def test_get_volume_stats(self):
def test_create_host(self):
def test_create_invalid_host(self):
def test_create_modify_host(self):
def test_get_least_used_nsp_for_host_single(self):
def test_get_least_used_nsp_for_host_new(self):
def test_get_least_used_nsp_for_host_reuse(self):
def test_get_least_used_nps_for_host_fc(self):
def test_invalid_iscsi_ip(self):
def test_get_least_used_nsp(self):
\OpenStack\cinder-2014.1\cinder\tests\test_hplefthand.py
class HPLeftHandBaseDriver():
class TestHPLeftHandCLIQISCSIDriver(HPLeftHandBaseDriver, test.TestCase):
def _fake_cliq_run(self, verb, cliq_args, check_exit_code=True):
def create_volume(cliq_args):
def delete_volume(cliq_args):
def extend_volume(cliq_args):
def assign_volume(cliq_args):
def unassign_volume(cliq_args):
def create_snapshot(cliq_args):
def delete_snapshot(cliq_args):
def create_volume_from_snapshot(cliq_args):
def get_cluster_info(cliq_args):
def get_volume_info(cliq_args):
def get_snapshot_info(cliq_args):
def get_server_info(cliq_args):
def create_server(cliq_args):
def test_error(cliq_args):
def setUp(self):
def tearDown(self):
def default_mock_conf(self):
def setup_driver(self, config=None):
def test_create_volume(self):
def test_delete_volume(self):
def test_extend_volume(self):
def test_initialize_connection(self):
def test_terminate_connection(self):
def test_create_snapshot(self):
def test_delete_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_get_volume_stats(self):
class TestHPLeftHandRESTISCSIDriver(HPLeftHandBaseDriver, test.TestCase):
def setUp(self):
def tearDown(self):
def default_mock_conf(self):
def setup_driver(self, _mock_client, config=None):
def test_create_volume(self):
def test_create_volume_with_es(self, _mock_volume_type):
def test_delete_volume(self):
def test_extend_volume(self):
def test_initialize_connection(self):
def test_initialize_connection_with_chaps(self):
def test_terminate_connection(self):
def test_create_snapshot(self):
def test_delete_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_create_cloned_volume(self):
def test_extra_spec_mapping(self, _mock_get_volume_type):
def test_extra_spec_mapping_invalid_value(self, _mock_get_volume_type):
def test_retype_with_no_LH_extra_specs(self):
def test_retype_with_only_LH_extra_specs(self):
def test_retype_with_both_extra_specs(self):
def test_retype_same_extra_specs(self):
def test_migrate_no_location(self):
def test_migrate_incorrect_vip(self):
def test_migrate_with_location(self):
def test_migrate_with_Snapshots(self):
def test_create_volume_with_ao_true(self, _mock_volume_type):
def test_create_volume_with_ao_false(self, _mock_volume_type):
\OpenStack\cinder-2014.1\cinder\tests\test_hp_msa.py
class TestHPMSAClient(test.TestCase):
def setUp(self):
def test_login(self, mock_url_open):
def test_build_request_url(self):
def test_request(self, mock_url_open):
def test_assert_response_ok(self):
def test_vdisk_exists(self, mock_request):
def test_vdisk_stats(self, mock_request):
def test_get_lun(self, mock_request):
def test_get_ports(self, mock_request):
def test_get_fc_ports(self, mock_request):
class FakeConfiguration(object):
def safe_get(self, key):
class TestHPMSACommon(test.TestCase):
def setUp(self):
def test_do_setup(self, mock_login, mock_logout, mock_vdisk_exists):
def test_vol_name(self):
def test_check_flags(self):
def test_assert_connector_ok(self):
def test_update_volume_stats(self, mock_stats):
def test_create_volume(self, mock_create):
def test_delete_volume(self, mock_delete):
def test_create_cloned_volume(self, mock_stats, mock_copy):
def test_create_volume_from_snapshot(self, mock_stats, mock_copy):
def test_extend_volume(self, mock_extend):
def test_create_snapshot(self, mock_create):
def test_delete_snapshot(self, mock_delete):
def test_map_volume(self, mock_map):
def test_unmap_volume(self, mock_unmap):
class TestHPMSAFC(test.TestCase):
def setUp(self, mock_setup):
def fake_init(self, *args, **kwargs):
def _test_with_mock(self, mock, method, args, expected=None):
def test_create_volume(self, mock_create):
def test_create_cloned_volume(self, mock_create):
def test_create_volume_from_snapshot(self, mock_create):
def test_delete_volume(self, mock_delete):
def test_create_snapshot(self, mock_create):
def test_delete_snapshot(self, mock_delete):
def test_extend_volume(self, mock_extend):
def test_initialize_connection(self, mock_login, mock_map, mock_ports, mock_logout):
def test_terminate_connection(self, mock_login, mock_unmap, mock_logout):
def test_get_volume_stats(self, mock_login, mock_stats, mock_logout):
\OpenStack\cinder-2014.1\cinder\tests\test_huawei_hvs.py
def Fake_sleep(time):
class FakeHVSCommon(rest_common.HVSCommon):
def __init__(self, configuration):
def _parse_volume_type(self, volume):
def _change_file_mode(self, filepath):
def call(self, url=False, data=None, method=None):
class FakeHVSiSCSIStorage(huawei_hvs.HuaweiHVSISCSIDriver):
def __init__(self, configuration):
def do_setup(self, context):
class FakeHVSFCStorage(huawei_hvs.HuaweiHVSFCDriver):
def __init__(self, configuration):
def do_setup(self, context):
class HVSRESTiSCSIDriverTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_log_in_success(self):
def test_log_out_success(self):
def test_create_volume_success(self):
def test_extend_volume_success(self):
def test_create_snapshot_success(self):
def test_delete_volume_success(self):
def test_delete_snapshot_success(self):
def test_colone_volume_success(self):
def test_create_volume_from_snapshot_success(self):
def test_initialize_connection_success(self):
def test_terminate_connection_success(self):
def test_initialize_connection_no_view_success(self):
def test_terminate_connectio_no_view_success(self):
def test_get_volume_stats(self):
def test_create_snapshot_fail(self):
def test_create_volume_fail(self):
def test_delete_volume_fail(self):
def test_delete_snapshot_fail(self):
def test_initialize_connection_fail(self):
def create_fake_conf_file(self):
class HVSRESTFCDriverTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_log_in_Success(self):
def test_create_volume_success(self):
def test_extend_volume_success(self):
def test_create_snapshot_success(self):
def test_delete_volume_success(self):
def test_delete_snapshot_success(self):
def test_colone_volume_success(self):
def test_create_volume_from_snapshot_success(self):
def test_initialize_connection_success(self):
def test_terminate_connection_success(self):
def test_initialize_connection_no_view_success(self):
def test_terminate_connection_no_viewn_success(self):
def test_get_volume_stats(self):
def test_create_snapshot_fail(self):
def test_create_volume_fail(self):
def test_delete_volume_fail(self):
def test_delete_snapshot_fail(self):
def create_fake_conf_file(self):
\OpenStack\cinder-2014.1\cinder\tests\test_huawei_t_dorado.py
class FakeChannel():
def __init__(self):
def resize_pty(self, width=80, height=24):
def settimeout(self, time):
def send(self, s):
def recv(self, nbytes):
def close(self):
class FakeSSHClient():
def invoke_shell(self):
def get_transport(self):
def close(self):
class FakeSSHPool():
def __init__(self, ip, port, conn_timeout, login, password=None, *args, **kwargs):
def create(self):
def get(self):
def put(self, ssh):
def remove(self, ssh):
def Fake_sleep(time):
def Fake_change_file_mode(obj, filepath):
def create_fake_conf_file(filename):
def modify_conf(conf, item, val, attrib=None):
def set_error_flg(cmd):
def reset_error_flg(cmd):
class HuaweiTCLIResSimulator():
def _paras_name(self, params):
def cli_showsys(self, params):
def cli_createlun(self, params):
def cli_showlun(self, params):
def cli_dellun(self, params):
def cli_showrg(self, params):
def cli_showpool(self, params):
def cli_createluncopy(self, params):
def cli_chgluncopystatus(self, params):
def cli_showluncopy(self, params):
def cli_delluncopy(self, params):
def cli_createsnapshot(self, params):
def cli_showsnapshot(self, params):
def cli_actvsnapshot(self, params):
def cli_disablesnapshot(self, params):
def cli_delsnapshot(self, params):
def cli_showrespool(self, params):
def cli_showiscsitgtname(self, params):
def cli_showiscsiip(self, params):
def cli_showhostgroup(self, params):
def cli_createhostgroup(self, params):
def cli_showhost(self, params):
def cli_addhost(self, params):
def cli_delhost(self, params):
def cli_showiscsiini(self, params):
def cli_addiscsiini(self, params):
def cli_deliscsiini(self, params):
def cli_showhostport(self, params):
def cli_addhostport(self, params):
def cli_delhostport(self, params):
def cli_showhostmap(self, params):
def cli_addhostmap(self, params):
def cli_delhostmap(self, params):
def cli_showfreeport(self, params):
def cli_showhostpath(self, params):
def cli_showfcmode(self, params):
def cli_chglun(self, params):
def cli_addluntoextlun(self, params):
def cli_rmlunfromextlun(self, patams):
class HuaweiDorado5100CLIResSimulator(HuaweiTCLIResSimulator):
def cli_showsys(self, params):
def cli_showlun(self, params):
class HuaweiDorado2100G2CLIResSimulator(HuaweiTCLIResSimulator):
def cli_showsys(self, params):
def cli_createlun(self, params):
def cli_showlun(self, params):
class HuaweiTISCSIDriverTestCase(test.TestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def _init_driver(self):
def tearDown(self):
def test_conf_invalid(self):
def test_volume_type(self):
def test_create_delete_volume(self):
def test_create_delete_cloned_volume(self):
def test_extend_volume(self):
def test_create_delete_snapshot(self):
def test_create_delete_snapshot_volume(self):
def test_initialize_connection(self):
def test_terminate_connection(self):
def test_get_volume_stats(self):
class HuaweiTFCDriverTestCase(test.TestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def _init_driver(self):
def tearDown(self):
def test_validate_connector_failed(self):
def test_create_delete_volume(self):
def test_create_delete_snapshot(self):
def test_create_cloned_volume(self):
def test_create_snapshot_volume(self):
def test_initialize_terminitat_connection(self):
def _test_get_volume_stats(self):
class HuaweiDorado5100FCDriverTestCase(HuaweiTFCDriverTestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def _init_driver(self):
def test_create_cloned_volume(self):
def test_create_snapshot_volume(self):
class HuaweiDorado2100G2FCDriverTestCase(HuaweiTFCDriverTestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def _init_driver(self):
def test_create_cloned_volume(self):
def test_create_delete_snapshot(self):
def test_create_snapshot_volume(self):
def test_extend_volume(self):
class HuaweiDorado5100ISCSIDriverTestCase(HuaweiTISCSIDriverTestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def _init_driver(self):
def test_create_delete_cloned_volume(self):
def test_create_delete_snapshot_volume(self):
def test_volume_type(self):
class HuaweiDorado2100G2ISCSIDriverTestCase(HuaweiTISCSIDriverTestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def _init_driver(self):
def test_conf_invalid(self):
def test_create_delete_cloned_volume(self):
def test_create_delete_snapshot(self):
def test_create_delete_snapshot_volume(self):
def test_initialize_connection(self):
def test_extend_volume(self):
class SSHMethodTestCase(test.TestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def tearDown(self):
def test_reach_max_connection_limit(self):
def test_socket_timeout(self):
def _fake_recv1(self, nbytes):
def _fake_recv2(self, nBytes):
class HuaweiUtilsTestCase(test.TestCase):
def __init__(self, *args, **kwargs):
def setUp(self):
def tearDown(self):
def test_parse_xml_file_ioerror(self):
def test_is_xml_item_exist(self):
def test_is_xml_item_valid(self):
def test_get_conf_host_os_type(self):
\OpenStack\cinder-2014.1\cinder\tests\test_ibmnas.py
class FakeEnv(object):
def __setitem__(self, key, value):
def __getitem__(self, item):
class IBMNASDriverTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _set_flag(self, flag, value):
def _reset_flags(self):
def test_check_for_setup_error(self):
def test_get_provider_location(self):
def test_get_export_path(self):
def test_create_ibmnas_snap_mount_point_provided(self):
def test_create_ibmnas_snap_no_mount_point_provided(self):
def test_create_ibmnas_copy(self):
def test_resize_volume_file(self):
def test_extend_volume(self):
def test_delete_snapfiles(self):
def test_delete_volume_no_provider_location(self):
def test_delete_volume(self):
def test_create_snapshot(self):
def test_delete_snapshot(self):
def test_create_cloned_volume(self):
def test_create_volume_from_snapshot(self):
\OpenStack\cinder-2014.1\cinder\tests\test_ibm_xiv_ds8k.py
class XIVDS8KFakeProxyDriver(object):
def __init__(self, xiv_ds8k_info, logger, expt, driver=None):
def setup(self, context):
def create_volume(self, volume):
def volume_exists(self, volume):
def delete_volume(self, volume):
def initialize_connection(self, volume, connector):
def terminate_connection(self, volume, connector):
def is_volume_attached(self, volume, connector):
class XIVDS8KVolumeDriverTest(test.TestCase):
def setUp(self):
def test_initialized_should_set_xiv_ds8k_info(self):
def test_setup_should_fail_if_credentials_are_invalid(self):
def test_setup_should_fail_if_connection_is_invalid(self):
def test_create_volume(self):
def test_volume_exists(self):
def test_delete_volume(self):
def test_delete_volume_should_fail_for_not_existing_volume(self):
def test_create_volume_should_fail_if_no_pool_space_left(self):
def test_initialize_connection(self):
def test_initialize_connection_should_fail_for_non_existing_volume(self):
def test_terminate_connection(self):
def test_terminate_connection_should_fail_on_non_existing_volume(self):
\OpenStack\cinder-2014.1\cinder\tests\test_image_utils.py
class FakeImageService:
def __init__(self):
def download(self, context, image_id, data):
def show(self, context, image_id):
def update(self, context, image_id, metadata, path):
class TestUtils(test.TestCase):
def setUp(self):
def test_resize_image(self):
def test_convert_image(self):
def test_qemu_img_info(self):
def test_qemu_img_info_alt(self):
def _test_fetch_to_raw(self, has_qemu=True, src_inf=None, dest_inf=None):
def test_fetch_to_raw(self):
def test_fetch_to_raw_no_qemu_img(self):
def test_fetch_to_raw_on_error_parsing_failed(self):
def test_fetch_to_raw_on_error_backing_file(self):
def test_fetch_to_raw_on_error_not_convert_to_raw(self):
def test_fetch_to_raw_on_error_image_size(self):
def _test_fetch_verify_image(self, qemu_info, volume_size=1):
def test_fetch_verify_image_with_backing_file(self):
def test_fetch_verify_image_without_file_format(self):
def test_fetch_verify_image_image_size(self):
def test_upload_volume(self):
def test_upload_volume_with_raw_image(self):
def test_upload_volume_on_error(self):
class TestExtractTo(test.TestCase):
def test_extract_to_calls_tar(self):
class TestSetVhdParent(test.TestCase):
def test_vhd_util_call(self):
class TestFixVhdChain(test.TestCase):
def test_empty_chain(self):
def test_single_vhd_file_chain(self):
def test_chain_with_two_elements(self):
class TestGetSize(test.TestCase):
def test_vhd_util_call(self):
class TestResize(test.TestCase):
def test_vhd_util_call(self):
class TestCoalesce(test.TestCase):
def test_vhd_util_call(self):
def fake_context(return_value):
class TestTemporaryFile(test.TestCase):
def test_file_unlinked(self):
def test_file_unlinked_on_error(self):
def sut():
class TestCoalesceChain(test.TestCase):
def test_single_vhd(self):
def test_chain_of_two_vhds(self):
class TestDiscoverChain(test.TestCase):
def test_discovery_calls(self):
class TestXenServerImageToCoalescedVhd(test.TestCase):
def test_calls(self):
\OpenStack\cinder-2014.1\cinder\tests\test_iscsi.py
class TargetAdminTestCase(object):
def setUp(self):
def fake_verify_backing_lun(obj, iqn, tid):
def fake_verify_rtstool(obj):
def fake_get_target(obj, iqn):
def get_script_params(self):
def get_script(self):
def fake_execute(self, *cmd, **kwargs):
def clear_cmds(self):
def verify_cmds(self, cmds):
def verify(self):
def run_commands(self):
def test_target_admin(self):
class TgtAdmTestCase(test.TestCase, TargetAdminTestCase):
def setUp(self):
def tearDown(self):
class IetAdmTestCase(test.TestCase, TargetAdminTestCase):
def setUp(self):
class IetAdmBlockIOTestCase(test.TestCase, TargetAdminTestCase):
def setUp(self):
class IetAdmFileIOTestCase(test.TestCase, TargetAdminTestCase):
def setUp(self):
class IetAdmAutoIOTestCase(test.TestCase, TargetAdminTestCase):
def setUp(self):
class LioAdmTestCase(test.TestCase, TargetAdminTestCase):
def setUp(self):
class ISERTgtAdmTestCase(TgtAdmTestCase):
def setUp(self):
\OpenStack\cinder-2014.1\cinder\tests\test_migrations.py
def _get_connect_string(backend, user="openstack_citest", passwd="openstack_citest", database="openstack_citest"):
def _is_mysql_avail(**kwargs):
def _is_backend_avail(backend, user="openstack_citest", passwd="openstack_citest", database="openstack_citest"):
def _have_mysql():
def get_table(engine, name):
class TestMigrations(test.TestCase):
def setUp(self):
def tearDown(self):
def _reset_databases(self):
def execute_cmd(cmd=None):
def test_walk_versions(self):
def test_mysql_connect_fail(self):
def test_mysql_innodb(self):
def test_postgresql_connect_fail(self):
def test_postgresql_opportunistically(self):
def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
def _migrate_down(self, engine, version):
def _migrate_up(self, engine, version, with_data=False):
def _prerun_004(self, engine):
def _check_004(self, engine, data):
def test_migration_005(self):
def _metadatas(self, upgrade_to, downgrade_to=None):
def metadatas_upgraded_to(self, revision):
def metadatas_downgraded_from(self, revision):
def test_upgrade_006_adds_provider_location(self):
def test_downgrade_006_removes_provider_location(self):
def test_upgrade_007_adds_fk(self):
def test_downgrade_007_removes_fk(self):
def test_migration_008(self):
def test_migration_009(self):
def test_migration_010(self):
def test_migration_011(self):
def test_migration_012(self):
def test_migration_013(self):
def test_migration_014(self):
def test_migration_015(self):
def test_migration_016(self):
def test_migration_017(self):
def test_migration_018(self):
def test_migration_019(self):
def test_migration_020(self):
def test_migration_021(self):
def test_migration_022(self):
\OpenStack\cinder-2014.1\cinder\tests\test_misc.py
class ExceptionTestCase(test.TestCase):
def _raise_exc(exc):
def test_exceptions_raise(self):
class ProjectTestCase(test.TestCase):
def test_all_migrations_have_downgrade(self):
\OpenStack\cinder-2014.1\cinder\tests\test_netapp.py
def create_configuration():
class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def log_message(self, format, *args):
class FakeHttplibSocket(object):
def __init__(self, value):
def newclose():
def makefile(self, mode, _other):
class FakeDirectCMODEServerHandler(FakeHTTPRequestHandler):
def do_GET(s):
def do_POST(s):
def _get_child_by_name(self, name):
def _get_child_content(self, name):
class FakeDirectCmodeHTTPConnection(object):
def __init__(self, host, timeout=None):
def request(self, method, path, data=None, headers=None):
def set_debuglevel(self, level):
def getresponse(self):
def getresponsebody(self):
class NetAppDirectCmodeISCSIDriverTestCase(test.TestCase):
def setUp(self):
def _custom_setup(self):
def _set_config(self, configuration):
def test_connect(self):
def test_create_destroy(self):
def test_create_vol_snapshot_destroy(self):
def test_map_unmap(self):
def test_cloned_volume_destroy(self):
def test_map_by_creating_igroup(self):
def test_fail_create_vol(self):
def test_vol_stats(self):
def test_create_vol_snapshot_diff_size_resize(self):
def test_create_vol_snapshot_diff_size_subclone(self):
def test_extend_vol_same_size(self):
def test_extend_vol_direct_resize(self):
def test_extend_vol_sub_lun_clone(self):
class NetAppDriverNegativeTestCase(test.TestCase):
def setUp(self):
def test_incorrect_family(self):
def test_incorrect_protocol(self):
def test_non_netapp_driver(self):
class FakeDirect7MODEServerHandler(FakeHTTPRequestHandler):
def do_GET(s):
def do_POST(s):
class FakeDirect7modeHTTPConnection(object):
def __init__(self, host, timeout=None):
def request(self, method, path, data=None, headers=None):
def set_debuglevel(self, level):
def getresponse(self):
def getresponsebody(self):
class NetAppDirect7modeISCSIDriverTestCase_NV(
NetAppDirectCmodeISCSIDriverTestCase):
def setUp(self):
def _custom_setup(self):
def _set_config(self, configuration):
def test_create_on_select_vol(self):
def test_create_fail_on_select_vol(self):
def test_check_for_setup_error_version(self):
class NetAppDirect7modeISCSIDriverTestCase_WV(
NetAppDirect7modeISCSIDriverTestCase_NV):
def setUp(self):
def _custom_setup(self):
def _set_config(self, configuration):
class NetAppApiElementTransTests(test.TestCase):
def setUp(self):
def test_translate_struct_dict_unique_key(self):
def test_translate_struct_dict_nonunique_key(self):
def test_translate_struct_list(self):
def test_translate_struct_tuple(self):
def test_translate_invalid_struct(self):
def test_setter_builtin_types(self):
def test_setter_na_element(self):
def test_setter_child_dict(self):
def test_setter_child_list_tuple(self):
def test_setter_no_value(self):
def test_setter_invalid_value(self):
def test_setter_invalid_key(self):
\OpenStack\cinder-2014.1\cinder\tests\test_netapp_eseries_iscsi.py
def create_configuration():
class FakeEseriesResponse(object):
def __init__(self, code=None, text=None):
def json(self):
class FakeEseriesServerHandler(object):
def do_GET(self, path, params, data, headers):
def do_POST(self, path, params, data, headers):
def do_DELETE(self, path, params, data, headers):
class FakeEseriesHTTPSession(object):
def __init__(self):
def request(self, method, url, params, data, headers, timeout, verify):
class NetAppEseriesIscsiDriverTestCase(test.TestCase):
def setUp(self):
def _custom_setup(self):
def _set_config(self, configuration):
def test_embedded_mode(self):
OpenStack IndexPreviousNext
|