OpenStack Study: nova
OpenStack IndexPreviousNext
def test_inject_auto_disk_config_when_present(self):
def test_inject_auto_disk_config_none_as_false(self):
class GetConsoleOutputTestCase(VMOpsTestBase):
def setUp(self):
def test_get_console_output_works(self):
def test_get_console_output_throws_nova_exception(self):
def test_get_dom_id_works(self):
def test_get_dom_id_works_with_rescue_vm(self):
def test_get_dom_id_raises_not_found(self):
def test_get_dom_id_works_with_vmref(self):
class SpawnTestCase(VMOpsTestBase):
def _stub_out_common(self):
def _test_spawn(self, name_label_param=None, block_device_info_param=None, rescue=False, include_root_vdi=True, throw_exception=None, attach_pci_dev=False):
def test_spawn(self):
def test_spawn_with_alternate_options(self):
def test_spawn_with_pci_available_on_the_host(self):
def test_spawn_performs_rollback_and_throws_exception(self):
def _test_finish_migration(self, power_on=True, resize_instance=True, throw_exception=None):
def test_finish_migration(self):
def test_finish_migration_no_power_on(self):
def test_finish_migrate_performs_rollback_on_error(self):
def test_remove_hostname(self):
def test_reset_network(self):
def test_inject_hostname(self):
def test_inject_hostname_with_rescue_prefix(self):
def test_inject_hostname_with_windows_name_truncation(self):
def test_wait_for_instance_to_start(self):
def test_attach_orig_disk_for_rescue(self):
def test_agent_update_setup(self):
class MigrateDiskAndPowerOffTestCase(VMOpsTestBase):
def test_migrate_disk_and_power_off_works_down(self, migrate_up, migrate_down, *mocks):
def test_migrate_disk_and_power_off_works_up(self, migrate_up, migrate_down, *mocks):
def test_migrate_disk_and_power_off_resize_down_ephemeral_fails(self, migrate_up, migrate_down, *mocks):
class MigrateDiskResizingUpTestCase(VMOpsTestBase):
def _fake_snapshot_attached_here(self, session, instance, vm_ref, label, userdevice, post_snapshot_callback):
def test_migrate_disk_resizing_up_works_no_ephemeral(self, mock_apply_orig, mock_update_progress, mock_get_all_vdi_uuids, mock_shutdown, mock_migrate_vhd):
def test_migrate_disk_resizing_up_works_with_two_ephemeral(self, mock_apply_orig, mock_update_progress, mock_get_all_vdi_uuids, mock_shutdown, mock_migrate_vhd):
def test_migrate_disk_resizing_up_rollback(self, mock_restore, mock_apply_orig, mock_update_progress, mock_get_all_vdi_uuids, mock_shutdown, mock_migrate_vhd):
class CreateVMRecordTestCase(VMOpsTestBase):
def test_create_vm_record_with_vm_device_id(self, mock_create_vm, mock_get_vm_device_id, mock_determine_vm_mode):
class BootableTestCase(VMOpsTestBase):
def setUp(self):
def _get_blocked(self):
def test_acquire_bootlock(self):
def test_release_bootlock(self):
def test_set_bootable(self):
def test_set_not_bootable(self):
class ResizeVdisTestCase(VMOpsTestBase):
def test_resize_up_vdis_root(self, mock_resize):
def test_resize_up_vdis_zero_disks(self, mock_resize):
def test_resize_up_vdis_no_vdis_like_initial_spawn(self, mock_resize):
def test_resize_up_vdis_ephemeral(self, mock_sizes, mock_resize):
def test_resize_up_vdis_ephemeral_with_generate(self, mock_sizes, mock_generate, mock_resize):
class MigrateDiskResizingDownTestCase(VMOpsTestBase):
def test_migrate_disk_resizing_down_works_no_ephemeral( self, mock_destroy_vdi, mock_migrate_vhd, mock_resize_disk, mock_get_vdi_for_vm_safely, mock_update_instance_progress, mock_apply_orig_vm_name_label, mock_resize_ensure_vm_is_shutdown):
\OpenStack\nova-2014.1\nova\tests\virt\xenapi\test_vm_utils.py
def get_fake_connection_data(sr_type):
def _get_fake_session(error=None):
def contextified(result):
def _fake_noop(*args, **kwargs):
class VMUtilsTestBase(stubs.XenAPITestBaseNoDB):
class LookupTestCase(VMUtilsTestBase):
def setUp(self):
def _do_mock(self, result):
def test_normal(self):
def test_no_result(self):
def test_too_many(self):
def test_rescue_none(self):
def test_rescue_found(self):
def test_rescue_too_many(self):
class GenerateConfigDriveTestCase(VMUtilsTestBase):
def test_no_admin_pass(self):
def test_vdi_cleaned_up(self, mock_find, mock_create_vdi, mock_attached, mock_destroy):
class XenAPIGetUUID(VMUtilsTestBase):
def test_get_this_vm_uuid_new_kernel(self):
def test_get_this_vm_uuid_old_kernel_reboot(self):
class FakeSession(object):
def call_xenapi(self, *args):
def call_plugin(self, *args):
def call_plugin_serialized(self, plugin, fn, *args, **kwargs):
def call_plugin_serialized_with_retry(self, plugin, fn, num_retries, callback, *args, **kwargs):
class FetchVhdImageTestCase(VMUtilsTestBase):
def setUp(self):
def _stub_glance_download_vhd(self, raise_exc=None):
def _stub_bittorrent_download_vhd(self, raise_exc=None):
def test_fetch_vhd_image_works_with_glance(self):
def test_fetch_vhd_image_works_with_bittorrent(self):
def test_fetch_vhd_image_cleans_up_vdi_on_fail(self):
def test_fallback_to_default_handler(self):
def test_default_handler_doesnt_fallback_to_itself(self):
class TestImageCompression(VMUtilsTestBase):
def test_image_compression(self):
class ResizeHelpersTestCase(VMUtilsTestBase):
def test_repair_filesystem(self):
def _call_tune2fs_remove_journal(self, path):
def _call_tune2fs_add_journal(self, path):
def _call_parted(self, path, start, end):
def test_resize_part_and_fs_down_succeeds(self):
def test_log_progress_if_required(self):
def test_log_progress_if_not_required(self):
def test_resize_part_and_fs_down_fails_disk_too_big(self):
def test_resize_part_and_fs_up_succeeds(self):
def test_resize_disk_throws_on_zero_size(self):
def test_auto_config_disk_returns_early_on_zero_size(self):
class CheckVDISizeTestCase(VMUtilsTestBase):
def setUp(self):
def test_not_too_large(self):
def test_too_large(self):
def test_zero_root_gb_disables_check(self):
class GetInstanceForVdisForSrTestCase(VMUtilsTestBase):
def setUp(self):
def test_get_instance_vdis_for_sr(self):
def test_get_instance_vdis_for_sr_no_vbd(self):
def test_get_vdi_uuid_for_volume_with_sr_uuid(self):
def test_get_vdi_uuid_for_volume_failure(self):
def bad_introduce_sr(session, sr_uuid, label, sr_params):
def test_get_vdi_uuid_for_volume_from_iscsi_vol_missing_sr_uuid(self):
class VMRefOrRaiseVMFoundTestCase(VMUtilsTestBase):
def test_lookup_call(self):
def test_return_value(self):
class VMRefOrRaiseVMNotFoundTestCase(VMUtilsTestBase):
def test_exception_raised(self):
def test_exception_msg_contains_vm_name(self):
class BittorrentTestCase(VMUtilsTestBase):
def setUp(self):
def test_image_uses_bittorrent(self):
def _test_create_image(self, cache_type):
def fake_create_cached_image(*args):
def fake_fetch_image(*args):
def test_create_image_cached(self):
def test_create_image_uncached(self):
class ShutdownTestCase(VMUtilsTestBase):
def test_hardshutdown_should_return_true_when_vm_is_shutdown(self):
def test_cleanshutdown_should_return_true_when_vm_is_shutdown(self):
class CreateVBDTestCase(VMUtilsTestBase):
def setUp(self):
def _generate_vbd_rec(self):
def test_create_vbd_default_args(self):
def test_create_vbd_osvol(self):
def test_create_vbd_extra_args(self):
def test_attach_cd(self):
class UnplugVbdTestCase(VMUtilsTestBase):
def test_unplug_vbd_works(self, mock_sleep):
def test_unplug_vbd_raises_unexpected_error(self):
def test_unplug_vbd_already_detached_works(self):
def test_unplug_vbd_already_raises_unexpected_xenapi_error(self):
def _test_uplug_vbd_retries(self, mock_sleep, error):
def test_uplug_vbd_retries_on_rejected(self, mock_sleep):
def test_uplug_vbd_retries_on_internal_error(self, mock_sleep):
class VDIOtherConfigTestCase(VMUtilsTestBase):
def setUp(self):
def test_create_vdi(self):
def test_create_image(self):
def fake_fetch_image(*args):
def VDI_add_to_other_config(ref, key, value):
def test_import_migrated_vhds(self):
def VDI_add_to_other_config(ref, key, value):
def call_plugin_serialized(*args, **kwargs):
class GenerateDiskTestCase(VMUtilsTestBase):
def setUp(self):
def tearDown(self):
def _expect_parted_calls(self):
def _check_vdi(self, vdi_ref, check_attached=True):
def test_generate_disk_with_no_fs_given(self):
def test_generate_disk_swap(self):
def test_generate_disk_ephemeral(self):
def test_generate_disk_ensure_cleanup_called(self):
def test_generate_disk_ephemeral_local_not_attached(self):
class GenerateEphemeralTestCase(VMUtilsTestBase):
def setUp(self):
def test_get_ephemeral_disk_sizes_simple(self):
def test_get_ephemeral_disk_sizes_three_disks_2000(self):
def test_get_ephemeral_disk_sizes_two_disks_1024(self):
def _expect_generate_disk(self, size, device, name_label):
def test_generate_ephemeral_adds_one_disk(self):
def test_generate_ephemeral_adds_multiple_disks(self):
def test_generate_ephemeral_cleans_up_on_error(self):
class FakeFile(object):
def __init__(self):
def seek(self, offset):
class StreamDiskTestCase(VMUtilsTestBase):
def setUp(self):
def test_non_ami(self):
def test_ami_disk(self):
class VMUtilsSRPath(VMUtilsTestBase):
def setUp(self):
def test_defined(self):
def test_default(self):
class CreateKernelRamdiskTestCase(VMUtilsTestBase):
def setUp(self):
def test_create_kernel_and_ramdisk_no_create(self):
def test_create_kernel_and_ramdisk_create_both_cached(self):
def test_create_kernel_and_ramdisk_create_kernel_not_cached(self):
class ScanSrTestCase(VMUtilsTestBase):
def test_scan_default_sr(self, mock_safe_find_sr, mock_scan_sr):
def test_scan_sr_works(self):
def test_scan_sr_unknown_error_fails_once(self):
def test_scan_sr_known_error_retries_then_throws(self, mock_sleep):
def test_scan_sr_known_error_retries_then_succeeds(self, mock_sleep):
def fake_call_xenapi(*args):
class CreateVmTestCase(VMUtilsTestBase):
def test_vss_provider(self, mock_extract):
def test_invalid_cpu_mask_raises(self, mock_extract):
def test_destroy_vm(self, mock_extract):
def test_destroy_vm_silently_fails(self, mock_extract):
class DetermineVmModeTestCase(VMUtilsTestBase):
def test_determine_vm_mode_returns_xen_mode(self):
def test_determine_vm_mode_returns_hvm_mode(self):
def test_determine_vm_mode_returns_xen_for_linux(self):
def test_determine_vm_mode_returns_hvm_for_windows(self):
def test_determine_vm_mode_returns_hvm_by_default(self):
def test_determine_vm_mode_returns_xen_for_VHD(self):
def test_determine_vm_mode_returns_xen_for_DISK(self):
class CallXenAPIHelpersTestCase(VMUtilsTestBase):
def test_vm_get_vbd_refs(self):
def test_vbd_get_rec(self):
def test_vdi_get_rec(self):
def test_vdi_snapshot(self):
def test_vdi_get_virtual_size(self):
def test_vdi_resize(self, mock_get_resize_func_name):
def test_update_vdi_virtual_size_works(self, mock_get_size, mock_resize):
def test_update_vdi_virtual_size_skips_resize_down(self, mock_get_size, mock_resize):
def test_update_vdi_virtual_size_raise_if_disk_big(self, mock_get_size, mock_resize):
class GetVdiForVMTestCase(VMUtilsTestBase):
def test_get_vdi_for_vm_safely(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_rec):
def test_get_vdi_for_vm_safely_fails(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_rec):
class GetAllVdiForVMTestCase(VMUtilsTestBase):
def _setup_get_all_vdi_uuids_for_vm(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_uuid):
def fake_vbd_get_rec(session, vbd_ref):
def fake_vdi_get_uuid(session, vdi_ref):
def test_get_all_vdi_uuids_for_vm_works(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_uuid):
def test_get_all_vdi_uuids_for_vm_finds_none(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_uuid):
class GetAllVdisTestCase(VMUtilsTestBase):
def test_get_all_vdis_in_sr(self):
def fake_get_rec(record_type, ref):
class VDIAttachedHere(VMUtilsTestBase):
def test_sync_called(self, mock_execute, mock_wait_for_device, mock_remap_vbd_dev, mock_create_vbd, mock_get_this_vm_ref, mock_destroy_vbd):
class SnapshotAttachedHereTestCase(VMUtilsTestBase):
def test_snapshot_attached_here(self, mock_impl):
def fake_impl(session, instance, vm_ref, label, userdevice, post_snapshot_callback):
def test_snapshot_attached_here_impl(self, mock_get_vdi_for_vm_safely, mock_get_vhd_parent_uuid, mock_vdi_snapshot, mock_vdi_get_uuid, mock_wait_for_vhd_coalesce, mock_walk_vdi_chain, mock_safe_destroy_vdis):
def test_wait_for_vhd_coalesce(self, mock_scan_sr, mock_another_child_vhd, mock_get_vhd_parent_uuid, mock_sleep):
def fake_scan_sr(session, sr_ref):
def fake_get_vhd_parent_uuid(session, vdi_ref):
def fake_call_xenapi(method, *args):
class ImportMigratedDisksTestCase(VMUtilsTestBase):
def test_import_all_migrated_disks(self, mock_root, mock_ephemeral):
def test_import_migrated_root_disk(self, mock_migrate):
def test_import_migrate_ephemeral_disks(self, mock_migrate):
def test_import_migrated_vhds(self, mock_get_sr_path, mock_scan_sr, mock_set_info):
def test_get_vhd_parent_uuid_rec_provided(self):
class MigrateVHDTestCase(VMUtilsTestBase):
def _assert_transfer_called(self, session, label):
def test_migrate_vhd_root(self):
def test_migrate_vhd_ephemeral(self):
def test_migrate_vhd_converts_exceptions(self):
class StripBaseMirrorTestCase(VMUtilsTestBase):
def test_strip_base_mirror_from_vdi_works(self):
def test_strip_base_mirror_from_vdi_hides_error(self):
def test_strip_base_mirror_from_vdis(self, mock_strip):
def call_xenapi(method, arg):
class DeviceIdTestCase(VMUtilsTestBase):
def test_device_id_is_none_if_not_specified_in_meta_data(self):
def test_get_device_id_if_hypervisor_version_is_greater_than_6_1(self):
def test_raise_exception_if_device_id_not_supported_by_hyp_version(self):
class CreateVmRecordTestCase(VMUtilsTestBase):
def test_create_vm_record_linux(self, mock_extract_flavor):
def test_create_vm_record_windows(self, mock_extract_flavor):
def _test_create_vm_record(self, mock_extract_flavor, instance, is_viridian):
def test_list_vms(self):
class ResizeFunctionTestCase(test.NoDBTestCase):
def _call_get_resize_func_name(self, brand, version):
def _test_is_resize(self, brand, version):
def _test_is_resize_online(self, brand, version):
def test_xenserver_5_5(self):
def test_xenserver_6_0(self):
def test_xcp_1_1(self):
def test_xcp_1_2(self):
def test_xcp_2_0(self):
def test_random_brand(self):
def test_default(self):
def test_empty(self):
def test_bad_version(self):
class VMInfoTests(VMUtilsTestBase):
def setUp(self):
def test_get_power_state_valid(self):
def test_get_power_state_invalid(self):
def test_compile_info(self):
def call_xenapi(method, *args):
\OpenStack\nova-2014.1\nova\tests\virt\xenapi\test_volumeops.py
class VolumeAttachTestCase(test.NoDBTestCase):
def test_detach_volume_call(self):
def regcall(label):
def test_attach_volume_call(self):
def test_attach_volume_no_hotplug(self):
def _test_connect_volume(self, hotplug, vm_running, plugged):
def test_connect_volume_no_hotplug_vm_running(self):
def test_connect_volume_no_hotplug_vm_not_running(self):
def test_connect_volume_hotplug_vm_stopped(self):
def test_connect_volume_hotplug_vm_running(self):
def test_connect_volume(self):
def fake_call_xenapi(self, method, *args, **kwargs):
\OpenStack\nova-2014.1\nova\tests\virt\xenapi\test_volume_utils.py
class SROps(stubs.XenAPITestBaseNoDB):
def test_find_sr_valid_uuid(self):
def test_find_sr_invalid_uuid(self):
class ISCSIParametersTestCase(stubs.XenAPITestBaseNoDB):
def test_target_host(self):
def test_target_port(self):
class IntroduceTestCase(stubs.XenAPITestBaseNoDB):
def test_introduce_vdi_retry(self, mock_sleep, mock_get_vdi_ref):
def fake_get_vdi_ref(session, sr_ref, vdi_uuid, target_lun):
def fake_call_xenapi(method, *args):
def test_introduce_vdi_exception(self, mock_sleep, mock_get_vdi_ref):
def fake_call_xenapi(method, *args):
\OpenStack\nova-2014.1\nova\tests\virt\xenapi\test_xenapi.py
def get_session():
def set_image_fixtures():
def get_fake_device_info():
def stub_vm_utils_with_vdi_attached_here(function):
def decorated_function(self, *args, **kwargs):
def create_instance_with_system_metadata(context, instance_values):
class XenAPIVolumeTestCase(stubs.XenAPITestBaseNoDB):
def setUp(self):
def _make_connection_info(cls):
def test_mountpoint_to_number(self):
def test_parse_volume_info_parsing_auth_details(self):
def test_get_device_number_raise_exception_on_wrong_mountpoint(self):
def test_attach_volume(self):
def test_attach_volume_raise_exception(self):
class XenAPIVMTestCase(stubs.XenAPITestBase):
def setUp(self):
def fake_inject_instance_metadata(self, instance, vm):
def fake_safe_copy_vdi(session, sr_ref, instance, vdi_to_copy_ref):
def tearDown(self):
def test_init_host(self):
def test_instance_exists(self):
def test_instance_not_exists(self):
def test_list_instances_0(self):
def test_list_instance_uuids_0(self):
def test_list_instance_uuids(self):
def test_get_rrd_server(self):
def test_get_diagnostics(self):
def fake_get_rrd(host, vm_uuid):
def test_get_vnc_console(self):
def test_get_vnc_console_for_rescue(self):
def test_get_vnc_console_instance_not_ready(self):
def test_get_vnc_console_rescue_not_ready(self):
def test_instance_snapshot_fails_with_no_primary_vdi(self):
def create_bad_vbd(session, vm_ref, vdi_ref, userdevice, vbd_type='disk', read_only=False, bootable=False, osvol=False):
def test_instance_snapshot(self):
def fake_image_upload(_self, ctx, session, inst, vdi_uuids, img_id):
def create_vm_record(self, conn, os_type, name):
def check_vm_record(self, conn, instance_type_id, check_injection):
def check_vm_params_for_windows(self):
def check_vm_params_for_linux(self):
def check_vm_params_for_linux_with_external_kernel(self):
def _list_vdis(self):
def _list_vms(self):
def _check_vdis(self, start_list, end_list):
def _test_spawn(self, image_ref, kernel_id, ramdisk_id, instance_type_id="3", os_type="linux", hostname="test", architecture="x86-64", instance_id=1, injected_files=None, check_injection=False, create_record=True, empty_dns=False, block_device_info=None, key_data=None):
def fake_inject_instance_metadata(self, instance, vm):
def test_spawn_ipxe_iso_success(self):
def test_spawn_ipxe_iso_no_network_name(self):
def test_spawn_ipxe_iso_no_boot_menu_url(self):
def test_spawn_ipxe_iso_unknown_network_name(self):
def test_spawn_empty_dns(self):
def test_spawn_not_enough_memory(self):
def test_spawn_fail_cleanup_1(self):
def test_spawn_fail_cleanup_2(self):
def test_spawn_fail_cleanup_3(self):
def test_spawn_raw_glance(self):
def test_spawn_vhd_glance_linux(self):
def test_spawn_vhd_glance_windows(self):
def test_spawn_iso_glance(self):
def test_spawn_glance(self):
def fake_fetch_disk_image(context, session, instance, name_label, image_id, image_type):
def test_spawn_boot_from_volume_no_image_meta(self):
def test_spawn_boot_from_volume_no_glance_image_meta(self):
def test_spawn_boot_from_volume_with_image_meta(self):
def test_spawn_netinject_file(self):
def _tee_handler(cmd, **kwargs):
def _readlink_handler(cmd_parts, **kwargs):
def test_spawn_netinject_xenstore(self):
def _mount_handler(cmd, *ignore_args, **ignore_kwargs):
def _umount_handler(cmd, *ignore_args, **ignore_kwargs):
def _tee_handler(cmd, *ignore_args, **ignore_kwargs):
def test_spawn_injects_auto_disk_config_to_xenstore(self):
def test_spawn_vlanmanager(self):
def dummy(*args, **kwargs):
def test_spawn_with_network_qos(self):
def test_spawn_ssh_key_injection(self):
def fake_inject_file(self, method, args):
def fake_encrypt_text(sshkey, new_pass):
def test_spawn_ssh_key_injection_non_rsa(self):
def fake_inject_file(self, method, args):
def fake_encrypt_text(sshkey, new_pass):
def test_spawn_injected_files(self):
def fake_inject_file(self, method, args):
def test_spawn_agent_upgrade(self):
def fake_agent_build(_self, *args):
def test_spawn_agent_upgrade_fails_silently(self):
def fake_agent_build(_self, *args):
def fake_agent_update(self, method, args):
def test_spawn_with_resetnetwork_alternative_returncode(self):
def fake_resetnetwork(self, method, args):
def _test_spawn_fails_silently_with(self, trigger, expected_exception):
def fake_agent_version(self, method, args):
def fake_add_instance_fault(*args, **kwargs):
def test_spawn_fails_with_agent_timeout(self):
def test_spawn_fails_with_agent_not_implemented(self):
def test_spawn_fails_with_agent_error(self):
def test_spawn_fails_with_agent_bad_return(self):
def fake_agent_version(self, method, args):
def test_spawn_fails_agent_not_implemented(self):
def fake_agent_version(self, method, args):
def test_rescue(self):
def test_rescue_preserve_disk_on_failure(self):
def fake_start(*args, **kwargs):
def test_unrescue(self):
def test_unrescue_not_in_rescue(self):
def test_finish_revert_migration(self):
def test_reboot_hard(self):
def test_poll_rebooting_instances(self):
def test_reboot_soft(self):
def test_reboot_halted(self):
def test_reboot_unknown_state(self):
def test_reboot_rescued(self):
def test_get_console_output_succeeds(self):
def fake_get_console_output(instance):
def _test_maintenance_mode(self, find_host, find_aggregate):
def fake_call_xenapi(method, *args):
def fake_aggregate_get(context, host, key):
def fake_host_find(context, session, src, dst):
def test_maintenance_mode(self):
def test_maintenance_mode_no_host(self):
def test_maintenance_mode_no_aggregate(self):
def test_uuid_find(self):
def test_session_virtapi(self):
def fake_aggregate_get_by_host(self, *args, **kwargs):
def test_per_instance_usage_running(self):
def test_per_instance_usage_suspended(self):
def test_per_instance_usage_halted(self):
def _create_instance(self, instance_id=1, spawn=True, obj=False, **attrs):
def test_destroy_clean_up_kernel_and_ramdisk(self):
def fake_lookup_kernel_ramdisk(session, vm_ref):
def fake_destroy_kernel_ramdisk(session, instance, kernel, ramdisk):
class XenAPIDiffieHellmanTestCase(test.NoDBTestCase):
def setUp(self):
def test_shared(self):
def _test_encryption(self, message):
def test_encrypt_simple_message(self):
def test_encrypt_message_with_newlines_at_end(self):
def test_encrypt_many_newlines_at_end(self):
def test_encrypt_newlines_inside_message(self):
def test_encrypt_with_leading_newlines(self):
def test_encrypt_really_long_message(self):
class XenAPIMigrateInstance(stubs.XenAPITestBase):
def setUp(self):
def fake_inject_instance_metadata(self, instance, vm):
def test_migrate_disk_and_power_off(self):
def test_migrate_disk_and_power_off_passes_exceptions(self):
def fake_raise(*args, **kwargs):
def test_migrate_disk_and_power_off_throws_on_zero_gb_resize_down(self):
def test_migrate_disk_and_power_off_with_zero_gb_old_and_new_works(self):
def _test_revert_migrate(self, power_on):
def fake_vm_start(*args, **kwargs):
def fake_vdi_resize(*args, **kwargs):
def fake_finish_revert_migration(*args, **kwargs):
def test_revert_migrate_power_on(self):
def test_revert_migrate_power_off(self):
def _test_finish_migrate(self, power_on):
def fake_vm_start(*args, **kwargs):
def fake_vdi_resize(*args, **kwargs):
def test_finish_migrate_power_on(self):
def test_finish_migrate_power_off(self):
def test_finish_migrate_no_local_storage(self):
def fake_vdi_resize(*args, **kwargs):
def test_finish_migrate_no_resize_vdi(self):
def fake_vdi_resize(*args, **kwargs):
def test_migrate_too_many_partitions_no_resize_down(self):
def fake_get_partitions(partition):
def test_migrate_bad_fs_type_no_resize_down(self):
def fake_get_partitions(partition):
def test_migrate_rollback_when_resize_down_fs_fails(self):
def test_resize_ensure_vm_is_shutdown_cleanly(self):
def test_resize_ensure_vm_is_shutdown_forced(self):
def test_resize_ensure_vm_is_shutdown_fails(self):
def test_resize_ensure_vm_is_shutdown_already_shutdown(self):
class XenAPIImageTypeTestCase(test.NoDBTestCase):
def test_to_string(self):
def _assert_role(self, expected_role, image_type_id):
def test_get_image_role_kernel(self):
def test_get_image_role_ramdisk(self):
def test_get_image_role_disk(self):
def test_get_image_role_disk_raw(self):
def test_get_image_role_disk_vhd(self):
class XenAPIDetermineDiskImageTestCase(test.NoDBTestCase):
def assert_disk_type(self, image_meta, expected_disk_type):
def test_machine(self):
def test_raw(self):
def test_vhd(self):
def test_none(self):
class XenAPIHostTestCase(stubs.XenAPITestBase):
def setUp(self):
def test_host_state(self):
def test_host_state_vcpus_used(self):
def test_pci_passthrough_devices_whitelist(self):
def test_pci_passthrough_devices_no_whitelist(self):
def test_host_state_missing_sr(self):
def fake_safe_find_sr(session):
def _test_host_action(self, method, action, expected=None):
def test_host_reboot(self):
def test_host_shutdown(self):
def test_host_startup(self):
def test_host_maintenance_on(self):
def test_host_maintenance_off(self):
def test_set_enable_host_enable(self):
def test_set_enable_host_disable(self):
def test_get_host_uptime(self):
def test_supported_instances_is_included_in_host_state(self):
def test_supported_instances_is_calculated_by_to_supported_instances(self):
def to_supported_instances(somedata):
def test_update_stats_caches_hostname(self):
class ToSupportedInstancesTestCase(test.NoDBTestCase):
def test_default_return_value(self):
def test_return_value(self):
def test_invalid_values_do_not_break(self):
def test_multiple_values(self):
class XenAPIAutoDiskConfigTestCase(stubs.XenAPITestBase):
def setUp(self):
def fake_create_vbd(session, vm_ref, vdi_ref, userdevice, vbd_type='disk', read_only=False, bootable=True, osvol=False):
def assertIsPartitionCalled(self, called):
def fake_resize_part_and_fs(dev, start, old, new):
def test_instance_not_auto_disk_config(self):
def test_instance_auto_disk_config_doesnt_pass_fail_safes(self):
def fake_get_partitions(dev):
def test_instance_auto_disk_config_passes_fail_safes(self):
def fake_get_partitions(dev):
class XenAPIGenerateLocal(stubs.XenAPITestBase):
def setUp(self):
def fake_create_vbd(session, vm_ref, vdi_ref, userdevice, vbd_type='disk', read_only=False, bootable=True, osvol=False, empty=False, unpluggable=True):
def assertCalled(self, instance, disk_image_type=vm_utils.ImageType.DISK_VHD):
def test_generate_swap(self):
def fake_generate_swap(*args, **kwargs):
def test_generate_ephemeral(self):
def fake_generate_ephemeral(*args):
def test_generate_iso_blank_root_disk(self):
def fake_generate_ephemeral(*args):
def fake_generate_iso(*args):
class XenAPIBWCountersTestCase(stubs.XenAPITestBaseNoDB):
def setUp(self):
def _fake_get_vif_device_map(vm_rec):
def _fake_list_vms(cls, session):
def _fake_fetch_bandwidth_mt(session):
def _fake_fetch_bandwidth(session):
def test_get_all_bw_counters(self):
def test_get_all_bw_counters_in_failure_case(self):
class XenAPIDom0IptablesFirewallTestCase(stubs.XenAPITestBase):
def setUp(self):
def _create_instance_ref(self):
def _create_test_security_group(self):
def _validate_security_group(self):
def test_static_filters(self):
def test_filters_for_instance_with_ip_v6(self):
def test_filters_for_instance_without_ip_v6(self):
def test_multinic_iptables(self):
def test_do_refresh_security_group_rules(self):
def test_provider_firewall_rules(self):
class XenAPISRSelectionTestCase(stubs.XenAPITestBaseNoDB):
def test_safe_find_sr_raise_exception(self):
def test_safe_find_sr_local_storage(self):
def test_safe_find_sr_by_other_criteria(self):
def test_safe_find_sr_default(self):
def _create_service_entries(context, values={'avail_zone1':
class XenAPIAggregateTestCase(stubs.XenAPITestBase):
def setUp(self):
def test_pool_add_to_aggregate_called_by_driver(self):
def pool_add_to_aggregate(context, aggregate, host, slave_info=None):
def test_pool_remove_from_aggregate_called_by_driver(self):
def pool_remove_from_aggregate(context, aggregate, host, slave_info=None):
def test_add_to_aggregate_for_first_host_sets_metadata(self):
def fake_init_pool(id, name):
def test_join_slave(self):
def fake_join_slave(id, compute_uuid, host, url, user, password):
def test_add_to_aggregate_first_host(self):
def fake_pool_set_name_label(self, session, pool_ref, name):
def test_remove_from_aggregate_called(self):
def fake_remove_from_aggregate(context, aggregate, host):
def test_remove_from_empty_aggregate(self):
def test_remove_slave(self):
def fake_eject_slave(id, compute_uuid, host_uuid):
def test_remove_master_solo(self):
def fake_clear_pool(id):
def test_remote_master_non_empty_pool(self):
def _aggregate_setup(self, aggr_name='fake_aggregate', aggr_zone='fake_zone', aggr_state=pool_states.CREATED, hosts=['host'], metadata=None):
def test_add_host_to_aggregate_invalid_changing_status(self):
def test_add_host_to_aggregate_invalid_dismissed_status(self):
def test_add_host_to_aggregate_invalid_error_status(self):
def test_remove_host_from_aggregate_error(self):
def test_remove_host_from_aggregate_invalid_dismissed_status(self):
def test_remove_host_from_aggregate_invalid_changing_status(self):
def test_add_aggregate_host_raise_err(self):
def fake_driver_add_to_aggregate(context, aggregate, host, **_ignore):
class MockComputeAPI(object):
def __init__(self):
def add_aggregate_host(self, ctxt, aggregate, host_param, host, slave_info):
def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host, slave_info):
class StubDependencies(object):
def __init__(self):
def _is_hv_pool(self, *_ignore):
def _get_metadata(self, *_ignore):
def _create_slave_info(self, *ignore):
class ResourcePoolWithStubs(StubDependencies, pool.ResourcePool):
class HypervisorPoolTestCase(test.NoDBTestCase):
def test_slave_asks_master_to_add_slave_to_pool(self):
def test_slave_asks_master_to_remove_slave_from_pool(self):
class SwapXapiHostTestCase(test.NoDBTestCase):
def test_swapping(self):
def test_no_port(self):
def test_no_path(self):
class XenAPILiveMigrateTestCase(stubs.XenAPITestBaseNoDB):
def setUp(self):
def test_live_migration_calls_vmops(self):
def fake_live_migrate(context, instance_ref, dest, post_method, recover_method, block_migration, migrate_data):
def test_pre_live_migration(self):
def test_post_live_migration_at_destination(self):
def fake_fw(instance, network_info):
def fake_create_kernel_and_ramdisk(context, session, instance, name_label):
def fake_get_vm_opaque_ref(instance):
def fake_strip_base_mirror_from_vdis(session, vm_ref):
def test_check_can_live_migrate_destination_with_block_migration(self):
def test_check_live_migrate_destination_verifies_ip(self):
def test_check_can_live_migrate_destination_block_migration_fails(self):
def _add_default_live_migrate_stubs(self, conn):
def fake_generate_vdi_map(destination_sr_ref, _vm_ref):
def fake_get_iscsi_srs(destination_sr_ref, _vm_ref):
def fake_get_vm_opaque_ref(instance):
def fake_lookup_kernel_ramdisk(session, vm):
def test_check_can_live_migrate_source_with_block_migrate(self):
def test_check_can_live_migrate_source_with_block_migrate_iscsi(self):
def fake_get_iscsi_srs(destination_sr_ref, _vm_ref):
def fake_make_plugin_call(plugin, method, **args):
def test_check_can_live_migrate_source_with_block_iscsi_fails(self):
def fake_get_iscsi_srs(destination_sr_ref, _vm_ref):
def fake_make_plugin_call(plugin, method, **args):
def test_check_can_live_migrate_source_with_block_migrate_fails(self):
def test_check_can_live_migrate_works(self):
def fake_aggregate_get_by_host(context, host, key=None):
def test_check_can_live_migrate_fails(self):
def fake_aggregate_get_by_host(context, host, key=None):
def test_live_migration(self):
def fake_get_vm_opaque_ref(instance):
def fake_get_host_opaque_ref(context, destination_hostname):
def post_method(context, instance, destination_hostname, block_migration, migrate_data):
def test_live_migration_on_failure(self):
def fake_get_vm_opaque_ref(instance):
def fake_get_host_opaque_ref(context, destination_hostname):
def fake_call_xenapi(*args):
def recover_method(context, instance, destination_hostname, block_migration):
def test_live_migration_calls_post_migration(self):
def post_method(context, instance, destination_hostname, block_migration, migrate_data):
def test_live_migration_block_cleans_srs(self):
def fake_get_iscsi_srs(context, instance):
def fake_forget_sr(context, instance):
def post_method(context, instance, destination_hostname, block_migration, migrate_data):
def test_live_migration_with_block_migration_raises_invalid_param(self):
def recover_method(context, instance, destination_hostname, block_migration):
def test_live_migration_with_block_migration_fails_migrate_send(self):
def recover_method(context, instance, destination_hostname, block_migration):
def test_live_migrate_block_migration_xapi_call_parameters(self):
def fake_generate_vdi_map(destination_sr_ref, _vm_ref):
def dummy_callback(*args, **kwargs):
def test_live_migrate_pool_migration_xapi_call_parameters(self):
def fake_get_host_opaque_ref(context, destination):
def dummy_callback(*args, **kwargs):
def test_generate_vdi_map(self):
def fake_find_sr(_session):
def fake_get_instance_vdis_for_sr(_session, _vm_ref, _sr_ref):
def test_rollback_live_migration_at_destination(self):
class XenAPIInjectMetadataTestCase(stubs.XenAPITestBaseNoDB):
def setUp(self):
def fake_get_vm_opaque_ref(inst, instance):
def fake_add_to_param_xenstore(inst, vm_ref, key, val):
def fake_remove_from_param_xenstore(inst, vm_ref, key):
def fake_write_to_xenstore(inst, instance, path, value, vm_ref=None):
def fake_delete_from_xenstore(inst, instance, path, vm_ref=None):
def test_inject_instance_metadata(self):
def test_change_instance_metadata_add(self):
def test_change_instance_metadata_update(self):
def test_change_instance_metadata_delete(self):
def test_change_instance_metadata_not_found(self):
class XenAPISessionTestCase(test.NoDBTestCase):
def _get_mock_xapisession(self, software_version):
def test_local_session(self):
def test_remote_session(self):
def test_get_product_version_product_brand_does_not_fail(self):
def test_get_product_version_product_brand_xs_6(self):
def test_verify_plugin_version_same(self):
def test_verify_plugin_version_compatible(self):
def test_verify_plugin_version_bad_maj(self):
def test_verify_plugin_version_bad_min(self):
def test_verify_current_version_matches(self):
class XenAPIFakeTestCase(test.NoDBTestCase):
def test_query_matches(self):
def test_query_bad_format(self):
\OpenStack\nova-2014.1\nova\tests\virt\xenapi\__init__.py
\OpenStack\nova-2014.1\nova\tests\virt\__init__.py
\OpenStack\nova-2014.1\nova\tests\volume\encryptors\test_base.py
class VolumeEncryptorTestCase(test.TestCase):
def _create(self, device_path):
def setUp(self):
\OpenStack\nova-2014.1\nova\tests\volume\encryptors\test_cryptsetup.py
def fake__get_key(context):
class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):
def _create(self, connection_info):
def setUp(self):
def fake_execute(*cmd, **kwargs):
def test__open_volume(self):
def test_attach_volume(self):
def test__close_volume(self):
def test_detach_volume(self):
\OpenStack\nova-2014.1\nova\tests\volume\encryptors\test_luks.py
class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):
def _create(self, connection_info):
def setUp(self):
def test__format_volume(self):
def test__open_volume(self):
def test_attach_volume(self):
def test__close_volume(self):
def test_detach_volume(self):
\OpenStack\nova-2014.1\nova\tests\volume\encryptors\test_nop.py
class NoOpEncryptorTestCase(test_base.VolumeEncryptorTestCase):
def _create(self, connection_info):
def setUp(self):
def test_attach_volume(self):
def test_detach_volume(self):
\OpenStack\nova-2014.1\nova\tests\volume\encryptors\__init__.py
\OpenStack\nova-2014.1\nova\tests\volume\test_cinder.py
class FakeCinderClient(object):
def __init__(self):
class CinderApiTestCase(test.NoDBTestCase):
def setUp(self):
def test_get(self):
def test_get_failed(self):
def test_create(self):
def test_create_failed(self):
def test_get_all(self):
def test_check_attach_volume_status_error(self):
def test_check_attach_volume_already_attached(self):
def test_check_attach_availability_zone_differs(self):
def test_check_attach(self):
def test_check_detach(self):
def test_reserve_volume(self):
def test_unreserve_volume(self):
def test_begin_detaching(self):
def test_roll_detaching(self):
def test_attach(self):
def test_detach(self):
def test_initialize_connection(self):
def test_terminate_connection(self):
def test_delete(self):
def test_update(self):
def test_get_snapshot(self):
def test_get_snapshot_failed(self):
def test_get_all_snapshots(self):
def test_create_snapshot(self):
def test_create_force(self):
def test_delete_snapshot(self):
def test_get_volume_metadata(self):
def test_get_volume_metadata_value(self):
def test_delete_volume_metadata(self):
def test_update_volume_metadata(self):
def test_update_snapshot_status(self):
def test_get_volume_encryption_metadata(self):
\OpenStack\nova-2014.1\nova\tests\volume\__init__.py
\OpenStack\nova-2014.1\nova\tests\__init__.py
\OpenStack\nova-2014.1\nova\utils.py
def vpn_ping(address, port, timeout=0.05, session_id=None):
def _get_root_helper():
def execute(*cmd, **kwargs):
def trycmd(*args, **kwargs):
def novadir():
def generate_uid(topic, size=8):
def last_completed_audit_period(unit=None, before=None):
def generate_password(length=None, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):
def get_my_ipv4_address():
def _get_ipv4_address_for_interface(iface):
def get_my_linklocal(interface):
class LazyPluggable(object):
def __init__(self, pivot, config_group=None, **backends):
def __get_backend(self):
def __getattr__(self, key):
def xhtml_escape(value):
def utf8(value):
def check_isinstance(obj, cls):
def parse_server_string(server_str):
def is_int_like(val):
def is_valid_ipv4(address):
def is_valid_ipv6(address):
def is_valid_ip_address(address):
def is_valid_ipv6_cidr(address):
def get_shortened_ipv6(address):
def get_shortened_ipv6_cidr(address):
def is_valid_cidr(address):
def get_ip_version(network):
def monkey_patch():
def convert_to_list_dict(lst, label):
def make_dev_path(dev, partition=None, base='/dev'):
def sanitize_hostname(hostname):
def read_cached_file(filename, cache_info, reload_func=None):
def temporary_mutation(obj, **kwargs):
def is_dict_like(thing):
def get(thing, attr, default):
def set_value(thing, attr, val):
def delete(thing, attr):
def generate_mac_address():
def read_file_as_root(file_path):
def temporary_chown(path, owner_uid=None):
def chown(path, owner_uid=None):
def tempdir(**kwargs):
def walk_class_hierarchy(clazz, encountered=None):
class UndoManager(object):
def __init__(self):
def undo_with(self, undo_func):
def _rollback(self):
def rollback_and_reraise(self, msg=None, **kwargs):
def mkfs(fs, path, label=None, run_as_root=False):
def last_bytes(file_like_object, num):
def metadata_to_dict(metadata):
def dict_to_metadata(metadata):
def instance_meta(instance):
def instance_sys_meta(instance):
def get_wrapped_function(function):
def _get_wrapped_function(function):
def expects_func_args(*args):
def _decorator_checker(dec):
def _decorator(f):
class ExceptionHelper(object):
def __init__(self, target):
def __getattr__(self, name):
def wrapper(*args, **kwargs):
\OpenStack\nova-2014.1\nova\version.py
def _load_config():
def vendor_string():
def product_string():
def package_string():
def version_string_with_package():
\OpenStack\nova-2014.1\nova\virt\baremetal\baremetal_states.py
\OpenStack\nova-2014.1\nova\virt\baremetal\base.py
class NodeDriver(object):
def __init__(self, virtapi):
def cache_images(self, context, node, instance, **kwargs):
def destroy_images(self, context, node, instance):
def activate_bootloader(self, context, node, instance, **kwargs):
def deactivate_bootloader(self, context, node, instance):
def activate_node(self, context, node, instance):
def deactivate_node(self, context, node, instance):
def get_console_output(self, node, instance):
def dhcp_options_for_instance(self, instance):
class PowerManager(object):
def __init__(self, **kwargs):
def activate_node(self):
def reboot_node(self):
def deactivate_node(self):
def is_power_on(self):
def start_console(self):
def stop_console(self):
\OpenStack\nova-2014.1\nova\virt\baremetal\common.py
class ConnectionFailed(exception.NovaException):
class Connection(object):
def __init__(self, host, username, password, port=22, keyfile=None):
def ssh_connect(connection):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\api.py
def bm_node_get_all(context, service_host=None):
def bm_node_get_associated(context, service_host=None):
def bm_node_get_unassociated(context, service_host=None):
def bm_node_find_free(context, service_host=None, memory_mb=None, cpus=None, local_gb=None):
def bm_node_get(context, bm_node_id):
def bm_node_get_by_instance_uuid(context, instance_uuid):
def bm_node_get_by_node_uuid(context, node_uuid):
def bm_node_create(context, values):
def bm_node_destroy(context, bm_node_id):
def bm_node_update(context, bm_node_id, values):
def bm_node_associate_and_update(context, node_uuid, values):
def bm_interface_get(context, if_id):
def bm_interface_get_all(context):
def bm_interface_destroy(context, if_id):
def bm_interface_create(context, bm_node_id, address, datapath_id, port_no):
def bm_interface_set_vif_uuid(context, if_id, vif_uuid):
def bm_interface_get_by_vif_uuid(context, vif_uuid):
def bm_interface_get_all_by_bm_node_id(context, bm_node_id):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\migration.py
def db_sync(version=None):
def db_version():
def db_initial_version():
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\api.py
def model_query(context, *args, **kwargs):
def _save(ref, session=None):
def _build_node_order_by(query):
def bm_node_get_all(context, service_host=None):
def bm_node_get_associated(context, service_host=None):
def bm_node_get_unassociated(context, service_host=None):
def bm_node_find_free(context, service_host=None, cpus=None, memory_mb=None, local_gb=None):
def bm_node_get(context, bm_node_id):
def bm_node_get_by_instance_uuid(context, instance_uuid):
def bm_node_get_by_node_uuid(context, bm_node_uuid):
def bm_node_create(context, values):
def bm_node_update(context, bm_node_id, values):
def bm_node_associate_and_update(context, node_uuid, values):
def bm_node_destroy(context, bm_node_id):
def bm_interface_get(context, if_id):
def bm_interface_get_all(context):
def bm_interface_destroy(context, if_id):
def bm_interface_create(context, bm_node_id, address, datapath_id, port_no):
def bm_interface_set_vif_uuid(context, if_id, vif_uuid):
def bm_interface_get_by_vif_uuid(context, vif_uuid):
def bm_interface_get_all_by_bm_node_id(context, bm_node_id):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\001_init.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\002_drop_bm_deployments.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\003_add_uuid_to_bm_nodes.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\004_add_instance_name_to_bm_nodes.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\005_drop_unused_columns_from_nodes.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\006_move_prov_mac_address.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\007_drop_prov_mac_address.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\008_remove_bm_pxe_ips_table.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\009_add_ephemeral_mb_to_bm_nodes.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\010_add_preserve_ephemeral.py
def upgrade(migrate_engine):
def downgrade(migrate_engine):
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\__init__.py
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\__init__.py
\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migration.py
def db_sync(version=None):
def db_version():
OpenStack IndexPreviousNext
|