OpenStack Study: cinder
OpenStack IndexPreviousNext
def manage_existing(self, context, topic, volume_id, request_spec, filter_properties=None):
def _manage_existing_set_error(self, context, ex, request_spec):
def _set_volume_state_and_notify(self, method, updates, context, ex, request_spec, msg=None):
\OpenStack\cinder-2014.1\cinder\scheduler\rpcapi.py
class SchedulerAPI(object):
def __init__(self):
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None):
def migrate_volume_to_host(self, ctxt, topic, volume_id, host, force_host_copy=False, request_spec=None, filter_properties=None):
def retype(self, ctxt, topic, volume_id, request_spec=None, filter_properties=None):
def manage_existing(self, ctxt, topic, volume_id, request_spec=None, filter_properties=None):
def update_service_capabilities(self, ctxt, service_name, host, capabilities):
\OpenStack\cinder-2014.1\cinder\scheduler\scheduler_options.py
class SchedulerOptions(object):
def __init__(self):
def _get_file_handle(self, filename):
def _get_file_timestamp(self, filename):
def _load_file(self, handle):
def _get_time_now(self):
def get_configuration(self, filename=None):
\OpenStack\cinder-2014.1\cinder\scheduler\simple.py
\OpenStack\cinder-2014.1\cinder\scheduler\weights\capacity.py
class CapacityWeigher(weights.BaseHostWeigher):
def _weight_multiplier(self):
def _weigh_object(self, host_state, weight_properties):
class AllocatedCapacityWeigher(weights.BaseHostWeigher):
def _weight_multiplier(self):
def _weigh_object(self, host_state, weight_properties):
\OpenStack\cinder-2014.1\cinder\scheduler\weights\chance.py
class ChanceWeigher(weights.BaseHostWeigher):
def _weigh_object(self, host_state, weight_properties):
\OpenStack\cinder-2014.1\cinder\scheduler\weights\__init__.py
\OpenStack\cinder-2014.1\cinder\scheduler\__init__.py
\OpenStack\cinder-2014.1\cinder\service.py
class Service(service.Service):
def __init__(self, host, binary, topic, manager, report_interval=None, periodic_interval=None, periodic_fuzzy_delay=None, service_name=None, *args, **kwargs):
def start(self):
def basic_config_check(self):
def _create_service_ref(self, context):
def __getattr__(self, key):
def create(cls, host=None, binary=None, topic=None, manager=None, report_interval=None, periodic_interval=None, periodic_fuzzy_delay=None, service_name=None):
def kill(self):
def stop(self):
def wait(self):
def periodic_tasks(self, raise_on_error=False):
def report_state(self):
class WSGIService(object):
def __init__(self, name, loader=None):
def _get_manager(self):
def start(self):
def stop(self):
def wait(self):
def process_launcher():
def serve(server, workers=None):
def wait():
class Launcher(object):
def __init__(self):
def get_launcher():
\OpenStack\cinder-2014.1\cinder\test.py
class TestingException(Exception):
class Database(fixtures.Fixture):
def __init__(self, db_session, db_migrate, sql_connection, sqlite_db, sqlite_clean_db):
def setUp(self):
class TestCase(testtools.TestCase):
def setUp(self):
def tearDown(self):
def flags(self, **kw):
def log_level(self, level):
def start_service(self, name, host=None, **kwargs):
def assertDictMatch(self, d1, d2, approx_equal=False, tolerance=0.001):
def raise_assertion(msg):
def assertGreater(self, first, second, msg=None):
def assertGreaterEqual(self, first, second, msg=None):
\OpenStack\cinder-2014.1\cinder\tests\api\common.py
def compare_links(actual, expected):
def compare_media_types(actual, expected):
def compare_tree_to_dict(actual, expected, keys):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_admin_actions.py
def app():
class AdminActionsTest(test.TestCase):
def setUp(self):
def test_reset_status_as_admin(self):
def test_reset_status_as_non_admin(self):
def test_malformed_reset_status_body(self):
def test_invalid_status_for_volume(self):
def test_reset_status_for_missing_volume(self):
def test_reset_attached_status(self):
def test_invalid_reset_attached_status(self):
def test_snapshot_reset_status(self):
def test_invalid_status_for_snapshot(self):
def test_force_delete(self):
def test_force_delete_snapshot(self):
def test_force_detach_instance_attached_volume(self):
def test_force_detach_host_attached_volume(self):
def test_attach_in_used_volume_by_instance(self):
def test_attach_in_used_volume_by_host(self):
def test_invalid_iscsi_connector(self):
def test_attach_attaching_volume_with_different_instance(self):
def test_attach_attaching_volume_with_different_mode(self):
def _migrate_volume_prep(self):
def _migrate_volume_exec(self, ctx, volume, host, expected_status, force_host_copy=False):
def test_migrate_volume_success(self):
def test_migrate_volume_as_non_admin(self):
def test_migrate_volume_without_host_parameter(self):
def test_migrate_volume_host_no_exist(self):
def test_migrate_volume_same_host(self):
def test_migrate_volume_migrating(self):
def test_migrate_volume_with_snap(self):
def test_migrate_volume_bad_force_host_copy1(self):
def test_migrate_volume_bad_force_host_copy2(self):
def _migrate_volume_comp_exec(self, ctx, volume, new_volume, error, expected_status, expected_id, no_body=False):
def test_migrate_volume_comp_as_non_admin(self):
def test_migrate_volume_comp_no_mig_status(self):
def test_migrate_volume_comp_bad_mig_status(self):
def test_migrate_volume_comp_no_action(self):
def test_migrate_volume_comp_from_nova(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_availability_zones.py
def list_availability_zones(self):
class FakeRequest(object):
class ControllerTestCase(cinder.test.TestCase):
def setUp(self):
def test_list_hosts(self):
class XMLSerializerTest(cinder.test.TestCase):
def test_index_xml(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_backups.py
class BackupsAPITestCase(test.TestCase):
def setUp(self):
def _create_backup(volume_id=1, display_name='test_backup', display_description='this is a test backup', container='volumebackups', status='creating', size=0, object_count=0):
def _get_backup_attrib(backup_id, attrib_name):
def test_show_backup(self):
def test_show_backup_xml_content_type(self):
def test_show_backup_with_backup_NotFound(self):
def test_list_backups_json(self):
def test_list_backups_xml(self):
def test_list_backups_detail_json(self):
def test_list_backups_detail_xml(self):
def test_create_backup_json(self, _mock_service_get_all_by_topic):
def test_create_backup_xml(self, _mock_service_get_all_by_topic):
def test_create_backup_with_no_body(self):
def test_create_backup_with_body_KeyError(self):
def test_create_backup_with_VolumeNotFound(self):
def test_create_backup_with_InvalidVolume(self):
def test_create_backup_WithOUT_enabled_backup_service( self, _mock_service_get_all_by_topic):
def test_is_backup_service_enabled(self, _mock_service_get_all_by_topic):
def test_delete_backup_available(self):
def test_delete_backup_error(self):
def test_delete_backup_with_backup_NotFound(self):
def test_delete_backup_with_InvalidBackup(self):
def test_restore_backup_volume_id_specified_json(self):
def test_restore_backup_volume_id_specified_xml(self):
def test_restore_backup_with_no_body(self):
def test_restore_backup_with_body_KeyError(self):
def test_restore_backup_volume_id_unspecified(self, _mock_volume_api_create):
def fake_volume_api_create(context, size, name, description):
def test_restore_backup_with_InvalidInput(self, _mock_volume_api_restore):
def test_restore_backup_with_InvalidVolume(self):
def test_restore_backup_with_InvalidBackup(self):
def test_restore_backup_with_BackupNotFound(self):
def test_restore_backup_with_VolumeNotFound(self):
def test_restore_backup_with_VolumeSizeExceedsAvailableQuota( self, _mock_backup_restore):
def test_restore_backup_with_VolumeLimitExceeded(self, _mock_backup_restore):
def test_restore_backup_to_undersized_volume(self):
def test_restore_backup_to_oversized_volume(self):
def test_export_record_as_non_admin(self):
def test_export_backup_record_id_specified_json(self, _mock_export_record_rpc):
def test_export_record_backup_id_specified_xml(self, _mock_export_record_rpc):
def test_export_record_with_bad_backup_id(self):
def test_export_record_for_unavailable_backup(self):
def test_export_record_with_unavailable_service(self, _mock_export_record_rpc):
def test_import_record_as_non_admin(self):
def test_import_record_volume_id_specified_json(self, _mock_import_record_rpc, _mock_list_services):
def test_import_record_volume_id_specified_xml(self, _mock_import_record_rpc, _mock_list_services):
def test_import_record_with_no_backup_services(self, _mock_list_services):
def test_import_backup_with_missing_backup_services(self, _mock_import_record, _mock_list_services):
def test_import_record_with_missing_body_elements(self):
def test_import_record_with_no_body(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_extended_snapshot_attributes.py
def _get_default_snapshot_param():
def fake_snapshot_get(self, context, snapshot_id):
def fake_snapshot_get_all(self, context, search_opts=None):
class ExtendedSnapshotAttributesTest(test.TestCase):
def setUp(self):
def _make_request(self, url):
def _get_snapshot(self, body):
def _get_snapshots(self, body):
def assertSnapshotAttributes(self, snapshot, project_id, progress):
def test_show(self):
def test_detail(self):
class ExtendedSnapshotAttributesXmlTest(ExtendedSnapshotAttributesTest):
def _get_snapshot(self, body):
def _get_snapshots(self, body):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_hosts.py
def stub_service_get_all(self, req):
class FakeRequest(object):
class FakeRequestWithcinderZone(object):
class HostTestCase(test.TestCase):
def setUp(self):
def _test_host_update(self, host, key, val, expected_value):
def test_list_hosts(self):
def test_list_hosts_with_zone(self):
def test_bad_status_value(self):
def test_bad_update_key(self):
def test_bad_update_key_and_correct_udpate_key(self):
def test_good_udpate_keys(self):
def test_bad_host(self):
def test_show_forbidden(self):
def test_show_host_not_exist(self):
class HostSerializerTest(test.TestCase):
def setUp(self):
def test_index_serializer(self):
def test_update_serializer_with_status(self):
def test_update_deserializer(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_qos_specs_manage.py
def stub_qos_specs(id):
def stub_qos_associates(id):
def return_qos_specs_get_all(context):
def return_qos_specs_get_qos_specs(context, id):
def return_qos_specs_delete(context, id, force):
def return_qos_specs_delete_keys(context, id, keys):
def return_qos_specs_update(context, id, specs):
def return_qos_specs_create(context, name, specs):
def return_qos_specs_get_by_name(context, name):
def return_get_qos_associations(context, id):
def return_associate_qos_specs(context, id, type_id):
def return_disassociate_all(context, id):
class QoSSpecManageApiTest(test.TestCase):
def setUp(self):
def test_index(self):
def test_index_xml_response(self):
def test_qos_specs_delete(self):
def test_qos_specs_delete_not_found(self):
def test_qos_specs_delete_inuse(self):
def test_qos_specs_delete_inuse_force(self):
def test_qos_specs_delete_keys(self):
def test_qos_specs_delete_keys_qos_notfound(self):
def test_qos_specs_delete_keys_badkey(self):
def test_create(self):
def test_create_conflict(self):
def test_create_failed(self):
def _create_qos_specs_bad_body(self, body):
def test_create_no_body(self):
def test_create_missing_specs_name(self):
def test_create_malformed_entity(self):
def test_update(self):
def test_update_not_found(self):
def test_update_invalid_input(self):
def test_update_failed(self):
def test_show(self):
def test_show_xml_response(self):
def test_get_associations(self):
def test_get_associations_xml_response(self):
def test_get_associations_not_found(self):
def test_get_associations_failed(self):
def test_associate(self):
def test_associate_no_type(self):
def test_associate_not_found(self):
def test_associate_fail(self):
def test_disassociate(self):
def test_disassociate_no_type(self):
def test_disassociate_not_found(self):
def test_disassociate_failed(self):
def test_disassociate_all(self):
def test_disassociate_all_not_found(self):
def test_disassociate_all_failed(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_quotas.py
def make_body(root=True, gigabytes=1000, snapshots=10, volumes=10, tenant_id='foo'):
class QuotaSetsControllerTest(test.TestCase):
def setUp(self):
def test_defaults(self):
def test_show(self):
def test_show_not_authorized(self):
def test_update(self):
def test_update_wrong_key(self):
def test_update_invalid_key_value(self):
def test_update_bad_quota_limit(self):
def test_update_no_admin(self):
def test_update_without_quota_set_field(self):
def test_update_empty_body(self):
def test_delete(self):
def test_delete_no_admin(self):
class QuotaSerializerTest(test.TestCase):
def setUp(self):
def test_update_serializer(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_quotas_classes.py
def make_body(root=True, gigabytes=1000, snapshots=10, volumes=10, volume_types_faked=None, tenant_id='foo'):
def make_response_body(root=True, ctxt=None, quota_class='foo', request_body=None, tenant_id='foo'):
class QuotaClassSetsControllerTest(test.TestCase):
def setUp(self):
def test_show(self):
def test_show_not_authorized(self):
def test_update(self):
def test_update_wrong_key(self):
def test_update_invalid_key_value(self):
def test_update_bad_quota_limit(self):
def test_update_no_admin(self):
def test_update_with_more_volume_types(self):
class QuotaClassesSerializerTest(test.TestCase):
def setUp(self):
def test_update_serializer(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_scheduler_hints.py
class SchedulerHintsTestCase(test.TestCase):
def setUp(self):
def test_create_server_without_hints(self):
def fake_create(*args, **kwargs):
def test_create_server_with_hints(self):
def fake_create(*args, **kwargs):
def test_create_server_bad_hints(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_services.py
class FakeRequest(object):
class FakeRequestWithService(object):
class FakeRequestWithBinary(object):
class FakeRequestWithHost(object):
class FakeRequestWithHostService(object):
class FakeRequestWithHostBinary(object):
def fake_service_get_all(context):
def fake_service_get_by_host_binary(context, host, binary):
def fake_service_get_by_id(value):
def fake_service_update(context, service_id, values):
def fake_policy_enforce(context, action, target):
def fake_utcnow():
class ServicesTest(test.TestCase):
def setUp(self):
def test_services_list(self):
def test_services_detail(self):
def test_services_list_with_host(self):
def test_services_detail_with_host(self):
def test_services_list_with_service(self):
def test_services_detail_with_service(self):
def test_services_list_with_binary(self):
def test_services_detail_with_binary(self):
def test_services_list_with_host_service(self):
def test_services_detail_with_host_service(self):
def test_services_list_with_host_binary(self):
def test_services_detail_with_host_binary(self):
def test_services_enable_with_service_key(self):
def test_services_enable_with_binary_key(self):
def test_services_disable_with_service_key(self):
def test_services_disable_with_binary_key(self):
def test_services_disable_log_reason(self):
def test_services_disable_log_reason_none(self):
def test_invalid_reason_field(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_snapshot_actions.py
class SnapshotActionsTest(test.TestCase):
def setUp(self):
def test_update_snapshot_status(self):
def test_update_snapshot_status_invalid_status(self):
def test_update_snapshot_status_without_status(self):
def stub_snapshot_get(context, snapshot_id):
def stub_snapshot_update(self, context, id, **kwargs):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_types_extra_specs.py
def return_create_volume_type_extra_specs(context, volume_type_id, extra_specs):
def return_volume_type_extra_specs(context, volume_type_id):
def return_empty_volume_type_extra_specs(context, volume_type_id):
def delete_volume_type_extra_specs(context, volume_type_id, key):
def delete_volume_type_extra_specs_not_found(context, volume_type_id, key):
def stub_volume_type_extra_specs():
def volume_type_get(context, volume_type_id):
class VolumeTypesExtraSpecsTest(test.TestCase):
def setUp(self):
def test_index(self):
def test_index_no_data(self):
def test_show(self):
def test_show_spec_not_found(self):
def test_delete(self):
def test_delete_not_found(self):
def test_create(self):
def test_create_key_allowed_chars( self, volume_type_extra_specs_update_or_create):
def test_create_too_many_keys_allowed_chars( self, volume_type_extra_specs_update_or_create):
def test_update_item(self):
def test_update_item_too_many_keys(self):
def test_update_item_body_uri_mismatch(self):
def _extra_specs_empty_update(self, body):
def test_update_no_body(self):
def test_update_empty_body(self):
def _extra_specs_create_bad_body(self, body):
def test_create_no_body(self):
def test_create_missing_volume(self):
def test_create_malformed_entity(self):
def test_create_invalid_key(self):
def test_create_invalid_too_many_key(self):
class VolumeTypeExtraSpecsSerializerTest(test.TestCase):
def test_index_create_serializer(self):
def test_update_show_serializer(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_types_manage.py
def stub_volume_type(id):
def return_volume_types_get_volume_type(context, id):
def return_volume_types_destroy(context, name):
def return_volume_types_with_volumes_destroy(context, id):
def return_volume_types_create(context, name, specs):
def return_volume_types_create_duplicate_type(context, name, specs):
def return_volume_types_get_by_name(context, name):
class VolumeTypesManageApiTest(test.TestCase):
def setUp(self):
def tearDown(self):
def test_volume_types_delete(self):
def test_volume_types_delete_not_found(self):
def test_volume_types_with_volumes_destroy(self):
def test_create(self):
def test_create_duplicate_type_fail(self):
def _create_volume_type_bad_body(self, body):
def test_create_no_body(self):
def test_create_missing_volume(self):
def test_create_malformed_entity(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_used_limits.py
class FakeRequest(object):
def __init__(self, context):
class UsedLimitsTestCase(test.TestCase):
def setUp(self):
def test_used_limits(self):
def stub_get_project_quotas(context, project_id, usages=True):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_actions.py
class VolumeActionsTest(test.TestCase):
def setUp(self):
def test_simple_api_actions(self):
def test_initialize_connection(self):
def test_initialize_connection_without_connector(self):
def test_initialize_connection_exception(self):
def test_terminate_connection(self):
def test_terminate_connection_without_connector(self):
def test_terminate_connection_with_exception(self):
def test_attach_to_instance(self):
def test_attach_to_host(self):
def test_attach_with_invalid_arguments(self):
def test_begin_detaching(self):
def fake_begin_detaching(*args, **kwargs):
def test_roll_detaching(self):
def fake_roll_detaching(*args, **kwargs):
def test_extend_volume(self):
def fake_extend_volume(*args, **kwargs):
def test_update_readonly_flag(self):
def fake_update_readonly_flag(*args, **kwargs):
def make_update_readonly_flag_test(self, readonly, return_code):
class VolumeRetypeActionsTest(VolumeActionsTest):
def setUp(self):
def get_vol_type(*args, **kwargs):
def _retype_volume_exec(self, expected_status, new_type='foo'):
def test_retype_volume_success(self, _mock_get_qspecs):
def test_retype_volume_no_body(self):
def test_retype_volume_bad_policy(self):
def test_retype_volume_bad_status(self):
def test_retype_type_no_exist(self):
def test_retype_same_type(self):
def test_retype_over_quota(self):
def _retype_volume_diff_qos(self, vol_status, consumer, expected_status, _mock_get_qspecs):
def fake_get_qos(ctxt, qos_id):
def test_retype_volume_diff_qos_fe_in_use(self):
def test_retype_volume_diff_qos_fe_available(self):
def test_retype_volume_diff_qos_be(self):
def stub_volume_get(self, context, volume_id):
def stub_upload_volume_to_image_service(self, context, volume, metadata, force):
class VolumeImageActionsTest(test.TestCase):
def setUp(self):
def test_copy_volume_to_image(self):
def test_copy_volume_to_image_volumenotfound(self):
def stub_volume_get_raise_exc(self, context, volume_id):
def test_copy_volume_to_image_invalidvolume(self):
def stub_upload_volume_to_image_service_raise(self, context, volume, metadata, force):
def test_copy_volume_to_image_valueerror(self):
def stub_upload_volume_to_image_service_raise(self, context, volume, metadata, force):
def test_copy_volume_to_image_remoteerror(self):
def stub_upload_volume_to_image_service_raise(self, context, volume, metadata, force):
def test_volume_upload_image_typeerror(self):
def test_volume_upload_image_without_type(self):
def test_extend_volume_valueerror(self):
def test_copy_volume_to_image_notimagename(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_encryption_metadata.py
def return_volume_type_encryption_metadata(context, volume_type_id):
def stub_volume_type_encryption():
class VolumeEncryptionMetadataTest(test.TestCase):
def _create_volume(context, display_name='test_volume', display_description='this is a test volume', status='creating', availability_zone='fake_az', host='fake_host', size=1):
def setUp(self):
def test_index(self):
def test_index_bad_tenant_id(self):
def test_index_bad_volume_id(self):
def test_show_key(self):
def test_show_control(self):
def test_show_provider(self):
def test_show_bad_tenant_id(self):
def test_show_bad_volume_id(self):
def test_retrieve_key_admin(self):
def test_show_volume_not_encrypted_type(self):
def test_index_volume_not_encrypted_type(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_host_attribute.py
def fake_volume_get(*args, **kwargs):
def fake_volume_get_all(*args, **kwargs):
def app():
class VolumeHostAttributeTest(test.TestCase):
def setUp(self):
def test_get_volume_allowed(self):
def test_get_volume_unallowed(self):
def test_list_detail_volumes_allowed(self):
def test_list_detail_volumes_unallowed(self):
def test_list_simple_volumes_no_host(self):
def test_get_volume_xml(self):
def test_list_volumes_detail_xml(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_image_metadata.py
def fake_volume_get(*args, **kwargs):
def fake_volume_get_all(*args, **kwargs):
def fake_get_volume_image_metadata(*args, **kwargs):
def fake_get_volumes_image_metadata(*args, **kwargs):
class VolumeImageMetadataTest(test.TestCase):
def setUp(self):
def _make_request(self, url):
def _get_image_metadata(self, body):
def _get_image_metadata_list(self, body):
def test_get_volume(self):
def test_list_detail_volumes(self):
class ImageMetadataXMLDeserializer(common.MetadataXMLDeserializer):
class VolumeImageMetadataXMLTest(VolumeImageMetadataTest):
def _get_image_metadata(self, body):
def _get_image_metadata_list(self, body):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_manage.py
def app():
def db_service_get_by_host_and_topic(context, host, topic):
def vt_get_volume_type_by_name(context, name):
def vt_get_volume_type(context, vt_id):
def api_manage(*args, **kwargs):
class VolumeManageTest(test.TestCase):
def setUp(self):
def _get_resp(self, body):
def test_manage_volume_ok(self, mock_api_manage):
def test_manage_volume_missing_host(self):
def test_manage_volume_missing_ref(self):
def test_manage_volume_volume_type_by_uuid(self):
def test_manage_volume_volume_type_by_name(self):
def test_manage_volume_bad_volume_type_by_uuid(self):
def test_manage_volume_bad_volume_type_by_name(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_migration_status_attribute.py
def fake_volume_get(*args, **kwargs):
def fake_volume_get_all(*args, **kwargs):
def app():
class VolumeMigStatusAttributeTest(test.TestCase):
def setUp(self):
def test_get_volume_allowed(self):
def test_get_volume_unallowed(self):
def test_list_detail_volumes_allowed(self):
def test_list_detail_volumes_unallowed(self):
def test_list_simple_volumes_no_migration_status(self):
def test_get_volume_xml(self):
def test_list_volumes_detail_xml(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_tenant_attribute.py
def fake_volume_get(*args, **kwargs):
def fake_volume_get_all(*args, **kwargs):
def app():
class VolumeTenantAttributeTest(test.TestCase):
def setUp(self):
def test_get_volume_allowed(self):
def test_get_volume_unallowed(self):
def test_list_detail_volumes_allowed(self):
def test_list_detail_volumes_unallowed(self):
def test_list_simple_volumes_no_tenant_id(self):
def test_get_volume_xml(self):
def test_list_volumes_detail_xml(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_transfer.py
class VolumeTransferAPITestCase(test.TestCase):
def setUp(self):
def _create_transfer(self, volume_id=1, display_name='test_transfer'):
def _create_volume(display_name='test_volume', display_description='this is a test volume', status='available', size=1):
def test_show_transfer(self):
def test_show_transfer_xml_content_type(self):
def test_show_transfer_with_transfer_NotFound(self):
def test_list_transfers_json(self):
def test_list_transfers_xml(self):
def test_list_transfers_detail_json(self):
def test_list_transfers_detail_xml(self):
def test_create_transfer_json(self):
def test_create_transfer_xml(self):
def test_create_transfer_with_no_body(self):
def test_create_transfer_with_body_KeyError(self):
def test_create_transfer_with_VolumeNotFound(self):
def test_create_transfer_with_InvalidVolume(self):
def test_delete_transfer_awaiting_transfer(self):
def test_delete_transfer_with_transfer_NotFound(self):
def test_accept_transfer_volume_id_specified_json(self):
def test_accept_transfer_volume_id_specified_xml(self):
def test_accept_transfer_with_no_body(self):
def test_accept_transfer_with_body_KeyError(self):
def test_accept_transfer_invalid_id_auth_key(self):
def test_accept_transfer_with_invalid_transfer(self):
def test_accept_transfer_with_VolumeSizeExceedsAvailableQuota(self):
def fake_transfer_api_accept_throwing_VolumeSizeExceedsAvailableQuota( cls, context, transfer, volume_id):
def test_accept_transfer_with_VolumeLimitExceeded(self):
def fake_transfer_api_accept_throwing_VolumeLimitExceeded(cls, context, transfer, volume_id):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_type_encryption.py
def return_volume_type_encryption(context, volume_type_id):
def stub_volume_type_encryption():
def volume_type_encryption_get(context, volume_type_id):
class VolumeTypeEncryptionTest(test.TestCase):
def setUp(self):
def _get_response(self, volume_type, admin=True, url='/v2/fake/types/%s/encryption', req_method='GET', req_body=None, req_headers=None):
def _create_type_and_encryption(self, volume_type, body=None):
def test_index(self):
def test_index_invalid_type(self):
def test_show_key_size(self):
def test_show_provider(self):
def test_show_item_not_found(self):
def _create(self, cipher, control_location, key_size, provider):
def test_create_json(self):
def test_create_xml(self):
def test_create_invalid_volume_type(self):
def test_create_encryption_type_exists(self):
def test_create_volume_exists(self):
def _encryption_create_bad_body(self, body, msg='Create body is not valid.'):
def test_create_no_body(self):
def test_create_malformed_entity(self):
def test_create_negative_key_size(self):
def test_create_none_key_size(self):
def test_create_invalid_control_location(self):
def test_create_no_provider(self):
def test_delete(self):
def test_delete_with_volume_in_use(self):
def test_update_item(self):
def _encryption_update_bad_body(self, update_body, msg):
def test_update_too_many_items(self):
def test_update_key_size_non_integer(self):
def test_update_item_invalid_body(self):
def _encryption_empty_update(self, update_body):
def test_update_no_body(self):
def test_update_empty_body(self):
def test_update_with_volume_in_use(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\test_volume_unmanage.py
def app():
def api_get(self, context, volume_id):
def db_snapshot_get_all_for_volume(context, volume_id):
class VolumeUnmanageTest(test.TestCase):
def setUp(self):
def _get_resp(self, volume_id):
def test_unmanage_volume_ok(self, mock_rpcapi, mock_db):
def test_unmanage_volume_bad_volume_id(self):
def test_unmanage_volume_attached_(self):
def test_unmanage_volume_with_snapshots(self):
\OpenStack\cinder-2014.1\cinder\tests\api\contrib\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\api\extensions\foxinsocks.py
class FoxInSocksController(object):
def index(self, req):
class FoxInSocksServerControllerExtension(wsgi.Controller):
def _add_tweedle(self, req, id, body):
def _delete_tweedle(self, req, id, body):
def _fail(self, req, id, body):
class FoxInSocksFlavorGooseControllerExtension(wsgi.Controller):
def show(self, req, resp_obj, id):
class FoxInSocksFlavorBandsControllerExtension(wsgi.Controller):
def show(self, req, resp_obj, id):
class Foxinsocks(extensions.ExtensionDescriptor):
def __init__(self, ext_mgr):
def get_resources(self):
def get_controller_extensions(self):
\OpenStack\cinder-2014.1\cinder\tests\api\extensions\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\api\fakes.py
class Context(object):
class FakeRouter(wsgi.Router):
def __init__(self, ext_mgr=None):
def __call__(self, req):
def fake_wsgi(self, req):
def wsgi_app(inner_app_v2=None, fake_auth=True, fake_auth_context=None, use_no_auth=False, ext_mgr=None):
def stub_out_key_pair_funcs(stubs, have_key_pair=True):
def key_pair(context, user_id):
def one_key_pair(context, user_id, name):
def no_key_pair(context, user_id):
class FakeToken(object):
def __getitem__(self, key):
def __init__(self, **kwargs):
class FakeRequestContext(context.RequestContext):
def __init__(self, *args, **kwargs):
class HTTPRequest(webob.Request):
def blank(cls, *args, **kwargs):
class TestRouter(wsgi.Router):
def __init__(self, controller):
class FakeAuthDatabase(object):
def auth_token_get(context, token_hash):
def auth_token_create(context, token):
def auth_token_destroy(context, token_id):
class FakeRateLimiter(object):
def __init__(self, application):
def __call__(self, req):
def get_fake_uuid(token=0):
\OpenStack\cinder-2014.1\cinder\tests\api\middleware\test_auth.py
class TestCinderKeystoneContextMiddleware(test.TestCase):
def setUp(self):
def fake_app(req):
def test_no_user_or_user_id(self):
def test_user_only(self):
def test_user_id_only(self):
def test_user_id_trumps_user(self):
def test_tenant_id_name(self):
def test_request_id_extracted_from_env(self):
\OpenStack\cinder-2014.1\cinder\tests\api\middleware\test_faults.py
class TestFaults(test.TestCase):
def _prepare_xml(self, xml_string):
def test_400_fault_json(self):
def test_413_fault_json(self):
def test_raise(self):
def raiser(req):
def test_raise_403(self):
def raiser(req):
def test_raise_http_with_localized_explanation(self):
def _mock_translation(msg, locale):
def raiser(req):
def test_raise_invalid_with_localized_explanation(self, mock_translation):
def translation(domain, localedir=None, languages=None, fallback=None):
def raiser(req):
def test_fault_has_status_int(self):
def test_xml_serializer(self):
class FaultsXMLSerializationTestV11(test.TestCase):
def _prepare_xml(self, xml_string):
def test_400_fault(self):
def test_413_fault(self):
def test_404_fault(self):
\OpenStack\cinder-2014.1\cinder\tests\api\middleware\test_sizelimit.py
class TestLimitingReader(test.TestCase):
def test_limiting_reader(self):
def test_limiting_reader_fails(self):
def _consume_all_iter():
def _consume_all_read():
class TestRequestBodySizeLimiter(test.TestCase):
def setUp(self):
def fake_app(req):
def test_content_length_acceptable(self):
def test_content_length_too_large(self):
def test_request_too_large_no_content_length(self):
\OpenStack\cinder-2014.1\cinder\tests\api\middleware\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\api\openstack\test_wsgi.py
class RequestTest(test.TestCase):
def test_content_type_missing(self):
def test_content_type_unsupported(self):
def test_content_type_with_charset(self):
def test_content_type_from_accept(self):
def test_content_type_from_accept_best(self):
def test_content_type_from_query_extension(self):
def test_content_type_accept_and_query_extension(self):
def test_content_type_accept_default(self):
def test_best_match_language(self):
def fake_best_match(self, offers, default_match=None):
def test_cache_and_retrieve_resources(self):
class ActionDispatcherTest(test.TestCase):
def test_dispatch(self):
def test_dispatch_action_None(self):
def test_dispatch_default(self):
class DictSerializerTest(test.TestCase):
def test_dispatch_default(self):
class XMLDictSerializerTest(test.TestCase):
def test_xml(self):
class JSONDictSerializerTest(test.TestCase):
def test_json(self):
class TextDeserializerTest(test.TestCase):
def test_dispatch_default(self):
class JSONDeserializerTest(test.TestCase):
def test_json(self):
class XMLDeserializerTest(test.TestCase):
def test_xml(self):
def test_xml_empty(self):
class MetadataXMLDeserializerTest(test.TestCase):
def test_xml_meta_parsing_special_character(self):
class ResourceTest(test.TestCase):
def test_resource_call(self):
def test_resource_not_authorized(self):
def test_dispatch(self):
def test_get_method_undefined_controller_action(self):
def test_get_method_action_json(self):
def test_get_method_action_xml(self):
def test_get_method_action_bad_body(self):
def test_get_method_unknown_controller_action(self):
def test_get_method_action_method(self):
def test_get_action_args(self):
def test_get_body_bad_content(self):
def test_get_body_no_content_type(self):
def test_get_body_no_content_body(self):
def test_get_body(self):
def test_deserialize_badtype(self):
def test_deserialize_default(self):
def test_deserialize_decorator(self):
def test_register_actions(self):
def test_register_extensions(self):
def test_get_method_extensions(self):
def test_get_method_action_extensions(self):
def test_get_method_action_whitelist_extensions(self):
def test_pre_process_extensions_regular(self):
def extension1(req, resp_obj):
def extension2(req, resp_obj):
def test_pre_process_extensions_generator(self):
def extension1(req):
def extension2(req):
def test_pre_process_extensions_generator_response(self):
def extension1(req):
def extension2(req):
def test_post_process_extensions_regular(self):
def extension1(req, resp_obj):
def extension2(req, resp_obj):
def test_post_process_extensions_regular_response(self):
def extension1(req, resp_obj):
def extension2(req, resp_obj):
def test_post_process_extensions_generator(self):
def extension1(req):
def extension2(req):
def test_post_process_extensions_generator_response(self):
def extension1(req):
def extension2(req):
class ResponseObjectTest(test.TestCase):
def test_default_code(self):
def test_modified_code(self):
def test_override_default_code(self):
def test_override_modified_code(self):
def test_set_header(self):
def test_get_header(self):
def test_del_header(self):
def test_header_isolation(self):
def test_default_serializers(self):
def test_bind_serializers(self):
def test_get_serializer(self):
def test_get_serializer_defaults(self):
def test_serialize(self):
class ValidBodyTest(test.TestCase):
def setUp(self):
def test_is_valid_body(self):
def test_is_valid_body_none(self):
def test_is_valid_body_empty(self):
def test_is_valid_body_no_entity(self):
def test_is_valid_body_malformed_entity(self):
\OpenStack\cinder-2014.1\cinder\tests\api\openstack\__init__.py
\OpenStack\cinder-2014.1\cinder\tests\api\test_common.py
class LimiterTest(test.TestCase):
def setUp(self):
def test_limiter_offset_zero(self):
def test_limiter_offset_medium(self):
def test_limiter_offset_over_max(self):
def test_limiter_offset_blank(self):
def test_limiter_offset_bad(self):
def test_limiter_nothing(self):
def test_limiter_limit_zero(self):
def test_limiter_limit_bad(self):
def test_limiter_limit_medium(self):
def test_limiter_limit_over_max(self):
def test_limiter_limit_and_offset(self):
def test_limiter_custom_max_limit(self):
def test_limiter_negative_limit(self):
def test_limiter_negative_offset(self):
class PaginationParamsTest(test.TestCase):
def test_nonnumerical_limit(self):
def test_no_params(self):
def test_valid_marker(self):
def test_valid_limit(self):
def test_invalid_limit(self):
def test_valid_limit_and_marker(self):
class MiscFunctionsTest(test.TestCase):
def test_remove_major_version_from_href(self):
def test_remove_version_from_href(self):
def test_remove_version_from_href_2(self):
def test_remove_version_from_href_3(self):
def test_remove_version_from_href_4(self):
def test_remove_version_from_href_bad_request(self):
def test_remove_version_from_href_bad_request_2(self):
def test_remove_version_from_href_bad_request_3(self):
\OpenStack\cinder-2014.1\cinder\tests\api\test_extensions.py
class ExtensionTestCase(test.TestCase):
def setUp(self):
class ExtensionControllerTest(ExtensionTestCase):
def setUp(self):
def test_list_extensions_json(self):
def test_get_extension_json(self):
def test_get_non_existing_extension_json(self):
def test_list_extensions_xml(self):
def test_get_extension_xml(self):
\OpenStack\cinder-2014.1\cinder\tests\api\test_router.py
class FakeController(object):
def __init__(self, ext_mgr=None):
def index(self, req):
def detail(self, req):
def create_resource(ext_mgr):
class VolumeRouterTestCase(test.TestCase):
def setUp(self):
def test_versions(self):
def test_versions_action_args_index(self):
def test_versions_action_args_multi(self):
def test_versions_get_most_recent_update(self):
def test_versions_create_version_entry(self):
def test_versions_create_feed(self):
def test_versions_multi(self):
def test_versions_multi_disable_v1(self):
def test_versions_multi_disable_v2(self):
def test_versions_index(self):
def test_versions_index_disable_v1(self):
def test_versions_index_disable_v2(self):
def test_volumes(self):
def test_volumes_detail(self):
def test_types(self):
def test_snapshots(self):
def test_snapshots_detail(self):
\OpenStack\cinder-2014.1\cinder\tests\api\test_wsgi.py
class Test(test.TestCase):
def test_debug(self):
def test_router(self):
\OpenStack\cinder-2014.1\cinder\tests\api\test_xmlutil.py
class SelectorTest(test.TestCase):
def test_empty_selector(self):
def test_dict_selector(self):
def test_datum_selector(self):
def test_list_selector(self):
def test_items_selector(self):
def test_missing_key_selector(self):
def test_constant_selector(self):
class TemplateElementTest(test.TestCase):
def test_element_initial_attributes(self):
def test_element_get_attributes(self):
def test_element_set_attributes(self):
def test_element_attribute_keys(self):
def test_element_attribute_items(self):
def test_element_selector_none(self):
def test_element_selector_string(self):
def test_element_selector(self):
def test_element_subselector_none(self):
def test_element_subselector_string(self):
OpenStack IndexPreviousNext
|