本文整理汇总了Python中nova.db.instance_get函数的典型用法代码示例。如果您正苦于以下问题:Python instance_get函数的具体用法?Python instance_get怎么用?Python instance_get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了instance_get函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_add_fixed_ip_instance_without_vpn_requested_networks
def test_add_fixed_ip_instance_without_vpn_requested_networks(self):
self.mox.StubOutWithMock(db, 'network_get')
self.mox.StubOutWithMock(db, 'fixed_ip_associate_pool')
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(db,
'virtual_interface_get_by_instance_and_network')
self.mox.StubOutWithMock(db, 'fixed_ip_update')
db.fixed_ip_update(mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg())
db.virtual_interface_get_by_instance_and_network(mox.IgnoreArg(),
mox.IgnoreArg(), mox.IgnoreArg()).AndReturn({'id': 0})
db.instance_get(mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn({'security_groups':
[{'id': 0}]})
db.fixed_ip_associate_pool(mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn('192.168.0.101')
db.network_get(mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn(networks[0])
self.mox.ReplayAll()
self.network.add_fixed_ip_to_instance(self.context, 1, HOST,
networks[0]['id'])
开发者ID:CaptTofu,项目名称:reddwarf,代码行数:25,代码来源:test_network.py
示例2: test_instance_dns
def test_instance_dns(self):
fixedip = '192.168.0.101'
self.mox.StubOutWithMock(db, 'network_get')
self.mox.StubOutWithMock(db, 'network_update')
self.mox.StubOutWithMock(db, 'fixed_ip_associate_pool')
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(db,
'virtual_interface_get_by_instance_and_network')
self.mox.StubOutWithMock(db, 'fixed_ip_update')
db.fixed_ip_update(mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg())
db.virtual_interface_get_by_instance_and_network(mox.IgnoreArg(),
mox.IgnoreArg(), mox.IgnoreArg()).AndReturn({'id': 0})
db.instance_get(mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn({'security_groups':
[{'id': 0}]})
db.instance_get(self.context,
1).AndReturn({'display_name': HOST})
db.fixed_ip_associate_pool(mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn(fixedip)
db.network_get(mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn(networks[0])
db.network_update(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
self.mox.ReplayAll()
self.network.add_fixed_ip_to_instance(self.context, 1, HOST,
networks[0]['id'])
addresses = self.network.instance_dns_manager.get_entries_by_name(HOST)
self.assertEqual(len(addresses), 1)
self.assertEqual(addresses[0], fixedip)
开发者ID:swapniltamse,项目名称:nova,代码行数:34,代码来源:test_network.py
示例3: test_live_migration_dest_hypervisor_version_older_raises
def test_live_migration_dest_hypervisor_version_older_raises(self):
"""Confirm live migration to older hypervisor raises"""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
instance_id = instance['id']
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance, dest)
db.service_get_all_compute_by_host(self.context, dest).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1}]}])
db.service_get_all_compute_by_host(self.context,
instance['host']).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 2}]}])
self.mox.ReplayAll()
self.assertRaises(exception.DestinationHypervisorTooOld,
self.driver.schedule_live_migration, self.context,
instance_id=instance_id, dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
开发者ID:bswartz,项目名称:nova,代码行数:34,代码来源:test_scheduler.py
示例4: test_live_migration_dest_check_service_same_host
def test_live_migration_dest_check_service_same_host(self):
"""Confirms exception raises in case dest and src is same host."""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(utils, 'service_is_up')
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
# make dest same as src
dest = instance['host']
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
db.service_get_all_compute_by_host(self.context,
dest).AndReturn(['fake_service3'])
utils.service_is_up('fake_service3').AndReturn(True)
self.mox.ReplayAll()
self.assertRaises(exception.UnableToMigrateToSelf,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration,
disk_over_commit=False)
开发者ID:KarimAllah,项目名称:nova,代码行数:27,代码来源:test_scheduler.py
示例5: test_live_migration_same_shared_storage_okay
def test_live_migration_same_shared_storage_okay(self):
"""live migration works with same src and dest shared storage"""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
self._check_shared_storage(dest, instance, False)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidSharedStorage,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
开发者ID:kambara2,项目名称:nova,代码行数:28,代码来源:test_scheduler.py
示例6: test_live_migration_compute_dest_not_alive
def test_live_migration_compute_dest_not_alive(self):
"""Raise exception when dest compute node is not alive."""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(utils, 'service_is_up')
dest = 'fake_host2'
block_migration = False
instance = self._live_migration_instance()
instance_id = instance['id']
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
db.service_get_all_compute_by_host(self.context,
dest).AndReturn(['fake_service3'])
# Compute is down
utils.service_is_up('fake_service3').AndReturn(False)
self.mox.ReplayAll()
self.assertRaises(exception.ComputeServiceUnavailable,
self.driver.schedule_live_migration, self.context,
instance_id=instance_id, dest=dest,
block_migration=block_migration)
开发者ID:bswartz,项目名称:nova,代码行数:26,代码来源:test_scheduler.py
示例7: test_live_migration_dest_check_service_lack_memory
def test_live_migration_dest_check_service_lack_memory(self):
"""Confirms exception raises when dest doesn't have enough memory."""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(utils, 'service_is_up')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
db.service_get_all_compute_by_host(self.context,
dest).AndReturn(['fake_service3'])
utils.service_is_up('fake_service3').AndReturn(True)
self.driver._get_compute_info(self.context, dest,
'memory_mb').AndReturn(2048)
db.instance_get_all_by_host(self.context, dest).AndReturn(
[dict(memory_mb=1024), dict(memory_mb=512)])
self.mox.ReplayAll()
self.assertRaises(exception.MigrationError,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
开发者ID:KarimAllah,项目名称:nova,代码行数:32,代码来源:test_scheduler.py
示例8: test_live_migration_basic
def test_live_migration_basic(self):
"""Test basic schedule_live_migration functionality"""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_common_check')
self.mox.StubOutWithMock(db, 'instance_update_and_get_original')
self.mox.StubOutWithMock(driver, 'cast_to_compute_host')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
self.driver._live_migration_common_check(self.context, instance,
dest, block_migration, disk_over_commit)
db.instance_update_and_get_original(self.context, instance['id'],
{"task_state": task_states.MIGRATING}).AndReturn(
(instance, instance))
driver.cast_to_compute_host(self.context, instance['host'],
'live_migration', update_db=False,
instance_id=instance['id'], dest=dest,
block_migration=block_migration)
self.mox.ReplayAll()
self.driver.schedule_live_migration(self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
开发者ID:kambara2,项目名称:nova,代码行数:34,代码来源:test_scheduler.py
示例9: test_notify_usage_exists_deleted_instance
def test_notify_usage_exists_deleted_instance(self):
# Ensure 'exists' notification generates appropriate usage data.
instance_id = self._create_instance()
instance = db.instance_get(self.context, instance_id)
# Set some system metadata
sys_metadata = {'image_md_key1': 'val1',
'image_md_key2': 'val2',
'other_data': 'meow'}
db.instance_system_metadata_update(self.context, instance['uuid'],
sys_metadata, False)
self.compute.terminate_instance(self.context, instance)
instance = db.instance_get(self.context.elevated(read_deleted='yes'),
instance_id)
compute_utils.notify_usage_exists(self.context, instance)
msg = test_notifier.NOTIFICATIONS[-1]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'compute.instance.exists')
payload = msg['payload']
self.assertEquals(payload['tenant_id'], self.project_id)
self.assertEquals(payload['user_id'], self.user_id)
self.assertEquals(payload['instance_id'], instance['uuid'])
self.assertEquals(payload['instance_type'], 'm1.tiny')
type_id = instance_types.get_instance_type_by_name('m1.tiny')['id']
self.assertEquals(str(payload['instance_type_id']), str(type_id))
for attr in ('display_name', 'created_at', 'launched_at',
'state', 'state_description',
'bandwidth', 'audit_period_beginning',
'audit_period_ending', 'image_meta'):
self.assertTrue(attr in payload,
msg="Key %s not in payload" % attr)
self.assertEquals(payload['image_meta'],
{'md_key1': 'val1', 'md_key2': 'val2'})
image_ref_url = "%s/images/1" % glance.generate_glance_url()
self.assertEquals(payload['image_ref_url'], image_ref_url)
开发者ID:CiscoAS,项目名称:nova,代码行数:34,代码来源:test_compute_utils.py
示例10: test_notify_about_instance_usage
def test_notify_about_instance_usage(self):
instance_id = self._create_instance()
instance = db.instance_get(self.context, instance_id)
# Set some system metadata
sys_metadata = {"image_md_key1": "val1", "image_md_key2": "val2", "other_data": "meow"}
extra_usage_info = {"image_name": "fake_name"}
db.instance_system_metadata_update(self.context, instance["uuid"], sys_metadata, False)
# NOTE(russellb) Make sure our instance has the latest system_metadata
# in it.
instance = db.instance_get(self.context, instance_id)
compute_utils.notify_about_instance_usage(
notify.get_notifier("compute"), self.context, instance, "create.start", extra_usage_info=extra_usage_info
)
self.assertEquals(len(fake_notifier.NOTIFICATIONS), 1)
msg = fake_notifier.NOTIFICATIONS[0]
self.assertEquals(msg.priority, "INFO")
self.assertEquals(msg.event_type, "compute.instance.create.start")
payload = msg.payload
self.assertEquals(payload["tenant_id"], self.project_id)
self.assertEquals(payload["user_id"], self.user_id)
self.assertEquals(payload["instance_id"], instance["uuid"])
self.assertEquals(payload["instance_type"], "m1.tiny")
type_id = flavors.get_flavor_by_name("m1.tiny")["id"]
self.assertEquals(str(payload["instance_type_id"]), str(type_id))
flavor_id = flavors.get_flavor_by_name("m1.tiny")["flavorid"]
self.assertEquals(str(payload["instance_flavor_id"]), str(flavor_id))
for attr in ("display_name", "created_at", "launched_at", "state", "state_description", "image_meta"):
self.assertTrue(attr in payload, msg="Key %s not in payload" % attr)
self.assertEquals(payload["image_meta"], {"md_key1": "val1", "md_key2": "val2"})
self.assertEquals(payload["image_name"], "fake_name")
image_ref_url = "%s/images/1" % glance.generate_glance_url()
self.assertEquals(payload["image_ref_url"], image_ref_url)
self.compute.terminate_instance(self.context, jsonutils.to_primitive(instance))
开发者ID:kraman,项目名称:nova,代码行数:33,代码来源:test_compute_utils.py
示例11: test_live_migration_compute_src_not_alive
def test_live_migration_compute_src_not_alive(self):
"""Raise exception when src compute node is not alive."""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(db, 'service_get_all_by_topic')
self.mox.StubOutWithMock(utils, 'service_is_up')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
dest = 'fake_host2'
block_migration = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
# Volume up
db.service_get_all_by_topic(self.context, 'volume').AndReturn(
['fake_service'])
utils.service_is_up('fake_service').AndReturn(True)
# Compute down
db.service_get_all_compute_by_host(self.context,
instance['host']).AndReturn(['fake_service2'])
utils.service_is_up('fake_service2').AndReturn(False)
self.mox.ReplayAll()
self.assertRaises(exception.ComputeServiceUnavailable,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration)
开发者ID:KarimAllah,项目名称:nova,代码行数:27,代码来源:test_scheduler.py
示例12: test_get_by_id
def test_get_by_id(self):
ctxt = context.get_admin_context()
self.mox.StubOutWithMock(db, "instance_get")
db.instance_get(ctxt, "instid", columns_to_join=[]).AndReturn(self.fake_instance)
self.mox.ReplayAll()
inst = instance.Instance.get_by_id(ctxt, "instid")
self.assertEqual(inst.uuid, self.fake_instance["uuid"])
self.assertRemotes()
开发者ID:ChaosCloud,项目名称:nova,代码行数:8,代码来源:test_instance.py
示例13: test_get_by_id
def test_get_by_id(self):
self.mox.StubOutWithMock(db, 'instance_get')
db.instance_get(self.context, 'instid', columns_to_join=[]
).AndReturn(self.fake_instance)
self.mox.ReplayAll()
inst = instance.Instance.get_by_id(self.context, 'instid')
self.assertEqual(inst.uuid, self.fake_instance['uuid'])
self.assertRemotes()
开发者ID:slagle,项目名称:nova,代码行数:8,代码来源:test_instance.py
示例14: test_get_by_id
def test_get_by_id(self):
self.mox.StubOutWithMock(db, "instance_get")
db.instance_get(self.context, "instid", columns_to_join=["info_cache", "security_groups"]).AndReturn(
self.fake_instance
)
self.mox.ReplayAll()
inst = instance.Instance.get_by_id(self.context, "instid")
self.assertEqual(inst.uuid, self.fake_instance["uuid"])
self.assertRemotes()
开发者ID:ZelinIO,项目名称:nova,代码行数:9,代码来源:test_instance.py
示例15: test_live_migration_dest_host_incompatable_cpu_raises
def test_live_migration_dest_host_incompatable_cpu_raises(self):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
db.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
db.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
{'method': 'create_shared_storage_test_file'}
).AndReturn(tmp_filename)
rpc.call(self.context, 'src_queue',
{'method': 'check_shared_storage_test_file',
'args': {'filename': tmp_filename}}).AndReturn(True)
rpc.call(self.context, 'dest_queue',
{'method': 'cleanup_shared_storage_test_file',
'args': {'filename': tmp_filename}})
db.service_get_all_compute_by_host(self.context, dest).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1}]}])
db.service_get_all_compute_by_host(self.context,
instance['host']).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
db.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
'args': {'cpu_info': 'fake_cpu_info'}}).AndRaise(
rpc.RemoteError())
self.mox.ReplayAll()
self.assertRaises(rpc_common.RemoteError,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration)
开发者ID:KarimAllah,项目名称:nova,代码行数:53,代码来源:test_scheduler.py
示例16: test_block_migration_dest_check_service_lack_disk
def test_block_migration_dest_check_service_lack_disk(self):
"""Confirms exception raises when dest doesn't have enough disk."""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(utils, 'service_is_up')
self.mox.StubOutWithMock(self.driver,
'assert_compute_node_has_enough_memory')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
dest = 'fake_host2'
block_migration = True
disk_over_commit = True
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
db.service_get_all_compute_by_host(self.context,
dest).AndReturn(['fake_service3'])
utils.service_is_up('fake_service3').AndReturn(True)
# Enough memory
self.driver.assert_compute_node_has_enough_memory(self.context,
instance, dest)
# Not enough disk
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1023)
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
instance_disk_info_msg = {
'method': 'get_instance_disk_info',
'args': {
'instance_name': instance['name'],
},
'version': compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION,
}
instance_disk_info = [{'disk_size': 1024 * (1024 ** 3)}]
rpc.call(self.context,
'src_queue',
instance_disk_info_msg,
None).AndReturn(jsonutils.dumps(instance_disk_info))
self.mox.ReplayAll()
self.assertRaises(exception.MigrationError,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
开发者ID:kambara2,项目名称:nova,代码行数:53,代码来源:test_scheduler.py
示例17: test_discard_a_blessed_instance
def test_discard_a_blessed_instance(self):
instance_id = utils.create_instance(self.context)
self.gridcentric.bless_instance(self.context, instance_id)
blessed_id = instance_id + 1
self.gridcentric.discard_instance(self.context, blessed_id)
try:
db.instance_get(self.context, blessed_id)
self.fail("The blessed instance should no longer exists after being discarded.")
except exception.InstanceNotFound:
pass
开发者ID:jingsu,项目名称:openstack,代码行数:13,代码来源:test_gridcentric.py
示例18: test_finish_revert_resize
def test_finish_revert_resize(self):
"""Ensure that the flavor is reverted to the original on revert"""
context = self.context.elevated()
instance_id = self._create_instance()
def fake(*args, **kwargs):
pass
self.stubs.Set(self.compute.driver, 'finish_migration', fake)
self.stubs.Set(self.compute.driver, 'revert_migration', fake)
self.stubs.Set(self.compute.network_api, 'get_instance_nw_info', fake)
self.compute.run_instance(self.context, instance_id)
# Confirm the instance size before the resize starts
inst_ref = db.instance_get(context, instance_id)
instance_type_ref = db.instance_type_get(context,
inst_ref['instance_type_id'])
self.assertEqual(instance_type_ref['flavorid'], 1)
db.instance_update(self.context, instance_id, {'host': 'foo'})
self.compute.prep_resize(context, inst_ref['uuid'], 3)
migration_ref = db.migration_get_by_instance_and_status(context,
inst_ref['uuid'], 'pre-migrating')
self.compute.resize_instance(context, inst_ref['uuid'],
migration_ref['id'])
self.compute.finish_resize(context, inst_ref['uuid'],
int(migration_ref['id']), {})
# Prove that the instance size is now the new size
inst_ref = db.instance_get(context, instance_id)
instance_type_ref = db.instance_type_get(context,
inst_ref['instance_type_id'])
self.assertEqual(instance_type_ref['flavorid'], 3)
# Finally, revert and confirm the old flavor has been applied
self.compute.revert_resize(context, inst_ref['uuid'],
migration_ref['id'])
self.compute.finish_revert_resize(context, inst_ref['uuid'],
migration_ref['id'])
inst_ref = db.instance_get(context, instance_id)
instance_type_ref = db.instance_type_get(context,
inst_ref['instance_type_id'])
self.assertEqual(instance_type_ref['flavorid'], 1)
self.compute.terminate_instance(context, instance_id)
开发者ID:cp16net,项目名称:reddwarf,代码行数:50,代码来源:test_compute.py
示例19: test_notify_usage_exists_instance_not_found
def test_notify_usage_exists_instance_not_found(self):
# Ensure 'exists' notification generates appropriate usage data.
instance_id = self._create_instance()
instance = db.instance_get(self.context, instance_id)
self.compute.terminate_instance(self.context, instance)
compute_utils.notify_usage_exists(self.context, instance)
msg = test_notifier.NOTIFICATIONS[-1]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'compute.instance.exists')
payload = msg['payload']
self.assertEquals(payload['tenant_id'], self.project_id)
self.assertEquals(payload['user_id'], self.user_id)
self.assertEquals(payload['instance_id'], instance['uuid'])
self.assertEquals(payload['instance_type'], 'm1.tiny')
type_id = instance_types.get_instance_type_by_name('m1.tiny')['id']
self.assertEquals(str(payload['instance_type_id']), str(type_id))
for attr in ('display_name', 'created_at', 'launched_at',
'state', 'state_description',
'bandwidth', 'audit_period_beginning',
'audit_period_ending', 'image_meta'):
self.assertTrue(attr in payload,
msg="Key %s not in payload" % attr)
self.assertEquals(payload['image_meta'], {})
image_ref_url = "%s/images/1" % glance.generate_glance_url()
self.assertEquals(payload['image_ref_url'], image_ref_url)
开发者ID:CiscoAS,项目名称:nova,代码行数:25,代码来源:test_compute_utils.py
示例20: test_live_migration_common_check_service_different_version
def test_live_migration_common_check_service_different_version(self):
"""Original host and dest host has different hypervisor version."""
dest = 'dummydest'
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
# compute service for destination
s_ref = self._create_compute_service(host=i_ref['host'])
# compute service for original host
s_ref2 = self._create_compute_service(host=dest,
hypervisor_version=12002)
# mocks
driver = self.scheduler.driver
self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage')
driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest)
self.mox.ReplayAll()
self.assertRaises(exception.DestinationHypervisorTooOld,
self.scheduler.driver._live_migration_common_check,
self.context, i_ref, dest)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
db.service_destroy(self.context, s_ref2['id'])
开发者ID:cp16net,项目名称:reddwarf,代码行数:25,代码来源:test_scheduler.py
注:本文中的nova.db.instance_get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论