OpenStack Study: cinder
OpenStack IndexPreviousNext
def test_check_system_pwd_not_sync(self):
def list_system():
def test_connect(self):
def test_create_destroy(self):
def test_create_vol_snapshot_destroy(self):
def test_map_unmap(self):
def test_cloned_volume_destroy(self):
def test_map_by_creating_host(self):
def test_vol_stats(self):
def test_create_vol_snapshot_diff_size_resize(self):
def test_create_vol_snapshot_diff_size_subclone(self):
\OpenStack\cinder-2014.1\cinder\tests\test_netapp_nfs.py
def create_configuration():
class FakeVolume(object):
def __init__(self, size=0):
def __getitem__(self, key):
def __setitem__(self, key, val):
class FakeSnapshot(object):
def __init__(self, volume_size=0):
def __getitem__(self, key):
class FakeResponse(object):
def __init__(self, status):
class NetappDirectCmodeNfsDriverTestCase(test.TestCase):
def setUp(self):
def test_create_snapshot(self):
def test_create_volume_from_snapshot(self):
def _prepare_delete_snapshot_mock(self, snapshot_exists):
def test_delete_existing_snapshot(self):
def test_delete_missing_snapshot(self):
def _custom_setup(self):
def test_check_for_setup_error(self):
def test_do_setup(self):
def _prepare_clone_mock(self, status):
def _prepare_info_by_ip_response(self):
def test_clone_volume(self):
def test_register_img_in_cache_noshare(self):
def test_register_img_in_cache_with_share(self):
def test_find_image_in_cache_no_shares(self):
def test_find_image_in_cache_shares(self):
def test_find_old_cache_files_notexists(self):
def test_find_old_cache_files_exists(self):
def test_delete_files_till_bytes_free_success(self):
def test_clean_image_cache_exec(self):
def test_clean_image_cache_noexec(self):
def test_clone_image_fromcache(self):
def get_img_info(self, format):
def test_clone_image_cloneableshare_nospace(self):
def test_clone_image_cloneableshare_raw(self):
def test_clone_image_cloneableshare_notraw(self):
def test_clone_image_file_not_discovered(self):
def test_clone_image_resizefails(self):
def test_is_cloneable_share_badformats(self):
def test_is_cloneable_share_goodformat1(self):
def test_is_cloneable_share_goodformat2(self):
def test_is_cloneable_share_goodformat3(self):
def test_is_cloneable_share_goodformat4(self):
def test_is_cloneable_share_goodformat5(self):
def test_check_share_in_use_no_conn(self):
def test_check_share_in_use_invalid_conn(self):
def test_check_share_in_use_incorrect_host(self):
def test_check_share_in_use_success(self):
def test_construct_image_url_loc(self):
def test_construct_image_url_direct(self):
class NetappDirectCmodeNfsDriverOnlyTestCase(test.TestCase):
def setUp(self):
def _custom_setup(self):
def test_create_volume(self, mock_volume_extra_specs):
def test_create_volume_with_qos_policy(self, mock_volume_extra_specs):
def test_copy_img_to_vol_copyoffload_success(self):
def test_copy_img_to_vol_copyoffload_failure(self):
def test_copyoffload_frm_cache_success(self):
def test_copyoffload_frm_img_service_success(self):
def test_cache_copyoffload_workflow_success(self):
def test_img_service_raw_copyoffload_workflow_success(self, mock_qemu_img_info):
def test_img_service_qcow2_copyoffload_workflow_success(self, mock_exists, mock_qemu_img_info, mock_cvrt_image):
class NetappDirect7modeNfsDriverTestCase(NetappDirectCmodeNfsDriverTestCase):
def _custom_setup(self):
def _prepare_delete_snapshot_mock(self, snapshot_exists):
def test_check_for_setup_error_version(self):
def test_check_for_setup_error(self):
def test_do_setup(self):
def _prepare_clone_mock(self, status):
def test_clone_volume_clear(self):
\OpenStack\cinder-2014.1\cinder\tests\test_netapp_ssc.py
class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def log_message(self, format, *args):
class FakeHttplibSocket(object):
def __init__(self, value):
def newclose():
def makefile(self, mode, _other):
class FakeDirectCMODEServerHandler(FakeHTTPRequestHandler):
def do_GET(s):
def do_POST(s):
class FakeDirectCmodeHTTPConnection(object):
def __init__(self, host, timeout=None):
def request(self, method, path, data=None, headers=None):
def set_debuglevel(self, level):
def getresponse(self):
def getresponsebody(self):
def createNetAppVolume(**kwargs):
class SscUtilsTestCase(test.TestCase):
def setUp(self):
def test_cl_vols_ssc_all(self):
def test_cl_vols_ssc_single(self):
def test_get_cluster_ssc(self):
def test_vols_for_boolean_specs(self):
def test_vols_for_optional_specs(self):
def test_query_cl_vols_for_ssc(self):
def test_query_aggr_options(self):
def test_query_aggr_storage_disk(self):
\OpenStack\cinder-2014.1\cinder\tests\test_nexenta.py
class TestNexentaISCSIDriver(test.TestCase):
def __init__(self, method):
def setUp(self):
def test_setup_error(self):
def test_setup_error_fail(self):
def test_local_path(self):
def test_create_volume(self):
def test_delete_volume(self):
def test_create_cloned_volume(self):
def test_migrate_volume(self):
def test_create_snapshot(self):
def test_create_volume_from_snapshot(self):
def test_delete_snapshot(self):
def _stub_export_method(self, module, method, args, error, raise_exception, fail=False):
def _stub_all_export_methods(self, fail=False):
def test_create_export(self):
def __get_test(i):
def _test_create_export_fail(self):
def test_ensure_export(self):
def test_remove_export(self):
def test_remove_export_fail_0(self):
def test_remove_export_fail_1(self):
def test_get_volume_stats(self):
class TestNexentaJSONRPC(test.TestCase):
def setUp(self):
def test_call(self):
def test_call_deep(self):
def test_call_auto(self):
def test_call_error(self):
def test_call_fail(self):
class TestNexentaNfsDriver(test.TestCase):
def _create_volume_db_entry(self):
def setUp(self):
def test_check_for_setup_error(self):
def test_initialize_connection(self):
def test_do_create_volume(self):
def test_create_sparsed_file(self):
def test_create_regular_file(self):
def test_set_rw_permissions_for_all(self):
def test_local_path(self):
def test_remote_path(self):
def test_share_folder(self):
def test_load_shares_config(self):
def test_get_capacity_info(self):
def test_get_share_datasets(self):
def test_delete_snapshot(self):
def test_delete_volume(self):
class TestNexentaUtils(test.TestCase):
def test_str2size(self):
def test_str2gib_size(self):
def test_parse_nms_url(self):
\OpenStack\cinder-2014.1\cinder\tests\test_nfs.py
class DumbVolume(object):
def __setitem__(self, key, value):
def __getitem__(self, item):
class RemoteFsDriverTestCase(test.TestCase):
def setUp(self):
def test_create_sparsed_file(self):
def test_create_regular_file(self):
def test_create_qcow2_file(self):
def test_set_rw_permissions_for_all(self):
class NfsDriverTestCase(test.TestCase):
def setUp(self):
def stub_out_not_replaying(self, obj, attr_name):
def test_local_path(self):
def test_copy_image_to_volume(self):
def fake_local_path(volume):
def test_get_mount_point_for_share(self):
def test_get_capacity_info(self):
def test_get_capacity_info_for_share_and_mount_point_with_spaces(self):
def test_load_shares_config(self):
def test_ensure_shares_mounted_should_save_mounting_successfully(self):
def test_ensure_shares_mounted_should_not_save_mounting_with_error(self):
def test_setup_should_throw_error_if_shares_config_not_configured(self):
def test_setup_should_throw_error_if_oversub_ratio_less_than_zero(self):
def test_setup_should_throw_error_if_used_ratio_less_than_zero(self):
def test_setup_should_throw_error_if_used_ratio_greater_than_one(self):
def test_setup_should_throw_exception_if_nfs_client_is_not_installed(self):
def test_find_share_should_throw_error_if_there_is_no_mounted_shares(self):
def test_find_share(self):
def test_find_share_should_throw_error_if_there_is_no_enough_place(self):
def _simple_volume(self):
def test_create_sparsed_volume(self):
def test_create_nonsparsed_volume(self):
def test_create_volume_should_ensure_nfs_mounted(self):
def test_create_volume_should_return_provider_location(self):
def test_delete_volume(self):
def test_delete_should_ensure_share_mounted(self):
def test_delete_should_not_delete_if_provider_location_not_provided(self):
def test_get_volume_stats(self):
\OpenStack\cinder-2014.1\cinder\tests\test_policy.py
class PolicyFileTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_modified_policy_reloads(self):
class PolicyTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_enforce_nonexistent_action_throws(self):
def test_enforce_bad_action_throws(self):
def test_enforce_good_action(self):
def test_enforce_http_true(self):
def fakeurlopen(url, post_data):
def test_enforce_http_false(self):
def fakeurlopen(url, post_data):
def test_templatized_enforcement(self):
def test_early_AND_enforcement(self):
def test_early_OR_enforcement(self):
def test_ignore_case_role_check(self):
class DefaultPolicyTestCase(test.TestCase):
def setUp(self):
def _set_brain(self, default_rule):
def tearDown(self):
def test_policy_called(self):
def test_not_found_policy_calls_default(self):
def test_default_not_found(self):
class ContextIsAdminPolicyTestCase(test.TestCase):
def setUp(self):
def test_default_admin_role_is_admin(self):
def test_custom_admin_role_is_admin(self):
def test_context_is_admin_undefined(self):
\OpenStack\cinder-2014.1\cinder\tests\test_qos_specs.py
def fake_db_qos_specs_create(context, values):
class QoSSpecsTestCase(test.TestCase):
def setUp(self):
def _create_qos_specs(self, name, values=None):
def test_create(self):
def test_update(self):
def fake_db_update(context, specs_id, values):
def test_delete(self):
def fake_db_associations_get(context, id):
def fake_db_delete(context, id):
def fake_disassociate_all(context, id):
def test_delete_keys(self):
def fake_db_qos_delete_key(context, id, key):
def fake_qos_specs_get(context, id):
def test_get_associations(self):
def fake_db_associate_get(context, id):
def test_associate_qos_with_type(self):
def fake_qos_specs_get(context, id):
def fake_db_associate(context, id, type_id):
def fake_vol_type_qos_get(type_id):
def test_disassociate_qos_specs(self):
def fake_qos_specs_get(context, id):
def fake_db_disassociate(context, id, type_id):
def test_disassociate_all(self):
def fake_db_disassociate_all(context, id):
def fake_qos_specs_get(context, id):
def test_get_all_specs(self):
def test_get_qos_specs(self):
def test_get_qos_specs_by_name(self):
\OpenStack\cinder-2014.1\cinder\tests\test_quota.py
class QuotaIntegrationTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _create_volume(self, size=1):
def _create_snapshot(self, volume):
def test_too_many_volumes(self):
def test_too_many_volumes_of_type(self):
def test_too_many_snapshots_of_type(self):
def test_too_many_gigabytes(self):
def test_too_many_combined_gigabytes(self):
def test_no_snapshot_gb_quota_flag(self):
def test_too_many_gigabytes_of_type(self):
class FakeContext(object):
def __init__(self, project_id, quota_class):
def elevated(self):
class FakeDriver(object):
def __init__(self, by_project=None, by_class=None, reservations=None):
def get_by_project(self, context, project_id, resource):
def get_by_class(self, context, quota_class, resource):
def get_default(self, context, resource):
def get_defaults(self, context, resources):
def get_class_quotas(self, context, resources, quota_class, defaults=True):
def get_project_quotas(self, context, resources, project_id, quota_class=None, defaults=True, usages=True):
def limit_check(self, context, resources, values, project_id=None):
def reserve(self, context, resources, deltas, expire=None, project_id=None):
def commit(self, context, reservations, project_id=None):
def rollback(self, context, reservations, project_id=None):
def destroy_all_by_project(self, context, project_id):
def expire(self, context):
class BaseResourceTestCase(test.TestCase):
def test_no_flag(self):
def test_with_flag(self):
def test_with_flag_no_quota(self):
def test_quota_no_project_no_class(self):
def test_quota_with_project_no_class(self):
def test_quota_no_project_with_class(self):
def test_quota_with_project_with_class(self):
def test_quota_override_project_with_class(self):
def test_quota_with_project_override_class(self):
class VolumeTypeResourceTestCase(test.TestCase):
def test_name_and_flag(self):
class QuotaEngineTestCase(test.TestCase):
def test_init(self):
def test_init_override_string(self):
def test_init_override_obj(self):
def test_register_resource(self):
def test_register_resources(self):
def test_get_by_project(self):
def test_get_by_class(self):
def _make_quota_obj(self, driver):
def test_get_defaults(self):
def test_get_class_quotas(self):
def test_get_project_quotas(self):
def test_count_no_resource(self):
def test_count_wrong_resource(self):
def test_count(self):
def fake_count(context, *args, **kwargs):
def test_limit_check(self):
def test_reserve(self):
def test_commit(self):
def test_rollback(self):
def test_destroy_all_by_project(self):
def test_expire(self):
def test_resource_names(self):
class VolumeTypeQuotaEngineTestCase(test.TestCase):
def test_default_resources(self):
def fake_vtga(context, inactive=False, filters=None):
def test_volume_type_resources(self):
def fake_vtga(context, inactive=False, filters=None):
class DbQuotaDriverTestCase(test.TestCase):
def setUp(self):
def test_get_defaults(self):
def _stub_quota_class_get_default(self):
def fake_qcgd(context):
def _stub_volume_type_get_all(self):
def fake_vtga(context, inactive=False, filters=None):
def _stub_quota_class_get_all_by_name(self):
def fake_qcgabn(context, quota_class):
def test_get_class_quotas(self):
def test_get_class_quotas_no_defaults(self):
def _stub_get_by_project(self):
def fake_qgabp(context, project_id):
def fake_qugabp(context, project_id):
def test_get_project_quotas(self):
def test_get_project_quotas_alt_context_no_class(self):
def test_get_project_quotas_alt_context_with_class(self):
def test_get_project_quotas_no_defaults(self):
def test_get_project_quotas_no_usages(self):
def _stub_get_project_quotas(self):
def fake_get_project_quotas(context, resources, project_id, quota_class=None, defaults=True, usages=True):
def test_get_quotas_has_sync_unknown(self):
def test_get_quotas_no_sync_unknown(self):
def test_get_quotas_has_sync_no_sync_resource(self):
def test_get_quotas_no_sync_has_sync_resource(self):
def test_get_quotas_has_sync(self):
def _stub_quota_reserve(self):
def fake_quota_reserve(context, resources, quotas, deltas, expire, until_refresh, max_age, project_id=None):
def test_reserve_bad_expire(self):
def test_reserve_default_expire(self):
def test_reserve_int_expire(self):
def test_reserve_timedelta_expire(self):
def test_reserve_datetime_expire(self):
def test_reserve_until_refresh(self):
def test_reserve_max_age(self):
def _stub_quota_destroy_all_by_project(self):
def fake_quota_destroy_all_by_project(context, project_id):
def test_destroy_by_project(self):
class FakeSession(object):
def begin(self):
def __enter__(self):
def __exit__(self, exc_type, exc_value, exc_traceback):
class FakeUsage(sqa_models.QuotaUsage):
def save(self, *args, **kwargs):
class QuotaReserveSqlAlchemyTestCase(test.TestCase):
def setUp(self):
def make_sync(res_name):
def fake_get_session():
def fake_get_quota_usages(context, session, project_id):
def fake_quota_usage_create(context, project_id, resource, in_use, reserved, until_refresh, session=None, save=True):
def fake_reservation_create(context, uuid, usage_id, project_id, resource, delta, expire, session=None):
def _make_quota_usage(self, project_id, resource, in_use, reserved, until_refresh, created_at, updated_at):
def init_usage(self, project_id, resource, in_use, reserved, until_refresh=None, created_at=None, updated_at=None):
def compare_usage(self, usage_dict, expected):
def _make_reservation(self, uuid, usage_id, project_id, resource, delta, expire, created_at, updated_at):
def compare_reservation(self, reservations, expected):
def test_quota_reserve_create_usages(self):
def test_quota_reserve_negative_in_use(self):
def test_quota_reserve_until_refresh(self):
def test_quota_reserve_max_age(self):
def test_quota_reserve_no_refresh(self):
def test_quota_reserve_unders(self):
def test_quota_reserve_overs(self):
def test_quota_reserve_reduction(self):
\OpenStack\cinder-2014.1\cinder\tests\test_rbd.py
class MockException(Exception):
def __init__(self, *args, **kwargs):
class MockImageNotFoundException(MockException):
class MockImageBusyException(MockException):
def _common_inner_inner1(inst, *args, **kwargs):
def _common_inner_inner2(mock_rados, mock_rbd, mock_client, mock_proxy):
class TestUtil(test.TestCase):
def test_ascii_str(self):
class RBDTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_create_volume(self):
def test_create_volume_no_layering(self):
def test_delete_volume(self):
def delete_volume_not_found(self):
def test_delete_busy_volume(self):
def test_create_snapshot(self):
def test_delete_snapshot(self):
def test_get_clone_info(self):
def test_get_clone_info_w_snap(self):
def test_get_clone_info_w_exception(self):
def test_get_clone_info_deleted_volume(self):
def test_create_cloned_volume(self):
def test_create_cloned_volume_w_flatten(self):
def test_create_cloned_volume_w_clone_exception(self):
def test_good_locations(self):
def test_bad_locations(self):
def test_cloneable(self):
def test_uncloneable_different_fsid(self):
def test_uncloneable_unreadable(self):
def test_uncloneable_bad_format(self):
def _copy_image(self):
def test_copy_image_no_volume_tmp(self):
def test_copy_image_volume_tmp(self):
def test_update_volume_stats(self):
def test_update_volume_stats_error(self):
def test_get_mon_addrs(self):
def test_initialize_connection(self):
def test_clone(self):
def mock__enter__(inst):
def test_extend_volume(self):
def test_rbd_volume_proxy_init(self):
def test_connect_to_rados(self):
class RBDImageIOWrapperTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def test_init(self):
def test_inc_offset(self):
def test_rbd_image(self):
def test_rbd_user(self):
def test_rbd_pool(self):
def test_rbd_conf(self):
def test_read(self):
def mock_read(offset, length):
def test_write(self):
def test_seekable(self):
def test_seek(self):
def test_tell(self):
def test_flush(self):
def test_fileno(self):
def test_close(self):
class ManagedRBDTestCase(DriverTestCase):
def setUp(self):
def _create_volume_from_image(self, expected_status, raw=False, clone_error=False):
def test_create_vol_from_image_status_available(self):
def _mock_clone_image(volume, image_location, image_id, image_meta):
def test_create_vol_from_non_raw_image_status_available(self):
def _mock_clone_image(volume, image_location, image_id, image_meta):
def test_create_vol_from_image_status_error(self):
def test_clone_failure(self):
def test_clone_success(self):
\OpenStack\cinder-2014.1\cinder\tests\test_scality.py
class ScalityDriverTestCase(test.TestCase):
def _makedirs(self, path):
def _create_fake_config(self):
def _create_fake_mount(self):
def _remove_fake_config(self):
def _configure_driver(self):
def _execute_wrapper(self, cmd, *args, **kwargs):
def _set_access_wrapper(self, is_visible):
def _access_wrapper(path, flags):
def setUp(self):
def tearDown(self):
def test_setup_no_config(self):
def test_setup_missing_config(self):
def test_setup_no_mount_helper(self):
def test_setup_make_voldir(self):
def test_local_path(self):
def test_create_volume(self):
def test_delete_volume(self):
def test_create_snapshot(self):
def test_delete_snapshot(self):
def test_initialize_connection(self):
def test_copy_image_to_volume(self):
def test_copy_volume_to_image(self):
def test_create_cloned_volume(self):
def test_extend_volume(self):
\OpenStack\cinder-2014.1\cinder\tests\test_service.py
class FakeManager(manager.Manager):
def __init__(self, host=None, db_driver=None, service_name=None):
def test_method(self):
class ExtendedService(service.Service):
def test_method(self):
class ServiceManagerTestCase(test.TestCase):
def test_message_gets_to_manager(self):
def test_override_manager_method(self):
class ServiceFlagsTestCase(test.TestCase):
def test_service_enabled_on_create_based_on_flag(self):
def test_service_disabled_on_create_based_on_flag(self):
class ServiceTestCase(test.TestCase):
def setUp(self):
def test_create(self):
def test_report_state_newly_disconnected(self):
def test_report_state_newly_connected(self):
def test_service_with_long_report_interval(self):
class TestWSGIService(test.TestCase):
def setUp(self):
def test_service_random_port(self):
class OSCompatibilityTestCase(test.TestCase):
def _test_service_launcher(self, fake_os):
def test_process_launcher_on_windows(self):
def test_process_launcher_on_linux(self):
\OpenStack\cinder-2014.1\cinder\tests\test_sheepdog.py
class FakeImageService:
def download(self, context, image_id, path):
class SheepdogTestCase(test.TestCase):
def setUp(self):
def test_update_volume_stats(self):
def fake_stats(*args):
def test_update_volume_stats_error(self):
def fake_stats(*args):
def test_check_for_setup_error_0_5(self):
def fake_stats(*args):
def test_check_for_setup_error_0_6(self):
def fake_stats(*args):
def test_copy_image_to_volume(self):
def fake_temp_file(dir):
def fake_try_execute(obj, *command, **kwargs):
def test_extend_volume(self):
\OpenStack\cinder-2014.1\cinder\tests\test_solidfire.py
def create_configuration():
class SolidFireVolumeTestCase(test.TestCase):
def setUp(self):
def fake_issue_api_request(obj, method, params, version='1.0'):
def fake_issue_api_request_fails(obj, method, params, version='1.0'):
def fake_set_qos_by_volume_type(self, type_id, ctxt):
def fake_volume_get(obj, key, default=None):
def fake_update_cluster_status(self):
def fake_get_model_info(self, account, vid):
def test_create_with_qos_type(self):
def test_create_volume(self):
def test_create_volume_non_512(self):
def test_create_snapshot(self):
def test_create_clone(self):
def test_initialize_connector_with_blocksizes(self):
def test_create_volume_with_qos(self):
def test_create_volume_fails(self):
def test_create_sfaccount(self):
def test_create_sfaccount_fails(self):
def test_get_sfaccount_by_name(self):
def test_get_sfaccount_by_name_fails(self):
def test_delete_volume(self):
def test_delete_volume_fails_no_volume(self):
def test_delete_volume_fails_account_lookup(self):
def test_get_cluster_info(self):
def test_get_cluster_info_fail(self):
def test_extend_volume(self):
def test_extend_volume_fails_no_volume(self):
def test_extend_volume_fails_account_lookup(self):
def test_set_by_qos_spec_with_scoping(self):
def test_set_by_qos_spec(self):
def test_set_by_qos_by_type_only(self):
def test_retype(self):
def test_retype_with_qos_spec(self):
def _fake_get_volume_type(ctxt, type_id):
def _fake_get_qos_spec(ctxt, spec_id):
\OpenStack\cinder-2014.1\cinder\tests\test_storwize_svc.py
class StorwizeSVCManagementSimulator:
def __init__(self, pool_name):
def _state_transition(self, function, fcmap):
def _find_unused_id(d):
def _is_invalid_name(name):
def _cmd_to_dict(arg_list):
def _print_info_cmd(rows, delim=' ', nohdr=False, **kwargs):
def _print_info_obj_cmd(header, row, delim=' ', nohdr=False):
def _convert_bytes_units(bytestr):
def _convert_units_bytes(num, unit):
def _cmd_lslicense(self, **kwargs):
def _cmd_lssystem(self, **kwargs):
def _cmd_lsmdiskgrp(self, **kwargs):
def _cmd_lsnodecanister(self, **kwargs):
def _cmd_lsnode(self, **kwargs):
def _cmd_lsportip(self, **kwargs):
def _cmd_lsfabric(self, **kwargs):
def _cmd_mkvdisk(self, **kwargs):
def _cmd_rmvdisk(self, **kwargs):
def _cmd_expandvdisksize(self, **kwargs):
def _get_fcmap_info(self, vol_name):
def _cmd_lsvdisk(self, **kwargs):
def _cmd_lsiogrp(self, **kwargs):
def _add_port_to_host(self, host_info, **kwargs):
def _cmd_mkhost(self, **kwargs):
def _cmd_addhostport(self, **kwargs):
def _cmd_chhost(self, **kwargs):
def _cmd_rmhost(self, **kwargs):
def _cmd_lshost(self, **kwargs):
def _cmd_lsiscsiauth(self, **kwargs):
def _cmd_mkvdiskhostmap(self, **kwargs):
def _cmd_rmvdiskhostmap(self, **kwargs):
def _cmd_lshostvdiskmap(self, **kwargs):
def _cmd_lsvdiskhostmap(self, **kwargs):
def _cmd_mkfcmap(self, **kwargs):
def _cmd_prestartfcmap(self, **kwargs):
def _cmd_startfcmap(self, **kwargs):
def _cmd_stopfcmap(self, **kwargs):
def _cmd_rmfcmap(self, **kwargs):
def _cmd_lsvdiskfcmappings(self, **kwargs):
def _cmd_chfcmap(self, **kwargs):
def _cmd_lsfcmap(self, **kwargs):
def _cmd_migratevdisk(self, **kwargs):
def _cmd_addvdiskcopy(self, **kwargs):
def _cmd_lsvdiskcopy(self, **kwargs):
def _cmd_rmvdiskcopy(self, **kwargs):
def _cmd_chvdisk(self, **kwargs):
def _cmd_movevdisk(self, **kwargs):
def _cmd_addvdiskaccess(self, **kwargs):
def _cmd_rmvdiskaccess(self, **kwargs):
def _add_host_to_list(self, connector):
def _host_in_list(self, host_name):
def execute_command(self, cmd, check_exit_code=True):
def error_injection(self, cmd, error):
class StorwizeSVCFakeDriver(storwize_svc.StorwizeSVCDriver):
def __init__(self, *args, **kwargs):
def set_fake_storage(self, fake):
def _run_ssh(self, cmd, check_exit_code=True, attempts=1):
class StorwizeSVCDriverTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def _set_flag(self, flag, value):
def _reset_flags(self):
def _assert_vol_exists(self, name, exists):
def test_storwize_svc_connectivity(self):
def _generate_vol_info(self, vol_name, vol_id):
def _create_volume(self, **kwargs):
def _delete_volume(self, volume):
def _create_test_vol(self, opts):
def test_storwize_svc_snapshots(self):
def test_storwize_svc_create_volfromsnap_clone(self):
def test_storwize_svc_volumes(self):
def test_storwize_svc_volume_params(self):
def test_storwize_svc_unicode_host_and_volume_names(self):
def test_storwize_svc_validate_connector(self):
def test_storwize_svc_host_maps(self):
def test_storwize_svc_multi_host_maps(self):
def test_storwize_svc_delete_volume_snapshots(self):
def assertLessEqual(self, a, b, msg=None):
def test_storwize_svc_get_volume_stats(self):
def test_storwize_svc_extend_volume(self):
def _check_loc_info(self, capabilities, expected):
def test_storwize_svc_migrate_bad_loc_info(self):
def test_storwize_svc_volume_migrate(self):
def test_storwize_svc_retype_no_copy(self):
def test_storwize_svc_retype_only_change_iogrp(self):
def test_storwize_svc_retype_need_copy(self):
def test_set_storage_code_level_success(self):
def test_storwize_vdisk_copy_ops(self):
def test_storwize_initiator_target_map(self):
def _get_vdisk_uid(self, vdisk_name):
def _create_volume_and_return_uid(self, volume_name):
def test_manage_existing_bad_ref(self):
def test_manage_existing_bad_uid(self):
def test_manage_existing_good_uid_not_mapped(self):
def test_manage_existing_good_uid_mapped(self):
def test_manage_existing_good_uid_mapped_with_override(self):
class CLIResponseTestCase(test.TestCase):
def test_empty(self):
def test_header(self):
def test_select(self):
def test_lsnode_all(self):
def test_lsnode_single(self):
class StorwizeHelpersTestCase(test.TestCase):
def setUp(self):
def test_compression_enabled(self):
\OpenStack\cinder-2014.1\cinder\tests\test_test.py
class IsolationTestCase(test.TestCase):
def test_service_isolation(self):
def test_rpc_consumer_isolation(self):
\OpenStack\cinder-2014.1\cinder\tests\test_test_utils.py
class TestUtilsTestCase(test.TestCase):
def test_get_test_admin_context(self):
\OpenStack\cinder-2014.1\cinder\tests\test_utils.py
def _get_local_mock_open(fake_data='abcd efgh'):
class ExecuteTestCase(test.TestCase):
def test_retry_on_failure(self):
def test_unknown_kwargs_raises_error(self):
def test_check_exit_code_boolean(self):
def test_no_retry_on_success(self):
class GetFromPathTestCase(test.TestCase):
def test_tolerates_nones(self):
def test_does_select(self):
def test_flattens_lists(self):
def test_bad_xpath(self):
def test_real_failure1(self):
def test_accepts_dictionaries(self):
class GenericUtilsTestCase(test.TestCase):
def test_find_config(self, mock_exists):
def test_as_int(self):
def test_check_exclusive_options(self):
def test_require_driver_intialized(self):
def test_hostname_unicode_sanitization(self):
def test_hostname_sanitize_periods(self):
def test_hostname_sanitize_dashes(self):
def test_hostname_sanitize_characters(self):
def test_hostname_translate(self):
def test_generate_glance_url(self):
def test_read_cached_file(self, mock_mtime, mock_open):
def test_read_modified_cached_file(self, mock_mtime, mock_open):
def test_generate_password(self):
def test_read_file_as_root(self):
def fake_execute(*args, **kwargs):
def test_temporary_chown(self):
def fake_execute(*args, **kwargs):
def test_service_is_up(self, mock_utcnow):
def test_safe_parse_xml(self):
def killer_body():
def test_xhtml_escape(self):
def test_hash_file(self):
def test_check_ssh_injection(self):
def test_check_ssh_injection_on_error(self):
def test_create_channel(self, mock_client):
def test_get_file_mode(self, mock_stat):
def test_get_file_gid(self, mock_stat):
class MonkeyPatchTestCase(test.TestCase):
def setUp(self):
def test_monkey_patch(self):
class AuditPeriodTest(test.TestCase):
def setUp(self):
def test_hour(self):
def test_hour_with_offset_before_current(self):
def test_hour_with_offset_after_current(self):
def test_day(self):
def test_day_with_offset_before_current(self):
def test_day_with_offset_after_current(self):
def test_month(self):
def test_month_with_offset_before_current(self):
def test_month_with_offset_after_current(self):
def test_year(self):
def test_year_with_offset_before_current(self):
def test_year_with_offset_after_current(self):
class FakeSSHClient(object):
def __init__(self):
def set_missing_host_key_policy(self, policy):
def connect(self, ip, port=22, username=None, password=None, pkey=None, timeout=10):
def get_transport(self):
def close(self):
def __call__(self, *args, **kwargs):
class FakeSock(object):
def settimeout(self, timeout):
class FakeTransport(object):
def __init__(self):
def set_keepalive(self, timeout):
def is_active(self):
class SSHPoolTestCase(test.TestCase):
def test_single_ssh_connect(self, mock_sshclient, mock_pkey):
def test_closed_reopend_ssh_connections(self, mock_sshclient):
class BrickUtils(test.TestCase):
def test_brick_get_connector_properties(self):
def test_brick_get_connector(self):
class StringLengthTestCase(test.TestCase):
def test_check_string_length(self):
\OpenStack\cinder-2014.1\cinder\tests\test_vmware_api.py
class RetryTest(test.TestCase):
def test_retry(self):
def func(*args, **kwargs):
def func2(*args, **kwargs):
def test_retry_with_expected_exceptions(self):
def func(*args, **kwargs):
def test_retry_with_max_retries(self):
def func(*args, **kwargs):
def test_retry_with_unexpected_exception(self):
def func(*args, **kwargs):
class VMwareAPISessionTest(test.TestCase):
def setUp(self):
def _create_api_session(self, _create_session, retry_count=10, task_poll_interval=1):
def test_create_session(self):
def test_create_session_with_existing_session(self):
def test_invoke_api(self):
def api(*args, **kwargs):
def test_invoke_api_with_expected_exception(self):
def api(*args, **kwargs):
def test_invoke_api_with_vim_fault_exception(self):
def api(*args, **kwargs):
def test_invoke_api_with_empty_response(self):
def api(*args, **kwargs):
def test_invoke_api_with_stale_session(self):
def api(*args, **kwargs):
\OpenStack\cinder-2014.1\cinder\tests\test_vmware_vmdk.py
class FakeVim(object):
def service_content(self):
def client(self):
def Login(self, session_manager, userName, password):
def Logout(self, session_manager):
def TerminateSession(self, session_manager, sessionId):
def SessionIsActive(self, session_manager, sessionID, userName):
class FakeTaskInfo(object):
def __init__(self, state, result=None):
class FakeMor(object):
def __init__(self, type, val):
class FakeObject(object):
def __init__(self):
def __setitem__(self, key, value):
def __getitem__(self, item):
class FakeManagedObjectReference(object):
def __init__(self, lis=[]):
class FakeDatastoreSummary(object):
def __init__(self, freeSpace, capacity, datastore=None, name=None):
class FakeSnapshotTree(object):
def __init__(self, tree=None, name=None, snapshot=None, childSnapshotList=None):
class FakeElem(object):
def __init__(self, prop_set=None):
class FakeProp(object):
def __init__(self, name=None, val=None):
class FakeRetrieveResult(object):
def __init__(self, objects, token):
class FakeObj(object):
def __init__(self, obj=None):
class VMwareEsxVmdkDriverTestCase(test.TestCase):
def setUp(self):
def test_retry(self):
def test_create_session(self):
def test_do_setup(self):
def test_check_for_setup_error(self):
def test_get_volume_stats(self):
def test_create_volume(self):
def test_success_wait_for_task(self):
def test_failed_wait_for_task(self):
def test_delete_volume_without_backing(self):
def test_delete_volume_with_backing(self):
def test_create_export(self):
def test_ensure_export(self):
def test_remove_export(self):
def test_terminate_connection(self):
def test_create_backing_in_inventory_multi_hosts(self):
def test_init_conn_with_instance_and_backing(self):
def test_get_volume_group_folder(self):
def test_select_datastore_summary(self):
def test_get_folder_ds_summary(self, volumeops, session):
def test_get_disk_type(self):
def test_init_conn_with_instance_no_backing(self):
def test_init_conn_without_instance(self):
def test_create_snapshot_without_backing(self):
def test_create_snapshot_with_backing(self):
def test_create_snapshot_when_attached(self):
def test_delete_snapshot_without_backing(self):
def test_delete_snapshot_with_backing(self):
def test_delete_snapshot_when_attached(self):
def test_create_cloned_volume_without_backing(self, mock_vops):
def test_create_cloned_volume_with_backing(self, mock_vops):
def test_create_backing_by_copying(self, volumeops, create_backing, _extend_virtual_disk):
def _test_create_backing_by_copying(self, volumeops, create_backing, _extend_virtual_disk):
def test_create_volume_from_snapshot_without_backing(self, mock_vops):
def test_create_volume_from_snap_without_backing_snap(self, mock_vops):
def test_create_volume_from_snapshot(self, mock_vops):
def test_extend_volume(self, volume_ops, _extend_virtual_disk, _select_ds_for_volume):
def _test_extend_volume(self, volume_ops, _extend_virtual_disk, _select_ds_for_volume):
def test_copy_image_to_volume_non_vmdk(self):
def test_copy_image_to_volume_vmdk(self, volume_ops, session, _create_backing_in_inventory, _get_ds_name_flat_vmdk_path, _extend_vmdk_virtual_disk, fetch_flat_image):
def _test_copy_image_to_volume_vmdk(self, volume_ops, session, _create_backing_in_inventory, _get_ds_name_flat_vmdk_path, _extend_vmdk_virtual_disk, fetch_flat_image):
def test_copy_image_to_volume_stream_optimized(self, volumeops, session, _select_ds_for_volume, _extend_virtual_disk, fetch_optimized_image):
def _test_copy_image_to_volume_stream_optimized(self, volumeops, session, _select_ds_for_volume, _extend_virtual_disk, fetch_optimized_image):
def test_copy_volume_to_image_non_vmdk(self):
def test_copy_volume_to_image_when_attached(self):
def test_copy_volume_to_image_vmdk(self):
def test_retrieve_properties_ex_fault_checker(self):
def test_extend_vmdk_virtual_disk(self, volume_ops):
def _test_extend_vmdk_virtual_disk(self, volume_ops):
class VMwareVcVmdkDriverTestCase(VMwareEsxVmdkDriverTestCase):
def setUp(self):
def test_get_pbm_wsdl_location(self):
def expected_wsdl(version):
def test_get_vc_version(self, session):
def test_do_setup(self, session, _get_vc_version, _get_pbm_wsdl_location):
def test_create_backing_by_copying(self, volumeops, create_backing, extend_virtual_disk):
def test_init_conn_with_instance_and_backing(self):
def test_get_volume_group_folder(self):
def test_init_conn_with_instance_and_backing_and_relocation(self):
def test_clone_backing_linked(self, volume_ops, _extend_vmdk_virtual_disk):
def test_clone_backing_full(self, volume_ops, _select_ds_for_volume, _extend_vmdk_virtual_disk):
def test_create_volume_from_snapshot_without_backing(self, mock_vops):
def test_create_volume_from_snap_without_backing_snap(self, mock_vops):
def test_create_volume_from_snapshot(self, mock_vops):
def test_create_cloned_volume_without_backing(self, mock_vops):
def test_create_cloned_volume_with_backing(self, mock_vops):
def test_create_linked_cloned_volume_with_backing(self, get_clone_type, mock_vops):
def test_create_linked_cloned_volume_when_attached(self, get_clone_type, mock_vops):
def test_get_storage_profile(self, get_volume_type_extra_specs):
def test_filter_ds_by_profile(self, volumeops, session, hubs_to_ds, ds_to_hubs):
def test_get_folder_ds_summary(self, volumeops, session):
def filter_ds(datastores, storage_profile):
def test_extend_vmdk_virtual_disk(self, volume_ops):
def test_copy_image_to_volume_vmdk(self, volume_ops, session, _create_backing_in_inventory, _get_ds_name_flat_vmdk_path, _extend_vmdk_virtual_disk, fetch_flat_image):
def test_copy_image_to_volume_stream_optimized(self, volumeops, session, _select_ds_for_volume, _extend_virtual_disk, fetch_optimized_image):
def test_extend_volume(self, volume_ops, _extend_virtual_disk, _select_ds_for_volume):
\OpenStack\cinder-2014.1\cinder\tests\test_vmware_volumeops.py
class VolumeOpsTestCase(test.TestCase):
def setUp(self):
def test_split_datastore_path(self):
def vm(self, val):
def test_get_backing(self):
def test_delete_backing(self):
def test_get_host(self):
def test_get_hosts(self):
def test_continue_retrieval(self):
def test_cancel_retrieval(self):
def test_is_usable(self):
def _create_host_mounts(self, access_mode, host, set_accessible=True, is_accessible=True, mounted=True):
def test_get_connected_hosts(self):
def test_is_valid(self):
def _is_valid(host_mounts, is_valid):
def test_get_dss_rp(self):
def test_get_parent(self):
def test_get_dc(self):
def mock_invoke_api(vim_util, method, vim, the_object, arg):
def test_get_vmfolder(self):
def test_create_folder_not_present(self):
def test_create_folder_already_present(self):
def test_get_create_spec(self):
def test_create_backing(self, get_create_spec):
def test_get_datastore(self):
def test_get_summary(self):
def test_get_relocate_spec(self):
def test_relocate_backing(self, get_relocate_spec):
def test_move_backing_to_folder(self):
def test_create_snapshot_operation(self):
def test_get_snapshot_from_tree(self):
def test_get_snapshot(self):
def test_delete_snapshot(self):
def test_get_folder(self):
def test_get_clone_spec(self):
def test_clone_backing(self, get_clone_spec):
def test_delete_file(self):
def test_get_path_name(self):
def test_get_entity_name(self):
def test_get_vmdk_path(self):
def test_copy_vmdk_file(self):
def test_delete_vmdk_file(self):
def test_extend_virtual_disk(self):
\OpenStack\cinder-2014.1\cinder\tests\test_volume.py
class FakeImageService:
def __init__(self, db_driver=None, image_service=None):
def show(self, context, image_id):
class BaseVolumeTestCase(test.TestCase):
def setUp(self):
def tearDown(self):
def fake_get_target(obj, iqn):
def fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
class VolumeTestCase(BaseVolumeTestCase):
def setUp(self):
def test_init_host_clears_downloads(self):
def test_create_driver_not_initialized(self, reserve, commit, rollback):
def fake_reserve(context, expire=None, project_id=None, **deltas):
def fake_commit_and_rollback(context, reservations, project_id=None):
def test_delete_driver_not_initialized(self, reserve, commit, rollback):
OpenStack IndexPreviousNext
|