|  OpenStack Study: cinderOpenStack IndexPreviousNext
         def fake_reserve(context, expire=None, project_id=None, **deltas):
         def fake_commit_and_rollback(context, reservations, project_id=None):
     def test_create_delete_volume(self):
         def fake_reserve(context, expire=None, project_id=None, **deltas):
         def fake_commit(context, reservations, project_id=None):
         def fake_rollback(context, reservations, project_id=None):
     def test_create_delete_volume_with_metadata(self):
     def test_create_volume_with_invalid_metadata(self):
     def test_create_volume_uses_default_availability_zone(self):
         def fake_list_availability_zones():
     def test_create_volume_with_volume_type(self):
         def fake_reserve(context, expire=None, project_id=None, **deltas):
         def fake_commit(context, reservations, project_id=None):
         def fake_rollback(context, reservations, project_id=None):
     def test_create_volume_with_encrypted_volume_type(self):
     def test_create_delete_volume_with_encrypted_volume_type(self):
     def test_extra_capabilities(self):
     def test_extra_capabilities_fail(self):
     def test_delete_busy_volume(self):
     def test_delete_volume_in_error_extending(self):
     def test_create_volume_from_snapshot(self):
     def test_create_volume_from_snapshot_with_types(self, _get_flow):
     def test_create_volume_from_source_with_types(self, _get_flow):
     def test_create_snapshot_driver_not_initialized(self):
     def _mock_synchronized(self, name, *s_args, **s_kwargs):
         def inner_sync1(f):
     def test_create_volume_from_snapshot_check_locks(self):
         def mock_flow_run(*args, **kwargs):
     def test_create_volume_from_volume_check_locks(self):
         def mock_flow_run(*args, **kwargs):
     def test_create_volume_from_volume_delete_lock_taken(self):
         def mock_elevated(*args, **kwargs):
     def test_create_volume_from_snapshot_delete_lock_taken(self):
         def mock_elevated(*args, **kwargs):
     def test_create_volume_from_snapshot_with_encryption(self):
     def test_create_volume_from_encrypted_volume(self):
     def test_create_volume_from_snapshot_fail_bad_size(self):
     def test_create_volume_from_snapshot_fail_wrong_az(self):
         def fake_list_availability_zones():
     def test_create_volume_with_invalid_exclusive_options(self):
     def test_initialize_connection_fetchqos(self, _mock_volume_update, _mock_volume_get, _mock_volume_admin_metadata_get):
     def test_run_attach_detach_volume_for_instance(self):
     def test_run_attach_detach_volume_for_host(self):
     def test_run_attach_detach_volume_with_attach_mode(self):
     def test_run_manager_attach_detach_volume_with_wrong_attach_mode(self):
     def test_run_api_attach_detach_volume_with_wrong_attach_mode(self):
     def test_reserve_volume_success(self, volume_get, volume_update):
     def test_reserve_volume_bad_status(self):
     def test_unreserve_volume_success(self):
     def test_concurrent_volumes_get_different_targets(self):
         def _check(volume_id):
     def test_multi_node(self):
     def _create_snapshot(volume_id, size='0', metadata=None):
     def test_create_delete_snapshot(self):
     def test_create_delete_snapshot_with_metadata(self):
     def test_cannot_delete_volume_in_use(self):
     def test_force_delete_volume(self):
     def test_cannot_force_delete_attached_volume(self):
     def test_cannot_delete_volume_with_snapshots(self):
     def test_can_delete_errored_snapshot(self):
     def test_create_snapshot_force(self):
     def test_delete_busy_snapshot(self):
     def test_delete_no_dev_fails(self):
     def _create_volume_from_image(self, fakeout_copy_image_to_volume=False, fakeout_clone_image=False):
         def fake_local_path(volume):
         def fake_copy_image_to_volume(context, volume, image_service, image_id):
         def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize, size=None):
         def fake_clone_image(volume_ref, image_location, image_id, image_meta):
     def test_create_volume_from_image_cloned_status_available(self):
     def test_create_volume_from_image_not_cloned_status_available(self):
     def test_create_volume_from_image_exception(self):
     def test_create_volume_from_exact_sized_image(self):
     def test_create_volume_from_oversized_image(self):
     def test_create_volume_with_mindisk_error(self):
     def _do_test_create_volume_with_size(self, size):
         def fake_reserve(context, expire=None, project_id=None, **deltas):
         def fake_commit(context, reservations, project_id=None):
         def fake_rollback(context, reservations, project_id=None):
     def test_create_volume_int_size(self):
     def test_create_volume_string_size(self):
     def test_create_volume_with_bad_size(self):
         def fake_reserve(context, expire=None, project_id=None, **deltas):
         def fake_commit(context, reservations, project_id=None):
         def fake_rollback(context, reservations, project_id=None):
     def test_begin_roll_detaching_volume(self):
     def test_volume_api_update(self):
     def test_volume_api_update_snapshot(self):
     def test_extend_volume(self, reserve):
     def test_extend_volume_driver_not_initialized(self):
     def test_extend_volume_manager(self):
         def fake_extend(volume, new_size):
     def test_create_volume_from_unelevated_context(self):
         def fake_create_volume(*args, **kwargs):
     def test_create_volume_from_sourcevol(self):
         def fake_create_cloned_volume(volume, src_vref):
     def test_create_volume_from_sourcevol_fail_wrong_az(self):
         def fake_list_availability_zones():
     def test_create_volume_from_sourcevol_with_glance_metadata(self):
         def fake_create_cloned_volume(volume, src_vref):
     def test_create_volume_from_sourcevol_failed_clone(self):
         def fake_error_create_cloned_volume(volume, src_vref):
     def test_list_availability_zones_enabled_service(self):
         def stub_service_get_all_by_topic(*args, **kwargs):
     def test_migrate_volume_driver(self):
     def test_migrate_volume_generic(self):
         def fake_migr(vol, host):
         def fake_delete_volume_rpc(self, ctxt, vol_id):
         def fake_create_volume(self, ctxt, volume, host, req_spec, filters, allow_reschedule=True):
     def _retype_volume_exec(self, driver, snap=False, policy='on-demand', migrate_exc=False, exc=None, diff_equal=False):
     def test_retype_volume_driver_success(self):
     def test_retype_volume_migration_bad_policy(self):
     def test_retype_volume_migration_with_snaps(self):
     def test_retype_volume_migration_failed(self):
     def test_retype_volume_migration_success(self):
     def test_retype_volume_migration_equal_types(self):
     def test_migrate_driver_not_initialized(self):
     def test_update_volume_readonly_flag(self):
 class CopyVolumeToImageTestCase(BaseVolumeTestCase):
     def fake_local_path(self, volume):
     def setUp(self):
     def tearDown(self):
     def test_copy_volume_to_image_status_available(self):
     def test_copy_volume_to_image_status_use(self):
     def test_copy_volume_to_image_exception(self):
 class GetActiveByWindowTestCase(BaseVolumeTestCase):
     def setUp(self):
     def test_volume_get_active_by_window(self):
     def test_snapshot_get_active_by_window(self):
 class DriverTestCase(test.TestCase):
     def setUp(self):
         def _fake_execute(_command, *_args, **_kwargs):
     def tearDown(self):
     def fake_get_target(obj, iqn):
     def _attach_volume(self):
     def _detach_volume(self, volume_id_list):
 class GenericVolumeDriverTestCase(DriverTestCase):
     def test_backup_volume(self):
     def test_restore_backup(self):
 class LVMISCSIVolumeDriverTestCase(DriverTestCase):
     def test_delete_busy_volume(self):
     def test_lvm_migrate_volume_no_loc_info(self):
     def test_lvm_migrate_volume_bad_loc_info(self):
     def test_lvm_migrate_volume_diff_driver(self):
     def test_lvm_migrate_volume_diff_host(self):
     def test_lvm_migrate_volume_in_use(self):
     def test_lvm_volume_group_missing(self):
         def get_all_volume_groups():
     def test_lvm_migrate_volume_proceed(self):
         def fake_execute(*args, **kwargs):
         def get_all_volume_groups():
         def _fake_get_all_physical_volumes(obj, root_helper, vg_name):
     def _get_manage_existing_lvs(name):
     def _setup_stubs_for_manage_existing(self):
     def test_lvm_manage_existing(self):
         def _rename_volume(old_name, new_name):
     def test_lvm_manage_existing_bad_size(self):
     def test_lvm_manage_existing_bad_ref(self):
 class LVMVolumeDriverTestCase(DriverTestCase):
     def test_delete_volume_invalid_parameter(self):
     def test_delete_volume_bad_path(self):
     def test_delete_volume_thinlvm_snap(self):
 class ISCSITestCase(DriverTestCase):
     def setUp(self):
     def _attach_volume(self):
     def test_do_iscsi_discovery(self):
     def test_get_iscsi_properties(self):
     def test_get_volume_stats(self):
         def _fake_get_all_physical_volumes(obj, root_helper, vg_name):
         def _fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
     def test_validate_connector(self):
 class ISERTestCase(ISCSITestCase):
     def setUp(self):
     def test_get_volume_stats(self):
         def _fake_get_all_physical_volumes(obj, root_helper, vg_name):
         def _fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):
     def test_get_volume_stats2(self):
 class FibreChannelTestCase(DriverTestCase):
     def test_initialize_connection(self):
 class VolumePolicyTestCase(test.TestCase):
     def setUp(self):
     def tearDown(self):
     def _set_rules(self, rules):
     def test_check_policy(self):
     def test_check_policy_with_target(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_volume_configuration.py
 class VolumeConfigurationTest(test.TestCase):
     def setUp(self):
     def tearDown(self):
     def test_group_grafts_opts(self):
     def test_opts_no_group(self):
     def test_grafting_multiple_opts(self):
     def test_safe_get(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_volume_glance_metadata.py
 class VolumeGlanceMetadataTestCase(test.TestCase):
     def setUp(self):
     def test_vol_glance_metadata_bad_vol_id(self):
     def test_vol_update_glance_metadata(self):
     def test_vols_get_glance_metadata(self):
     def _assert_metadata_equals(self, volume_id, key, value, observed):
     def test_vol_delete_glance_metadata(self):
     def test_vol_glance_metadata_copy_to_snapshot(self):
     def test_vol_glance_metadata_copy_from_volume_to_volume(self):
     def test_volume_glance_metadata_copy_to_volume(self):
     def test_volume_snapshot_glance_metadata_get_nonexistent(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_volume_rpcapi.py
 class VolumeRpcAPITestCase(test.TestCase):
     def setUp(self):
     def test_serialized_volume_has_id(self):
     def _test_volume_api(self, method, rpc_method, **kwargs):
         def _fake_prepare_method(*args, **kwds):
         def _fake_rpc_method(*args, **kwargs):
     def test_create_volume(self):
     def test_create_volume_serialization(self):
     def test_delete_volume(self):
     def test_create_snapshot(self):
     def test_delete_snapshot(self):
     def test_attach_volume_to_instance(self):
     def test_attach_volume_to_host(self):
     def test_detach_volume(self):
     def test_copy_volume_to_image(self):
     def test_initialize_connection(self):
     def test_terminate_connection(self):
     def test_accept_transfer(self):
     def test_extend_volume(self):
     def test_migrate_volume(self):
     def test_migrate_volume_completion(self):
     def test_retype(self):
     def test_manage_existing(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_volume_transfer.py
 class VolumeTransferTestCase(test.TestCase):
     def setUp(self):
     def test_transfer_volume_create_delete(self):
     def test_transfer_invalid_volume(self):
     def test_transfer_accept(self):
     def test_transfer_get(self):
     def test_delete_transfer_with_deleted_volume(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_volume_types.py
 class VolumeTypeTestCase(test.TestCase):
     def setUp(self):
     def test_volume_type_create_then_destroy(self):
     def test_get_all_volume_types(self):
     def test_get_default_volume_type(self):
     def test_default_volume_type_missing_in_db(self):
     def test_non_existent_vol_type_shouldnt_delete(self):
     def test_volume_type_with_volumes_shouldnt_delete(self):
     def test_repeated_vol_types_shouldnt_raise(self):
     def test_invalid_volume_types_params(self):
     def test_volume_type_get_by_id_and_name(self):
     def test_volume_type_search_by_extra_spec(self):
     def test_volume_type_search_by_extra_spec_multiple(self):
     def test_is_encrypted(self):
     def test_get_volume_type_qos_specs(self):
     def test_volume_types_diff(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_volume_types_extra_specs.py
 class VolumeTypeExtraSpecsTestCase(test.TestCase):
     def setUp(self):
     def tearDown(self):
     def test_volume_type_specs_get(self):
     def test_volume_type_extra_specs_delete(self):
     def test_volume_type_extra_specs_update(self):
     def test_volume_type_extra_specs_create(self):
     def test_volume_type_get_with_extra_specs(self):
     def test_volume_type_get_by_name_with_extra_specs(self):
     def test_volume_type_get_all(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_volume_utils.py
 class UsageInfoTestCase(test.TestCase):
     def setUp(self):
     def tearDown(self):
     def _create_volume(self, params={}):
 class LVMVolumeDriverTestCase(test.TestCase):
     def test_convert_blocksize_option(self):
 class ClearVolumeTestCase(test.TestCase):
     def test_clear_volume(self):
     def test_clear_volume_zero(self):
     def test_clear_volume_ionice(self):
     def test_clear_volume_zero_ionice(self):
     def test_clear_volume_shred(self):
     def test_clear_volume_shred_not_clear_size(self):
     def test_clear_volume_invalid_opt(self):
     def test_clear_volume_lvm_snap(self):
         def fake_copy_volume(srcstr, deststr, size, blocksize, **kwargs):
 \OpenStack\cinder-2014.1\cinder\tests\test_windows.py
 class TestWindowsDriver(test.TestCase):
     def __init__(self, method):
     def setUp(self):
     def tearDown(self):
     def _setup_stubs(self):
         def fake_wutils__init__(self):
     def fake_local_path(self, volume):
     def test_check_for_setup_errors(self):
     def test_create_volume(self):
     def test_delete_volume(self):
     def test_create_snapshot(self):
     def test_create_volume_from_snapshot(self):
     def test_delete_snapshot(self):
     def test_create_export(self):
     def test_initialize_connection(self):
     def test_terminate_connection(self):
     def test_ensure_export(self):
     def test_remove_export(self):
     def test_copy_image_to_volume(self):
     def test_copy_volume_to_image(self):
     def test_create_cloned_volume(self):
     def test_extend_volume(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_wsgi.py
 class TestLoaderNothingExists(test.TestCase):
     def setUp(self):
     def test_config_not_found(self):
 class TestLoaderNormalFilesystem(test.TestCase):
     def setUp(self):
     def test_config_found(self):
     def test_app_not_found(self):
     def test_app_found(self):
 class TestWSGIServer(test.TestCase):
     def _ipv6_configured():
     def test_no_app(self):
     def test_start_random_port(self):
     def test_start_random_port_with_ipv6(self):
     def test_app(self):
         def hello_world(env, start_response):
     def test_app_using_ssl(self):
         def hello_world(req):
     def test_app_using_ipv6_and_ssl(self):
         def hello_world(req):
 class ExceptionTest(test.TestCase):
     def _wsgi_app(self, inner_app):
     def _do_test_exception_safety_reflected_in_faults(self, expose):
         def fail(req):
     def test_safe_exceptions_are_described_in_faults(self):
     def test_unsafe_exceptions_are_not_described_in_faults(self):
     def _do_test_exception_mapping(self, exception_type, msg):
         def fail(req):
     def test_quota_error_mapping(self):
     def test_non_cinder_notfound_exception_mapping(self):
     def test_non_cinder_exception_mapping(self):
     def test_exception_with_none_code_throws_500(self):
         def fail(req):
     def test_cinder_exception_with_localized_explanation(self, mock_t9n):
         def fail(req):
         def mock_get_non_localized_message(msgid, locale):
         def mock_translate(msgid, locale):
 \OpenStack\cinder-2014.1\cinder\tests\test_xenapi_sm.py
 class MockContext(object):
     def __init__(ctxt, auth_token):
 def simple_context(value):
 def get_configured_driver(server='ignore_server', path='ignore_path'):
 class DriverTestCase(test.TestCase):
     def assert_flag(self, flagname):
     def test_config_options(self):
     def test_do_setup(self):
     def test_create_volume(self):
     def test_delete_volume(self):
     def test_create_export_does_not_raise_exception(self):
     def test_remove_export_does_not_raise_exception(self):
     def test_initialize_connection(self):
     def test_initialize_connection_null_values(self):
     def _setup_mock_driver(self, server, serverpath, sr_base_path="_srbp"):
     def test_create_snapshot(self):
     def test_create_volume_from_snapshot(self):
     def test_delete_snapshot(self):
     def test_copy_volume_to_image_xenserver_case(self):
     def test_copy_volume_to_image_non_xenserver_case(self):
     def test_use_image_utils_to_upload_volume(self):
     def test_use_glance_plugin_to_upload_volume(self):
     def test_copy_image_to_volume_xenserver_case(self):
     def test_copy_image_to_volume_non_xenserver_case(self):
     def test_use_image_utils_to_pipe_bytes_to_volume(self):
     def test_use_glance_plugin_to_copy_image_to_volume_success(self):
     def test_use_glance_plugin_to_copy_image_to_volume_fail(self):
     def test_get_volume_stats_reports_required_keys(self):
     def test_get_volume_stats_reports_unknown_cap(self):
     def test_reported_driver_type(self):
 class ToolsTest(test.TestCase):
     def test_get_this_vm_uuid(self, mock_read_first_line):
     def test_stripped_first_line_of(self):
 \OpenStack\cinder-2014.1\cinder\tests\test_zadara.py
 class FakeRequest(object):
     def __init__(self, method, url, body):
     def read(self):
     def _compare_url(self, url, template_url):
     def _get_parameters(self, data):
     def _get_counter(self):
     def _login(self):
     def _incorrect_access_key(self, params):
     def _create_volume(self):
     def _create_server(self):
     def _attach(self):
     def _detach(self):
     def _expand(self):
     def _create_snapshot(self):
     def _delete_snapshot(self):
     def _create_clone(self):
     def _delete(self):
     def _generate_list_resp(self, header, footer, body, lst, vol):
     def _list_volumes(self):
     def _list_controllers(self):
     def _list_pools(self):
     def _list_servers(self):
     def _get_server_obj(self, name):
     def _list_vol_attachments(self):
     def _list_vol_snapshots(self):
 class FakeHTTPConnection(object):
     def __init__(self, host, port, use_ssl=False):
     def request(self, method, url, body):
     def getresponse(self):
     def close(self):
 class FakeHTTPSConnection(FakeHTTPConnection):
     def __init__(self, host, port):
 class ZadaraVPSADriverTestCase(test.TestCase):
     def setUp(self):
     def tearDown(self):
     def test_create_destroy(self):
     def test_create_destroy_multiple(self):
     def test_destroy_non_existent(self):
     def test_empty_apis(self):
     def test_volume_attach_detach(self):
     def test_volume_attach_multiple_detach(self):
     def test_wrong_attach_params(self):
     def test_wrong_detach_params(self):
     def test_wrong_login_reply(self):
     def test_ssl_use(self):
     def test_bad_http_response(self):
     def test_delete_without_detach(self):
     def test_no_active_ctrl(self):
     def test_create_destroy_snapshot(self):
     def test_expand_volume(self):
     def test_create_destroy_clones(self):
     def test_get_volume_stats(self):
 \OpenStack\cinder-2014.1\cinder\tests\utils.pydef get_test_admin_context():
 def create_volume(ctxt, host='test_host', display_name='test_volume', display_description='this is a test volume', status='available', migration_status=None, size=1, availability_zone='fake_az', volume_type_id=None, **kwargs):
 def create_snapshot(ctxt, volume_id, display_name='test_snapshot', display_description='this is a test snapshot', status='creating'):
 \OpenStack\cinder-2014.1\cinder\tests\volume\drivers\netapp\test_iscsi.py
 class NetAppDirectISCSIDriverTestCase(test.TestCase):
     def setUp(self):
     def tearDown(self):
     def test_create_lun(self):
     def test_create_lun_with_qos_policy_group(self):
 class NetAppiSCSICModeTestCase(test.TestCase):
     def setUp(self):
     def tearDown(self):
     def test_clone_lun_multiple_zapi_calls(self):
     def test_clone_lun_zero_block_count(self):
 class NetAppiSCSI7ModeTestCase(test.TestCase):
     def setUp(self):
     def tearDown(self):
     def test_clone_lun_multiple_zapi_calls(self):
     def test_clone_lun_zero_block_count(self):
 \OpenStack\cinder-2014.1\cinder\tests\volume\drivers\netapp\__init__.py\OpenStack\cinder-2014.1\cinder\tests\volume\drivers\__init__.py\OpenStack\cinder-2014.1\cinder\tests\volume\__init__.py\OpenStack\cinder-2014.1\cinder\tests\windows\db_fakes.pydef get_fake_volume_info():
 def get_fake_volume_info_cloned():
 def get_fake_image_meta():
 def get_fake_snapshot_info():
 def get_fake_connector_info():
 \OpenStack\cinder-2014.1\cinder\tests\windows\__init__.py\OpenStack\cinder-2014.1\cinder\tests\xenapi\__init__.py\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_san_lookup_service.py
 class TestBrcdFCSanLookupService(brcd_lookup.BrcdFCSanLookupService,
                                 test.TestCase):
     def setUp(self):
     def __init__(self, *args, **kwargs):
     def create_configuration(self):
     def test_get_device_mapping_from_network(self, get_nameserver_info_mock):
     def test_get_nameserver_info(self, get_switch_data_mock):
     def test__get_switch_data(self):
     def test__parse_ns_output(self):
     def test_get_formatted_wwn(self):
 class Channel(object):
     def recv_exit_status(self):
 class Stream(object):
     def __init__(self, buffer=''):
     def readlines(self):
     def close(self):
     def flush(self):
 \OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_zone_client_cli.py
 class TestBrcdFCZoneClientCLI(BrcdFCZoneClientCLI, test.TestCase):
     def setUp(self):
     def __init__(self, *args, **kwargs):
     def test_get_active_zone_set(self, get_switch_info_mock):
     def test_get_active_zone_set_ssh_error(self, run_ssh_mock):
     def test_add_zones_new_zone_no_activate(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock):
     def test_add_zones_new_zone_activate(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock, activate_zoneset_mock):
     def test_activate_zoneset(self, ssh_execute_mock):
     def test_deactivate_zoneset(self, ssh_execute_mock):
     def test_delete_zones_activate_false(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock):
     def test_delete_zones_activate_true(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock, activate_zs_mock):
     def test_get_nameserver_info(self, get_switch_info_mock):
     def test_get_nameserver_info_ssh_error(self, run_ssh_mock):
     def test__cfg_save(self, ssh_execute_mock):
     def test__zone_delete(self, apply_zone_change_mock):
     def test__cfg_trans_abort(self, apply_zone_change_mock):
     def test__is_trans_abortable_true(self, run_ssh_mock):
     def test__is_trans_abortable_ssh_error(self, run_ssh_mock):
     def test__is_trans_abortable_false(self, run_ssh_mock):
     def test_apply_zone_change(self, run_ssh_mock):
     def test__get_switch_info(self, run_ssh_mock):
     def test__parse_ns_output(self):
     def test_is_supported_firmware(self, exec_shell_cmd_mock):
     def test_is_supported_firmware_invalid(self, exec_shell_cmd_mock):
     def test_is_supported_firmware_no_ssh_response(self, exec_shell_cmd_mock):
     def test_is_supported_firmware_ssh_error(self, exec_shell_cmd_mock):
 class Channel(object):
     def recv_exit_status(self):
 class Stream(object):
     def __init__(self, buffer=''):
     def readlines(self):
     def splitlines(self):
     def close(self):
     def flush(self):
 \OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_zone_driver.py
 class BrcdFcZoneDriverBaseTest(object):
     def setup_config(self, is_normal, mode):
 class TestBrcdFcZoneDriver(BrcdFcZoneDriverBaseTest, test.TestCase):
     def setUp(self):
     def setup_driver(self, config):
     def fake_get_active_zone_set(self, fabric_ip, fabric_user, fabric_pwd):
     def fake_get_san_context(self, target_wwn_list):
     def test_add_connection(self):
     def test_delete_connection(self):
     def test_add_connection_for_initiator_mode(self):
     def test_delete_connection_for_initiator_mode(self):
     def test_add_connection_for_invalid_fabric(self):
     def test_delete_connection_for_invalid_fabric(self):
 class FakeBrcdFCZoneClientCLI(object):
     def __init__(self, ipaddress, username, password, port):
     def get_active_zone_set(self):
     def add_zones(self, zones, isActivate):
     def delete_zones(self, zone_names, isActivate):
     def is_supported_firmware(self):
     def get_nameserver_info(self):
     def close_connection(self):
     def cleanup(self):
 class FakeBrcdFCSanLookupService(object):
     def get_device_mapping_from_network(self, initiator_wwn_list, target_wwn_list):
 class GlobalVars(object):
 \OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_lookup_service.py
 class TestFCSanLookupService(FCSanLookupService, test.TestCase):
     def setUp(self):
     def __init__(self, *args, **kwargs):
     def setup_config(self):
     def test_get_device_mapping_from_network(self):
     def test_get_device_mapping_from_network_for_invalid_config(self):
 class FakeBrcdFCSanLookupService(object):
     def __init__(self, **kwargs):
     def get_device_mapping_from_network(self, initiator_wwn_list, target_wwn_list):
 class GlobalParams(object):
 \OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_fc_zone_manager.py
 class TestFCZoneManager(ZoneManager, test.TestCase):
     def setUp(self):
     def __init__(self, *args, **kwargs):
     def test_add_connection(self):
     def test_add_connection_error(self):
     def test_delete_connection(self):
     def test_delete_connection_error(self):
 \OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_volume_manager_fc.py
 class TestVolumeManager(manager.VolumeManager, test.TestCase):
     def setUp(self):
     def tearDown(self):
     def __init__(self, *args, **kwargs):
     def test_initialize_connection_voltype_fc_mode_fabric(self, utils_mock):
     def test_initialize_connection_voltype_fc_mode_none(self, utils_mock):
     def test_terminate_connection_exception(self):
     def test_terminate_connection_voltype_fc_mode_fabric(self, utils_mock):
     def test_terminate_connection_mode_none(self, utils_mock):
     def test_terminate_connection_conn_info_none(self, utils_mock):
     def test__add_or_delete_connection_add(self, add_connection_mock):
     def test__add_or_delete_connection_delete(self, delete_connection_mock):
     def test__add_or_delete_connection_no_init_target_map(self, del_conn_mock):
 \OpenStack\cinder-2014.1\cinder\tests\zonemanager\__init__.py\OpenStack\cinder-2014.1\cinder\tests\__init__.py\OpenStack\cinder-2014.1\cinder\transfer\api.py
 class API(base.Base):
     def __init__(self, db_driver=None):
     def get(self, context, transfer_id):
     def delete(self, context, transfer_id):
     def get_all(self, context, filters={}):
     def _get_random_string(self, length):
     def _get_crypt_hash(self, salt, auth_key):
     def create(self, context, volume_id, display_name):
     def accept(self, context, transfer_id, auth_key):
 \OpenStack\cinder-2014.1\cinder\transfer\__init__.py\OpenStack\cinder-2014.1\cinder\units.py\OpenStack\cinder-2014.1\cinder\utils.py
 def find_config(config_path):
 def as_int(obj, quiet=True):
 def check_exclusive_options(**kwargs):
 def execute(*cmd, **kwargs):
 def check_ssh_injection(cmd_list):
 def create_channel(client, width, height):
 class SSHPool(pools.Pool):
     def __init__(self, ip, port, conn_timeout, login, password=None, privatekey=None, *args, **kwargs):
     def create(self):
     def get(self):
     def remove(self, ssh):
 def cinderdir():
 def last_completed_audit_period(unit=None):
 def generate_password(length=20, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):
 def generate_username(length=20, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):
 class LazyPluggable(object):
     def __init__(self, pivot, **backends):
     def __get_backend(self):
     def __getattr__(self, key):
 class ProtectedExpatParser(expatreader.ExpatParser):
     def __init__(self, forbid_dtd=True, forbid_entities=True, *args, **kwargs):
     def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
     def entity_decl(self, entityName, is_parameter_entity, value, base, systemId, publicId, notationName):
     def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
     def reset(self):
 def safe_minidom_parse_string(xml_string):
 def xhtml_escape(value):
 def get_from_path(items, path):
 def is_valid_boolstr(val):
 def monkey_patch():
 def generate_glance_url():
 def make_dev_path(dev, partition=None, base='/dev'):
 def total_seconds(td):
 def sanitize_hostname(hostname):
 def read_cached_file(filename, cache_info, reload_func=None):
 def hash_file(file_like_object):
 def service_is_up(service):
 def read_file_as_root(file_path):
 def temporary_chown(path, owner_uid=None):
 def tempdir(**kwargs):
 def walk_class_hierarchy(clazz, encountered=None):
 def get_root_helper():
 def brick_get_connector_properties():
 def brick_get_connector(protocol, driver=None, execute=processutils.execute, use_multipath=False, device_scan_attempts=3, *args, **kwargs):
 def require_driver_initialized(driver):
 def get_file_mode(path):
 def get_file_gid(path):
 def check_string_length(value, name, min_length=0, max_length=None):
 def add_visible_admin_metadata(context, volume, volume_api):
 \OpenStack\cinder-2014.1\cinder\version.py\OpenStack\cinder-2014.1\cinder\volume\api.py
 def wrap_check_policy(func):
  def wrapped(self, context, target_obj, *args, **kwargs):
 def check_policy(context, action, target_obj=None):
 class API(base.Base):
     def __init__(self, db_driver=None, image_service=None):
     def _valid_availability_zone(self, availability_zone):
     def list_availability_zones(self):
     def create(self, context, size, name, description, snapshot=None, image_id=None, volume_type=None, metadata=None, availability_zone=None, source_volume=None, scheduler_hints=None, backup_source_volume=None):
         def check_volume_az_zone(availability_zone):
     def delete(self, context, volume, force=False, unmanage_only=False):
     def update(self, context, volume, fields):
     def get(self, context, volume_id):
     def get_all(self, context, marker=None, limit=None, sort_key='created_at', sort_dir='desc', filters=None):
     def get_snapshot(self, context, snapshot_id):
     def get_volume(self, context, volume_id):
     def get_all_snapshots(self, context, search_opts=None):
     def check_attach(self, volume):
     def check_detach(self, volume):
     def reserve_volume(self, context, volume):
     def unreserve_volume(self, context, volume):
     def begin_detaching(self, context, volume):
     def roll_detaching(self, context, volume):
     def attach(self, context, volume, instance_uuid, host_name, mountpoint, mode):
     def detach(self, context, volume):
     def initialize_connection(self, context, volume, connector):
     def terminate_connection(self, context, volume, connector, force=False):
     def accept_transfer(self, context, volume, new_user, new_project):
     def _create_snapshot(self, context, volume, name, description, force=False, metadata=None):
     def create_snapshot(self, context, volume, name, description, metadata=None):
     def create_snapshot_force(self, context, volume, name, description, metadata=None):
     def delete_snapshot(self, context, snapshot, force=False):
     def update_snapshot(self, context, snapshot, fields):
     def get_volume_metadata(self, context, volume):
     def delete_volume_metadata(self, context, volume, key):
     def _check_metadata_properties(self, metadata=None):
     def update_volume_metadata(self, context, volume, metadata, delete=False):
     def get_volume_metadata_value(self, volume, key):
     def get_volume_admin_metadata(self, context, volume):
     def delete_volume_admin_metadata(self, context, volume, key):
     def update_volume_admin_metadata(self, context, volume, metadata, delete=False):
     def get_snapshot_metadata(self, context, snapshot):
     def delete_snapshot_metadata(self, context, snapshot, key):
     def update_snapshot_metadata(self, context, snapshot, metadata, delete=False):
     def get_snapshot_metadata_value(self, snapshot, key):
     def get_volumes_image_metadata(self, context):
     def get_volume_image_metadata(self, context, volume):
     def _check_volume_availability(self, volume, force):
     def copy_volume_to_image(self, context, volume, metadata, force):
     def extend(self, context, volume, new_size):
     def migrate_volume(self, context, volume, host, force_host_copy):
     def migrate_volume_completion(self, context, volume, new_volume, error):
     def update_readonly_flag(self, context, volume, flag):
     def retype(self, context, volume, new_type, migration_policy=None):
     def manage_existing(self, context, host, ref, name=None, description=None, volume_type=None, metadata=None, availability_zone=None):
 class HostAPI(base.Base):
     def __init__(self):
     def set_host_enabled(self, context, host, enabled):
     def get_host_uptime(self, context, host):
     def host_power_action(self, context, host, action):
     def set_host_maintenance(self, context, host, mode):
 \OpenStack\cinder-2014.1\cinder\volume\configuration.py
 class Configuration(object):
     def __init__(self, volume_opts, config_group=None):
     def _ensure_config_values(self, volume_opts):
     def append_config_values(self, volume_opts):
     def safe_get(self, value):
     def __getattr__(self, value):
 \OpenStack\cinder-2014.1\cinder\volume\driver.py
 class VolumeDriver(object):
     def __init__(self, execute=utils.execute, *args, **kwargs):
     def set_execute(self, execute):
     def set_initialized(self):
     def initialized(self):
     def get_version(self):
     def _is_non_recoverable(self, err, non_recoverable_list):
     def _try_execute(self, *command, **kwargs):
     def check_for_setup_error(self):
     def create_volume(self, volume):
     def create_volume_from_snapshot(self, volume, snapshot):
     def create_cloned_volume(self, volume, src_vref):
     def delete_volume(self, volume):
     def create_snapshot(self, snapshot):
     def delete_snapshot(self, snapshot):
     def local_path(self, volume):
     def ensure_export(self, context, volume):
     def create_export(self, context, volume):
     def remove_export(self, context, volume):
     def initialize_connection(self, volume, connector):
     def terminate_connection(self, volume, connector, **kwargs):
     def attach_volume(self, context, volume, instance_uuid, host_name, mountpoint):
     def detach_volume(self, context, volume):
     def get_volume_stats(self, refresh=False):
     def do_setup(self, context):
     def validate_connector(self, connector):
     def copy_volume_data(self, context, src_vol, dest_vol, remote=None):
     def copy_image_to_volume(self, context, volume, image_service, image_id):
     def copy_volume_to_image(self, context, volume, image_service, image_meta):
     def _attach_volume(self, context, volume, properties, remote=False):
     def _detach_volume(self, context, attach_info, volume, properties, force=False, remote=False):
     def clone_image(self, volume, image_location, image_id, image_meta):
     def backup_volume(self, context, backup, backup_service):
     def restore_backup(self, context, backup, volume, backup_service):
     def clear_download(self, context, volume):
     def extend_volume(self, volume, new_size):
     def migrate_volume(self, context, volume, host):
     def retype(self, context, volume, new_type, diff, host):
     def accept_transfer(self, context, volume, new_user, new_project):
     def manage_existing(self, volume, existing_ref):
     def manage_existing_get_size(self, volume, existing_ref):
     def unmanage(self, volume):
 class ISCSIDriver(VolumeDriver):
     def __init__(self, *args, **kwargs):
     def _do_iscsi_discovery(self, volume):
     def _get_iscsi_properties(self, volume):
     def _run_iscsiadm(self, iscsi_properties, iscsi_command, **kwargs):
     def _run_iscsiadm_bare(self, iscsi_command, **kwargs):
     def _iscsiadm_update(self, iscsi_properties, property_key, property_value, **kwargs):
     def initialize_connection(self, volume, connector):
     def validate_connector(self, connector):
     def terminate_connection(self, volume, connector, **kwargs):
     def get_volume_stats(self, refresh=False):
     def _update_volume_stats(self):
     def get_target_helper(self, db):
 class FakeISCSIDriver(ISCSIDriver):
     def __init__(self, *args, **kwargs):
     def create_volume(self, volume):
     def check_for_setup_error(self):
     def initialize_connection(self, volume, connector):
     def terminate_connection(self, volume, connector, **kwargs):
     def fake_execute(cmd, *_args, **_kwargs):
 class ISERDriver(ISCSIDriver):
     def __init__(self, *args, **kwargs):
     def initialize_connection(self, volume, connector):
     def _update_volume_stats(self):
     def get_target_helper(self, db):
 class FakeISERDriver(FakeISCSIDriver):
     def __init__(self, *args, **kwargs):
     def initialize_connection(self, volume, connector):
     def fake_execute(cmd, *_args, **_kwargs):
 class FibreChannelDriver(VolumeDriver):
     def __init__(self, *args, **kwargs):
     def initialize_connection(self, volume, connector):
 \OpenStack\cinder-2014.1\cinder\volume\drivers\block_device.py
 class BlockDeviceDriver(driver.ISCSIDriver):
     def __init__(self, *args, **kwargs):
     def set_execute(self, execute):
     def check_for_setup_error(self):
     def create_volume(self, volume):
     def initialize_connection(self, volume, connector):
     def terminate_connection(self, volume, connector, **kwargs):
     def create_export(self, context, volume):
     def remove_export(self, context, volume):
     def ensure_export(self, context, volume):
     def delete_volume(self, volume):
     def local_path(self, volume):
     def copy_image_to_volume(self, context, volume, image_service, image_id):
     def copy_volume_to_image(self, context, volume, image_service, image_meta):
     def create_cloned_volume(self, volume, src_vref):
     def get_volume_stats(self, refresh=False):
     def _update_volume_stats(self):
     def _get_used_devices(self):
     def _get_device_size(self, dev_path):
     def _devices_sizes(self):
     def find_appropriate_size_device(self, size):
 \OpenStack\cinder-2014.1\cinder\volume\drivers\coraid.py
 class CoraidRESTClient(object):
     def __init__(self, esm_url):
     def _check_esm_url(self, esm_url):
     def rpc(self, handle, url_params, data, allow_empty_response=False):
     def _rpc(self, handle, url_params, data, allow_empty_response):
 def to_coraid_kb(gb):
 def coraid_volume_size(gb):
 class CoraidAppliance(object):
     def __init__(self, rest_client, username, password, group):
     def _login(self):
     def _set_effective_group(self, groups_map, group):
     def _ensure_session(self):
     def _relogin(self):
     def rpc(self, handle, url_params, data, allow_empty_response=False):
     def _is_session_expired(self, reply):
     def _is_bad_config_state(self, reply):
     def configure(self, json_request):
     def esm_command(self, request):
     def get_volume_info(self, volume_name):
     def get_volume_repository(self, volume_name):
     def get_all_repos(self):
     def ping(self):
     def create_lun(self, repository_name, volume_name, volume_size_in_gb):
     def delete_lun(self, volume_name):
     def resize_volume(self, volume_name, new_volume_size_in_gb):
     def create_snapshot(self, volume_name, snapshot_name):
     def delete_snapshot(self, snapshot_name):
     def create_volume_from_snapshot(self, snapshot_name, volume_name, dest_repository_name):
     def clone_volume(self, src_volume_name, dst_volume_name, dst_repository_name):
 class CoraidDriver(driver.VolumeDriver):
     def __init__(self, *args, **kwargs):
     def appliance(self):
     def check_for_setup_error(self):
     def _get_repository(self, volume_type):
     def create_volume(self, volume):
     def create_cloned_volume(self, volume, src_vref):
     def delete_volume(self, volume):
     def create_snapshot(self, snapshot):
     def delete_snapshot(self, snapshot):
     def create_volume_from_snapshot(self, volume, snapshot):
     def extend_volume(self, volume, new_size):
     def initialize_connection(self, volume, connector):
     def _get_repository_capabilities(self):
     def update_volume_stats(self):
     def get_volume_stats(self, refresh=False):
     def local_path(self, volume):
     def create_export(self, context, volume):
     def remove_export(self, context, volume):
     def terminate_connection(self, volume, connector, **kwargs):
     def ensure_export(self, context, volume):
 \OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_cli_iscsi.py
 class EMCCLIISCSIDriver(driver.ISCSIDriver):
     def __init__(self, *args, **kwargs):
     def check_for_setup_error(self):
     def create_volume(self, volume):
     def create_volume_from_snapshot(self, volume, snapshot):
     def create_cloned_volume(self, volume, src_vref):
     def delete_volume(self, volume):
     def create_snapshot(self, snapshot):
     def delete_snapshot(self, snapshot):
     def ensure_export(self, context, volume):
     def create_export(self, context, volume):
     def remove_export(self, context, volume):
     def check_for_export(self, context, volume_id):
     def extend_volume(self, volume, new_size):
     def initialize_connection(self, volume, connector):
         def do_initialize_connection():
     def _do_iscsi_discovery(self, volume):
     def vnx_get_iscsi_properties(self, volume, connector):
     def terminate_connection(self, volume, connector, **kwargs):
         def do_terminate_connection():
     def get_volume_stats(self, refresh=False):
     def update_volume_stats(self):
 \OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_smis_common.py
 class EMCSMISCommon():
     def __init__(self, prtcl, configuration=None):
     def create_volume(self, volume):
     def create_volume_from_snapshot(self, volume, snapshot):
     def create_cloned_volume(self, volume, src_vref):
     def delete_volume(self, volume):
     def create_snapshot(self, snapshot, volume):
     def delete_snapshot(self, snapshot, volume):
     def _expose_paths(self, configservice, vol_instance, connector):
     def _hide_paths(self, configservice, vol_instance, connector):
     def _add_members(self, configservice, vol_instance):
     def _remove_members(self, configservice, vol_instance):
     def _map_lun(self, volume, connector):
     def _unmap_lun(self, volume, connector):
     def initialize_connection(self, volume, connector):
     def terminate_connection(self, volume, connector):
     def extend_volume(self, volume, new_size):
     def update_volume_stats(self):
     def _get_storage_type(self, volume, filename=None):
     def _get_storage_type_conffile(self, filename=None):
     def _get_masking_view(self, filename=None):
     def _get_timeout(self, filename=None):
     def _get_ecom_cred(self, filename=None):
     def _get_ecom_server(self, filename=None):
     def _get_ecom_connection(self, filename=None):
     def _find_replication_service(self, storage_system):
     def _find_storage_configuration_service(self, storage_system):
     def _find_controller_configuration_service(self, storage_system):
     def _find_storage_hardwareid_service(self, storage_system):
     def _find_pool(self, storage_type, details=False):
     def _parse_pool_instance_id(self, instanceid):
     def _find_lun(self, volume):
     def _find_storage_sync_sv_sv(self, snapshot, volume, waitforsync=True):
     def _find_initiator_names(self, connector):
     def _wait_for_job_complete(self, job):
     def _find_lunmasking_scsi_protocol_controller(self, storage_system, connector):
     def _find_lunmasking_scsi_protocol_controller_for_vol(self, vol_instance, connector):
     def get_num_volumes_mapped(self, volume, connector):
     def _find_avail_device_number(self, storage_system):
     def find_device_number(self, volume, connector):
     def _find_device_masking_group(self):
     def _find_storage_processor_system(self, owningsp, storage_system):
     def _find_iscsi_protocol_endpoints(self, owningsp, storage_system):
     def _getnum(self, num, datatype):
     def _getinstancename(self, classname, bindings):
     def get_target_wwns(self, storage_system, connector):
     def _find_storage_hardwareids(self, connector):
     def _get_volumetype_extraspecs(self, volume):
     def _get_provisioning(self, storage_type):
 \OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_smis_fc.py
 class EMCSMISFCDriver(driver.FibreChannelDriver):
     def __init__(self, *args, **kwargs):
     def check_for_setup_error(self):
     def create_volume(self, volume):
     def create_volume_from_snapshot(self, volume, snapshot):
     def create_cloned_volume(self, volume, src_vref):
     def delete_volume(self, volume):
OpenStack IndexPreviousNext |