¡@

Home 

OpenStack Study: cinder

OpenStack Index

Previous

Next

\OpenStack\cinder-2014.1\cinder\tests\db\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\declare_conf.py

\OpenStack\cinder-2014.1\cinder\tests\fake_driver.py

class FakeISCSIDriver(lvm.LVMISCSIDriver):

    def __init__(self, *args, **kwargs):

    def check_for_setup_error(self):

    def initialize_connection(self, volume, connector):

    def terminate_connection(self, volume, connector, **kwargs):

    def fake_execute(cmd, *_args, **_kwargs):

class FakeISERDriver(FakeISCSIDriver):

    def __init__(self, *args, **kwargs):

    def initialize_connection(self, volume, connector):

    def fake_execute(cmd, *_args, **_kwargs):

class LoggingVolumeDriver(driver.VolumeDriver):

    def check_for_setup_error(self):

    def create_volume(self, volume):

    def delete_volume(self, volume):

    def clear_volume(self, volume):

    def local_path(self, volume):

    def ensure_export(self, context, volume):

    def create_export(self, context, volume):

    def remove_export(self, context, volume):

    def initialize_connection(self, volume, connector):

    def terminate_connection(self, volume, connector):

    def clear_logs():

    def log_action(action, parameters):

    def all_logs():

    def logs_like(action, **kwargs):

\OpenStack\cinder-2014.1\cinder\tests\fake_notifier.py

def reset():

class FakeNotifier(object):

    def __init__(self, transport, publisher_id, serializer=None):

    def prepare(self, publisher_id=None):

    def _notify(self, priority, ctxt, event_type, payload):

def stub_notifier(stubs):

\OpenStack\cinder-2014.1\cinder\tests\fake_utils.py

def fake_execute_get_log():

def fake_execute_clear_log():

def fake_execute_set_repliers(repliers):

def fake_execute_default_reply_handler(*ignore_args, **ignore_kwargs):

def fake_execute(*cmd_parts, **kwargs):

def stub_out_utils_execute(stubs):

\OpenStack\cinder-2014.1\cinder\tests\glance\stubs.py

class StubGlanceClient(object):

    def __init__(self, images=None):

    def list(self, filters=None, marker=None, limit=30):

    def get(self, image_id):

    def data(self, image_id):

    def create(self, **metadata):

    def update(self, image_id, **metadata):

    def delete(self, image_id):

class FakeImage(object):

    def __init__(self, metadata):

    def __getattr__(self, key):

    def __setattr__(self, key, value):

\OpenStack\cinder-2014.1\cinder\tests\glance\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\image\fake.py

class _FakeImageService(object):

    def __init__(self):

    def detail(self, context, **kwargs):

    def download(self, context, image_id, data):

    def show(self, context, image_id):

    def create(self, context, metadata, data=None):

    def update(self, context, image_id, metadata, data=None, purge_props=False):

    def delete(self, context, image_id):

    def get_location(self, context, image_id):

def FakeImageService():

def FakeImageService_reset():

def stub_out_image_service(stubs):

    def fake_get_remote_image_service(context, image_href):

\OpenStack\cinder-2014.1\cinder\tests\image\test_glance.py

class NullWriter(object):

    def write(self, *arg, **kwargs):

class TestGlanceSerializer(test.TestCase):

    def test_serialize(self):

class TestGlanceImageService(test.TestCase):

    def setUp(self):

    def _create_image_service(self, client):

        def _fake_create_glance_client(context, netloc, use_ssl, version):

    def _make_fixture(**kwargs):

    def _make_datetime_fixture(self):

    def test_create_with_instance_id(self):

    def test_create_without_instance_id(self):

    def test_create(self):

    def test_create_and_show_non_existing_image(self):

    def test_detail_private_image(self):

    def test_detail_marker(self):

    def test_detail_limit(self):

    def test_detail_default_limit(self):

    def test_detail_marker_and_limit(self):

    def test_detail_invalid_marker(self):

    def test_update(self):

    def test_delete(self):

    def test_show_passes_through_to_client(self):

    def test_show_raises_when_no_authtoken_in_the_context(self):

    def test_detail_passes_through_to_client(self):

    def test_show_makes_datetimes(self):

    def test_detail_makes_datetimes(self):

    def test_download_with_retries(self):

    def test_client_forbidden_converts_to_imagenotauthed(self):

    def test_client_httpforbidden_converts_to_imagenotauthed(self):

    def test_client_notfound_converts_to_imagenotfound(self):

    def test_client_httpnotfound_converts_to_imagenotfound(self):

    def test_glance_client_image_id(self):

    def test_glance_client_image_ref(self):

class TestGlanceClientVersion(test.TestCase):

    def setUp(self):

        def fake_get_model(self):

    def test_glance_version_by_flag(self):

    def test_glance_version_by_arg(self):

        def get(self, image_id):

class TestGlanceImageServiceClient(test.TestCase):

    def setUp(self):

    def test_create_glance_client(self):

    def test_create_glance_client_auth_strategy_is_not_keystone(self):

    def test_create_glance_client_glance_request_default_timeout(self):

    def tearDown(self):

\OpenStack\cinder-2014.1\cinder\tests\image\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\integrated\api\client.py

class OpenStackApiException(Exception):

    def __init__(self, message=None, response=None):

class OpenStackApiAuthenticationException(OpenStackApiException):

    def __init__(self, response=None, message=None):

class OpenStackApiAuthorizationException(OpenStackApiException):

    def __init__(self, response=None, message=None):

class OpenStackApiNotFoundException(OpenStackApiException):

    def __init__(self, response=None, message=None):

class TestOpenStackClient(object):

    def __init__(self, auth_user, auth_key, auth_uri):

    def request(self, url, method='GET', body=None, headers=None, ssl_verify=True, stream=False):

    def _authenticate(self):

    def api_request(self, relative_uri, check_response_status=None, **kwargs):

    def _decode_json(self, response):

    def api_get(self, relative_uri, **kwargs):

    def api_post(self, relative_uri, body, **kwargs):

    def api_put(self, relative_uri, body, **kwargs):

    def api_delete(self, relative_uri, **kwargs):

    def get_volume(self, volume_id):

    def get_volumes(self, detail=True):

    def post_volume(self, volume):

    def delete_volume(self, volume_id):

    def put_volume(self, volume_id, volume):

\OpenStack\cinder-2014.1\cinder\tests\integrated\api\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\integrated\integrated_helpers.py

def generate_random_alphanumeric(length):

def generate_random_numeric(length):

def generate_new_element(items, prefix, numeric=False):

class _IntegratedTestBase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _start_api_service(self):

    def _get_flags(self):

    def get_unused_server_name(self):

    def get_invalid_image(self):

    def _build_minimal_create_server_request(self):

\OpenStack\cinder-2014.1\cinder\tests\integrated\test_extensions.py

class ExtensionsTest(integrated_helpers._IntegratedTestBase):

    def _get_flags(self):

    def test_get_foxnsocks(self):

\OpenStack\cinder-2014.1\cinder\tests\integrated\test_login.py

class LoginTest(integrated_helpers._IntegratedTestBase):

    def test_login(self):

\OpenStack\cinder-2014.1\cinder\tests\integrated\test_volumes.py

class VolumesTest(integrated_helpers._IntegratedTestBase):

    def setUp(self):

    def _start_api_service(self):

    def _get_flags(self):

    def test_get_volumes_summary(self):

    def test_get_volumes(self):

    def _poll_while(self, volume_id, continue_states, max_retries=5):

    def test_create_and_delete_volume(self):

    def test_create_volume_with_metadata(self):

    def test_create_volume_in_availability_zone(self):

    def test_create_and_update_volume(self):

\OpenStack\cinder-2014.1\cinder\tests\integrated\test_xml.py

class XmlTests(integrated_helpers._IntegratedTestBase):

    def test_namespace_volumes(self):

\OpenStack\cinder-2014.1\cinder\tests\integrated\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\keymgr\fake.py

def fake_api():

\OpenStack\cinder-2014.1\cinder\tests\keymgr\mock_key_mgr.py

class MockKeyManager(key_mgr.KeyManager):

    def __init__(self):

    def _generate_hex_key(self, **kwargs):

    def _generate_key(self, **kwargs):

    def create_key(self, ctxt, **kwargs):

    def _generate_key_id(self):

    def store_key(self, ctxt, key, **kwargs):

    def copy_key(self, ctxt, key_id, **kwargs):

    def get_key(self, ctxt, key_id, **kwargs):

    def delete_key(self, ctxt, key_id, **kwargs):

\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_conf_key_mgr.py

class ConfKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):

    def __init__(self, *args, **kwargs):

    def _create_key_manager(self):

    def setUp(self):

    def test___init__(self):

    def test_create_key(self):

    def test_create_null_context(self):

    def test_store_key(self):

    def test_store_null_context(self):

    def test_store_key_invalid(self):

    def test_copy_key(self):

    def test_copy_null_context(self):

    def test_delete_key(self):

    def test_delete_null_context(self):

    def test_delete_unknown_key(self):

    def test_get_key(self):

    def test_get_null_context(self):

    def test_get_unknown_key(self):

\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_key.py

class KeyTestCase(test.TestCase):

    def _create_key(self):

    def setUp(self):

class SymmetricKeyTestCase(KeyTestCase):

    def _create_key(self):

    def setUp(self):

    def test_get_algorithm(self):

    def test_get_format(self):

    def test_get_encoded(self):

    def test___eq__(self):

    def test___ne__(self):

\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_key_mgr.py

class KeyManagerTestCase(test.TestCase):

    def __init__(self, *args, **kwargs):

    def _create_key_manager(self):

    def setUp(self):

\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_mock_key_mgr.py

class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):

    def _create_key_manager(self):

    def setUp(self):

    def test_create_key(self):

    def test_create_key_with_length(self):

    def test_create_null_context(self):

    def test_store_key(self):

    def test_store_null_context(self):

    def test_copy_key(self):

    def test_copy_null_context(self):

    def test_get_key(self):

    def test_get_null_context(self):

    def test_get_unknown_key(self):

    def test_delete_key(self):

    def test_delete_null_context(self):

    def test_delete_unknown_key(self):

\OpenStack\cinder-2014.1\cinder\tests\keymgr\test_not_implemented_key_mgr.py

class NotImplementedKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):

    def _create_key_manager(self):

    def setUp(self):

    def test_create_key(self):

    def test_store_key(self):

    def test_copy_key(self):

    def test_get_key(self):

    def test_delete_key(self):

\OpenStack\cinder-2014.1\cinder\tests\keymgr\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\monkey_patch_example\example_a.py

def example_function_a():

class ExampleClassA():

    def example_method(self):

    def example_method_add(self, arg1, arg2):

\OpenStack\cinder-2014.1\cinder\tests\monkey_patch_example\example_b.py

def example_function_b():

class ExampleClassB():

    def example_method(self):

    def example_method_add(self, arg1, arg2):

\OpenStack\cinder-2014.1\cinder\tests\monkey_patch_example\__init__.py

def example_decorator(name, function):

\OpenStack\cinder-2014.1\cinder\tests\runtime_conf.py

\OpenStack\cinder-2014.1\cinder\tests\scheduler\fakes.py

class FakeFilterScheduler(filter_scheduler.FilterScheduler):

    def __init__(self, *args, **kwargs):

class FakeHostManager(host_manager.HostManager):

    def __init__(self):

class FakeHostState(host_manager.HostState):

    def __init__(self, host, attribute_dict):

def mock_host_manager_db_calls(mock_obj):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_allocated_capacity_weigher.py

class AllocatedCapacityWeigherTestCase(test.TestCase):

    def setUp(self):

    def _get_weighed_host(self, hosts, weight_properties=None):

    def _get_all_hosts(self, _mock_service_get_all_by_topic):

    def test_default_of_spreading_first(self):

    def test_capacity_weight_multiplier1(self):

    def test_capacity_weight_multiplier2(self):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_capacity_weigher.py

class CapacityWeigherTestCase(test.TestCase):

    def setUp(self):

    def _get_weighed_host(self, hosts, weight_properties=None):

    def _get_all_hosts(self, _mock_service_get_all_by_topic):

    def test_default_of_spreading_first(self):

    def test_capacity_weight_multiplier1(self):

    def test_capacity_weight_multiplier2(self):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_chance_weigher.py

class ChanceWeigherTestCase(test.TestCase):

    def setUp(self):

    def fake_random(self, reset=False):

    def test_chance_weigher(self, _mock_random):

    def test_host_manager_choosing_chance_weigher(self):

    def test_use_of_chance_weigher_via_host_manager(self):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_filter_scheduler.py

class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):

    def test_create_volume_no_hosts(self):

    def test_create_volume_non_admin(self, _mock_get_all_host_states):

        def fake_get(ctxt):

    def test_schedule_happy_day(self, _mock_service_get_all_by_topic):

    def test_max_attempts(self):

    def test_invalid_max_attempts(self):

    def test_retry_disabled(self):

    def test_retry_attempt_one(self):

    def test_retry_attempt_two(self):

    def test_retry_exceeded_max_attempts(self):

    def test_add_retry_host(self):

    def test_post_select_populate(self):

    def _host_passes_filters_setup(self, mock_obj):

    def test_host_passes_filters_happy_day(self, _mock_service_get_topic):

    def test_host_passes_filters_no_capacity(self, _mock_service_get_topic):

    def test_retype_policy_never_migrate_pass(self, _mock_service_get_topic):

    def test_retype_policy_never_migrate_fail(self, _mock_service_get_topic):

    def test_retype_policy_demand_migrate_pass(self, _mock_service_get_topic):

    def test_retype_policy_demand_migrate_fail(self, _mock_service_get_topic):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_host_filters.py

class HostFiltersTestCase(test.TestCase):

    def setUp(self):

    def test_capacity_filter_passes(self, _mock_serv_is_up):

    def test_capacity_filter_fails(self, _mock_serv_is_up):

    def test_capacity_filter_passes_infinite(self, _mock_serv_is_up):

    def test_capacity_filter_passes_unknown(self, _mock_serv_is_up):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_host_manager.py

class FakeFilterClass1(filters.BaseHostFilter):

    def host_passes(self, host_state, filter_properties):

class FakeFilterClass2(filters.BaseHostFilter):

    def host_passes(self, host_state, filter_properties):

class HostManagerTestCase(test.TestCase):

    def setUp(self):

    def test_choose_host_filters_not_found(self):

    def test_choose_host_filters(self):

    def test_get_filtered_hosts(self, _mock_choose_host_filters):

    def test_update_service_capabilities(self, _mock_utcnow):

    def test_get_all_host_states(self, _mock_service_is_up, _mock_service_get_all_by_topic):

class HostStateTestCase(test.TestCase):

    def test_update_from_volume_capability(self):

    def test_update_from_volume_infinite_capability(self):

    def test_update_from_volume_unknown_capability(self):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_rpcapi.py

class SchedulerRpcAPITestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _test_scheduler_api(self, method, rpc_method, fanout=False, **kwargs):

        def _fake_prepare_method(*args, **kwds):

        def _fake_rpc_method(*args, **kwargs):

    def test_update_service_capabilities(self):

    def test_create_volume(self):

    def test_migrate_volume_to_host(self):

    def test_retype(self):

    def test_manage_existing(self):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_scheduler.py

class SchedulerManagerTestCase(test.TestCase):

    def setUp(self):

    def test_1_correct_init(self):

    def test_update_service_capabilities_empty_dict(self, _mock_update_cap):

    def test_update_service_capabilities_correct(self, _mock_update_cap):

    def test_create_volume_exception_puts_volume_in_error_state( self, _mock_volume_update, _mock_sched_create):

    def test_migrate_volume_exception_returns_volume_state( self, _mock_volume_update, _mock_host_passes):

    def test_chance_simple_scheduler_mocked(self):

    def test_retype_volume_exception_returns_volume_state(self, _mock_vol_get, _mock_vol_update):

class SchedulerTestCase(test.TestCase):

    def setUp(self):

    def test_update_service_capabilities(self, _mock_update_cap):

class SchedulerDriverBaseTestCase(SchedulerTestCase):

    def test_unimplemented_schedule(self):

class SchedulerDriverModuleTestCase(test.TestCase):

    def setUp(self):

    def test_volume_host_update_db(self, _mock_utcnow, _mock_vol_update):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\test_scheduler_options.py

class FakeSchedulerOptions(scheduler_options.SchedulerOptions):

    def __init__(self, last_checked, now, file_old, file_now, data, filedata):

    def _get_file_timestamp(self, filename):

    def _get_file_handle(self, filename):

    def _get_time_now(self):

class SchedulerOptionsTestCase(test.TestCase):

    def test_get_configuration_first_time_no_flag(self):

    def test_get_configuration_first_time_empty_file(self):

    def test_get_configuration_first_time_happy_day(self):

    def test_get_configuration_second_time_no_change(self):

    def test_get_configuration_second_time_too_fast(self):

    def test_get_configuration_second_time_change(self):

\OpenStack\cinder-2014.1\cinder\tests\scheduler\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\test_api.py

class FakeHttplibSocket(object):

    def __init__(self, response_string):

    def makefile(self, _mode, _other):

class FakeHttplibConnection(object):

    def __init__(self, app, host, is_secure=False):

    def request(self, method, path, data, headers):

    def getresponse(self):

    def getresponsebody(self):

    def close(self):

\OpenStack\cinder-2014.1\cinder\tests\test_api_urlmap.py

class TestParseFunctions(test.TestCase):

    def test_unquote_header_value_without_quotes(self):

    def test_unquote_header_value_with_quotes(self):

    def test_parse_list_header(self):

    def test_parse_options_header(self):

    def test_parse_options_header_without_value(self):

class TestAccept(test.TestCase):

    def test_best_match_ValueError(self):

    def test_best_match(self):

    def test_match_mask_one_asterisk(self):

    def test_match_mask_two_asterisk(self):

    def test_match_mask_no_asterisk(self):

    def test_content_type_params(self):

    def test_content_type_params_wrong_content_type(self):

class TestUrlMapFactory(test.TestCase):

    def setUp(self):

    def test_not_found_app_in_local_conf(self):

    def test_not_found_app_not_in_local_conf(self):

    def test_not_found_app_is_none(self):

class TestURLMap(test.TestCase):

    def setUp(self):

    def test_match_with_applications(self):

    def test_match_without_applications(self):

    def test_match_path_info_equals_app_url(self):

    def test_match_path_info_equals_app_url_many_app(self):

    def test_set_script_name(self):

    def test_munge_path(self):

    def test_content_type_strategy_without_version(self):

    def test_content_type_strategy_with_version(self):

    def test_path_strategy_wrong_path_info(self):

    def test_path_strategy_mime_type_only(self):

    def test_path_strategy(self):

    def test_path_strategy_wrong_mime_type(self):

    def test_accept_strategy_version_not_in_params(self):

    def test_accept_strategy_version(self):

\OpenStack\cinder-2014.1\cinder\tests\test_backup.py

class FakeBackupException(Exception):

class BackupTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _create_backup_db_entry(self, volume_id=1, display_name='test_backup', display_description='this is a test backup', container='volumebackups', status='creating', size=1, object_count=0, project_id='fake'):

    def _create_volume_db_entry(self, display_name='test_volume', display_description='this is a test volume', status='backing-up', size=1):

    def _create_exported_record_entry(self, vol_size=1):

    def _create_export_record_db_entry(self, volume_id='0000', status='creating', project_id='fake'):

    def test_init_host(self):

    def test_create_backup_with_bad_volume_status(self):

    def test_create_backup_with_bad_backup_status(self):

    def test_create_backup_with_error(self, _mock_volume_backup):

    def test_create_backup(self, _mock_volume_backup):

    def test_restore_backup_with_bad_volume_status(self):

    def test_restore_backup_with_bad_backup_status(self):

    def test_restore_backup_with_driver_error(self, _mock_volume_restore):

    def test_restore_backup_with_bad_service(self):

    def test_restore_backup(self, _mock_volume_restore):

    def test_delete_backup_with_bad_backup_status(self):

    def test_delete_backup_with_error(self):

    def test_delete_backup_with_bad_service(self):

    def test_delete_backup_with_no_service(self):

    def test_delete_backup(self):

    def test_list_backup(self):

    def test_backup_get_all_by_project_with_deleted(self):

    def test_backup_get_all_by_host_with_deleted(self):

    def test_backup_manager_driver_name(self):

    def test_export_record_with_bad_service(self):

    def test_export_record_with_bad_backup_status(self):

    def test_export_record(self):

    def test_import_record_with_verify_not_implemented(self):

    def test_import_record_with_verify(self):

    def test_import_record_with_bad_service(self):

    def test_import_record_with_invalid_backup(self):

    def test_import_record_with_verify_invalid_backup(self):

\OpenStack\cinder-2014.1\cinder\tests\test_backup_ceph.py

class MockException(Exception):

    def __init__(self, *args, **kwargs):

class MockImageNotFoundException(MockException):

class MockImageBusyException(MockException):

class MockObjectNotFoundException(MockException):

    def _common_inner_inner1(inst, *args, **kwargs):

        def _common_inner_inner2(mock_rados, mock_rbd, mock_time, mock_sleep, mock_popen):

class BackupCephTestCase(test.TestCase):

    def _create_volume_db_entry(self, id, size):

    def _create_backup_db_entry(self, backupid, volid, size):

    def time_inc(self):

    def _get_wrapped_rbd_io(self, rbd_image):

    def _setup_mock_popen(self, mock_popen, retval=None, p1hook=None, p2hook=None):

    def setUp(self):

    def tearDown(self):

    def test_get_rbd_support(self):

    def test_get_most_recent_snap(self):

    def test_get_backup_snap_name(self):

        def get_backup_snaps(inst, *args):

    def test_get_backup_snaps(self):

    def test_transfer_data_from_rbd_to_file(self):

    def test_transfer_data_from_rbd_to_rbd(self):

        def mock_write_data(data, offset):

    def test_transfer_data_from_file_to_rbd(self):

        def mock_write_data(data, offset):

    def test_transfer_data_from_file_to_file(self):

    def test_backup_volume_from_file(self):

        def mock_write_data(data, offset):

    def test_get_backup_base_name(self):

    def test_backup_volume_from_rbd(self, mock_popen, mock_fnctl):

        def mock_write_data():

        def mock_read_data():

    def test_backup_vol_length_0(self):

    def test_restore(self):

        def mock_read_data(offset, length):

    def test_discard_bytes(self):

    def test_delete_backup_snapshot(self):

    def test_try_delete_base_image_diff_format(self, mock_meta_backup):

    def test_try_delete_base_image(self, mock_meta_backup):

    def test_try_delete_base_image_busy(self):

    def test_delete(self, mock_meta_backup):

    def test_delete_image_not_found(self, mock_meta_backup):

    def test_diff_restore_allowed(self):

    def test_piped_execute(self, mock_popen, mock_fcntl):

    def test_restore_metdata(self):

        def mock_read(*args):

    def test_backup_metata_already_exists(self, mock_meta_backup):

        def mock_set(json_meta):

    def test_backup_metata_error(self):

    def test_restore_invalid_metadata_version(self):

        def mock_read(*args):

    def _common_inner_inner1(inst, *args, **kwargs):

        def _common_inner_inner2(mock_rados, mock_rbd):

class VolumeMetadataBackupTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_name(self):

    def test_exists(self):

    def test_set(self):

        def mock_read(*args):

        def _mock_write(data):

    def test_get(self):

    def remove_if_exists(self):

\OpenStack\cinder-2014.1\cinder\tests\test_backup_driver_base.py

class BackupBaseDriverTestCase(test.TestCase):

    def _create_volume_db_entry(self, id, size):

    def _create_backup_db_entry(self, backupid, volid, size):

    def setUp(self):

    def test_backup(self):

    def test_restore(self):

    def test_delete(self):

    def test_get_metadata(self):

    def test_put_metadata(self):

    def test_get_put_metadata(self):

    def test_export_record(self):

    def test_import_record(self):

    def test_verify(self):

    def tearDown(self):

class BackupMetadataAPITestCase(test.TestCase):

    def _create_volume_db_entry(self, id, size):

    def setUp(self):

    def _add_metadata(self, vol_meta=False, vol_glance_meta=False):

    def test_get(self):

    def test_put(self):

    def test_put_invalid_version(self):

    def test_v1_restore_factory(self):

    def test_restore_vol_glance_meta(self):

    def test_restore_vol_meta(self):

    def test_restore_vol_base_meta(self):

    def test_filter(self):

    def test_save_vol_glance_meta(self):

    def test_save_vol_meta(self):

    def test_save_vol_base_meta(self):

    def test_is_serializable(self):

    def test_is_not_serializable(self):

    def tearDown(self):

\OpenStack\cinder-2014.1\cinder\tests\test_backup_swift.py

def fake_md5(arg):

class BackupSwiftTestCase(test.TestCase):

    def _create_volume_db_entry(self):

    def _create_backup_db_entry(self, container='test-container'):

    def setUp(self):

    def tearDown(self):

    def test_backup_uncompressed(self):

    def test_backup_bz2(self):

    def test_backup_zlib(self):

    def test_backup_default_container(self):

    def test_backup_custom_container(self):

    def test_create_backup_put_object_wraps_socket_error(self):

    def test_restore(self):

    def test_restore_wraps_socket_error(self):

    def test_restore_unsupported_version(self):

    def test_delete(self):

    def test_delete_wraps_socket_error(self):

    def test_get_compressor(self):

\OpenStack\cinder-2014.1\cinder\tests\test_backup_tsm.py

class TSMBackupSimulator:

    def __init__(self):

    def _cmd_backup(self, **kwargs):

    def _backup_exists(self, path):

    def _cmd_restore(self, **kwargs):

    def _cmd_delete(self, **kwargs):

    def _cmd_to_dict(self, arg_list):

    def _exec_dsmc_cmd(self, cmd):

    def exec_cmd(self, cmd):

    def error_injection(self, cmd, error):

def fake_exec(*cmd, **kwargs):

def fake_stat_image(path):

def fake_stat_file(path):

def fake_stat_illegal(path):

class BackupTSMTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _create_volume_db_entry(self, volume_id):

    def _create_backup_db_entry(self, backup_id, mode):

    def test_backup_image(self):

    def test_backup_file(self):

    def test_backup_invalid_mode(self):

\OpenStack\cinder-2014.1\cinder\tests\test_block_device.py

class TestBlockDeviceDriver(cinder.test.TestCase):

    def setUp(self):

    def test_initialize_connection(self):

    def test_initialize_connection_different_hosts(self):

    def test_delete_not_volume_provider_location(self):

    def test_delete_volume_path_exist(self):

    def test_delete_path_is_not_in_list_of_available_devices(self):

    def test_create_volume(self):

    def test_update_volume_stats(self):

    def test_create_cloned_volume(self):

    def test_copy_image_to_volume(self):

    def test_copy_volume_to_image(self):

    def test_get_used_devices(self):

    def test_get_device_size(self):

    def test_devices_sizes(self):

    def test_find_appropriate_size_device_no_free_disks(self):

    def test_find_appropriate_size_device_not_big_enough_disk(self):

    def test_find_appropriate_size_device(self):

\OpenStack\cinder-2014.1\cinder\tests\test_conf.py

class ConfigTestCase(test.TestCase):

    def setUp(self):

    def test_declare(self):

    def test_runtime_and_unknown_conf(self):

    def test_long_vs_short_conf(self):

    def test_conf_leak_left(self):

    def test_conf_leak_right(self):

    def test_conf_overrides(self):

\OpenStack\cinder-2014.1\cinder\tests\test_context.py

class ContextTestCase(test.TestCase):

    def test_request_context_sets_is_admin(self):

    def test_request_context_sets_is_admin_upcase(self):

    def test_request_context_read_deleted(self):

    def test_request_context_read_deleted_invalid(self):

    def test_extra_args_to_context_get_logged(self):

        def fake_warn(log_msg):

    def test_service_catalog_nova_only(self):

    def test_user_identity(self):

\OpenStack\cinder-2014.1\cinder\tests\test_coraid.py

def to_coraid_kb(gb):

def coraid_volume_size(gb):

def compare(a, b):

def pack_data(request):

class FakeRpcBadRequest(Exception):

class FakeRpcIsNotCalled(Exception):

    def __init__(self, handle, url_params, data):

    def __str__(self):

class FakeRpcHandle(object):

    def __init__(self, handle, url_params, data, result):

    def set_called(self):

    def __call__(self, handle, url_params, data, allow_empty_response=False):

class FakeRpc(object):

    def __init__(self):

    def handle(self, handle, url_params, data, result):

    def __call__(self, handle_name, url_params, data, allow_empty_response=False):

class CoraidDriverTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def mock_volume_types(self, repositories=[]):

class CoraidDriverLoginSuccessTestCase(CoraidDriverTestCase):

    def setUp(self):

class CoraidDriverApplianceTestCase(CoraidDriverLoginSuccessTestCase):

    def test_resize_volume(self):

class CoraidDriverIntegrationalTestCase(CoraidDriverLoginSuccessTestCase):

    def setUp(self):

    def test_create_volume(self):

    def test_delete_volume(self):

    def test_ping_ok(self):

    def test_ping_failed(self):

        def rpc(handle, url_params, data, allow_empty_response=True):

    def test_delete_not_existing_lun(self):

    def test_delete_not_existing_volumeappliance_is_ok(self):

        def delete_lun(volume_name):

        def ping():

    def test_delete_not_existing_volume_sleepingappliance(self):

        def delete_lun(volume_name):

        def ping():

    def test_create_snapshot(self):

    def test_delete_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_initialize_connection(self):

    def test_get_repository_capabilities(self):

    def test_create_cloned_volume(self):

    def test_create_cloned_volume_with_resize(self):

    def test_create_cloned_volume_in_different_repository(self):

    def test_extend_volume(self):

class AutoReloginCoraidTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _test_auto_relogin_fail(self, state):

    def test_auto_relogin_fail_admin(self):

    def test_auto_relogin_fail_inactivity(self):

    def test_auto_relogin_fail_absolute(self):

    def test_auto_relogin_success(self):

class CoraidDriverImageTestCases(CoraidDriverTestCase):

    def setUp(self):

    def test_copy_volume_to_image(self):

    def test_copy_image_to_volume(self):

class CoraidResetConnectionTestCase(CoraidDriverTestCase):

    def test_create_new_appliance_for_every_request(self):

\OpenStack\cinder-2014.1\cinder\tests\test_create_volume_flow.py

class fake_scheduler_rpc_api(object):

    def __init__(self, expected_spec, test_inst):

    def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None):

class fake_volume_api(object):

    def __init__(self, expected_spec, test_inst):

    def create_volume(self, ctxt, volume, host, request_spec, filter_properties, allow_reschedule=True, snapshot_id=None, image_id=None, source_volid=None):

class fake_db(object):

    def volume_get(self, *args, **kwargs):

    def volume_update(self, *args, **kwargs):

    def snapshot_get(self, *args, **kwargs):

class CreateVolumeFlowTestCase(test.TestCase):

    def time_inc(self):

    def setUp(self):

    def test_cast_create_volume(self):

    def tearDown(self):

\OpenStack\cinder-2014.1\cinder\tests\test_db_api.py

def _quota_reserve(context, project_id):

 def get_sync(resource, usage):

class ModelsObjectComparatorMixin(object):

    def _dict_from_object(self, obj, ignored_keys):

    def _assertEqualObjects(self, obj1, obj2, ignored_keys=None):

    def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None):

    def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):

class BaseTest(test.TestCase, ModelsObjectComparatorMixin):

    def setUp(self):

class DBAPIServiceTestCase(BaseTest):

    def _get_base_values(self):

    def _create_service(self, values):

    def test_service_create(self):

    def test_service_destroy(self):

    def test_service_update(self):

    def test_service_update_not_found_exception(self):

    def test_service_get(self):

    def test_service_get_not_found_exception(self):

    def test_service_get_by_host_and_topic(self):

    def test_service_get_all(self):

    def test_service_get_all_by_topic(self):

    def test_service_get_all_by_host(self):

    def test_service_get_by_args(self):

    def test_service_get_by_args_not_found_exception(self):

    def test_service_get_all_volume_sorted(self):

class DBAPIVolumeTestCase(BaseTest):

    def test_volume_create(self):

    def test_volume_allocate_iscsi_target_no_more_targets(self):

    def test_volume_allocate_iscsi_target(self):

    def test_volume_attached_invalid_uuid(self):

    def test_volume_attached_to_instance(self):

    def test_volume_attached_to_host(self):

    def test_volume_data_get_for_host(self):

    def test_volume_data_get_for_project(self):

    def test_volume_detached_from_instance(self):

    def test_volume_detached_from_host(self):

    def test_volume_get(self):

    def test_volume_destroy(self):

    def test_volume_get_all(self):

    def test_volume_get_all_marker_passed(self):

    def test_volume_get_all_by_host(self):

    def test_volume_get_all_by_instance_uuid(self):

    def test_volume_get_all_by_instance_uuid_empty(self):

    def test_volume_get_all_by_project(self):

    def test_volume_get_by_name(self):

    def test_volume_list_by_status(self):

    def _assertEqualsVolumeOrderResult(self, correct_order, limit=None, sort_key='created_at', sort_dir='asc', filters=None, project_id=None, match_keys=['id', 'display_name', 'volume_metadata', 'created_at']):

    def test_volume_get_by_filter(self):

    def test_volume_get_all_filters_limit(self):

    def test_volume_get_no_migration_targets(self):

    def test_volume_get_iscsi_target_num(self):

    def test_volume_get_iscsi_target_num_nonexistent(self):

    def test_volume_update(self):

    def test_volume_update_nonexistent(self):

    def test_volume_metadata_get(self):

    def test_volume_metadata_update(self):

    def test_volume_metadata_update_delete(self):

    def test_volume_metadata_delete(self):

class DBAPISnapshotTestCase(BaseTest):

    def test_snapshot_data_get_for_project(self):

    def test_snapshot_get_all(self):

    def test_snapshot_metadata_get(self):

    def test_snapshot_metadata_update(self):

    def test_snapshot_metadata_update_delete(self):

    def test_snapshot_metadata_delete(self):

class DBAPIVolumeTypeTestCase(BaseTest):

    def setUp(self):

    def test_volume_type_create_exists(self):

class DBAPIEncryptionTestCase(BaseTest):

    def setUp(self):

    def _get_values(self, one=False, updated=False):

        def compose(val, step):

    def test_volume_type_encryption_create(self):

    def test_volume_type_encryption_update(self):

    def test_volume_type_encryption_get(self):

    def test_volume_type_update_with_no_create(self):

    def test_volume_type_encryption_delete(self):

class DBAPIReservationTestCase(BaseTest):

    def setUp(self):

    def test_reservation_create(self):

    def test_reservation_get(self):

    def test_reservation_get_nonexistent(self):

    def test_reservation_commit(self):

    def test_reservation_rollback(self):

    def test_reservation_get_all_by_project(self):

    def test_reservation_expire(self):

    def test_reservation_destroy(self):

class DBAPIQuotaClassTestCase(BaseTest):

    def setUp(self):

    def test_quota_class_get(self):

    def test_quota_class_destroy(self):

    def test_quota_class_get_not_found(self):

    def test_quota_class_get_all_by_name(self):

    def test_quota_class_update(self):

    def test_quota_class_destroy_all_by_name(self):

class DBAPIQuotaTestCase(BaseTest):

    def test_quota_create(self):

    def test_quota_get(self):

    def test_quota_get_all_by_project(self):

    def test_quota_update(self):

    def test_quota_update_nonexistent(self):

    def test_quota_get_nonexistent(self):

    def test_quota_reserve(self):

    def test_quota_destroy(self):

    def test_quota_destroy_all_by_project(self):

    def test_quota_usage_get_nonexistent(self):

    def test_quota_usage_get(self):

    def test_quota_usage_get_all_by_project(self):

class DBAPIIscsiTargetTestCase(BaseTest):

    def _get_base_values(self):

    def test_iscsi_target_create_safe(self):

    def test_iscsi_target_count_by_host(self):

    def test_integrity_error(self):

class DBAPIBackupTestCase(BaseTest):

    def setUp(self):

    def _get_values(self, one=False):

        def compose(val, step):

    def test_backup_create(self):

    def test_backup_get(self):

    def tests_backup_get_all(self):

    def test_backup_get_all_by_host(self):

    def test_backup_get_all_by_project(self):

    def test_backup_update_nonexistent(self):

    def test_backup_update(self):

    def test_backup_destroy(self):

    def test_backup_not_found(self):

\OpenStack\cinder-2014.1\cinder\tests\test_drivers_compatibility.py

class VolumeDriverCompatibility(test.TestCase):

    def fake_update_cluster_status(self):

    def setUp(self):

    def tearDown(self):

    def _load_driver(self, driver):

    def _driver_module_name(self):

    def test_nexenta_old(self):

    def test_nexenta_new(self):

    def test_solidfire_old(self):

    def test_solidfire_old2(self):

    def test_solidfire_new(self):

    def test_storwize_svc_old(self):

    def test_storwize_svc_old2(self):

    def test_storwize_svc_new(self):

    def test_windows_old(self):

    def test_windows_new(self):

    def test_xiv_old(self):

    def test_xiv_ds8k_new(self):

    def test_netapp_7m_iscsi_old(self):

    def test_netapp_cm_iscsi_old(self):

    def test_netapp_7m_nfs_old(self):

    def test_netapp_cm_nfs_old(self):

    def test_hp_lefthand_rest_old(self):

    def test_hp_lefthand_rest_new(self):

    def test_gpfs_old(self):

    def test_gpfs_new(self):

\OpenStack\cinder-2014.1\cinder\tests\test_emc_smis.py

class EMC_StorageVolume(dict):

class SE_ConcreteJob(dict):

class SE_StorageHardwareID(dict):

class FakeCIMInstanceName(dict):

    def fake_getinstancename(self, classname, bindings):

class FakeDB():

    def volume_update(self, context, volume_id, model_update):

    def snapshot_update(self, context, snapshot_id, model_update):

    def volume_get(self, context, volume_id):

class EMCSMISCommonData():

class FakeEcomConnection():

    def __init__(self, *args, **kwargs):

    def InvokeMethod(self, MethodName, Service, ElementName=None, InPool=None, ElementType=None, Size=None, SyncType=None, SourceElement=None, Operation=None, Synchronization=None, TheElements=None, TheElement=None, LUNames=None, InitiatorPortIDs=None, DeviceAccesses=None, ProtocolControllers=None, MaskingGroup=None, Members=None, HardwareId=None):

    def EnumerateInstanceNames(self, name):

    def EnumerateInstances(self, name):

    def GetInstance(self, objectpath, LocalOnly=False):

    def Associators(self, objectpath, resultClass='EMC_StorageHardwareID'):

    def AssociatorNames(self, objectpath, resultClass='EMC_LunMaskingSCSIProtocolController'):

    def ReferenceNames(self, objectpath, ResultClass='CIM_ProtocolControllerForUnit'):

    def _ref_unitnames(self):

    def _default_ref(self, objectpath):

    def _assoc_hdwid(self):

    def _assoc_endpoint(self):

OpenStack Index

Previous

Next