OpenStack Study: nova
OpenStack IndexPreviousNext
def do_test(_process_instance_event, get_instance_nw_info):
def test_retry_reboot_pending_soft(self):
def test_retry_reboot_pending_hard(self):
def test_retry_reboot_starting_soft_off(self):
def test_retry_reboot_starting_hard_off(self):
def test_retry_reboot_starting_hard_on(self):
def test_retry_reboot_no_reboot(self):
class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
def setUp(self):
def fake_network_info():
def _do_build_instance_update(self, reschedule_update=False):
def _build_and_run_instance_update(self):
def _build_resources_instance_update(self, stub=True):
def _notify_about_instance_usage(self, event, stub=True, **kwargs):
def test_build_and_run_instance_called_with_proper_args(self):
def test_build_abort_exception(self):
def test_rescheduled_exception(self):
def test_rescheduled_exception_do_not_deallocate_network(self):
def test_rescheduled_exception_deallocate_network_if_dhcp(self):
def test_unexpected_exception(self):
def test_instance_not_found(self):
def test_reschedule_on_exception(self):
def test_spawn_network_alloc_failure(self):
def test_spawn_waits_for_network_and_saves_info_cache(self, gps):
def test_reschedule_on_resources_unavailable(self):
def test_build_resources_buildabort_reraise(self):
def test_build_resources_reraises_on_failed_bdm_prep(self):
def test_failed_bdm_prep_from_delete_raises_unexpected(self):
def test_build_resources_aborts_on_failed_network_alloc(self):
def test_failed_network_alloc_from_delete_raises_unexpected(self):
def test_build_resources_cleans_up_and_reraises_on_spawn_failure(self):
def fake_spawn():
def test_build_resources_aborts_on_cleanup_failure(self):
def fake_spawn():
def test_cleanup_cleans_volumes(self):
def test_cleanup_reraises_volume_cleanup_failure(self):
def test_build_networks_if_not_allocated(self):
def test_build_networks_if_allocated_false(self):
def test_return_networks_if_found(self):
def fake_network_info():
class ComputeManagerMigrationTestCase(test.NoDBTestCase):
def setUp(self):
def test_finish_resize_failure(self):
def test_resize_instance_failure(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_compute_utils.py
class ComputeValidateDeviceTestCase(test.TestCase):
def setUp(self):
def _update_flavor(self, flavor_info):
def _validate_device(self, device=None):
def _fake_bdm(device):
def test_wrap(self):
def test_wrap_plus_one(self):
def test_later(self):
def test_gap(self):
def test_no_bdms(self):
def test_lxc_names_work(self):
def test_name_conversion(self):
def test_invalid_bdms(self):
def test_invalid_device_prefix(self):
def test_device_in_use(self):
def test_swap(self):
def test_swap_no_ephemeral(self):
def test_ephemeral_xenapi(self):
def test_swap_xenapi(self):
def test_swap_and_ephemeral_xenapi(self):
def test_swap_and_one_attachment_xenapi(self):
class DefaultDeviceNamesForInstanceTestCase(test.NoDBTestCase):
def setUp(self):
def fake_extract_flavor(instance):
def fake_driver_matches(driver_string):
def tearDown(self):
def _test_default_device_names(self, *block_device_lists):
def test_only_block_device_mapping(self):
def test_with_ephemerals(self):
def test_with_swap(self):
def test_all_together(self):
class UsageInfoTestCase(test.TestCase):
def setUp(self):
def fake_get_nw_info(cls, ctxt, instance):
def fake_show(meh, context, id):
def _create_instance(self, params={}):
def test_notify_usage_exists(self):
def test_notify_usage_exists_deleted_instance(self):
def test_notify_usage_exists_instance_not_found(self):
def test_notify_about_instance_usage(self):
def test_notify_about_aggregate_update_with_id(self):
def test_notify_about_aggregate_update_with_name(self):
def test_notify_about_aggregate_update_without_name_id(self):
class ComputeGetImageMetadataTestCase(test.TestCase):
def setUp(self):
def instance_obj(self):
def _fake_show(self, ctx, image_id):
def test_get_image_meta(self):
def test_get_image_meta_no_image(self):
def fake_show(ctx, image_id):
def test_get_image_meta_no_image_system_meta(self):
def test_get_image_meta_no_image_no_image_system_meta(self):
def fake_show(ctx, image_id):
class ComputeUtilsGetNWInfo(test.TestCase):
def test_instance_object_none_info_cache(self):
def test_instance_dict_none_info_cache(self):
class ComputeUtilsGetRebootTypes(test.TestCase):
def setUp(self):
def test_get_reboot_type_started_soft(self):
def test_get_reboot_type_pending_soft(self):
def test_get_reboot_type_hard(self):
def test_get_reboot_not_running_hard(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_compute_xen.py
class ComputeXenTestCase(stubs.XenAPITestBaseNoDB):
def setUp(self):
def test_sync_power_states_instance_not_found(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_host_api.py
class ComputeHostAPITestCase(test.TestCase):
def setUp(self):
def _compare_obj(self, obj, db_obj):
def _compare_objs(self, obj_list, db_obj_list):
def _mock_rpc_call(self, method, **kwargs):
def _mock_assert_host_exists(self):
def fake_assert_host_exists(context, host_name, must_be_up=False):
def test_set_host_enabled(self):
def test_host_name_from_assert_hosts_exists(self):
def test_get_host_uptime(self):
def test_get_host_uptime_service_down(self):
def fake_service_get_by_compute_host(context, host_name):
def fake_service_is_up(service):
def test_host_power_action(self):
def test_set_host_maintenance(self):
def test_service_get_all_no_zones(self):
def test_service_get_all(self):
def test_service_get_by_compute_host(self):
def test_service_update(self):
def test_instance_get_all_by_host(self):
def test_task_log_get_all(self):
def test_service_delete(self):
class ComputeHostAPICellsTestCase(ComputeHostAPITestCase):
def setUp(self):
def _mock_rpc_call(self, method, **kwargs):
def test_service_get_all_no_zones(self):
def _test_service_get_all(self, fake_filters, **kwargs):
def test_service_get_all(self):
def test_service_get_all_set_zones(self):
def test_service_get_by_compute_host(self):
def test_service_update(self):
def test_service_delete(self):
def test_instance_get_all_by_host(self):
def test_task_log_get_all(self):
def test_get_host_uptime_service_down(self):
def test_get_host_uptime(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_keypairs.py
class KeypairAPITestCase(test_compute.BaseTestCase):
def setUp(self):
def _keypair_db_call_stubs(self):
def db_key_pair_get_all_by_user(context, user_id):
def db_key_pair_create(context, keypair):
def db_key_pair_destroy(context, user_id, name):
def db_key_pair_get(context, user_id, name):
def _check_notifications(self, action='create', key_name='foo'):
class CreateImportSharedTestMixIn(object):
def assertKeyNameRaises(self, exc_class, expected_message, name):
def assertInvalidKeypair(self, expected_message, name):
def test_name_too_short(self):
def test_name_too_long(self):
def test_invalid_chars(self):
def test_already_exists(self):
def db_key_pair_create_duplicate(context, keypair):
def test_quota_limit(self):
def fake_quotas_count(self, context, resource, *args, **kwargs):
class CreateKeypairTestCase(KeypairAPITestCase, CreateImportSharedTestMixIn):
def test_success(self):
class ImportKeypairTestCase(KeypairAPITestCase, CreateImportSharedTestMixIn):
def test_success(self):
def test_bad_key_data(self):
class GetKeypairTestCase(KeypairAPITestCase):
def test_success(self):
class GetKeypairsTestCase(KeypairAPITestCase):
def test_success(self):
class DeleteKeypairTestCase(KeypairAPITestCase):
def test_success(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_multiple_nodes.py
class BaseTestCase(test.TestCase):
def tearDown(self):
class FakeDriverSingleNodeTestCase(BaseTestCase):
def setUp(self):
def test_get_host_stats(self):
def test_get_available_resource(self):
class FakeDriverMultiNodeTestCase(BaseTestCase):
def setUp(self):
def test_get_host_stats(self):
def test_get_available_resource(self):
class MultiNodeComputeTestCase(BaseTestCase):
def setUp(self):
def fake_get_compute_nodes_in_db(context):
def fake_compute_node_delete(context, compute_node):
def test_update_available_resource_add_remove_node(self):
def test_compute_manager_removes_deleted_node(self):
def fake_get_compute_nodes_in_db(context):
def fake_compute_node_delete(context, compute_node):
\OpenStack\nova-2014.1\nova\tests\compute\test_resource_tracker.py
class UnsupportedVirtDriver(driver.ComputeDriver):
def __init__(self):
def get_host_ip_addr(self):
def get_available_resource(self, nodename):
class FakeVirtDriver(driver.ComputeDriver):
def __init__(self, pci_support=False):
def get_host_ip_addr(self):
def get_available_resource(self, nodename):
def estimate_instance_overhead(self, instance_info):
class BaseTestCase(test.TestCase):
def setUp(self):
def _create_compute_node(self, values=None):
def _create_service(self, host="fakehost", compute=None):
def _fake_instance_system_metadata(self, instance_type, prefix=''):
def _fake_instance(self, stash=True, **kwargs):
def _fake_flavor_create(self, **kwargs):
def _fake_instance_get_all_by_host_and_node(self, context, host, nodename):
def _fake_flavor_get(self, ctxt, id_):
def _fake_instance_update_and_get_original(self, context, instance_uuid, values):
def _driver(self):
def _tracker(self, host=None):
class UnsupportedDriverTestCase(BaseTestCase):
def setUp(self):
def _driver(self):
def test_disabled(self):
def test_disabled_claim(self):
def test_disabled_instance_claim(self):
def test_disabled_instance_context_claim(self):
def test_disabled_updated_usage(self):
def test_disabled_resize_claim(self):
def test_disabled_resize_context_claim(self):
class MissingServiceTestCase(BaseTestCase):
def setUp(self):
def test_missing_service(self):
class MissingComputeNodeTestCase(BaseTestCase):
def setUp(self):
def _fake_create_compute_node(self, context, values):
def _fake_service_get_by_compute_host(self, ctx, host):
def test_create_compute_node(self):
def test_enabled(self):
class BaseTrackerTestCase(BaseTestCase):
def setUp(self):
def _fake_service_get_by_compute_host(self, ctx, host):
def _fake_compute_node_update(self, ctx, compute_node_id, values, prune_stats=False):
def _fake_compute_node_delete(self, ctx, compute_node_id):
def _fake_migration_get_in_progress_by_host_and_node(self, ctxt, host, node):
def _fake_migration_update(self, ctxt, migration_id, values):
def _limits(self, memory_mb=FAKE_VIRT_MEMORY_MB + FAKE_VIRT_MEMORY_OVERHEAD, disk_gb=FAKE_VIRT_LOCAL_GB, vcpus=FAKE_VIRT_VCPUS):
def _assert(self, value, field, tracker=None):
class TrackerTestCase(BaseTrackerTestCase):
def test_free_ram_resource_value(self):
def test_free_disk_resource_value(self):
def test_update_compute_node(self):
def test_init(self):
class TrackerPciStatsTestCase(BaseTrackerTestCase):
def test_update_compute_node(self):
def test_init(self):
def _driver(self):
class InstanceClaimTestCase(BaseTrackerTestCase):
def test_update_usage_only_for_tracked(self):
def test_claim_and_audit(self):
def test_claim_and_abort(self):
def test_instance_claim_with_oversubscription(self):
def test_additive_claims(self):
def test_context_claim_with_exception(self):
def test_instance_context_claim(self):
def test_update_load_stats_for_instance(self):
def test_cpu_stats(self):
def test_skip_deleted_instances(self):
class ResizeClaimTestCase(BaseTrackerTestCase):
def setUp(self):
def _fake_migration_create(mig_self, ctxt):
def _fake_migration_create(self, context, values=None):
def test_claim(self):
def test_abort(self):
def test_additive_claims(self):
def test_claim_and_audit(self):
def test_same_host(self):
def test_revert(self):
def test_revert_reserve_source(self):
def test_resize_filter(self):
def test_dupe_filter(self):
def test_set_instance_host_and_node(self):
class NoInstanceTypesInSysMetadata(ResizeClaimTestCase):
def setUp(self):
class OrphanTestCase(BaseTrackerTestCase):
def _driver(self):
def test_usage(self):
def test_find(self):
class ComputeMonitorTestCase(BaseTestCase):
def setUp(self):
def test_get_host_metrics_none(self):
def test_get_host_metrics_one_failed(self):
def test_get_host_metrics(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_rpcapi.py
class ComputeRpcAPITestCase(test.TestCase):
def setUp(self):
def test_serialized_instance_has_name(self):
def _test_compute_api(self, method, rpc_method, **kwargs):
def test_add_aggregate_host(self):
def test_add_fixed_ip_to_instance(self):
def test_attach_interface(self):
def test_attach_volume(self):
def test_change_instance_metadata(self):
def test_check_can_live_migrate_destination(self):
def test_check_can_live_migrate_source(self):
def test_check_instance_shared_storage(self):
def test_confirm_resize_cast(self):
def test_confirm_resize_call(self):
def test_detach_interface(self):
def test_detach_volume(self):
def test_finish_resize(self):
def test_finish_revert_resize(self):
def test_get_console_output(self):
def test_get_console_pool_info(self):
def test_get_console_topic(self):
def test_get_diagnostics(self):
def test_get_vnc_console(self):
def test_get_spice_console(self):
def test_get_rdp_console(self):
def test_validate_console_port(self):
def test_host_maintenance_mode(self):
def test_host_power_action(self):
def test_inject_network_info(self):
def test_live_migration(self):
def test_post_live_migration_at_destination(self):
def test_pause_instance(self):
def test_soft_delete_instance(self):
def test_swap_volume(self):
def test_restore_instance(self):
def test_pre_live_migration(self):
def test_prep_resize(self):
def test_reboot_instance(self):
def test_rebuild_instance(self):
def test_rebuild_instance_preserve_ephemeral(self):
def test_reserve_block_device_name(self):
def refresh_provider_fw_rules(self):
def test_refresh_security_group_rules(self):
def test_refresh_security_group_members(self):
def test_remove_aggregate_host(self):
def test_remove_fixed_ip_from_instance(self):
def test_remove_volume_connection(self):
def test_rescue_instance(self):
def test_reset_network(self):
def test_resize_instance(self):
def test_resume_instance(self):
def test_revert_resize(self):
def test_rollback_live_migration_at_destination(self):
def test_run_instance(self):
def test_set_admin_password(self):
def test_set_host_enabled(self):
def test_get_host_uptime(self):
def test_backup_instance(self):
def test_snapshot_instance(self):
def test_start_instance(self):
def test_stop_instance_cast(self):
def test_stop_instance_call(self):
def test_suspend_instance(self):
def test_terminate_instance(self):
def test_unpause_instance(self):
def test_unrescue_instance(self):
def test_shelve_instance(self):
def test_shelve_offload_instance(self):
def test_unshelve_instance(self):
def test_volume_snapshot_create(self):
def test_volume_snapshot_delete(self):
def test_external_instance_event(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_shelve.py
def _fake_resources():
class ShelveComputeManagerTestCase(test_compute.BaseTestCase):
def _shelve_instance(self, shelved_offload_time):
def test_shelve(self):
def test_shelve_offload(self):
def test_shelve_volume_backed(self):
def test_unshelve(self):
def fake_delete(self2, ctxt, image_id):
def test_unshelve_volume_backed(self):
def test_shelved_poll_none_exist(self):
def test_shelved_poll_not_timedout(self):
def test_shelved_poll_timedout(self):
def fake_destroy(inst, nw_info, bdm):
class ShelveComputeAPITestCase(test_compute.BaseTestCase):
def test_shelve(self):
def fake_init(self2):
def fake_create(self2, ctxt, metadata):
def test_unshelve(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_stats.py
class StatsTestCase(test.NoDBTestCase):
def setUp(self):
def _create_instance(self, values=None):
def test_os_type_count(self):
def test_update_project_count(self):
def _get():
def test_instance_count(self):
def test_add_stats_for_instance(self):
def test_calculate_workload(self):
def test_update_stats_for_instance_no_change(self):
def test_update_stats_for_instance_vm_change(self):
def test_update_stats_for_instance_task_change(self):
def test_update_stats_for_instance_deleted(self):
def test_io_workload(self):
def test_io_workload_saved_to_stats(self):
def test_clear(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_virtapi.py
class VirtAPIBaseTest(test.NoDBTestCase, test.APICoverage):
def setUp(self):
def set_up_virtapi(self):
def assertExpected(self, method, *args, **kwargs):
def test_instance_update(self):
def test_provider_fw_rule_get_all(self):
def test_agent_build_get_by_triple(self):
def test_wait_for_instance_event(self):
class FakeVirtAPITest(VirtAPIBaseTest):
def set_up_virtapi(self):
def assertExpected(self, method, *args, **kwargs):
class FakeCompute(object):
def __init__(self):
def _instance_update(self, context, instance_uuid, **kwargs):
def _event_waiter(self):
def _prepare_for_instance_event(self, instance, event_name):
class ComputeVirtAPITest(VirtAPIBaseTest):
def set_up_virtapi(self):
def assertExpected(self, method, *args, **kwargs):
def test_wait_for_instance_event(self):
def test_wait_for_instance_event_failed(self):
def _failer():
def do_test():
def test_wait_for_instance_event_failed_callback(self):
def _failer():
def do_test():
def test_wait_for_instance_event_timeout(self):
def _failer():
def do_test(timeout):
\OpenStack\nova-2014.1\nova\tests\compute\test_vmmode.py
class ComputeVMModeTest(test.NoDBTestCase):
def test_case(self):
def test_legacy_pv(self):
def test_legacy_hv(self):
def test_bogus(self):
def test_good(self):
def test_name_pv_compat(self):
def test_name_hv_compat(self):
def test_name_hvm(self):
def test_name_none(self):
def test_name_invalid(self):
\OpenStack\nova-2014.1\nova\tests\compute\__init__.py
\OpenStack\nova-2014.1\nova\tests\conductor\tasks\test_live_migrate.py
class LiveMigrationTaskTestCase(test.NoDBTestCase):
def setUp(self):
def _generate_task(self):
def test_execute_with_destination(self):
def test_execute_without_destination(self):
def test_check_instance_is_running_passes(self):
def test_check_instance_is_running_fails_when_shutdown(self):
def test_check_instance_host_is_up(self):
def test_check_instance_host_is_up_fails_if_not_up(self):
def test_check_instance_host_is_up_fails_if_not_found(self):
def test_check_requested_destination(self):
def test_check_requested_destination_fails_with_same_dest(self):
def test_check_requested_destination_fails_when_destination_is_up(self):
def test_check_requested_destination_fails_with_not_enough_memory(self):
def test_check_requested_destination_fails_with_hypervisor_diff(self):
def test_check_requested_destination_fails_with_hypervisor_too_old(self):
def test_find_destination_works(self):
def test_find_destination_no_image_works(self):
def _test_find_destination_retry_hypervisor_raises(self, error):
def test_find_destination_retry_with_old_hypervisor(self):
def test_find_destination_retry_with_invalid_hypervisor_type(self):
def test_find_destination_retry_with_invalid_livem_checks(self):
def test_find_destination_retry_exceeds_max(self):
def test_find_destination_when_runs_out_of_hosts(self):
def test_not_implemented_rollback(self):
\OpenStack\nova-2014.1\nova\tests\conductor\tasks\__init__.py
\OpenStack\nova-2014.1\nova\tests\conductor\test_conductor.py
class FakeContext(context.RequestContext):
def elevated(self):
class _BaseTestCase(object):
def setUp(self):
def fake_deserialize_context(serializer, ctxt_dict):
def _create_fake_instance(self, params=None, type_name='m1.tiny'):
def _do_update(self, instance_uuid, **updates):
def test_instance_update(self):
def test_action_event_start(self):
def test_action_event_finish(self):
def test_instance_update_invalid_key(self):
def test_migration_get_in_progress_by_host_and_node(self):
def test_instance_get_by_uuid(self):
def _setup_aggregate_with_host(self):
def test_aggregate_host_add(self):
def test_aggregate_host_delete(self):
def test_aggregate_metadata_get_by_host(self):
def test_bw_usage_update(self):
def test_provider_fw_rule_get_all(self):
def test_agent_build_get_by_triple(self):
def test_block_device_mapping_get_all_by_instance(self):
def test_instance_get_active_by_window_joined(self):
def test_instance_destroy(self):
def test_instance_info_cache_delete(self):
def test_vol_get_usage_by_time(self):
def test_vol_usage_update(self):
def test_compute_node_create(self):
def test_compute_node_update(self):
def test_compute_node_update_with_non_json_stats(self):
def test_compute_node_delete(self):
def test_instance_fault_create(self):
def test_task_log_get(self):
def test_task_log_get_with_no_state(self):
def test_task_log_begin_task(self):
def test_task_log_end_task(self):
def test_notify_usage_exists(self):
def test_security_groups_trigger_members_refresh(self):
def test_network_migrate_instance_start(self):
def test_network_migrate_instance_finish(self):
def test_quota_commit(self):
def test_quota_rollback(self):
def test_get_ec2_ids(self):
def test_compute_unrescue(self):
class ConductorTestCase(_BaseTestCase, test.TestCase):
def setUp(self):
def test_instance_info_cache_update(self):
def test_migration_get(self):
def test_migration_get_unconfirmed_by_dest_compute(self):
def test_compute_confirm_resize(self):
def test_migration_create(self):
def test_block_device_mapping_update_or_create(self):
def test_instance_get_all_by_filters(self):
def test_instance_get_all_by_filters_use_slave(self):
def test_instance_get_all_by_host(self):
def _test_stubbed(self, name, dbargs, condargs, db_result_listified=False, db_exception=None):
def test_service_get_all(self):
def test_service_get_by_host_and_topic(self):
def test_service_get_all_by_topic(self):
def test_service_get_all_by_host(self):
def test_service_get_by_compute_host(self):
def test_service_get_by_args(self):
def test_service_get_by_compute_host_not_found(self):
def test_service_get_by_args_not_found(self):
def test_security_groups_trigger_handler(self):
def test_compute_confirm_resize_with_objects(self):
def _test_object_action(self, is_classmethod, raise_exception):
def test_object_action(self):
def test_object_action_on_raise(self):
def test_object_class_action(self):
def test_object_class_action_on_raise(self):
def test_object_action_copies_object(self):
def test_aggregate_metadata_add(self):
def test_aggregate_metadata_delete(self):
def test_security_group_get_by_instance(self):
def test_security_group_rule_get_by_security_group(self):
def _test_action_event_expected_exceptions(self, db_method, conductor_method, error):
def test_action_event_start_expected_exceptions(self):
def test_action_event_finish_expected_exceptions(self):
class ConductorRPCAPITestCase(_BaseTestCase, test.TestCase):
def setUp(self):
def test_block_device_mapping_update_or_create(self):
def test_instance_get_all_by_filters(self):
def test_instance_get_all_by_filters_use_slave(self):
def _test_stubbed(self, name, dbargs, condargs, db_result_listified=False, db_exception=None):
def test_service_get_all(self):
def test_service_get_by_host_and_topic(self):
def test_service_get_all_by_topic(self):
def test_service_get_all_by_host(self):
def test_service_get_by_compute_host(self):
def test_service_get_by_args(self):
def test_service_get_by_compute_host_not_found(self):
def test_service_get_by_args_not_found(self):
def test_security_groups_trigger_handler(self):
class ConductorAPITestCase(_BaseTestCase, test.TestCase):
def setUp(self):
def _do_update(self, instance_uuid, **updates):
def test_bw_usage_get(self):
def test_block_device_mapping_update_or_create(self):
def _test_stubbed(self, name, *args, **kwargs):
def test_service_get_all(self):
def test_service_get_by_host_and_topic(self):
def test_service_get_all_by_topic(self):
def test_service_get_all_by_host(self):
def test_service_get_by_compute_host(self):
def test_service_get_by_args(self):
def test_service_get_by_compute_host_not_found(self):
def test_service_get_by_args_not_found(self):
def test_service_create(self):
def test_service_destroy(self):
def test_service_update(self):
def test_instance_get_all_by_host_and_node(self):
def test_instance_get_all_by_host(self):
def test_wait_until_ready(self):
def fake_ping(context, message, timeout):
def test_security_groups_trigger_handler(self):
class ConductorLocalAPITestCase(ConductorAPITestCase):
def setUp(self):
def test_client_exceptions(self):
def test_wait_until_ready(self):
class ConductorImportTest(test.TestCase):
def test_import_conductor_local(self):
def test_import_conductor_rpc(self):
def test_import_conductor_override_to_local(self):
class ConductorPolicyTest(test.TestCase):
def test_all_allowed_keys(self):
def fake_db_instance_update(self, *args, **kwargs):
def test_allowed_keys_are_real(self):
class _BaseTaskTestCase(object):
def setUp(self):
def fake_deserialize_context(serializer, ctxt_dict):
def test_live_migrate(self):
def test_cold_migrate(self):
def test_build_instances(self):
def test_unshelve_instance_on_host(self):
def test_unshelve_instance_schedule_and_rebuild(self):
def test_unshelve_instance_schedule_and_rebuild_novalid_host(self):
def fake_schedule_instances(context, image, filter_properties, *instances):
def test_unshelve_instance_schedule_and_rebuild_volume_backed(self):
class ConductorTaskTestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
def setUp(self):
def test_migrate_server_fails_with_rebuild(self):
def test_migrate_server_fails_with_flavor(self):
def _build_request_spec(self, instance):
def test_migrate_server_deals_with_expected_exceptions(self):
def test_migrate_server_deals_with_unexpected_exceptions(self):
def test_set_vm_state_and_notify(self):
def test_cold_migrate_no_valid_host_back_in_active_state(self):
def test_cold_migrate_no_valid_host_back_in_stopped_state(self):
def test_cold_migrate_exception_host_in_error_state_and_raise(self):
class ConductorTaskRPCAPITestCase(_BaseTaskTestCase,
test_compute.BaseTestCase):
def setUp(self):
class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
def setUp(self):
class ConductorLocalComputeTaskAPITestCase(ConductorTaskAPITestCase):
def setUp(self):
class ConductorV2ManagerProxyTestCase(test.NoDBTestCase):
def test_v2_manager_proxy(self):
\OpenStack\nova-2014.1\nova\tests\conductor\__init__.py
\OpenStack\nova-2014.1\nova\tests\conf_fixture.py
class ConfFixture(config_fixture.Config):
def setUp(self):
\OpenStack\nova-2014.1\nova\tests\console\test_console.py
class ConsoleTestCase(test.TestCase):
def setUp(self):
def _create_instance(self):
def test_get_pool_for_instance_host(self):
def test_get_pool_creates_new_pool_if_needed(self):
def test_get_pool_does_not_create_new_pool_if_exists(self):
def test_add_console(self):
def test_add_console_does_not_duplicate(self):
def test_remove_console(self):
class ConsoleAPITestCase(test.TestCase):
def setUp(self):
def _fake_db_console_get(_ctxt, _console_uuid, _instance_uuid):
def _fake_db_console_get_all_by_instance(_ctxt, _instance_uuid, columns_to_join):
def _fake_instance_get_by_uuid(_ctxt, _instance_uuid):
def test_get_consoles(self):
def test_get_console(self):
def test_delete_console(self):
def test_create_console(self):
\OpenStack\nova-2014.1\nova\tests\console\test_rpcapi.py
class ConsoleRpcAPITestCase(test.NoDBTestCase):
def _test_console_api(self, method, rpc_method, **kwargs):
def test_add_console(self):
def test_remove_console(self):
\OpenStack\nova-2014.1\nova\tests\console\__init__.py
\OpenStack\nova-2014.1\nova\tests\consoleauth\test_consoleauth.py
class ConsoleauthTestCase(test.TestCase):
def setUp(self):
def test_tokens_expire(self):
def _stub_validate_console_port(self, result):
def fake_validate_console_port(ctxt, instance, port, console_type):
def test_multiple_tokens_for_instance(self):
def test_delete_tokens_for_instance(self):
def test_wrong_token_has_port(self):
def test_delete_expired_tokens(self):
class ControlauthMemcacheEncodingTestCase(test.TestCase):
def setUp(self):
def test_authorize_console_encoding(self):
def test_check_token_encoding(self):
def test_delete_tokens_for_instance_encoding(self):
class CellsConsoleauthTestCase(ConsoleauthTestCase):
def setUp(self):
def _stub_validate_console_port(self, result):
def fake_validate_console_port(ctxt, instance_uuid, console_port, console_type):
\OpenStack\nova-2014.1\nova\tests\consoleauth\test_rpcapi.py
class ConsoleAuthRpcAPITestCase(test.NoDBTestCase):
def _test_consoleauth_api(self, method, **kwargs):
def test_authorize_console(self):
def test_check_token(self):
def test_delete_tokens_for_instnace(self):
\OpenStack\nova-2014.1\nova\tests\consoleauth\__init__.py
\OpenStack\nova-2014.1\nova\tests\db\fakes.py
class FakeModel(object):
def __init__(self, values):
def __getattr__(self, name):
def __getitem__(self, key):
def __repr__(self):
def get(self, name):
def stub_out(stubs, funcs):
def stub_out_db_network_api(stubs):
def fake_floating_ip_allocate_address(context, project_id, pool, auto_assigned=False):
def fake_floating_ip_deallocate(context, address):
def fake_floating_ip_disassociate(context, address):
def fake_floating_ip_fixed_ip_associate(context, floating_address, fixed_address, host):
def fake_floating_ip_get_all_by_host(context, host):
def fake_floating_ip_get_by_address(context, address):
def fake_floating_ip_set_auto_assigned(contex, address):
def fake_fixed_ip_associate(context, address, instance_id):
def fake_fixed_ip_associate_pool(context, network_id, instance_id):
def fake_fixed_ip_create(context, values):
def fake_fixed_ip_disassociate(context, address):
def fake_fixed_ip_disassociate_all_by_timeout(context, host, time):
def fake_fixed_ip_get_all(context):
def fake_fixed_ip_get_by_instance(context, instance_uuid):
def fake_fixed_ip_get_by_address(context, address):
def fake_fixed_ip_update(context, address, values):
def fake_flavor_get(context, id):
def fake_virtual_interface_create(context, values):
def fake_virtual_interface_delete_by_instance(context, instance_id):
def fake_virtual_interface_get_by_instance(context, instance_id):
def fake_virtual_interface_get_by_instance_and_network(context, instance_id, network_id):
def fake_network_create_safe(context, values):
def fake_network_get(context, network_id):
def fake_network_get_all(context):
def fake_network_get_all_by_host(context, host):
def fake_network_set_host(context, network_id, host_id):
def fake_network_update(context, network_id, values):
def fake_project_get_networks(context, project_id):
def stub_out_db_instance_api(stubs, injected=True):
def fake_flavor_get_all(context, inactive=0, filters=None):
def fake_flavor_get_by_name(context, name):
def fake_flavor_get(context, id):
def fake_fixed_ip_get_by_instance(context, instance_id):
\OpenStack\nova-2014.1\nova\tests\db\test_db_api.py
def _reservation_get(context, uuid):
def _quota_reserve(context, project_id, user_id):
def get_sync(resource, usage):
class DbTestCase(test.TestCase):
def setUp(self):
def create_instance_with_args(self, **kwargs):
def fake_metadata(self, content):
def create_metadata_for_instance(self, instance_uuid):
class DecoratorTestCase(test.TestCase):
def _test_decorator_wraps_helper(self, decorator):
def test_func():
def test_require_context_decorator_wraps_functions_properly(self):
def test_require_admin_context_decorator_wraps_functions_properly(self):
def test_require_deadlock_retry_wraps_functions_properly(self):
def _get_fake_aggr_values():
def _get_fake_aggr_metadata():
def _get_fake_aggr_hosts():
def _create_aggregate(context=context.get_admin_context(), values=_get_fake_aggr_values(), metadata=_get_fake_aggr_metadata()):
def _create_aggregate_with_hosts(context=context.get_admin_context(), values=_get_fake_aggr_values(), metadata=_get_fake_aggr_metadata(), hosts=_get_fake_aggr_hosts()):
class NotDbApiTestCase(DbTestCase):
def setUp(self):
def test_instance_get_all_by_filters_regex_unsupported_db(self):
def test_instance_get_all_by_filters_paginate(self):
def test_convert_objects_related_datetimes(self):
class AggregateDBApiTestCase(test.TestCase):
def setUp(self):
def test_aggregate_create_no_metadata(self):
def test_aggregate_create_avoid_name_conflict(self):
def test_aggregate_create_raise_exist_exc(self):
def test_aggregate_get_raise_not_found(self):
def test_aggregate_metadata_get_raise_not_found(self):
def test_aggregate_create_with_metadata(self):
def test_aggregate_create_delete_create_with_metadata(self):
def test_aggregate_get(self):
def test_aggregate_get_by_host(self):
def test_aggregate_get_by_host_with_key(self):
def test_aggregate_metadata_get_by_host(self):
def test_aggregate_metadata_get_by_metadata_key(self):
def test_aggregate_metadata_get_by_host_with_key(self):
def test_aggregate_host_get_by_metadata_key(self):
def test_aggregate_get_by_host_not_found(self):
def test_aggregate_delete_raise_not_found(self):
def test_aggregate_delete(self):
def test_aggregate_update(self):
def test_aggregate_update_with_metadata(self):
def test_aggregate_update_with_existing_metadata(self):
def test_aggregate_update_zone_with_existing_metadata(self):
def test_aggregate_update_raise_not_found(self):
def test_aggregate_update_raise_name_exist(self):
def test_aggregate_get_all(self):
def test_aggregate_get_all_non_deleted(self):
def test_aggregate_metadata_add(self):
def test_aggregate_metadata_add_retry(self):
def counted():
def test_aggregate_metadata_update(self):
def test_aggregate_metadata_delete(self):
def test_aggregate_remove_availability_zone(self):
def test_aggregate_metadata_delete_raise_not_found(self):
def test_aggregate_host_add(self):
def test_aggregate_host_re_add(self):
def test_aggregate_host_add_duplicate_works(self):
def test_aggregate_host_add_duplicate_raise_exist_exc(self):
def test_aggregate_host_add_raise_not_found(self):
def test_aggregate_host_delete(self):
def test_aggregate_host_delete_raise_not_found(self):
class SqlAlchemyDbApiTestCase(DbTestCase):
def test_instance_get_all_by_host(self):
def test_instance_get_all_uuids_by_host(self):
def test_instance_get_active_by_window_joined(self):
class MigrationTestCase(test.TestCase):
def setUp(self):
def _create(self, status='migrating', source_compute='host1', source_node='a', dest_compute='host2', dest_node='b', system_metadata=None):
def _assert_in_progress(self, migrations):
def test_migration_get_in_progress_joins(self):
def test_in_progress_host1_nodea(self):
def test_in_progress_host1_nodeb(self):
def test_in_progress_host2_nodeb(self):
def test_instance_join(self):
def test_get_migrations_by_filters(self):
def test_only_admin_can_get_all_migrations_by_filters(self):
def test_migration_get_unconfirmed_by_dest_compute(self):
def test_migration_update_not_found(self):
class ModelsObjectComparatorMixin(object):
def _dict_from_object(self, obj, ignored_keys):
def _assertEqualObjects(self, obj1, obj2, ignored_keys=None):
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None):
def _assertEqualOrderedListOfObjects(self, objs1, objs2, ignored_keys=None):
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
class InstanceSystemMetadataTestCase(test.TestCase):
def setUp(self):
def test_instance_system_metadata_get(self):
def test_instance_system_metadata_update_new_pair(self):
def test_instance_system_metadata_update_existent_pair(self):
def test_instance_system_metadata_update_delete_true(self):
def test_instance_system_metadata_update_nonexistent(self):
class ReservationTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
def test_reservation_commit(self):
def test_reservation_rollback(self):
def test_reservation_expire(self):
class SecurityGroupRuleTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
def _get_base_values(self):
def _get_base_rule_values(self):
def _create_security_group(self, values):
def _create_security_group_rule(self, values):
def test_security_group_rule_create(self):
def _test_security_group_rule_get_by_security_group(self, columns=None):
def test_security_group_rule_get_by_security_group(self):
def test_security_group_rule_get_by_security_group_no_joins(self):
def test_security_group_rule_get_by_security_group_grantee(self):
def test_security_group_rule_destroy(self):
def test_security_group_rule_destroy_not_found_exception(self):
def test_security_group_rule_get(self):
def test_security_group_rule_get_not_found_exception(self):
def test_security_group_rule_count_by_group(self):
class SecurityGroupTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
def _get_base_values(self):
def _create_security_group(self, values):
def test_security_group_create(self):
def test_security_group_destroy(self):
def test_security_group_get(self):
def test_security_group_get_with_instance_columns(self):
def test_security_group_get_no_instances(self):
def test_security_group_get_not_found_exception(self):
def test_security_group_get_by_name(self):
def test_security_group_get_by_project(self):
def test_security_group_get_by_instance(self):
def test_security_group_get_all(self):
def test_security_group_in_use(self):
def test_security_group_ensure_default(self):
def test_security_group_update(self):
def test_security_group_update_to_duplicate(self):
class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
def _assertEqualInstances(self, instance1, instance2):
def _assertEqualListsOfInstances(self, list1, list2):
def create_instance_with_args(self, **kwargs):
def test_instance_create(self):
def test_instance_create_with_object_values(self):
def test_instance_update_with_object_values(self):
def test_instance_update_no_metadata_clobber(self):
def test_instance_get_all_with_meta(self):
def test_instance_update(self):
def test_instance_update_bad_str_dates(self):
def test_instance_update_good_str_dates(self):
def test_create_instance_unique_hostname(self):
def test_instance_get_all_by_filters_with_meta(self):
def test_instance_get_all_by_filters_without_meta(self):
def test_instance_get_all_by_filters(self):
def test_instance_metadata_get_multi(self):
def test_instance_metadata_get_multi_no_uuids(self):
def test_instance_system_system_metadata_get_multi(self):
def test_instance_system_metadata_get_multi_no_uuids(self):
def test_instance_get_all_by_filters_regex(self):
def test_instance_get_all_by_filters_changes_since(self):
def test_instance_get_all_by_filters_exact_match(self):
def test_instance_get_all_by_filters_metadata(self):
def test_instance_get_all_by_filters_system_metadata(self):
def test_instance_get_all_by_filters_unicode_value(self):
def test_instance_get_all_by_filters_tags(self):
def test_instance_get_by_uuid(self):
def test_instance_get_by_uuid_join_empty(self):
def test_instance_get_by_uuid_join_meta(self):
def test_instance_get_by_uuid_join_sys_meta(self):
def test_instance_get_all_by_filters_deleted(self):
def test_instance_get_all_by_filters_deleted_and_soft_deleted(self):
def test_instance_get_all_by_filters_deleted_no_soft_deleted(self):
def test_instance_get_all_by_filters_alive_and_soft_deleted(self):
def test_instance_get_all_by_filters_not_deleted(self):
def test_instance_get_all_by_filters_cleaned(self):
def test_instance_get_all_by_host_and_node_no_join(self):
def test_instance_get_all_hung_in_rebooting(self):
def test_instance_update_with_expected_vm_state(self):
def test_instance_update_with_unexpected_vm_state(self):
def test_instance_update_with_instance_uuid(self):
def test_delete_instance_metadata_on_instance_destroy(self):
def test_delete_instance_faults_on_instance_destroy(self):
def test_instance_update_with_and_get_original(self):
def test_instance_update_and_get_original_metadata(self):
def test_instance_update_and_get_original_metadata_none_join(self):
def test_instance_update_unique_name(self):
def _test_instance_update_updates_metadata(self, metadata_type):
def set_and_check(meta):
def test_security_group_in_use(self):
def test_instance_update_updates_system_metadata(self):
def test_instance_update_updates_metadata(self):
def test_instance_floating_address_get_all(self):
def test_instance_stringified_ips(self):
def test_instance_destroy(self):
def test_instance_destroy_already_destroyed(self):
class InstanceMetadataTestCase(test.TestCase):
def setUp(self):
def test_instance_metadata_get(self):
def test_instance_metadata_delete(self):
def test_instance_metadata_update(self):
class ServiceTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
def _get_base_values(self):
def _create_service(self, values):
def test_service_create(self):
def test_service_destroy(self):
def test_service_update(self):
def test_service_update_not_found_exception(self):
def test_service_get(self):
def test_service_get_with_compute_node(self):
def test_service_get_not_found_exception(self):
def test_service_get_by_host_and_topic(self):
def test_service_get_all(self):
def test_service_get_all_by_topic(self):
def test_service_get_all_by_host(self):
def test_service_get_by_compute_host(self):
def test_service_get_by_compute_host_not_found(self):
def test_service_get_by_args(self):
def test_service_get_by_args_not_found_exception(self):
def test_service_binary_exists_exception(self):
def test_service_topic_exists_exceptions(self):
class BaseInstanceTypeTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
def _get_base_values(self):
def _create_flavor(self, values, projects=None):
class InstanceActionTestCase(test.TestCase, ModelsObjectComparatorMixin):
def setUp(self):
def _create_action_values(self, uuid, action='run_instance', ctxt=None, extra=None):
def _create_event_values(self, uuid, event='schedule', ctxt=None, extra=None):
def _assertActionSaved(self, action, uuid):
def _assertActionEventSaved(self, event, action_id):
OpenStack IndexPreviousNext
|