¡@

Home 

OpenStack Study: cinder

OpenStack Index

Previous

Next

    def _assoc_storagevolume(self, objectpath):

    def _default_assoc(self, objectpath):

    def _assocnames_lunmaskctrl(self):

    def _default_assocnames(self, objectpath):

    def _getinstance_storagevolume(self, objectpath):

    def _getinstance_syncsvsv(self, objectpath):

    def _getinstance_lunmask(self):

    def _getinstance_unit(self, objectpath):

    def _getinstance_job(self, jobpath):

    def _default_getinstance(self, objectpath):

    def _enum_replicationservices(self):

    def _enum_stconfsvcs(self):

    def _enum_ctrlconfsvcs(self):

    def _enum_pools(self):

    def _enum_pool_details(self):

    def _enum_storagevolumes(self):

    def _enum_syncsvsvs(self):

    def _create_sync(self, objpath1, objpath2, percentsynced):

    def _enum_unitnames(self):

    def _enum_lunmaskctrls(self):

    def _enum_processors(self):

    def _enum_hdwidmgmts(self):

    def _enum_storhdwids(self):

    def _default_enum(self):

class EMCSMISISCSIDriverTestCase(test.TestCase):

    def setUp(self):

    def create_fake_config_file(self):

    def fake_ecom_connection(self):

    def fake_do_iscsi_discovery(self, volume):

    def fake_sleep(self, seconds):

    def test_get_volume_stats(self):

    def test_create_destroy(self):

    def test_create_volume_snapshot_destroy(self):

    def test_map_unmap(self):

    def test_create_volume_failed(self):

    def test_create_volume_snapshot_unsupported(self):

    def test_create_volume_snapshot_replica_failed(self):

    def test_create_volume_snapshot_sync_failed(self):

    def test_create_volume_clone_replica_failed(self):

    def test_create_volume_clone_sync_failed(self):

    def test_delete_volume_notfound(self):

    def test_delete_volume_failed(self):

    def test_extend_volume(self):

    def _cleanup(self):

    def tearDown(self):

class EMCSMISFCDriverTestCase(test.TestCase):

    def setUp(self):

    def create_fake_config_file(self):

    def fake_ecom_connection(self):

    def fake_sleep(self, seconds):

    def test_get_volume_stats(self):

    def test_create_destroy(self):

    def test_create_volume_snapshot_destroy(self):

    def test_map_unmap(self):

    def test_create_volume_failed(self):

    def test_create_volume_snapshot_unsupported(self):

    def test_create_volume_snapshot_replica_failed(self):

    def test_create_volume_snapshot_sync_failed(self):

    def test_create_volume_clone_replica_failed(self):

    def test_create_volume_clone_sync_failed(self):

    def test_delete_volume_notfound(self):

    def test_delete_volume_failed(self):

    def test_extend_volume(self):

    def test_create_volume_with_volume_type(self, _mock_volume_type):

    def _cleanup(self):

    def tearDown(self):

\OpenStack\cinder-2014.1\cinder\tests\test_emc_vnxdirect.py

class EMCVNXCLIDriverTestData():

class EMCVNXCLIDriverISCSITestCase(test.TestCase):

    def _fake_cli_executor(self, *cmd, **kwargv):

    def setUp(self):

    def _restore(self, back_os_path_exists):

    def test_create_destroy_volume_withoutExtraSpec(self):

    def test_create_destroy_volume_withExtraSpec(self):

    def test_get_volume_stats(self):

    def test_create_destroy_volume_snapshot(self):

    def test_initialize_connection(self, _mock_iscsi_discovery):

    def test_terminate_connection(self):

    def test_create_volume_failed(self):

    def test_create_volume_snapshot_failed(self):

    def test_create_volume_from_snapshot(self):

    def test_create_volume_from_snapshot_sync_failed(self):

    def test_create_cloned_volume(self):

    def test_create_volume_clone_sync_failed(self):

    def test_delete_volume_failed(self):

    def test_extend_volume(self):

    def test_extend_volume_has_snapshot(self):

    def test_extend_volume_failed(self):

    def test_create_remove_export(self):

\OpenStack\cinder-2014.1\cinder\tests\test_eqlx.py

class DellEQLSanISCSIDriverTestCase(test.TestCase):

    def setUp(self):

    def _fake_get_iscsi_properties(self, volume):

    def test_create_volume(self):

    def test_delete_volume(self):

    def test_delete_absent_volume(self):

    def test_ensure_export(self):

    def test_create_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_create_cloned_volume(self):

    def test_delete_snapshot(self):

    def test_extend_volume(self):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

    def test_do_setup(self):

    def test_update_volume_stats(self):

    def test_get_volume_stats(self):

    def test_get_space_in_gb(self):

    def test_get_output(self):

        def _fake_recv(ignore_arg):

    def test_get_prefixed_value(self):

    def test_ssh_execute(self):

    def test_ssh_execute_error(self):

    def test_with_timeout(self):

        def no_timeout(cmd, *args, **kwargs):

        def w_timeout(cmd, *args, **kwargs):

    def test_local_path(self):

\OpenStack\cinder-2014.1\cinder\tests\test_exception.py

class FakeNotifier(object):

    def __init__(self):

    def notify(self, context, publisher, event, priority, payload):

def good_function():

def bad_function_error():

def bad_function_exception():

class CinderExceptionTestCase(test.TestCase):

    def test_default_error_msg(self):

    def test_error_msg(self):

    def test_default_error_msg_with_kwargs(self):

    def test_error_msg_exception_with_kwargs(self):

    def test_default_error_code(self):

    def test_error_code_from_kwarg(self):

\OpenStack\cinder-2014.1\cinder\tests\test_glusterfs.py

class DumbVolume(object):

    def __setitem__(self, key, value):

    def __getitem__(self, item):

class FakeDb(object):

    def volume_get(self, *a, **kw):

    def snapshot_get_all_for_volume(self, *a, **kw):

class GlusterFsDriverTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def stub_out_not_replaying(self, obj, attr_name):

    def test_set_execute(self):

        def my_execute(*a, **k):

    def test_local_path(self):

    def test_mount_glusterfs_should_mount_correctly(self):

    def test_mount_glusterfs_should_suppress_already_mounted_error(self):

    def test_mount_glusterfs_should_reraise_already_mounted_error(self):

    def test_mount_glusterfs_should_create_mountpoint_if_not_yet(self):

    def test_get_hash_str(self):

    def test_get_mount_point_for_share(self):

    def test_get_available_capacity_with_df(self):

    def test_load_shares_config(self):

    def test_ensure_share_mounted(self):

    def test_ensure_shares_mounted_should_save_mounting_successfully(self):

    def test_ensure_shares_mounted_should_not_save_mounting_with_error(self):

    def test_setup_should_throw_error_if_shares_config_not_configured(self):

    def test_setup_should_throw_exception_if_client_is_not_installed(self):

    def _fake_load_shares_config(self, conf):

    def _fake_NamedTemporaryFile(self, prefix=None, dir=None):

    def test_setup_set_share_permissions(self):

    def test_find_share_should_throw_error_if_there_is_no_mounted_shares(self):

    def test_find_share(self):

    def test_find_share_should_throw_error_if_there_is_no_enough_place(self):

    def _simple_volume(self, id=None):

    def test_create_sparsed_volume(self):

    def test_create_nonsparsed_volume(self):

    def test_create_qcow2_volume(self):

    def test_create_volume_should_ensure_glusterfs_mounted(self):

    def test_create_volume_should_return_provider_location(self):

    def test_create_cloned_volume(self):

    def test_delete_volume(self):

    def test_delete_should_ensure_share_mounted(self):

    def test_delete_should_not_delete_if_provider_location_not_provided(self):

    def test_delete_volume_with_info_file(self, mock_path_exists, mock_remove):

    def test_create_snapshot(self):

    def test_delete_snapshot_bottom(self):

    def test_delete_snapshot_middle(self):

    def test_delete_snapshot_not_in_info(self):

    def test_read_info_file(self):

    def test_extend_volume(self):

    def test_create_snapshot_online(self):

    def test_create_snapshot_online_novafailure(self):

    def test_delete_snapshot_online_1(self):

    def test_delete_snapshot_online_2(self):

    def test_delete_snapshot_online_novafailure(self):

    def test_get_backing_chain_for_path(self):

    def test_copy_volume_from_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_initialize_connection(self):

    def test_get_mount_point_base(self):

    def test_backup_volume(self):

    def test_backup_volume_previous_snap(self):

    def test_backup_snap_failure_1(self):

    def test_backup_snap_failure_2(self):

    def test_backup_failure_unsupported_format(self):

\OpenStack\cinder-2014.1\cinder\tests\test_gpfs.py

class FakeQemuImgInfo(object):

    def __init__(self):

class GPFSDriverTestCase(test.TestCase):

    def _execute_wrapper(self, cmd, *args, **kwargs):

    def setUp(self):

    def tearDown(self):

    def test_different(self):

    def test_sizestr(self):

    def test_get_gpfs_state_ok(self, mock_exec):

    def test_get_gpfs_state_fail_mmgetstate(self, mock_exec):

    def test_check_gpfs_state_ok(self, mock_get_gpfs_state):

    def test_check_gpfs_state_fail_not_active(self, mock_get_gpfs_state):

    def test_get_fs_from_path_ok(self, mock_exec):

    def test_get_fs_from_path_fail_path(self, mock_exec):

    def test_get_fs_from_path_fail_raise(self, mock_exec):

    def test_get_gpfs_cluster_id_ok(self, mock_exec):

    def test_get_gpfs_cluster_id_fail_id(self, mock_exec):

    def test_get_gpfs_cluster_id_fail_raise(self, mock_exec):

    def test_get_fileset_from_path_ok(self, mock_exec):

    def test_get_fileset_from_path_fail_mmlsattr(self, mock_exec):

    def test_get_fileset_from_path_fail_find_fileset(self, mock_exec):

    def test_verify_gpfs_pool_ok(self, mock_exec):

    def test_verify_gpfs_pool_fail_pool(self, mock_exec):

    def test_verify_gpfs_pool_fail_raise(self, mock_exec):

    def test_update_volume_storage_pool_ok(self, mock_exec, mock_verify_pool):

    def test_update_volume_storage_pool_ok_pool_none(self, mock_exec, mock_verify_pool):

    def test_update_volume_storage_pool_fail_pool(self, mock_exec, mock_verify_pool):

    def test_update_volume_storage_pool_fail_mmchattr(self, mock_exec, mock_verify_pool):

    def test_get_gpfs_fs_release_level_ok(self, mock_exec, mock_fs_from_path):

    def test_get_gpfs_fs_release_level_fail_mmlsfs(self, mock_exec, mock_fs_from_path):

    def test_get_gpfs_cluster_release_level_ok(self, mock_exec):

    def test_get_gpfs_cluster_release_level_fail_mmlsconfig(self, mock_exec):

    def test_is_gpfs_path_fail_mmlsattr(self, mock_exec):

    def test_is_same_fileset_ok(self, mock_exec, mock_get_fileset_from_path):

    def test_same_cluster_ok(self, mock_exec, mock_avail_capacity):

    def test_set_rw_permission(self, mock_exec):

    def test_can_migrate_locally(self, mock_exec):

    def test_do_setup_ok(self, mock_exec, mock_get_gpfs_cluster_id, mock_get_filesystem_from_path, mock_verify_gpfs_pool):

    def test_do_setup_fail_get_cluster_id(self, mock_exec, mock_get_gpfs_cluster_id, mock_get_filesystem_from_path, mock_verify_gpfs_pool):

    def test_do_setup_fail_get_fs_from_path(self, mock_exec, mock_get_gpfs_cluster_id, mock_get_fs_from_path, mock_verify_gpfs_pool):

    def test_do_setup_fail_volume(self, mock_exec, mock_get_gpfs_cluster_id, mock_get_filesystem_from_path, mock_verify_gpfs_pool):

    def test_check_for_setup_error_fail_conf(self, mock_get_gpfs_fs_rel_lev, mock_is_gpfs_path, mock_check_gpfs_state):

    def test_create_sparse_file(self, mock_exec):

    def test_allocate_file_blocks(self, mock_exec):

    def test_gpfs_change_attributes(self, mock_exec):

    def test_set_volume_attributes(self, mock_change_attributes):

    def test_set_volume_attributes_no_attributes(self, mock_change_attributes):

    def test_set_volume_attributes_no_options(self, mock_change_attributes):

    def test_create_volume(self, mock_gpfs_path_state, mock_local_path, mock_sparse_file, mock_rw_permission, mock_set_volume_attributes, mock_allocate_file_blocks, mock_exec):

    def test_create_volume_no_sparse_volume(self, mock_gpfs_path_state, mock_local_path, mock_sparse_file, mock_rw_permission, mock_set_volume_attributes, mock_allocate_file_blocks, mock_exec):

    def test_create_volume_from_snapshot(self, mock_local_path, mock_create_gpfs_copy, mock_rw_permission, mock_gpfs_redirect, mock_resize_volume_file):

    def test_create_cloned_volume(self, mock_local_path, mock_create_gpfs_clone, mock_rw_permission, mock_resize_volume_file):

    def test_delete_gpfs_file_ok(self, mock_exec):

    def test_delete_gpfs_file_ok_parent(self, mock_exec, mock_path_exists):

    def test_delete_volume(self, mock_verify_gpfs_path_state, mock_local_path, mock_delete_gpfs_file):

    def test_gpfs_redirect_ok(self, mock_exec):

    def test_gpfs_redirect_fail_depth(self, mock_exec):

    def test_gpfs_redirect_fail_match(self, mock_exec):

    def test_create_gpfs_clone(self, mock_exec, mock_redirect, mock_cr_gpfs_cp, mock_cr_gpfs_snap):

    def test_create_gpfs_copy(self, mock_exec):

    def test_create_gpfs_snap(self, mock_exec):

    def test_is_gpfs_parent_file_ok(self, mock_exec):

    def test_create_snapshot(self, mock_local_path, mock_create_gpfs_snap, mock_set_rw_permission, mock_gpfs_redirect):

    def test_delete_snapshot(self, mock_local_path, mock_exec):

    def test_ensure_export(self):

    def test_create_export(self):

    def test_remove_export(self):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

    def test_get_volume_stats(self):

    def test_get_volume_stats_none_stats(self, mock_upd_vol_stats):

    def test_clone_image_pub(self, mock_exec):

    def test_is_cloneable_ok(self, mock_is_gpfs_path):

    def test_is_cloneable_fail_config(self, mock_is_gpfs_path):

    def test_is_cloneable_fail_path(self, mock_is_gpfs_path):

    def test_clone_image(self, mock_verify_gpfs_path_state, mock_is_cloneable, mock_local_path, mock_is_gpfs_parent_file, mock_create_gpfs_snap, mock_qemu_img_info, mock_create_gpfs_copy, mock_conv_image, mock_set_rw_permission, mock_resize_volume_file):

    def test_clone_image_not_cloneable(self, mock_verify_gpfs_path_state, mock_is_cloneable):

    def test_clone_image_format(self, mock_verify_gpfs_path_state, mock_is_cloneable, mock_local_path, mock_is_gpfs_parent_file, mock_create_gpfs_snap, mock_qemu_img_info, mock_create_gpfs_copy, mock_copyfile, mock_conv_image, mock_set_rw_permission, mock_resize_volume_file):

    def test_copy_image_to_volume(self, mock_verify_gpfs_path_state, mock_fetch_to_raw, mock_local_path, mock_resize_volume_file):

    def test_resize_volume_file_ok(self, mock_local_path, mock_resize_image, mock_qemu_img_info):

    def test_resize_volume_file_fail(self, mock_local_path, mock_resize_image, mock_qemu_img_info):

    def test_extend_volume(self, mock_resize_volume_file):

    def test_copy_volume_to_image(self, mock_upload_volume, mock_local_path):

    def test_backup_volume(self, mock_local_path, mock_create_gpfs_clone, mock_gpfs_redirect, mock_temp_chown, mock_file_open, mock_delete_gpfs_file):

    def test_restore_backup(self, mock_local_path, mock_temp_chown, mock_file_open):

    def test_migrate_volume_ok(self, mock_local, mock_exec):

    def test_migrate_volume_fail_dest_path(self, mock_local, mock_exec):

    def test_migrate_volume_fail_mpb(self, mock_local, mock_exec):

    def test_migrate_volume_fail_mv(self, mock_local, mock_exec):

    def test_migrate_volume_ok_pub(self, mock_migrate_volume):

    def test_retype_ok(self, mock_different, mock_strg_pool, mock_migrate_vol):

    def test_retype_diff_backend(self, mock_different, mock_strg_pool, mock_migrate_vol):

    def test_retype_diff_pools_migrated(self, mock_different, mock_strg_pool, mock_migrate_vol):

    def test_retype_diff_pools(self, mock_different, mock_strg_pool, mock_migrate_vol):

    def test_retype_no_diff_hit(self, mock_different, mock_strg_pool, mock_migrate_vol):

    def test_mkfs_ok(self, mock_exec):

    def test_mkfs_fail_mk(self, mock_exec):

    def test_get_available_capacity_ok(self, mock_exec):

    def test_get_available_capacity_fail_mounted(self, mock_exec, mock_path_state):

    def test_verify_gpfs_path_state_ok(self, mock_is_gpfs_path):

    def test_verify_gpfs_path_state_fail_path(self, mock_is_gpfs_path):

    def _fake_qemu_qcow2_image_info(self, path):

    def _fake_qemu_raw_image_info(self, path):

    def _fake_retype_arguments(self):

\OpenStack\cinder-2014.1\cinder\tests\test_hds.py

class SimulatedHusBackend:

    def __init__(self):

    def get_version(self, cmd, ver, ip0, ip1, user, pw):

    def get_iscsi_info(self, cmd, ver, ip0, ip1, user, pw):

    def get_hdp_info(self, cmd, ver, ip0, ip1, user, pw):

    def create_lu(self, cmd, ver, ip0, ip1, user, pw, id, hdp, start, end, size):

    def extend_vol(self, cmd, ver, ip0, ip1, user, pw, id, lu, size):

    def delete_lu(self, cmd, ver, ip0, ip1, user, pw, id, lun):

    def create_dup(self, cmd, ver, ip0, ip1, user, pw, id, src_lun, hdp, start, end, size):

    def add_iscsi_conn(self, cmd, ver, ip0, ip1, user, pw, id, lun, ctl, port, iqn, initiator):

    def del_iscsi_conn(self, cmd, ver, ip0, ip1, user, pw, id, lun, ctl, port, iqn, initiator):

class HUSiSCSIDriverTest(test.TestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def tearDown(self):

    def test_get_volume_stats(self):

    def test_create_volume(self):

    def test_delete_volume(self):

    def test_extend_volume(self):

    def test_create_snapshot(self):

    def test_create_clone(self):

    def test_delete_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

\OpenStack\cinder-2014.1\cinder\tests\test_hp3par.py

class HP3PARBaseDriver(object):

    def setup_configuration(self):

    def setup_mock_client(self, _m_client, driver, conf=None, m_conf=None):

    def test_create_volume(self):

    def test_create_volume_qos(self, _mock_volume_types):

    def test_delete_volume(self):

    def test_create_cloned_volume(self):

    def test_migrate_volume(self):

    def test_migrate_volume_diff_host(self):

    def test_migrate_volume_diff_domain(self):

    def test_migrate_volume_attached(self):

    def test_attach_volume(self):

    def test_detach_volume(self):

    def test_create_snapshot(self):

    def test_delete_snapshot(self):

    def test_delete_snapshot_in_use(self):

    def test_delete_snapshot_not_found(self):

    def test_create_volume_from_snapshot(self):

    def test_create_volume_from_snapshot_and_extend(self):

    def test_create_volume_from_snapshot_and_extend_copy_fail(self):

    def test_create_volume_from_snapshot_qos(self, _mock_volume_types):

    def test_terminate_connection(self):

    def test_update_volume_key_value_pair(self):

    def test_clear_volume_key_value_pair(self):

    def test_extend_volume(self):

    def test_extend_volume_non_base(self):

    def test_extend_volume_non_base_failure(self):

    def test_get_ports(self):

    def test_get_by_qos_spec_with_scoping(self):

    def test_get_by_qos_spec(self):

    def test_get_by_qos_by_type_only(self):

class TestHP3PARFCDriver(HP3PARBaseDriver, test.TestCase):

    def setUp(self):

    def tearDown(self):

    def setup_driver(self, config=None, mock_conf=None):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

    def test_get_volume_stats(self):

    def test_create_host(self):

    def test_create_invalid_host(self):

    def test_create_modify_host(self):

    def test_modify_host_with_new_wwn(self):

    def test_modify_host_with_unknown_wwn_and_new_wwn(self):

class TestHP3PARISCSIDriver(HP3PARBaseDriver, test.TestCase):

    def setUp(self):

    def tearDown(self):

    def setup_driver(self, config=None, mock_conf=None):

    def test_initialize_connection(self):

    def test_get_volume_stats(self):

    def test_create_host(self):

    def test_create_invalid_host(self):

    def test_create_modify_host(self):

    def test_get_least_used_nsp_for_host_single(self):

    def test_get_least_used_nsp_for_host_new(self):

    def test_get_least_used_nsp_for_host_reuse(self):

    def test_get_least_used_nps_for_host_fc(self):

    def test_invalid_iscsi_ip(self):

    def test_get_least_used_nsp(self):

\OpenStack\cinder-2014.1\cinder\tests\test_hplefthand.py

class HPLeftHandBaseDriver():

class TestHPLeftHandCLIQISCSIDriver(HPLeftHandBaseDriver, test.TestCase):

    def _fake_cliq_run(self, verb, cliq_args, check_exit_code=True):

        def create_volume(cliq_args):

        def delete_volume(cliq_args):

        def extend_volume(cliq_args):

        def assign_volume(cliq_args):

        def unassign_volume(cliq_args):

        def create_snapshot(cliq_args):

        def delete_snapshot(cliq_args):

        def create_volume_from_snapshot(cliq_args):

        def get_cluster_info(cliq_args):

        def get_volume_info(cliq_args):

        def get_snapshot_info(cliq_args):

        def get_server_info(cliq_args):

        def create_server(cliq_args):

        def test_error(cliq_args):

    def setUp(self):

    def tearDown(self):

    def default_mock_conf(self):

    def setup_driver(self, config=None):

    def test_create_volume(self):

    def test_delete_volume(self):

    def test_extend_volume(self):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

    def test_create_snapshot(self):

    def test_delete_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_get_volume_stats(self):

class TestHPLeftHandRESTISCSIDriver(HPLeftHandBaseDriver, test.TestCase):

    def setUp(self):

    def tearDown(self):

    def default_mock_conf(self):

    def setup_driver(self, _mock_client, config=None):

    def test_create_volume(self):

    def test_create_volume_with_es(self, _mock_volume_type):

    def test_delete_volume(self):

    def test_extend_volume(self):

    def test_initialize_connection(self):

    def test_initialize_connection_with_chaps(self):

    def test_terminate_connection(self):

    def test_create_snapshot(self):

    def test_delete_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_create_cloned_volume(self):

    def test_extra_spec_mapping(self, _mock_get_volume_type):

    def test_extra_spec_mapping_invalid_value(self, _mock_get_volume_type):

    def test_retype_with_no_LH_extra_specs(self):

    def test_retype_with_only_LH_extra_specs(self):

    def test_retype_with_both_extra_specs(self):

    def test_retype_same_extra_specs(self):

    def test_migrate_no_location(self):

    def test_migrate_incorrect_vip(self):

    def test_migrate_with_location(self):

    def test_migrate_with_Snapshots(self):

    def test_create_volume_with_ao_true(self, _mock_volume_type):

    def test_create_volume_with_ao_false(self, _mock_volume_type):

\OpenStack\cinder-2014.1\cinder\tests\test_hp_msa.py

class TestHPMSAClient(test.TestCase):

    def setUp(self):

    def test_login(self, mock_url_open):

    def test_build_request_url(self):

    def test_request(self, mock_url_open):

    def test_assert_response_ok(self):

    def test_vdisk_exists(self, mock_request):

    def test_vdisk_stats(self, mock_request):

    def test_get_lun(self, mock_request):

    def test_get_ports(self, mock_request):

    def test_get_fc_ports(self, mock_request):

class FakeConfiguration(object):

    def safe_get(self, key):

class TestHPMSACommon(test.TestCase):

    def setUp(self):

    def test_do_setup(self, mock_login, mock_logout, mock_vdisk_exists):

    def test_vol_name(self):

    def test_check_flags(self):

    def test_assert_connector_ok(self):

    def test_update_volume_stats(self, mock_stats):

    def test_create_volume(self, mock_create):

    def test_delete_volume(self, mock_delete):

    def test_create_cloned_volume(self, mock_stats, mock_copy):

    def test_create_volume_from_snapshot(self, mock_stats, mock_copy):

    def test_extend_volume(self, mock_extend):

    def test_create_snapshot(self, mock_create):

    def test_delete_snapshot(self, mock_delete):

    def test_map_volume(self, mock_map):

    def test_unmap_volume(self, mock_unmap):

class TestHPMSAFC(test.TestCase):

    def setUp(self, mock_setup):

        def fake_init(self, *args, **kwargs):

    def _test_with_mock(self, mock, method, args, expected=None):

    def test_create_volume(self, mock_create):

    def test_create_cloned_volume(self, mock_create):

    def test_create_volume_from_snapshot(self, mock_create):

    def test_delete_volume(self, mock_delete):

    def test_create_snapshot(self, mock_create):

    def test_delete_snapshot(self, mock_delete):

    def test_extend_volume(self, mock_extend):

    def test_initialize_connection(self, mock_login, mock_map, mock_ports, mock_logout):

    def test_terminate_connection(self, mock_login, mock_unmap, mock_logout):

    def test_get_volume_stats(self, mock_login, mock_stats, mock_logout):

\OpenStack\cinder-2014.1\cinder\tests\test_huawei_hvs.py

def Fake_sleep(time):

class FakeHVSCommon(rest_common.HVSCommon):

    def __init__(self, configuration):

    def _parse_volume_type(self, volume):

    def _change_file_mode(self, filepath):

    def call(self, url=False, data=None, method=None):

class FakeHVSiSCSIStorage(huawei_hvs.HuaweiHVSISCSIDriver):

    def __init__(self, configuration):

    def do_setup(self, context):

class FakeHVSFCStorage(huawei_hvs.HuaweiHVSFCDriver):

    def __init__(self, configuration):

    def do_setup(self, context):

class HVSRESTiSCSIDriverTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_log_in_success(self):

    def test_log_out_success(self):

    def test_create_volume_success(self):

    def test_extend_volume_success(self):

    def test_create_snapshot_success(self):

    def test_delete_volume_success(self):

    def test_delete_snapshot_success(self):

    def test_colone_volume_success(self):

    def test_create_volume_from_snapshot_success(self):

    def test_initialize_connection_success(self):

    def test_terminate_connection_success(self):

    def test_initialize_connection_no_view_success(self):

    def test_terminate_connectio_no_view_success(self):

    def test_get_volume_stats(self):

    def test_create_snapshot_fail(self):

    def test_create_volume_fail(self):

    def test_delete_volume_fail(self):

    def test_delete_snapshot_fail(self):

    def test_initialize_connection_fail(self):

    def create_fake_conf_file(self):

class HVSRESTFCDriverTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_log_in_Success(self):

    def test_create_volume_success(self):

    def test_extend_volume_success(self):

    def test_create_snapshot_success(self):

    def test_delete_volume_success(self):

    def test_delete_snapshot_success(self):

    def test_colone_volume_success(self):

    def test_create_volume_from_snapshot_success(self):

    def test_initialize_connection_success(self):

    def test_terminate_connection_success(self):

    def test_initialize_connection_no_view_success(self):

    def test_terminate_connection_no_viewn_success(self):

    def test_get_volume_stats(self):

    def test_create_snapshot_fail(self):

    def test_create_volume_fail(self):

    def test_delete_volume_fail(self):

    def test_delete_snapshot_fail(self):

    def create_fake_conf_file(self):

\OpenStack\cinder-2014.1\cinder\tests\test_huawei_t_dorado.py

class FakeChannel():

    def __init__(self):

    def resize_pty(self, width=80, height=24):

    def settimeout(self, time):

    def send(self, s):

    def recv(self, nbytes):

    def close(self):

class FakeSSHClient():

    def invoke_shell(self):

    def get_transport(self):

    def close(self):

class FakeSSHPool():

    def __init__(self, ip, port, conn_timeout, login, password=None, *args, **kwargs):

    def create(self):

    def get(self):

    def put(self, ssh):

    def remove(self, ssh):

def Fake_sleep(time):

def Fake_change_file_mode(obj, filepath):

def create_fake_conf_file(filename):

def modify_conf(conf, item, val, attrib=None):

def set_error_flg(cmd):

def reset_error_flg(cmd):

class HuaweiTCLIResSimulator():

    def _paras_name(self, params):

    def cli_showsys(self, params):

    def cli_createlun(self, params):

    def cli_showlun(self, params):

    def cli_dellun(self, params):

    def cli_showrg(self, params):

    def cli_showpool(self, params):

    def cli_createluncopy(self, params):

    def cli_chgluncopystatus(self, params):

    def cli_showluncopy(self, params):

    def cli_delluncopy(self, params):

    def cli_createsnapshot(self, params):

    def cli_showsnapshot(self, params):

    def cli_actvsnapshot(self, params):

    def cli_disablesnapshot(self, params):

    def cli_delsnapshot(self, params):

    def cli_showrespool(self, params):

    def cli_showiscsitgtname(self, params):

    def cli_showiscsiip(self, params):

    def cli_showhostgroup(self, params):

    def cli_createhostgroup(self, params):

    def cli_showhost(self, params):

    def cli_addhost(self, params):

    def cli_delhost(self, params):

    def cli_showiscsiini(self, params):

    def cli_addiscsiini(self, params):

    def cli_deliscsiini(self, params):

    def cli_showhostport(self, params):

    def cli_addhostport(self, params):

    def cli_delhostport(self, params):

    def cli_showhostmap(self, params):

    def cli_addhostmap(self, params):

    def cli_delhostmap(self, params):

    def cli_showfreeport(self, params):

    def cli_showhostpath(self, params):

    def cli_showfcmode(self, params):

    def cli_chglun(self, params):

    def cli_addluntoextlun(self, params):

    def cli_rmlunfromextlun(self, patams):

class HuaweiDorado5100CLIResSimulator(HuaweiTCLIResSimulator):

    def cli_showsys(self, params):

    def cli_showlun(self, params):

class HuaweiDorado2100G2CLIResSimulator(HuaweiTCLIResSimulator):

    def cli_showsys(self, params):

    def cli_createlun(self, params):

    def cli_showlun(self, params):

class HuaweiTISCSIDriverTestCase(test.TestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def _init_driver(self):

    def tearDown(self):

    def test_conf_invalid(self):

    def test_volume_type(self):

    def test_create_delete_volume(self):

    def test_create_delete_cloned_volume(self):

    def test_extend_volume(self):

    def test_create_delete_snapshot(self):

    def test_create_delete_snapshot_volume(self):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

    def test_get_volume_stats(self):

class HuaweiTFCDriverTestCase(test.TestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def _init_driver(self):

    def tearDown(self):

    def test_validate_connector_failed(self):

    def test_create_delete_volume(self):

    def test_create_delete_snapshot(self):

    def test_create_cloned_volume(self):

    def test_create_snapshot_volume(self):

    def test_initialize_terminitat_connection(self):

    def _test_get_volume_stats(self):

class HuaweiDorado5100FCDriverTestCase(HuaweiTFCDriverTestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def _init_driver(self):

    def test_create_cloned_volume(self):

    def test_create_snapshot_volume(self):

class HuaweiDorado2100G2FCDriverTestCase(HuaweiTFCDriverTestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def _init_driver(self):

    def test_create_cloned_volume(self):

    def test_create_delete_snapshot(self):

    def test_create_snapshot_volume(self):

    def test_extend_volume(self):

class HuaweiDorado5100ISCSIDriverTestCase(HuaweiTISCSIDriverTestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def _init_driver(self):

    def test_create_delete_cloned_volume(self):

    def test_create_delete_snapshot_volume(self):

    def test_volume_type(self):

class HuaweiDorado2100G2ISCSIDriverTestCase(HuaweiTISCSIDriverTestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def _init_driver(self):

    def test_conf_invalid(self):

    def test_create_delete_cloned_volume(self):

    def test_create_delete_snapshot(self):

    def test_create_delete_snapshot_volume(self):

    def test_initialize_connection(self):

    def test_extend_volume(self):

class SSHMethodTestCase(test.TestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def tearDown(self):

    def test_reach_max_connection_limit(self):

    def test_socket_timeout(self):

    def _fake_recv1(self, nbytes):

    def _fake_recv2(self, nBytes):

class HuaweiUtilsTestCase(test.TestCase):

    def __init__(self, *args, **kwargs):

    def setUp(self):

    def tearDown(self):

    def test_parse_xml_file_ioerror(self):

    def test_is_xml_item_exist(self):

    def test_is_xml_item_valid(self):

    def test_get_conf_host_os_type(self):

\OpenStack\cinder-2014.1\cinder\tests\test_ibmnas.py

class FakeEnv(object):

    def __setitem__(self, key, value):

    def __getitem__(self, item):

class IBMNASDriverTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _set_flag(self, flag, value):

    def _reset_flags(self):

    def test_check_for_setup_error(self):

    def test_get_provider_location(self):

    def test_get_export_path(self):

    def test_create_ibmnas_snap_mount_point_provided(self):

    def test_create_ibmnas_snap_no_mount_point_provided(self):

    def test_create_ibmnas_copy(self):

    def test_resize_volume_file(self):

    def test_extend_volume(self):

    def test_delete_snapfiles(self):

    def test_delete_volume_no_provider_location(self):

    def test_delete_volume(self):

    def test_create_snapshot(self):

    def test_delete_snapshot(self):

    def test_create_cloned_volume(self):

    def test_create_volume_from_snapshot(self):

\OpenStack\cinder-2014.1\cinder\tests\test_ibm_xiv_ds8k.py

class XIVDS8KFakeProxyDriver(object):

    def __init__(self, xiv_ds8k_info, logger, expt, driver=None):

    def setup(self, context):

    def create_volume(self, volume):

    def volume_exists(self, volume):

    def delete_volume(self, volume):

    def initialize_connection(self, volume, connector):

    def terminate_connection(self, volume, connector):

    def is_volume_attached(self, volume, connector):

class XIVDS8KVolumeDriverTest(test.TestCase):

    def setUp(self):

    def test_initialized_should_set_xiv_ds8k_info(self):

    def test_setup_should_fail_if_credentials_are_invalid(self):

    def test_setup_should_fail_if_connection_is_invalid(self):

    def test_create_volume(self):

    def test_volume_exists(self):

    def test_delete_volume(self):

    def test_delete_volume_should_fail_for_not_existing_volume(self):

    def test_create_volume_should_fail_if_no_pool_space_left(self):

    def test_initialize_connection(self):

    def test_initialize_connection_should_fail_for_non_existing_volume(self):

    def test_terminate_connection(self):

    def test_terminate_connection_should_fail_on_non_existing_volume(self):

\OpenStack\cinder-2014.1\cinder\tests\test_image_utils.py

class FakeImageService:

    def __init__(self):

    def download(self, context, image_id, data):

    def show(self, context, image_id):

    def update(self, context, image_id, metadata, path):

class TestUtils(test.TestCase):

    def setUp(self):

    def test_resize_image(self):

    def test_convert_image(self):

    def test_qemu_img_info(self):

    def test_qemu_img_info_alt(self):

    def _test_fetch_to_raw(self, has_qemu=True, src_inf=None, dest_inf=None):

    def test_fetch_to_raw(self):

    def test_fetch_to_raw_no_qemu_img(self):

    def test_fetch_to_raw_on_error_parsing_failed(self):

    def test_fetch_to_raw_on_error_backing_file(self):

    def test_fetch_to_raw_on_error_not_convert_to_raw(self):

    def test_fetch_to_raw_on_error_image_size(self):

    def _test_fetch_verify_image(self, qemu_info, volume_size=1):

    def test_fetch_verify_image_with_backing_file(self):

    def test_fetch_verify_image_without_file_format(self):

    def test_fetch_verify_image_image_size(self):

    def test_upload_volume(self):

    def test_upload_volume_with_raw_image(self):

    def test_upload_volume_on_error(self):

class TestExtractTo(test.TestCase):

    def test_extract_to_calls_tar(self):

class TestSetVhdParent(test.TestCase):

    def test_vhd_util_call(self):

class TestFixVhdChain(test.TestCase):

    def test_empty_chain(self):

    def test_single_vhd_file_chain(self):

    def test_chain_with_two_elements(self):

class TestGetSize(test.TestCase):

    def test_vhd_util_call(self):

class TestResize(test.TestCase):

    def test_vhd_util_call(self):

class TestCoalesce(test.TestCase):

    def test_vhd_util_call(self):

def fake_context(return_value):

class TestTemporaryFile(test.TestCase):

    def test_file_unlinked(self):

    def test_file_unlinked_on_error(self):

        def sut():

class TestCoalesceChain(test.TestCase):

    def test_single_vhd(self):

    def test_chain_of_two_vhds(self):

class TestDiscoverChain(test.TestCase):

    def test_discovery_calls(self):

class TestXenServerImageToCoalescedVhd(test.TestCase):

    def test_calls(self):

\OpenStack\cinder-2014.1\cinder\tests\test_iscsi.py

class TargetAdminTestCase(object):

    def setUp(self):

    def fake_verify_backing_lun(obj, iqn, tid):

    def fake_verify_rtstool(obj):

    def fake_get_target(obj, iqn):

    def get_script_params(self):

    def get_script(self):

    def fake_execute(self, *cmd, **kwargs):

    def clear_cmds(self):

    def verify_cmds(self, cmds):

    def verify(self):

    def run_commands(self):

    def test_target_admin(self):

class TgtAdmTestCase(test.TestCase, TargetAdminTestCase):

    def setUp(self):

    def tearDown(self):

class IetAdmTestCase(test.TestCase, TargetAdminTestCase):

    def setUp(self):

class IetAdmBlockIOTestCase(test.TestCase, TargetAdminTestCase):

    def setUp(self):

class IetAdmFileIOTestCase(test.TestCase, TargetAdminTestCase):

    def setUp(self):

class IetAdmAutoIOTestCase(test.TestCase, TargetAdminTestCase):

    def setUp(self):

class LioAdmTestCase(test.TestCase, TargetAdminTestCase):

    def setUp(self):

class ISERTgtAdmTestCase(TgtAdmTestCase):

    def setUp(self):

\OpenStack\cinder-2014.1\cinder\tests\test_migrations.py

def _get_connect_string(backend, user="openstack_citest", passwd="openstack_citest", database="openstack_citest"):

def _is_mysql_avail(**kwargs):

def _is_backend_avail(backend, user="openstack_citest", passwd="openstack_citest", database="openstack_citest"):

def _have_mysql():

def get_table(engine, name):

class TestMigrations(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _reset_databases(self):

        def execute_cmd(cmd=None):

    def test_walk_versions(self):

    def test_mysql_connect_fail(self):

    def test_mysql_innodb(self):

    def test_postgresql_connect_fail(self):

    def test_postgresql_opportunistically(self):

    def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):

    def _migrate_down(self, engine, version):

    def _migrate_up(self, engine, version, with_data=False):

    def _prerun_004(self, engine):

    def _check_004(self, engine, data):

    def test_migration_005(self):

    def _metadatas(self, upgrade_to, downgrade_to=None):

    def metadatas_upgraded_to(self, revision):

    def metadatas_downgraded_from(self, revision):

    def test_upgrade_006_adds_provider_location(self):

    def test_downgrade_006_removes_provider_location(self):

    def test_upgrade_007_adds_fk(self):

    def test_downgrade_007_removes_fk(self):

    def test_migration_008(self):

    def test_migration_009(self):

    def test_migration_010(self):

    def test_migration_011(self):

    def test_migration_012(self):

    def test_migration_013(self):

    def test_migration_014(self):

    def test_migration_015(self):

    def test_migration_016(self):

    def test_migration_017(self):

    def test_migration_018(self):

    def test_migration_019(self):

    def test_migration_020(self):

    def test_migration_021(self):

    def test_migration_022(self):

\OpenStack\cinder-2014.1\cinder\tests\test_misc.py

class ExceptionTestCase(test.TestCase):

    def _raise_exc(exc):

    def test_exceptions_raise(self):

class ProjectTestCase(test.TestCase):

    def test_all_migrations_have_downgrade(self):

\OpenStack\cinder-2014.1\cinder\tests\test_netapp.py

def create_configuration():

class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):

    def log_message(self, format, *args):

class FakeHttplibSocket(object):

    def __init__(self, value):

        def newclose():

    def makefile(self, mode, _other):

class FakeDirectCMODEServerHandler(FakeHTTPRequestHandler):

    def do_GET(s):

    def do_POST(s):

    def _get_child_by_name(self, name):

    def _get_child_content(self, name):

class FakeDirectCmodeHTTPConnection(object):

    def __init__(self, host, timeout=None):

    def request(self, method, path, data=None, headers=None):

    def set_debuglevel(self, level):

    def getresponse(self):

    def getresponsebody(self):

class NetAppDirectCmodeISCSIDriverTestCase(test.TestCase):

    def setUp(self):

    def _custom_setup(self):

    def _set_config(self, configuration):

    def test_connect(self):

    def test_create_destroy(self):

    def test_create_vol_snapshot_destroy(self):

    def test_map_unmap(self):

    def test_cloned_volume_destroy(self):

    def test_map_by_creating_igroup(self):

    def test_fail_create_vol(self):

    def test_vol_stats(self):

    def test_create_vol_snapshot_diff_size_resize(self):

    def test_create_vol_snapshot_diff_size_subclone(self):

    def test_extend_vol_same_size(self):

    def test_extend_vol_direct_resize(self):

    def test_extend_vol_sub_lun_clone(self):

class NetAppDriverNegativeTestCase(test.TestCase):

    def setUp(self):

    def test_incorrect_family(self):

    def test_incorrect_protocol(self):

    def test_non_netapp_driver(self):

class FakeDirect7MODEServerHandler(FakeHTTPRequestHandler):

    def do_GET(s):

    def do_POST(s):

class FakeDirect7modeHTTPConnection(object):

    def __init__(self, host, timeout=None):

    def request(self, method, path, data=None, headers=None):

    def set_debuglevel(self, level):

    def getresponse(self):

    def getresponsebody(self):

class NetAppDirect7modeISCSIDriverTestCase_NV( NetAppDirectCmodeISCSIDriverTestCase):

    def setUp(self):

    def _custom_setup(self):

    def _set_config(self, configuration):

    def test_create_on_select_vol(self):

    def test_create_fail_on_select_vol(self):

    def test_check_for_setup_error_version(self):

class NetAppDirect7modeISCSIDriverTestCase_WV( NetAppDirect7modeISCSIDriverTestCase_NV):

    def setUp(self):

    def _custom_setup(self):

    def _set_config(self, configuration):

class NetAppApiElementTransTests(test.TestCase):

    def setUp(self):

    def test_translate_struct_dict_unique_key(self):

    def test_translate_struct_dict_nonunique_key(self):

    def test_translate_struct_list(self):

    def test_translate_struct_tuple(self):

    def test_translate_invalid_struct(self):

    def test_setter_builtin_types(self):

    def test_setter_na_element(self):

    def test_setter_child_dict(self):

    def test_setter_child_list_tuple(self):

    def test_setter_no_value(self):

    def test_setter_invalid_value(self):

    def test_setter_invalid_key(self):

\OpenStack\cinder-2014.1\cinder\tests\test_netapp_eseries_iscsi.py

def create_configuration():

class FakeEseriesResponse(object):

    def __init__(self, code=None, text=None):

    def json(self):

class FakeEseriesServerHandler(object):

    def do_GET(self, path, params, data, headers):

    def do_POST(self, path, params, data, headers):

    def do_DELETE(self, path, params, data, headers):

class FakeEseriesHTTPSession(object):

    def __init__(self):

    def request(self, method, url, params, data, headers, timeout, verify):

class NetAppEseriesIscsiDriverTestCase(test.TestCase):

    def setUp(self):

    def _custom_setup(self):

    def _set_config(self, configuration):

    def test_embedded_mode(self):

OpenStack Index

Previous

Next