OpenStack Study: nova
OpenStack IndexPreviousNext
\OpenStack\nova-2014.1\nova\tests\compute\monitors\test_monitors.py
class FakeResourceMonitor(monitors.ResourceMonitorBase):
def _update_data(self):
def _get_foo_metric1(self, **kwargs):
def _get_foo_metric2(self, **kwargs):
class FakeMonitorClass1(monitors.ResourceMonitorBase):
def get_metrics(self, **kwargs):
def get_metric_names(self):
class FakeMonitorClass2(monitors.ResourceMonitorBase):
def get_metrics(self, **kwargs):
def get_metric_names(self):
class FakeMonitorClass3(monitors.ResourceMonitorBase):
def get_metrics(self, **kwargs):
def get_metric_names(self):
class FakeMonitorClass4(monitors.ResourceMonitorBase):
def get_metrics(self, **kwargs):
def get_metric_names(self):
class ResourceMonitorBaseTestCase(test.TestCase):
def setUp(self):
def test_get_metric_names(self):
def test_get_metrics(self):
class ResourceMonitorsTestCase(test.TestCase):
def setUp(self):
def test_choose_monitors_not_found(self):
def test_choose_monitors_bad(self):
def test_choose_monitors(self):
def test_choose_monitors_none(self):
def test_all_monitors(self):
\OpenStack\nova-2014.1\nova\tests\compute\monitors\__init__.py
\OpenStack\nova-2014.1\nova\tests\compute\test_claims.py
class DummyTracker(object):
def abort_instance_claim(self, *args, **kwargs):
def drop_resize_claim(self, *args, **kwargs):
def new_pci_tracker(self):
class ClaimTestCase(test.NoDBTestCase):
def setUp(self):
def _claim(self, limits=None, overhead=None, **kwargs):
def _fake_instance(self, **kwargs):
def _fake_instance_type(self, **kwargs):
def _fake_resources(self, values=None):
def assertRaisesRegexp(self, re_obj, e, fn, *a, **kw):
def test_cpu_unlimited(self):
def test_memory_unlimited(self):
def test_disk_unlimited_root(self):
def test_disk_unlimited_ephemeral(self):
def test_cpu_oversubscription(self):
def test_memory_with_overhead(self):
def test_memory_with_overhead_insufficient(self):
def test_cpu_insufficient(self):
def test_memory_oversubscription(self):
def test_memory_insufficient(self):
def test_disk_oversubscription(self):
def test_disk_insufficient(self):
def test_disk_and_memory_insufficient(self):
def test_disk_and_cpu_insufficient(self):
def test_disk_and_cpu_and_memory_insufficient(self):
def test_pci_pass(self):
def _set_pci_request(self, claim):
def test_pci_fail(self):
def test_pci_pass_no_requests(self):
def test_abort(self):
def _abort(self):
class ResizeClaimTestCase(ClaimTestCase):
def setUp(self):
def _claim(self, limits=None, overhead=None, **kwargs):
def _set_pci_request(self, claim):
def test_abort(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_compute.py
def fake_not_implemented(*args, **kwargs):
def get_primitive_instance_by_uuid(context, instance_uuid):
def unify_instance(instance):
class FakeSchedulerAPI(object):
def run_instance(self, ctxt, request_spec, admin_password, injected_files, requested_networks, is_first_time, filter_properties):
def live_migration(self, ctxt, block_migration, disk_over_commit, instance, dest):
def prep_resize(self, ctxt, instance, instance_type, image, request_spec, filter_properties, reservations):
class BaseTestCase(test.TestCase):
def setUp(self):
def fake_get_compute_nodes_in_db(context):
def fake_compute_node_delete(context, compute_node):
def fake_show(meh, context, id):
def fake_get_nw_info(cls, ctxt, instance, *args, **kwargs):
def tearDown(self):
def _create_fake_instance(self, params=None, type_name='m1.tiny', services=False):
def make_fake_sys_meta():
def _objectify(self, db_inst):
def _create_fake_instance_obj(self, params=None, type_name='m1.tiny'):
def _create_instance_type(self, params=None):
def _create_group(self):
def _stub_migrate_server(self):
def _fake_migrate_server(*args, **kwargs):
def _init_aggregate_with_host(self, aggr, aggr_name, zone, host):
class ComputeVolumeTestCase(BaseTestCase):
def setUp(self):
def store_cinfo(context, *args, **kwargs):
def test_attach_volume_serial(self):
def test_attach_volume_raises(self):
def fake_attach(*args, **kwargs):
def test_attach_volume_no_bdm(self):
def test_await_block_device_created_to_slow(self):
def never_get(context, vol_id):
def test_await_block_device_created_slow(self):
def slow_get(context, vol_id):
def test_boot_volume_serial(self):
def test_boot_volume_metadata(self, metadata=True):
def volume_api_get(*args, **kwargs):
def test_boot_volume_no_metadata(self):
def test_boot_image_metadata(self, metadata=True):
def image_api_show(*args, **kwargs):
def test_boot_image_no_metadata(self):
def test_poll_bandwidth_usage_disabled(self):
def test_poll_bandwidth_usage_not_implemented(self):
def test_get_host_volume_bdms(self, mock_get_by_inst, mock_get_by_host):
def test_poll_volume_usage_disabled(self):
def test_poll_volume_usage_returns_no_vols(self):
def test_poll_volume_usage_with_data(self):
def test_detach_volume_usage(self):
def fake_get_volume_encryption_metadata(self, context, volume_id):
def test_prepare_image_mapping(self):
def test_validate_bdm(self):
def fake_get(self, context, res_id):
def fake_check_attach(*args, **kwargs):
def test_validate_bdm_media_service_exceptions(self):
def fake_volume_get_1(self, context, volume_id):
def fake_volume_get_2(self, context, volume_id):
def fake_volume_get_3(self, context, volume_id):
def test_volume_snapshot_create(self):
def test_volume_snapshot_delete(self):
class ComputeTestCase(BaseTestCase):
def test_wrap_instance_fault(self):
def did_it_add_fault(*args):
def failer(self2, context, instance):
def test_wrap_instance_fault_instance_in_args(self):
def did_it_add_fault(*args):
def failer(self2, context, instance):
def test_wrap_instance_fault_no_instance(self):
def did_it_add_fault(*args):
def failer(self2, context, instance):
def test_wrap_instance_event(self):
def did_it_update_start(self2, context, values):
def did_it_update_finish(self2, context, values):
def fake_event(self, context, instance):
def test_wrap_instance_event_log_exception(self):
def did_it_update_start(self2, context, values):
def did_it_update_finish(self2, context, values):
def fake_event(self2, context, instance):
def test_object_compat(self):
def test_fn(_self, context, instance):
def test_object_compat_more_positional_args(self):
def test_fn(_self, context, instance, pos_arg_1, pos_arg_2):
def test_create_instance_with_img_ref_associates_config_drive(self):
def test_create_instance_associates_config_drive(self):
def test_create_instance_unlimited_memory(self):
def test_create_instance_unlimited_disk(self):
def test_create_multiple_instances_then_starve(self):
def test_create_multiple_instance_with_neutron_port(self):
def fake_is_neutron():
def test_create_instance_with_oversubscribed_ram(self):
def test_create_instance_with_oversubscribed_ram_fail(self):
def test_create_instance_with_oversubscribed_cpu(self):
def test_create_instance_with_oversubscribed_disk(self):
def test_create_instance_with_oversubscribed_disk_fail(self):
def test_create_instance_without_node_param(self):
def test_create_instance_no_image(self):
def test_default_access_ip(self):
def _instance_update(ctxt, instance_uuid, **kwargs):
def test_no_default_access_ip(self):
def test_fail_to_schedule_persists(self):
def test_run_instance_setup_block_device_mapping_fail(self):
def fake(*args, **kwargs):
def test_run_instance_spawn_fail(self):
def fake(*args, **kwargs):
def test_run_instance_dealloc_network_instance_not_found(self):
def fake(*args, **kwargs):
def test_run_instance_bails_on_missing_instance(self):
def fake_instance_update(self, *a, **args):
def test_run_instance_bails_on_deleting_instance(self):
def fake_instance_update(self, *a, **args):
def test_run_instance_bails_on_missing_instance_2(self):
def fake_default_block_device_names(self, *a, **args):
def test_can_terminate_on_error_state(self):
def test_run_terminate(self):
def test_run_terminate_with_vol_attached(self):
def fake_check_attach(*args, **kwargs):
def fake_reserve_volume(*args, **kwargs):
def fake_volume_get(self, context, volume_id):
def fake_terminate_connection(self, context, volume_id, connector):
def fake_detach(self, context, volume_id):
def fake_rpc_reserve_block_device_name(self, context, **kwargs):
def test_run_terminate_no_image(self):
def test_terminate_no_network(self):
def test_terminate_no_fixed_ips(self):
def test_run_terminate_timestamps(self):
def test_run_terminate_deallocate_net_failure_sets_error_state(self):
def _fake_deallocate_network(*args, **kwargs):
def test_stop(self):
def test_start(self):
def test_stop_start_no_image(self):
def test_rescue(self):
def fake_rescue(self, context, instance_ref, network_info, image_meta, rescue_password):
def fake_unrescue(self, instance_ref, network_info):
def test_rescue_notifications(self):
def fake_rescue(self, context, instance_ref, network_info, image_meta, rescue_password):
def test_unrescue_notifications(self):
def fake_unrescue(self, instance_ref, network_info):
def test_rescue_handle_err(self):
def test_power_on(self):
def fake_driver_power_on(self, context, instance, network_info, block_device_info):
def test_power_off(self):
def fake_driver_power_off(self, instance):
def test_pause(self):
def test_suspend(self):
def test_suspend_error(self):
def fake(*args, **kwargs):
def test_suspend_not_implemented(self):
def fake(*args, **kwargs):
def test_rebuild(self):
def test_rebuild_driver(self):
def fake(**kwargs):
def test_rebuild_no_image(self):
def test_rebuild_launched_at_time(self):
def test_rebuild_with_injected_files(self):
def _spawn(context, instance, image_meta, injected_files, admin_password, network_info, block_device_info):
def _test_reboot(self, soft, test_delete=False, test_unrescue=False, fail_reboot=False, fail_running=False):
def fake_reboot(*args, **kwargs):
def test_reboot_soft(self):
def test_reboot_soft_and_delete(self):
def test_reboot_soft_and_rescued(self):
def test_reboot_soft_and_delete_and_rescued(self):
def test_reboot_hard(self):
def test_reboot_hard_and_delete(self):
def test_reboot_hard_and_rescued(self):
def test_reboot_hard_and_delete_and_rescued(self):
def test_reboot_fail(self):
def test_reboot_fail_running(self):
def test_get_instance_volume_block_device_info_source_image(self):
def test_get_instance_volume_block_device_info_passed_bdms(self):
def test_set_admin_password(self):
def test_set_admin_password_bad_state(self):
def fake_driver_get_info(self2, _instance):
def _do_test_set_admin_password_driver_error(self, exc, expected_vm_state, expected_task_state, expected_exception):
def fake_sleep(_time):
def fake_driver_set_pass(self2, _instance, _pwd):
def test_set_admin_password_driver_not_authorized(self):
def test_set_admin_password_driver_not_implemented(self):
def test_inject_network_info(self):
def fake_driver_inject_network(self, instance, network_info):
def test_reset_network(self):
def fake_driver_reset_network(self, instance):
def _get_snapshotting_instance(self):
def test_snapshot(self):
def test_snapshot_no_image(self):
def _test_snapshot_fails(self, raise_during_cleanup):
def fake_snapshot(*args, **kwargs):
def fake_delete(self_, context, image_id):
def test_snapshot_fails(self):
def test_snapshot_fails_cleanup_ignores_exception(self):
def test_snapshot_fails_with_glance_error(self):
def fake_snapshot(*args, **kwargs):
def test_snapshot_handles_cases_when_instance_is_deleted(self):
def test_snapshot_handles_cases_when_instance_is_not_found(self):
def _assert_state(self, state_dict):
def test_console_output(self):
def test_console_output_tail(self):
def test_console_output_not_implemented(self):
def fake_not_implemented(*args, **kwargs):
def test_novnc_vnc_console(self):
def test_validate_console_port_vnc(self):
def fake_driver_get_console(*args, **kwargs):
def test_validate_console_port_spice(self):
def fake_driver_get_console(*args, **kwargs):
def test_validate_console_port_rdp(self):
def fake_driver_get_console(*args, **kwargs):
def test_validate_console_port_wrong_port(self):
def fake_driver_get_console(*args, **kwargs):
def test_xvpvnc_vnc_console(self):
def test_invalid_vnc_console_type(self):
def test_missing_vnc_console_type(self):
def test_get_vnc_console_not_implemented(self):
def test_spicehtml5_spice_console(self):
def test_invalid_spice_console_type(self):
def test_missing_spice_console_type(self):
def test_rdphtml5_rdp_console(self):
def test_invalid_rdp_console_type(self):
def test_missing_rdp_console_type(self):
def test_vnc_console_instance_not_ready(self):
def fake_driver_get_console(*args, **kwargs):
def test_spice_console_instance_not_ready(self):
def fake_driver_get_console(*args, **kwargs):
def test_rdp_console_instance_not_ready(self):
def fake_driver_get_console(*args, **kwargs):
def test_diagnostics(self):
def test_add_fixed_ip_usage_notification(self):
def dummy(*args, **kwargs):
def test_remove_fixed_ip_usage_notification(self):
def dummy(*args, **kwargs):
def test_run_instance_usage_notification(self, request_spec={}):
def test_run_instance_image_usage_notification(self):
def test_run_instance_usage_notification_volume_meta(self):
def test_run_instance_end_notification_on_abort(self):
def build_inst_abort(*args, **kwargs):
def test_run_instance_error_notification_on_reschedule(self):
def build_inst_fail(*args, **kwargs):
def test_run_instance_error_notification_on_failure(self):
def build_inst_fail(*args, **kwargs):
def test_terminate_usage_notification(self):
def test_run_instance_existing(self):
def test_run_instance_queries_macs(self):
def test_run_instance_reschedules_on_anti_affinity_violation(self):
def test_instance_set_to_error_on_uncaught_exception(self):
def test_delete_instance_succedes_on_volume_fail(self):
def fake_cleanup_volumes(context, instance):
def test_delete_instance_keeps_net_on_power_off_fail(self):
def test_delete_instance_loses_net_on_other_fail(self):
def test_delete_instance_deletes_console_auth_tokens(self):
def fake_delete_tokens(*args, **kwargs):
def test_delete_instance_deletes_console_auth_tokens_cells(self):
def fake_delete_tokens(*args, **kwargs):
def test_instance_termination_exception_sets_error(self):
def fake_delete_instance(context, instance, bdms, reservations=None):
def test_network_is_deallocated_on_spawn_failure(self):
def test_lock(self):
def check_task_state(task_state):
def _check_locked_by(self, instance_uuid, locked_by):
def test_override_owner_lock(self):
def test_upgrade_owner_lock(self):
def _test_state_revert(self, instance, operation, pre_task_state, kwargs=None):
def _get_an_exception(*args, **kwargs):
def test_state_revert(self):
def _ensure_quota_reservations_committed(self, expect_project=False, expect_user=False):
def _ensure_quota_reservations_rolledback(self, expect_project=False, expect_user=False):
def test_quotas_successful_delete(self):
def test_quotas_failed_delete(self):
def fake_shutdown_instance(*args, **kwargs):
def test_quotas_successful_soft_delete(self):
def test_quotas_failed_soft_delete(self):
def fake_soft_delete(*args, **kwargs):
def test_quotas_destroy_of_soft_deleted_instance(self):
def _stub_out_resize_network_methods(self):
def fake(cls, ctxt, instance, *args, **kwargs):
def _test_finish_resize(self, power_on):
def _mig_save(context):
def _instance_save1():
def _instance_save2(expected_task_state=None):
def _instance_save3(expected_task_state=None):
def test_finish_resize_from_active(self):
def test_finish_resize_from_stopped(self):
def test_finish_resize_with_volumes(self):
def fake_volume_get(self, context, volume_id):
def fake_volume_check_attach(self, context, volume_id, instance):
def fake_get_volume_encryption_metadata(self, context, volume_id):
def fake_init_conn(self, context, volume_id, session):
def fake_attach(self, context, volume_id, instance_uuid, device_name):
def fake_get_volume_connector(*args, **kwargs):
def fake_attach_volume(*args, **kwargs):
def fake_terminate_connection(self, context, volume, connector):
def fake(*args, **kwargs):
def fake_init_conn_with_data(self, context, volume, session):
def fake_detach(self, context, volume_uuid):
def test_finish_resize_handles_error(self):
def throw_up(*args, **kwargs):
def fake(*args, **kwargs):
def test_rebuild_instance_notification(self):
def test_finish_resize_instance_notification(self):
def test_resize_instance_notification(self):
def test_prep_resize_instance_migration_error_on_same_host(self):
def test_prep_resize_instance_migration_error_on_none_host(self):
def test_resize_instance_driver_error(self):
def throw_up(*args, **kwargs):
def test_resize_instance_driver_rollback(self):
def throw_up(*args, **kwargs):
def test_resize_instance(self):
def _test_confirm_resize(self, power_on):
def fake(*args, **kwargs):
def fake_confirm_migration_driver(*args, **kwargs):
def test_confirm_resize_from_active(self):
def test_confirm_resize_from_stopped(self):
def _test_finish_revert_resize(self, power_on, remove_old_vm_state=False):
def fake(*args, **kwargs):
def fake_finish_revert_migration_driver(*args, **kwargs):
def test_finish_revert_resize_from_active(self):
def test_finish_revert_resize_from_stopped(self):
def test_finish_revert_resize_from_stopped_remove_old_vm_state(self):
def _test_cleanup_stored_instance_types(self, old, new, revert=False):
def test_cleanup_stored_instance_types_for_resize(self):
def test_cleanup_stored_instance_types_for_resize_with_update(self):
def test_cleanup_stored_instance_types_for_migration(self):
def test_cleanup_stored_instance_types_for_migration_with_update(self):
def test_get_by_flavor_id(self):
def test_resize_same_source_fails(self):
def test_resize_instance_handles_migration_error(self):
def raise_migration_failure(*args):
def test_pre_live_migration_instance_has_no_fixed_ip(self):
def test_pre_live_migration_works_correctly(self):
def stupid(*args, **kwargs):
def test_live_migration_exception_rolls_back(self):
def test_live_migration_works_correctly(self):
def test_post_live_migration_no_shared_storage_working_correctly(self):
def fakecleanup(*args, **kwargs):
def test_post_live_migration_working_correctly(self):
def test_post_live_migration_terminate_volume_connections(self):
def _begin_post_live_migration_at_destination(self):
def _finish_post_live_migration_at_destination(self):
def test_post_live_migration_at_destination_with_compute_info(self):
def test_post_live_migration_at_destination_without_compute_info(self):
def test_rollback_live_migration_at_destination_correctly(self):
def test_run_kill_vm(self):
def test_add_instance_fault(self):
def fake_db_fault_create(ctxt, values):
def test_add_instance_fault_with_remote_error(self):
def fake_db_fault_create(ctxt, values):
def test_add_instance_fault_user_error(self):
def fake_db_fault_create(ctxt, values):
def test_add_instance_fault_no_exc_info(self):
def fake_db_fault_create(ctxt, values):
def test_add_instance_fault_long_message(self):
def fake_db_fault_create(ctxt, values):
def _test_cleanup_running(self, action):
def test_cleanup_running_deleted_instances_unrecognized_value(self):
def test_cleanup_running_deleted_instances_reap(self):
def test_cleanup_running_deleted_instances_shutdown(self):
def test_cleanup_running_deleted_instances_shutdown_notimpl(self):
def test_cleanup_running_deleted_instances_shutdown_error(self):
def test_running_deleted_instances(self):
def test_get_instance_nw_info(self):
def test_heal_instance_info_cache(self):
def fake_instance_get_all_by_host(context, host, columns_to_join, use_slave=False):
def fake_instance_get_by_uuid(context, instance_uuid, columns_to_join, use_slave=False):
def fake_get_instance_nw_info(context, instance, use_slave=False):
def test_poll_rescued_instances(self):
def fake_instance_get_all_by_filters(context, filters, columns_to_join, use_slave=False):
def fake_unrescue(context, instance):
def test_poll_unconfirmed_resizes(self):
def fake_instance_get_by_uuid(context, instance_uuid, columns_to_join=None, use_slave=False):
def fake_migration_get_unconfirmed_by_dest_compute(context, resize_confirm_window, dest_compute, use_slave=False):
def fake_migration_update(context, mid, updates):
def fake_confirm_resize(context, instance, migration=None):
def fetch_instance_migration_status(instance_uuid):
def test_instance_build_timeout_disabled(self):
def test_instance_build_timeout_mixed_instances(self):
def test_get_resource_tracker_fail(self):
def test_instance_update_host_check(self):
def fail_get(nodename):
def test_destroy_evacuated_instance_on_shared_storage(self):
def test_destroy_evacuated_instance_with_disks(self):
def test_destroy_evacuated_instance_not_implemented(self):
def test_complete_partial_deletion(self):
def fake_destroy():
def test_init_instance_for_partial_deletion(self):
def fake_partial_deletion(context, instance):
def test_partial_deletion_raise_exception(self):
def test_add_remove_fixed_ip_updates_instance_updated_at(self):
def _noop(*args, **kwargs):
def test_no_pending_deletes_for_soft_deleted_instances(self):
def test_reclaim_queued_deletes(self):
def test_reclaim_queued_deletes_continue_on_error(self):
def test_sync_power_states(self):
def _test_lifecycle_event(self, lifecycle_event, power_state):
def test_lifecycle_events(self):
def test_lifecycle_event_non_existent_instance(self):
def test_allow_confirm_resize_on_instance_in_deleting_task_state(self):
def fake_drop_resize_claim(*args, **kwargs):
def fake_get_resource_tracker(self):
def fake_setup_networks_on_host(self, *args, **kwargs):
def _get_instance_and_bdm_for_dev_defaults_tests(self):
def test_default_block_device_names_empty_instance_root_dev(self):
def test_default_block_device_names_empty_root_device(self):
def test_default_block_device_names_no_root_device(self):
def test_reserve_block_device_name(self):
class ComputeAPITestCase(BaseTestCase):
def setUp(self):
def fake_get_nw_info(cls, ctxt, instance):
def fake_show(obj, context, image_id):
def _run_instance(self, params=None):
def test_create_with_too_little_ram(self):
def test_create_with_too_little_disk(self):
def test_create_with_too_large_image(self):
def test_create_just_enough_ram_and_disk(self):
def test_create_with_no_ram_and_disk_reqs(self):
def test_create_with_deleted_image(self):
def test_create_instance_defaults_display_name(self):
def test_create_instance_sets_system_metadata(self):
def test_create_saves_type_in_system_metadata(self):
def test_create_instance_associates_security_groups(self):
def test_create_instance_with_invalid_security_group_raises(self):
def test_create_with_large_user_data(self):
def test_create_with_malformed_user_data(self):
def test_create_with_base64_user_data(self):
def test_populate_instance_for_create(self):
def test_default_hostname_generator(self):
def _fake_populate(base_options, *args, **kwargs):
def test_instance_create_adds_to_instance_group(self):
def test_instance_create_auto_creates_group(self):
def test_destroy_instance_disassociates_security_groups(self):
def test_destroy_security_group_disassociates_instances(self):
def _test_rebuild(self, vm_state):
def fake_rpc_rebuild(context, **kwargs):
def test_rebuild(self):
def test_rebuild_in_error_state(self):
def test_rebuild_in_error_not_launched(self):
def test_rebuild_no_image(self):
def test_rebuild_with_deleted_image(self):
def test_rebuild_with_too_little_ram(self):
def fake_extract_flavor(_inst, prefix):
def test_rebuild_with_too_little_disk(self):
def fake_extract_flavor(_inst, prefix):
def test_rebuild_with_just_enough_ram_and_disk(self):
def fake_extract_flavor(_inst, prefix):
def test_rebuild_with_no_ram_and_disk_reqs(self):
def fake_extract_flavor(_inst, prefix):
def test_rebuild_with_too_large_image(self):
def fake_extract_flavor(_inst, prefix):
def test_hostname_create(self):
def test_set_admin_password(self):
def fake_set_admin_password(self, context, **kwargs):
def test_rescue_unrescue(self):
def _fake_rescue_block_devices(self, instance, status="in-use"):
def test_rescue_volume_backed_no_image(self, mock_get_vol, mock_get_bdms):
def test_rescue_volume_backed_placeholder_image(self, mock_get_vol, mock_get_bdms):
def test_get(self):
def fake_db_get(_context, _instance_uuid, columns_to_join=None, use_slave=False):
def test_get_with_admin_context(self):
def fake_db_get(context, instance_uuid, columns_to_join=None, use_slave=False):
def test_get_with_integer_id(self):
def fake_db_get(_context, _instance_id, columns_to_join=None):
def test_get_all_by_name_regexp(self):
def test_get_all_by_multiple_options_at_once(self, fixed_get, network_get):
def test_get_all_by_image(self):
def test_get_all_by_flavor(self):
def test_get_all_by_state(self):
def test_get_all_by_metadata(self):
def test_all_instance_metadata(self):
def test_instance_metadata(self):
def fake_change_instance_metadata(inst, ctxt, diff, instance=None, instance_uuid=None):
def test_disallow_metadata_changes_during_building(self):
def fake_change_instance_metadata(inst, ctxt, diff, instance=None, instance_uuid=None):
def test_get_instance_faults(self):
def return_fault(_ctxt, instance_uuids):
def _parse_db_block_device_mapping(bdm_ref):
def test_update_block_device_mapping(self):
def _test_check_and_transform_bdm(self, bdms, expected_bdms, image_bdms=None, base_options=None, legacy_bdms=False, legacy_image_bdms=False):
def test_check_and_transform_legacy_bdm_no_image_bdms(self):
def test_check_and_transform_legacy_bdm_legacy_image_bdms(self):
def test_check_and_transform_legacy_bdm_image_bdms(self):
def test_check_and_transform_bdm_no_image_bdms(self):
def test_check_and_transform_bdm_image_bdms(self):
def test_check_and_transform_bdm_legacy_image_bdms(self):
def test_check_and_transform_image(self):
def test_volume_size(self):
def test_is_volume_backed_instance(self):
def test_is_volume_backed_instance_no_bdms(self):
def test_reservation_id_one_instance(self):
def test_reservation_ids_two_instances(self):
def test_multi_instance_display_name_template(self):
def test_instance_architecture(self):
def test_instance_unknown_architecture(self):
def test_instance_name_template(self):
def test_add_remove_fixed_ip(self):
def test_attach_volume_invalid(self):
def test_no_attach_volume_in_rescue_state(self):
def fake(*args, **kwargs):
def fake_volume_get(self, context, volume_id):
def test_no_attach_volume_in_suspended_state(self):
def test_no_detach_volume_in_rescue_state(self):
def test_no_rescue_in_volume_state_attaching(self, mock_get_vol, mock_get_bdms):
def test_vnc_console(self):
def test_get_vnc_console_no_host(self):
def test_spice_console(self):
def test_get_spice_console_no_host(self):
def test_rdp_console(self):
def test_get_rdp_console_no_host(self):
def test_console_output(self):
def test_console_output_no_host(self):
def test_attach_interface(self):
def test_detach_interface(self):
def test_attach_volume(self):
def test_attach_volume_no_device(self):
def fake_check_attach(*args, **kwargs):
def fake_reserve_volume(*args, **kwargs):
def fake_volume_get(self, context, volume_id):
def fake_rpc_attach_volume(self, context, **kwargs):
def fake_rpc_reserve_block_device_name(self, context, **kwargs):
def test_detach_volume(self):
def fake_check_detach(*args, **kwargs):
def fake_begin_detaching(*args, **kwargs):
def fake_rpc_detach_volume(self, context, **kwargs):
def test_detach_invalid_volume(self):
def test_detach_unattached_volume(self):
def test_detach_suspended_instance_fails(self):
def test_detach_volume_libvirt_is_down(self):
def fake_libvirt_driver_instance_exists(*args, **kwargs):
def fake_libvirt_driver_detach_volume_fails(*args, **kwargs):
def fake_roll_detaching(*args, **kwargs):
def test_terminate_with_volumes(self):
def fake_volume_get(self, context, volume_id):
def fake_detach(self, context, volume_id_param):
def fake_terminate_connection(self, context, volume_id, connector):
def test_terminate_deletes_all_bdms(self):
def test_inject_network_info(self):
def test_reset_network(self):
def test_lock(self):
def test_unlock(self):
def test_get_lock(self):
def test_add_remove_security_group(self):
def test_get_diagnostics(self):
def test_secgroup_refresh(self):
def rule_get(*args, **kwargs):
def group_get(*args, **kwargs):
def test_secgroup_refresh_once(self):
def rule_get(*args, **kwargs):
def group_get(*args, **kwargs):
def test_secgroup_refresh_none(self):
def rule_get(*args, **kwargs):
def group_get(*args, **kwargs):
def test_secrule_refresh(self):
def group_get(*args, **kwargs):
def test_secrule_refresh_once(self):
def group_get(*args, **kwargs):
def test_secrule_refresh_none(self):
def group_get(*args, **kwargs):
def test_live_migrate(self):
def test_evacuate(self):
def fake_service_is_up(*args, **kwargs):
def fake_rebuild_instance(*args, **kwargs):
def test_fail_evacuate_from_non_existing_host(self):
def test_fail_evacuate_from_running_host(self):
def fake_service_is_up(*args, **kwargs):
def test_fail_evacuate_instance_in_wrong_state(self):
def test_get_migrations(self):
def fake_rpc_method(context, method, **kwargs):
def _create_service_entries(context, values={'avail_zone1':
class ComputeAPIAggrTestCase(BaseTestCase):
def setUp(self):
def test_aggregate_no_zone(self):
def test_check_az_for_aggregate(self):
def test_update_aggregate(self):
def test_update_aggregate_no_az(self):
def test_update_aggregate_az_change(self):
def test_update_aggregate_az_fails(self):
def test_update_aggregate_metadata(self):
def test_update_aggregate_metadata_no_az(self):
def test_update_aggregate_metadata_az_change(self):
def test_update_aggregate_metadata_az_fails(self):
def test_delete_aggregate(self):
def test_delete_non_empty_aggregate(self):
def test_add_host_to_aggregate(self):
def fake_add_aggregate_host(*args, **kwargs):
def test_add_host_to_aggr_with_no_az(self):
def test_add_host_no_az_metadata(self):
def fake_aggregate_metadata_get_by_metadata_key(*args, **kwargs):
def test_add_host_to_multi_az(self):
def test_add_host_to_aggregate_multiple(self):
def test_add_host_to_aggregate_raise_not_found(self):
def test_remove_host_from_aggregate_active(self):
def fake_remove_aggregate_host(*args, **kwargs):
def test_remove_host_from_aggregate_raise_not_found(self):
def test_aggregate_list(self):
def test_aggregate_list_with_hosts(self):
class ComputeAggrTestCase(BaseTestCase):
def setUp(self):
def test_add_aggregate_host(self):
def fake_driver_add_to_aggregate(context, aggregate, host, **_ignore):
def test_remove_aggregate_host(self):
def fake_driver_remove_from_aggregate(context, aggregate, host, **_ignore):
def test_add_aggregate_host_passes_slave_info_to_driver(self):
def driver_add_to_aggregate(context, aggregate, host, **kwargs):
def test_remove_from_aggregate_passes_slave_info_to_driver(self):
def driver_remove_from_aggregate(context, aggregate, host, **kwargs):
class ComputePolicyTestCase(BaseTestCase):
def setUp(self):
def test_actions_are_prefixed(self):
def test_wrapped_method(self):
def test_create_fail(self):
def test_create_attach_volume_fail(self):
def test_create_attach_network_fail(self):
def test_get_fail(self):
def test_get_all_fail(self):
def test_get_instance_faults(self):
def test_force_host_fail(self):
def test_force_host_pass(self):
class DisabledInstanceTypesTestCase(BaseTestCase):
def setUp(self):
def test_can_build_instance_from_visible_instance_type(self):
def test_cannot_build_instance_from_disabled_instance_type(self):
def test_can_resize_to_visible_instance_type(self):
def fake_get_flavor_by_flavor_id(flavor_id, ctxt=None, read_deleted="yes"):
def test_cannot_resize_to_disabled_instance_type(self):
def fake_get_flavor_by_flavor_id(flavor_id, ctxt=None, read_deleted="yes"):
class ComputeReschedulingTestCase(BaseTestCase):
def setUp(self):
def fake_update(*args, **kwargs):
def _reschedule(self, request_spec=None, filter_properties=None, exc_info=None):
def test_reschedule_no_filter_properties(self):
def test_reschedule_no_retry_info(self):
def test_reschedule_no_request_spec(self):
def test_reschedule_success(self):
class ComputeReschedulingResizeTestCase(ComputeReschedulingTestCase):
def setUp(self):
def _reschedule(self, request_spec=None, filter_properties=None, exc_info=None):
class InnerTestingException(Exception):
class ComputeRescheduleOrErrorTestCase(BaseTestCase):
def setUp(self):
def test_reschedule_or_error_called(self):
def test_shutdown_instance_fail(self):
def test_shutdown_instance_fail_instance_info_cache_not_found(self):
def test_reschedule_fail(self):
def test_reschedule_false(self):
def test_reschedule_true(self):
def test_no_reschedule_on_delete_during_spawn(self):
def test_no_reschedule_on_unexpected_task_state(self):
def test_no_reschedule_on_block_device_fail(self):
class ComputeRescheduleResizeOrReraiseTestCase(BaseTestCase):
def setUp(self):
def test_reschedule_resize_or_reraise_called(self):
def test_reschedule_fails_with_exception(self):
def test_reschedule_false(self):
def test_reschedule_true(self):
class ComputeInactiveImageTestCase(BaseTestCase):
def setUp(self):
def fake_show(meh, context, id):
def test_create_instance_with_deleted_image(self):
class EvacuateHostTestCase(BaseTestCase):
def setUp(self):
def tearDown(self):
def _rebuild(self, on_shared_storage=True):
def fake(cls, ctxt, instance, *args, **kwargs):
def test_rebuild_on_host_updated_target(self):
def fake_get_compute_info(context, host):
def test_rebuild_on_host_updated_target_node_not_found(self):
def fake_get_compute_info(context, host):
def test_rebuild_with_instance_in_stopped_state(self):
def test_rebuild_with_wrong_shared_storage(self):
def test_rebuild_on_host_with_volumes(self):
def fake_volume_get(self, context, volume):
def fake_detach(self, context, volume):
def fake_terminate_connection(self, context, volume, connector):
def test_rebuild_on_host_with_shared_storage(self):
def test_rebuild_on_host_without_shared_storage(self):
def test_rebuild_on_host_instance_exists(self):
def test_driver_doesnt_support_recreate(self):
class ComputeInjectedFilesTestCase(BaseTestCase):
def setUp(self):
def _spawn(self, context, instance, image_meta, injected_files, admin_password, nw_info, block_device_info, db_api=None):
def _test(self, injected_files, decoded_files):
def test_injected_none(self):
def test_injected_empty(self):
def test_injected_success(self):
def test_injected_invalid(self):
def test_reschedule(self):
def _roe(context, instance, exc_info, requested_networks, admin_password, injected_files, is_first_time, request_spec, filter_properties, bdms=None, legacy_bdm_in_spec=False):
def spawn_explode(context, instance, image_meta, injected_files, admin_password, nw_info, block_device_info):
class CheckConfigDriveTestCase(test.TestCase):
def setUp(self):
def _assertCheck(self, expected, config_drive):
def _assertInvalid(self, config_drive):
def test_config_drive_false_values(self):
def test_config_drive_true_values(self):
def test_config_drive_bogus_values_raise(self):
class CheckRequestedImageTestCase(test.TestCase):
def setUp(self):
def test_no_image_specified(self):
def test_image_status_must_be_active(self):
def test_image_min_ram_check(self):
def test_image_min_disk_check(self):
def test_image_too_large(self):
def test_root_gb_zero_disables_size_check(self):
def test_root_gb_zero_disables_min_disk(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_compute_api.py
class _ComputeAPIUnitTestMixIn(object):
def setUp(self):
def _create_flavor(self, params=None):
def _create_instance_obj(self, params=None, flavor=None):
def make_fake_sys_meta():
def test_create_quota_exceeded_messages(self):
def test_suspend(self):
def test_resume(self):
def test_start(self):
def test_start_invalid_state(self):
def test_start_no_host(self):
def _test_stop(self, vm_state, force=False):
def test_stop(self):
def test_stop_stopped_instance_with_bypass(self):
def test_stop_invalid_state(self):
def test_stop_a_stopped_inst(self):
def test_stop_no_host(self):
def _test_reboot_type(self, vm_state, reboot_type, task_state=None):
def _test_reboot_type_fails(self, reboot_type, **updates):
def test_reboot_hard_active(self):
def test_reboot_hard_error(self):
def test_reboot_hard_rebooting(self):
def test_reboot_hard_rescued(self):
def test_reboot_hard_error_not_launched(self):
def test_reboot_soft(self):
def test_reboot_soft_error(self):
def test_reboot_soft_paused(self):
def test_reboot_soft_stopped(self):
def test_reboot_soft_suspended(self):
def test_reboot_soft_rebooting(self):
def test_reboot_soft_rebooting_hard(self):
def test_reboot_soft_rescued(self):
def test_reboot_soft_error_not_launched(self):
def _test_delete_resized_part(self, inst):
def _test_delete_shelved_part(self, inst):
def _test_downed_host_part(self, inst, updates, delete_time, delete_type):
def _test_delete(self, delete_type, **attrs):
def test_delete(self):
def test_delete_if_not_launched(self):
def test_delete_in_resizing(self):
def test_delete_in_resized(self):
def test_delete_shelved(self):
def test_delete_shelved_offloaded(self):
def test_delete_shelved_image_not_found(self):
def test_delete_shelved_image_not_authorized(self):
def test_delete_shelved_exception(self):
def test_delete_with_down_host(self):
def test_delete_soft_with_down_host(self):
def test_delete_soft(self):
def test_delete_forced(self):
def test_delete_fast_if_host_not_set(self):
def test_local_delete_with_deleted_volume(self):
def _fake_do_delete(context, instance, bdms, rservations=None, local=False):
def test_delete_disabled(self):
def test_delete_soft_rollback(self):
def _test_confirm_resize(self, mig_ref_passed=False):
def _check_mig(expected_task_state=None):
def test_confirm_resize(self):
def test_confirm_resize_with_migration_ref(self):
def _test_revert_resize(self):
def _check_state(expected_task_state=None):
def _check_mig(expected_task_state=None):
def test_revert_resize(self):
def test_revert_resize_concurent_fail(self):
def _test_resize(self, flavor_id_passed=True, same_host=False, allow_same_host=False, allow_mig_same_host=False, project_id=None, extra_kwargs=None, same_flavor=False):
def _test_migrate(self, *args, **kwargs):
def test_resize(self):
def test_resize_with_kwargs(self):
def test_resize_same_host_and_allowed(self):
def test_resize_same_host_and_not_allowed(self):
def test_resize_different_project_id(self):
def test_migrate(self):
def test_migrate_with_kwargs(self):
def test_migrate_same_host_and_allowed(self):
def test_migrate_same_host_and_not_allowed(self):
def test_migrate_different_project_id(self):
def test_resize_invalid_flavor_fails(self):
def test_resize_disabled_flavor_fails(self):
def test_resize_quota_exceeds_fails(self):
def test_pause(self):
def test_unpause(self):
def test_swap_volume_volume_api_usage(self):
def fake_vol_api_begin_detaching(context, volume_id):
def fake_vol_api_roll_detaching(context, volume_id):
def fake_vol_api_reserve(context, volume_id):
def fake_vol_api_unreserve(context, volume_id):
def fake_swap_volume_exc(context, instance, old_volume_id, new_volume_id):
def _test_snapshot_and_backup(self, is_snapshot=True, with_base_ref=False, min_ram=None, min_disk=None, create_fails=False):
def check_state(expected_task_state=None):
def test_snapshot(self):
def test_snapshot_fails(self):
def test_snapshot_invalid_state(self):
def test_snapshot_with_base_image_ref(self):
def test_snapshot_min_ram(self):
def test_snapshot_min_disk(self):
def test_backup(self):
def test_backup_fails(self):
def test_backup_invalid_state(self):
def test_backup_with_base_image_ref(self):
def test_snapshot_volume_backed(self):
def fake_get_all_by_instance(context, instance, use_slave=False):
def fake_image_create(context, image_meta, data):
def fake_volume_get(context, volume_id):
def fake_volume_create_snapshot(context, volume_id, name, description):
def test_volume_snapshot_create(self):
def test_volume_snapshot_delete(self):
def _create_instance_with_disabled_disk_config(self, object=False):
def _setup_fake_image_with_disabled_disk_config(self):
def fake_show(obj, context, image_id):
def test_resize_with_disabled_auto_disk_config_fails(self):
def test_create_with_disabled_auto_disk_config_fails(self):
def test_rebuild_with_disabled_auto_disk_config_fails(self):
def test_rebuild(self, _record_action_start, _checks_for_create_and_rebuild, _check_auto_disk_config, _get_image, bdm_get_by_instance_uuid, get_flavor, instance_save):
def test_rebuild_change_image(self, _record_action_start, _checks_for_create_and_rebuild, _check_auto_disk_config, _get_image, bdm_get_by_instance_uuid, get_flavor, instance_save):
def get_image(context, image_href):
def test_restore(self, action_start, instance_save, quota_reserve, quota_commit):
def test_external_instance_event(self):
def test_volume_ops_invalid_task_state(self):
class ComputeAPIUnitTestCase(_ComputeAPIUnitTestMixIn, test.NoDBTestCase):
def setUp(self):
def test_resize_same_flavor_fails(self):
class ComputeAPIAPICellUnitTestCase(_ComputeAPIUnitTestMixIn,
test.NoDBTestCase):
def setUp(self):
def test_resize_same_flavor_fails(self):
class ComputeAPIComputeCellUnitTestCase(_ComputeAPIUnitTestMixIn,
test.NoDBTestCase):
def setUp(self):
def test_resize_same_flavor_passes(self):
class DiffDictTestCase(test.NoDBTestCase):
def test_no_change(self):
def test_new_key(self):
def test_changed_key(self):
def test_removed_key(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_compute_cells.py
def stub_call_to_cells(context, instance, method, *args, **kwargs):
def stub_cast_to_cells(context, instance, method, *args, **kwargs):
def deploy_stubs(stubs, api, original_instance=None):
def wrap_create_instance(func):
def wrapper(self, *args, **kwargs):
class CellsComputeAPITestCase(test_compute.ComputeAPITestCase):
def setUp(self):
def _fake_cell_read_only(*args, **kwargs):
def _fake_validate_cell(*args, **kwargs):
def _nop_update(context, instance, **kwargs):
def tearDown(self):
def test_instance_metadata(self):
def test_evacuate(self):
def test_delete_instance_no_cell(self):
def test_soft_delete_instance_no_cell(self):
def test_get_migrations(self):
def test_rebuild_sig(self, mock_msg):
def wire(version):
class CellsComputePolicyTestCase(test_compute.ComputePolicyTestCase):
def setUp(self):
def tearDown(self):
\OpenStack\nova-2014.1\nova\tests\compute\test_compute_mgr.py
class ComputeManagerUnitTestCase(test.NoDBTestCase):
def setUp(self):
def test_allocate_network_succeeds_after_retries(self):
def test_allocate_network_fails(self):
def test_allocate_network_neg_conf_value_treated_as_zero(self):
def test_init_host(self):
def _do_mock_calls(defer_iptables_apply):
def test_cleanup_host(self, mock_instance_list):
def test_init_host_with_deleted_migration(self):
def test_init_instance_failed_resume_sets_error(self):
def test_init_instance_stuck_in_deleting(self):
def _test_init_instance_reverts_crashed_migrations(self, old_vm_state=None):
def test_init_instance_reverts_crashed_migration_from_active(self):
def test_init_instance_reverts_crashed_migration_from_stopped(self):
def test_init_instance_reverts_crashed_migration_no_old_state(self):
def test_init_instance_sets_building_error(self):
def _test_init_instance_sets_building_tasks_error(self, instance):
def test_init_instance_sets_building_tasks_error_scheduling(self):
def test_init_instance_sets_building_tasks_error_block_device(self):
def test_init_instance_sets_building_tasks_error_networking(self):
def test_init_instance_sets_building_tasks_error_spawning(self):
def _test_init_instance_cleans_image_states(self, instance):
def test_init_instance_cleans_image_state_pending_upload(self):
def test_init_instance_cleans_image_state_uploading(self):
def test_init_instance_cleans_image_state_snapshot(self):
def test_init_instance_cleans_image_state_snapshot_pending(self):
def test_init_instance_errors_when_not_migrating(self):
def test_init_instance_deletes_error_deleting_instance(self):
def _test_init_instance_retries_reboot(self, instance, reboot_type, return_power_state):
def test_init_instance_retries_reboot_pending(self):
def test_init_instance_retries_reboot_pending_hard(self):
def test_init_instance_retries_reboot_started(self):
def test_init_instance_retries_reboot_started_hard(self):
def _test_init_instance_cleans_reboot_state(self, instance):
def test_init_instance_cleans_image_state_reboot_started(self):
def test_init_instance_cleans_image_state_reboot_started_hard(self):
def test_get_instances_on_driver(self):
def test_get_instances_on_driver_fallback(self):
def test_instance_usage_audit(self):
def _get_sync_instance(self, power_state, vm_state, task_state=None):
def test_sync_instance_power_state_match(self):
def test_sync_instance_power_state_running_stopped(self):
def _test_sync_to_stop(self, power_state, vm_state, driver_power_state, stop=True, force=False):
def test_sync_instance_power_state_to_stop(self):
def test_sync_instance_power_state_to_no_stop(self):
def test_run_pending_deletes(self):
def test_swap_volume_volume_api_usage(self):
def fake_vol_api_begin_detaching(context, volume_id):
def fake_vol_api_roll_detaching(context, volume_id):
def fake_vol_api_func(context, volume, *args):
def fake_vol_get(context, volume_id):
def fake_vol_attach(context, volume_id, instance_uuid, connector):
def fake_vol_api_reserve(context, volume_id):
def fake_vol_unreserve(context, volume_id):
def fake_vol_detach(context, volume_id):
def fake_vol_migrate_volume_completion(context, old_volume_id, new_volume_id, error=False):
def fake_func_exc(*args, **kwargs):
def test_check_can_live_migrate_source(self):
def _test_check_can_live_migrate_destination(self, do_raise=False, has_mig_data=False):
def test_check_can_live_migrate_destination_success(self):
def test_check_can_live_migrate_destination_success_w_mig_data(self):
def test_check_can_live_migrate_destination_fail(self):
def test_prepare_for_instance_event(self):
def test_prepare_for_instance_event_again(self):
def test_process_instance_event(self):
def test_external_instance_event(self):
OpenStack IndexPreviousNext
|