本文整理汇总了Python中nova.db.compute_node_get_all函数的典型用法代码示例。如果您正苦于以下问题:Python compute_node_get_all函数的具体用法?Python compute_node_get_all怎么用?Python compute_node_get_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了compute_node_get_all函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: mox_host_manager_db_calls
def mox_host_manager_db_calls(mock, context):
mock.StubOutWithMock(db, 'compute_node_get_all')
mock.StubOutWithMock(db, 'instance_get_all')
db.compute_node_get_all(mox.IgnoreArg()).AndReturn(COMPUTE_NODES)
db.instance_get_all(mox.IgnoreArg(),
columns_to_join=['instance_type']).AndReturn(INSTANCES)
开发者ID:A7Zulu,项目名称:nova,代码行数:7,代码来源:fakes.py
示例2: test_get_all_host_states
def test_get_all_host_states(self):
context = 'fake_context'
topic = 'compute'
self.mox.StubOutWithMock(db, 'compute_node_get_all')
self.mox.StubOutWithMock(host_manager.LOG, 'warn')
db.compute_node_get_all(context).AndReturn(fakes.COMPUTE_NODES)
# Invalid service
host_manager.LOG.warn("No service for compute ID 5")
self.mox.ReplayAll()
host_states = self.host_manager.get_all_host_states(context, topic)
self.assertEqual(len(host_states), 4)
# Check that .service is set properly
for i in xrange(4):
compute_node = fakes.COMPUTE_NODES[i]
host = compute_node['service']['host']
self.assertEqual(host_states[host].service,
compute_node['service'])
self.assertEqual(host_states['host1'].free_ram_mb, 512)
# 511GB
self.assertEqual(host_states['host1'].free_disk_mb, 524288)
self.assertEqual(host_states['host2'].free_ram_mb, 1024)
# 1023GB
self.assertEqual(host_states['host2'].free_disk_mb, 1048576)
self.assertEqual(host_states['host3'].free_ram_mb, 3072)
# 3071GB
self.assertEqual(host_states['host3'].free_disk_mb, 3145728)
self.assertEqual(host_states['host4'].free_ram_mb, 8192)
# 8191GB
self.assertEqual(host_states['host4'].free_disk_mb, 8388608)
开发者ID:Cerberus98,项目名称:nova,代码行数:34,代码来源:test_host_manager.py
示例3: test_get_all_host_states
def test_get_all_host_states(self):
# Ensure .service is set and we have the values we expect to.
context = 'fake_context'
self.mox.StubOutWithMock(db, 'compute_node_get_all')
db.compute_node_get_all(context).AndReturn(ironic_fakes.COMPUTE_NODES)
self.mox.ReplayAll()
self.host_manager.get_all_host_states(context)
host_states_map = self.host_manager.host_state_map
self.assertEqual(len(host_states_map), 4)
for i in range(4):
compute_node = ironic_fakes.COMPUTE_NODES[i]
host = compute_node['service']['host']
node = compute_node['hypervisor_hostname']
state_key = (host, node)
self.assertEqual(compute_node['service'],
host_states_map[state_key].service)
self.assertEqual(jsonutils.loads(compute_node['stats']),
host_states_map[state_key].stats)
self.assertEqual(compute_node['free_ram_mb'],
host_states_map[state_key].free_ram_mb)
self.assertEqual(compute_node['free_disk_gb'] * 1024,
host_states_map[state_key].free_disk_mb)
开发者ID:COSHPC,项目名称:ironic,代码行数:25,代码来源:test_ironic_host_manager.py
示例4: test_run_instance_no_hosts
def test_run_instance_no_hosts(self):
def _fake_empty_call_zone_method(*args, **kwargs):
return []
sched = fakes.FakeFilterScheduler()
uuid = 'fake-uuid1'
fake_context = context.RequestContext('user', 'project')
instance_properties = {'project_id': 1, 'os_type': 'Linux'}
request_spec = {'instance_type': {'memory_mb': 1, 'root_gb': 1,
'ephemeral_gb': 0},
'instance_properties': instance_properties,
'instance_uuids': [uuid]}
self.mox.StubOutWithMock(compute_utils, 'add_instance_fault_from_exc')
self.mox.StubOutWithMock(db, 'instance_update_and_get_original')
old_ref, new_ref = db.instance_update_and_get_original(fake_context,
uuid, {'vm_state': vm_states.ERROR, 'task_state':
None}).AndReturn(({}, {}))
compute_utils.add_instance_fault_from_exc(fake_context,
mox.IsA(conductor_api.LocalAPI), new_ref,
mox.IsA(exception.NoValidHost), mox.IgnoreArg())
self.mox.StubOutWithMock(db, 'compute_node_get_all')
db.compute_node_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.ReplayAll()
sched.schedule_run_instance(
fake_context, request_spec, None, None, None, None, {})
开发者ID:itskewpie,项目名称:nova,代码行数:30,代码来源:test_filter_scheduler.py
示例5: test_get_all
def test_get_all(self):
self.mox.StubOutWithMock(db, "compute_node_get_all")
db.compute_node_get_all(self.context).AndReturn([fake_compute_node])
self.mox.ReplayAll()
computes = compute_node.ComputeNodeList.get_all(self.context)
self.assertEqual(1, len(computes))
self.compare_obj(computes[0], fake_compute_node, subs=self.subs(), comparators=self.comparators())
开发者ID:mathslinux,项目名称:nova,代码行数:7,代码来源:test_compute_node.py
示例6: test_run_instance_no_hosts
def test_run_instance_no_hosts(self):
def _fake_empty_call_zone_method(*args, **kwargs):
return []
sched = fakes.FakeFilterScheduler()
uuid = "fake-uuid1"
fake_context = context.RequestContext("user", "project")
instance_properties = {"project_id": 1, "os_type": "Linux"}
request_spec = {
"instance_type": {"memory_mb": 1, "root_gb": 1, "ephemeral_gb": 0},
"instance_properties": instance_properties,
"instance_uuids": [uuid],
}
self.mox.StubOutWithMock(compute_utils, "add_instance_fault_from_exc")
self.mox.StubOutWithMock(db, "instance_update_and_get_original")
old_ref, new_ref = db.instance_update_and_get_original(
fake_context, uuid, {"vm_state": vm_states.ERROR, "task_state": None}
).AndReturn(({}, {}))
compute_utils.add_instance_fault_from_exc(
fake_context, mox.IsA(conductor_api.LocalAPI), new_ref, mox.IsA(exception.NoValidHost), mox.IgnoreArg()
)
self.mox.StubOutWithMock(db, "compute_node_get_all")
db.compute_node_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.ReplayAll()
sched.schedule_run_instance(fake_context, request_spec, None, None, None, None, {}, False)
开发者ID:pacharya-pf9,项目名称:nova,代码行数:29,代码来源:test_filter_scheduler.py
示例7: test_get_all
def test_get_all(self):
self.mox.StubOutWithMock(db, 'compute_node_get_all')
db.compute_node_get_all(self.context).AndReturn([fake_compute_node])
self.mox.ReplayAll()
computes = compute_node.ComputeNodeList.get_all(self.context)
self.assertEqual(1, len(computes))
self._compare(computes[0], fake_compute_node)
开发者ID:prometheanfire,项目名称:nova,代码行数:7,代码来源:test_compute_node.py
示例8: test_get_all_host_states
def test_get_all_host_states(self):
# Ensure .service is set and we have the values we expect to.
context = "fake_context"
self.mox.StubOutWithMock(db, "compute_node_get_all")
db.compute_node_get_all(context).AndReturn(ironic_fakes.COMPUTE_NODES)
self.mox.ReplayAll()
self.host_manager.service_states = ironic_fakes.IRONIC_SERVICE_STATE
self.host_manager.get_all_host_states(context)
host_states_map = self.host_manager.host_state_map
self.assertEqual(len(host_states_map), 4)
# Check that .service is set properly
for i in range(4):
compute_node = ironic_fakes.COMPUTE_NODES[i]
host = compute_node["service"]["host"]
node = compute_node["hypervisor_hostname"]
state_key = (host, node)
self.assertEqual(compute_node["service"], host_states_map[state_key].service)
# check we have the values we think we should.
self.assertEqual(1024, host_states_map[("host1", "node1uuid")].free_ram_mb)
self.assertEqual(10240, host_states_map[("host1", "node1uuid")].free_disk_mb)
self.assertEqual(2048, host_states_map[("host2", "node2uuid")].free_ram_mb)
self.assertEqual(20480, host_states_map[("host2", "node2uuid")].free_disk_mb)
self.assertEqual(3072, host_states_map[("host3", "node3uuid")].free_ram_mb)
self.assertEqual(30720, host_states_map[("host3", "node3uuid")].free_disk_mb)
self.assertEqual(4096, host_states_map[("host4", "node4uuid")].free_ram_mb)
self.assertEqual(40960, host_states_map[("host4", "node4uuid")].free_disk_mb)
开发者ID:rahulgopan,项目名称:ironic,代码行数:29,代码来源:test_ironic_host_manager.py
示例9: testProcessUpdates_compute_stopped_exception
def testProcessUpdates_compute_stopped_exception(self):
vmHost = VmHost()
vmHost.set_id('1')
vmHost.set_connectionState(Constants.VMHOST_CONNECTED)
InventoryCacheManager.update_object_in_cache('1', vmHost)
self.mock.StubOutWithMock(api, 'vm_host_save')
api.vm_host_save(
mox.IgnoreArg(), mox.IgnoreArg()).MultipleTimes().AndReturn(None)
self.mock.StubOutWithMock(
InventoryCacheManager, 'get_compute_conn_driver')
InventoryCacheManager.get_compute_conn_driver(
self.libvirtVmHost.compute_id,
Constants.VmHost).AndReturn(fake.get_connection())
fake_computes = [{'id': '1', 'service': {'created_at':
'created',
'updated_at':'updated'}}]
self.mock.StubOutWithMock(novadb, 'compute_node_get_all')
novadb.compute_node_get_all(mox.IgnoreArg()).AndReturn(fake_computes)
self.mock.StubOutWithMock(hnm_utils, 'is_service_alive')
hnm_utils.is_service_alive(
mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(False)
self.mock.StubOutWithMock(event_api, 'notify_host_update')
event_api.notify_host_update(
mox.IgnoreArg(), mox.IgnoreArg()).AndRaise(Exception())
self.mock.ReplayAll()
self.assertEquals(self.libvirtVmHost.processUpdates(), None)
self.mock.stubs.UnsetAll()
开发者ID:rakrup,项目名称:healthnmon,代码行数:32,代码来源:test_Inventory.py
示例10: test_host_removed_event
def test_host_removed_event(self):
self.__mock_service_get_all_by_topic()
deleted_host = VmHost()
deleted_host.set_id('compute1')
deleted_host.set_name('compute1')
self.mox.StubOutWithMock(api, 'vm_host_get_all')
api.vm_host_get_all(mox.IgnoreArg()).AndReturn([deleted_host])
self.mox.StubOutWithMock(api, 'vm_get_all')
api.vm_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.StubOutWithMock(api, 'storage_volume_get_all')
api.storage_volume_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.StubOutWithMock(api, 'subnet_get_all')
api.subnet_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.StubOutWithMock(nova_db, 'compute_node_get_all')
nova_db.compute_node_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.StubOutWithMock(api, 'vm_host_delete_by_ids')
api.vm_host_delete_by_ids(
mox.IgnoreArg(),
mox.IgnoreArg()).MultipleTimes().AndReturn(None)
self.mox.StubOutWithMock(
InventoryCacheManager, 'get_compute_conn_driver')
InventoryCacheManager.get_compute_conn_driver(
'compute1',
Constants.VmHost).AndReturn(fake.get_connection())
self.mox.ReplayAll()
compute_service = dict(host='host1')
compute = dict(id='compute1', hypervisor_type='fake',
service=compute_service)
rm_context = \
rmcontext.ComputeRMContext(rmType=compute['hypervisor_type'],
rmIpAddress=compute_service['host'],
rmUserName='ubuntu164',
rmPassword='password')
InventoryCacheManager.get_all_compute_inventory().clear()
InventoryCacheManager.get_all_compute_inventory()['compute1'] = \
ComputeInventory(rm_context)
InventoryCacheManager.get_compute_inventory(
'compute1').update_compute_info(rm_context, deleted_host)
self.assertEquals(
len(InventoryCacheManager.get_all_compute_inventory()), 1)
inv_manager = InventoryManager()
inv_manager._refresh_from_db(None)
self.assertEquals(
len(InventoryCacheManager.get_all_compute_inventory()), 0)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
msg = test_notifier.NOTIFICATIONS[0]
self.assertEquals(msg['priority'], notifier_api.INFO)
event_type = \
event_metadata.get_EventMetaData(
event_metadata.EVENT_TYPE_HOST_REMOVED)
self.assertEquals(msg['event_type'],
event_type.get_event_fully_qal_name())
payload = msg['payload']
self.assertEquals(payload['entity_type'], 'VmHost')
self.assertEquals(payload['entity_id'], deleted_host.id)
开发者ID:rakrup,项目名称:healthnmon,代码行数:59,代码来源:test_host_events.py
示例11: test_get_all
def test_get_all(self):
self.mox.StubOutWithMock(db, 'compute_node_get_all')
db.compute_node_get_all(self.context).AndReturn([fake_compute_node])
self.mox.ReplayAll()
computes = compute_node.ComputeNodeList.get_all(self.context)
self.assertEqual(1, len(computes))
self.compare_obj(computes[0], fake_compute_node,
comparators={'stats': self.json_comparator,
'host_ip': self.str_comparator})
开发者ID:AsherBond,项目名称:nova,代码行数:9,代码来源:test_compute_node.py
示例12: test_get_all_host_states
def test_get_all_host_states(self):
context = 'fake_context'
self.mox.StubOutWithMock(db, 'compute_node_get_all')
db.compute_node_get_all(context).AndReturn(fakes.COMPUTE_NODES)
self.mox.ReplayAll()
self.host_manager.get_all_host_states(context)
host_states_map = self.host_manager.host_state_map
self.assertEqual(len(host_states_map), 4)
开发者ID:EliseCheng,项目名称:nova,代码行数:10,代码来源:test_host_manager.py
示例13: test_get_all_host_states
def test_get_all_host_states(self):
context = 'fake_context'
self.mox.StubOutWithMock(db, 'compute_node_get_all')
self.mox.StubOutWithMock(host_manager.LOG, 'warn')
db.compute_node_get_all(context).AndReturn(fakes.COMPUTE_NODES)
# node 3 host physical disk space is greater than database
host_manager.LOG.warning(_LW("Host %(hostname)s has more disk space "
"than database expected (%(physical)sgb >"
" %(database)sgb)"),
{'physical': 3333, 'database': 3072,
'hostname': 'node3'})
# Invalid service
host_manager.LOG.warning(_LW("No service for compute ID %s"), 5)
self.mox.ReplayAll()
self.host_manager.get_all_host_states(context)
host_states_map = self.host_manager.host_state_map
self.assertEqual(len(host_states_map), 4)
# Check that .service is set properly
for i in xrange(4):
compute_node = fakes.COMPUTE_NODES[i]
host = compute_node['service']['host']
node = compute_node['hypervisor_hostname']
state_key = (host, node)
self.assertEqual(host_states_map[state_key].service,
compute_node['service'])
self.assertEqual(host_states_map[('host1', 'node1')].free_ram_mb,
512)
# 511GB
self.assertEqual(host_states_map[('host1', 'node1')].free_disk_mb,
524288)
self.assertEqual(host_states_map[('host2', 'node2')].free_ram_mb,
1024)
# 1023GB
self.assertEqual(host_states_map[('host2', 'node2')].free_disk_mb,
1048576)
self.assertEqual(host_states_map[('host3', 'node3')].free_ram_mb,
3072)
# 3071GB
self.assertEqual(host_states_map[('host3', 'node3')].free_disk_mb,
3145728)
self.assertThat(
objects.NUMATopology.obj_from_db_obj(
host_states_map[('host3', 'node3')].numa_topology
)._to_dict(),
matchers.DictMatches(fakes.NUMA_TOPOLOGY._to_dict()))
self.assertEqual(host_states_map[('host4', 'node4')].free_ram_mb,
8192)
# 8191GB
self.assertEqual(host_states_map[('host4', 'node4')].free_disk_mb,
8388608)
开发者ID:absolutarin,项目名称:nova,代码行数:55,代码来源:test_host_manager.py
示例14: test_host_removed_event_none_host
def test_host_removed_event_none_host(self):
deleted_host = VmHost()
deleted_host.set_id('compute1')
deleted_host.set_name('compute1')
self.mox.StubOutWithMock(api, 'vm_host_get_all')
api.vm_host_get_all(mox.IgnoreArg()).AndReturn([deleted_host])
self.mox.StubOutWithMock(api, 'vm_get_all')
api.vm_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.StubOutWithMock(api, 'storage_volume_get_all')
api.storage_volume_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.StubOutWithMock(api, 'subnet_get_all')
api.subnet_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.StubOutWithMock(nova_db, 'compute_node_get_all')
nova_db.compute_node_get_all(mox.IgnoreArg()).AndReturn([])
self.mox.StubOutWithMock(api, 'vm_host_delete_by_ids')
api.vm_host_delete_by_ids(
mox.IgnoreArg(),
mox.IgnoreArg()).MultipleTimes().AndReturn(None)
self.mox.StubOutWithMock(
InventoryCacheManager, 'get_compute_conn_driver')
InventoryCacheManager.get_compute_conn_driver(
'compute1',
Constants.VmHost).AndReturn(fake.get_connection())
self.mox.ReplayAll()
compute_service = dict(host='host1')
compute = dict(id='compute1', hypervisor_type='fake',
service=compute_service)
rm_context = \
rmcontext.ComputeRMContext(rmType=compute['hypervisor_type'],
rmIpAddress=compute_service['host'],
rmUserName='ubuntu164',
rmPassword='password')
InventoryCacheManager.get_all_compute_inventory().clear()
InventoryCacheManager.get_all_compute_inventory()['compute1'] = \
ComputeInventory(rm_context)
InventoryCacheManager.get_compute_inventory(
'compute1').update_compute_info(rm_context, deleted_host)
self.assertEquals(
len(InventoryCacheManager.get_all_compute_inventory()), 1)
InventoryCacheManager.get_inventory_cache(
)[Constants.VmHost][deleted_host.get_id()] = None
inv_manager = InventoryManager()
inv_manager._refresh_from_db(None)
self.assertEquals(
len(InventoryCacheManager.get_all_compute_inventory()), 0)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
开发者ID:rakrup,项目名称:healthnmon,代码行数:53,代码来源:test_host_events.py
示例15: test_get_all_host_states_after_delete_all
def test_get_all_host_states_after_delete_all(self):
context = 'fake_context'
self.mox.StubOutWithMock(db, 'compute_node_get_all')
# all nodes active for first call
db.compute_node_get_all(context).AndReturn(fakes.COMPUTE_NODES)
# remove all nodes for second call
db.compute_node_get_all(context).AndReturn([])
self.mox.ReplayAll()
self.host_manager.get_all_host_states(context)
self.host_manager.get_all_host_states(context)
host_states_map = self.host_manager.host_state_map
self.assertEqual(len(host_states_map), 0)
开发者ID:EliseCheng,项目名称:nova,代码行数:14,代码来源:test_host_manager.py
示例16: test_get_all_host_states_after_delete_all
def test_get_all_host_states_after_delete_all(self):
context = "fake_context"
self.mox.StubOutWithMock(db, "compute_node_get_all")
# all nodes active for first call
db.compute_node_get_all(context).AndReturn(ironic_fakes.COMPUTE_NODES)
# remove all nodes for second call
db.compute_node_get_all(context).AndReturn([])
self.mox.ReplayAll()
self.host_manager.service_states = ironic_fakes.IRONIC_SERVICE_STATE
self.host_manager.get_all_host_states(context)
self.host_manager.get_all_host_states(context)
host_states_map = self.host_manager.host_state_map
self.assertEqual(0, len(host_states_map))
开发者ID:rahulgopan,项目名称:ironic,代码行数:15,代码来源:test_ironic_host_manager.py
示例17: test_get_all_host_states_after_delete_one
def test_get_all_host_states_after_delete_one(self):
context = "fake_context"
self.mox.StubOutWithMock(db, "compute_node_get_all")
# all nodes active for first call
db.compute_node_get_all(context).AndReturn(ironic_fakes.COMPUTE_NODES)
# remove node4 for second call
running_nodes = [n for n in ironic_fakes.COMPUTE_NODES if n.get("hypervisor_hostname") != "node4uuid"]
db.compute_node_get_all(context).AndReturn(running_nodes)
self.mox.ReplayAll()
self.host_manager.get_all_host_states(context)
self.host_manager.get_all_host_states(context)
host_states_map = self.host_manager.host_state_map
self.assertEqual(3, len(host_states_map))
开发者ID:keiichishima,项目名称:nova,代码行数:15,代码来源:test_ironic_host_manager.py
示例18: _refresh_from_db
def _refresh_from_db(self, context):
"""Make our compute_node inventory map match the db."""
# Add/update existing compute_nodes ...
computes = db.compute_node_get_all(context)
existing = InventoryCacheManager.get_all_compute_inventory().keys()
db_keys = []
for compute in computes:
compute_id = str(compute['id'])
service = compute['service']
if service is not None:
compute_alive = hnm_utils.is_service_alive(
service['updated_at'], service['created_at'])
db_keys.append(compute_id)
if not compute_alive:
LOG.warn(_('Service %s for host %s is not active') % (
service['binary'], service['host']))
# continue
if compute_id not in existing:
self._add_compute_to_inventory(compute[
'hypervisor_type'],
compute_id, service['host'])
LOG.audit(_(
'New Host with compute_id %s is \
obtained') % (compute_id))
InventoryCacheManager.get_all_compute_inventory()[
compute_id].update_compute_Id(compute_id)
else:
LOG.warn(_(
' No services entry found for compute id \
%s') % compute_id)
# Cleanup compute_nodes removed from db ...
self._clean_deleted_computes(db_keys)
开发者ID:jessegonzalez,项目名称:healthnmon,代码行数:35,代码来源:inventory_manager.py
示例19: hosts
def hosts(self, req):
context = req.environ['nova.context']
authorize(context)
nodes = db.compute_node_get_all(context)
result = {}
for node in nodes:
pri_network_mbps = node.get('total_private_network_mbps', 0)
pub_network_mbps = node.get('total_public_network_mbps', 0)
pri_network_mbps_used = node.get('private_network_mbps_used', 0)
pub_network_mbps_used = node.get('public_network_mbps_used', 0)
result.update({node.hypervisor_hostname: {
"ecus": self._get_host_ecu(req, node).get('capacity'),
"ecus_used": self._get_host_ecu(req, node).get('ecus_used'),
"disk_gb": node.local_gb,
"local_gb_used": node.local_gb_used,
"memory_mb": node.memory_mb,
"memory_mb_used": node.memory_mb_used,
"public_network_qos_mbps": pub_network_mbps,
"private_network_qos_mbps": pri_network_mbps,
"public_qos_used": pub_network_mbps_used,
"private_qos_used": pri_network_mbps_used,
"servers_used": node.running_vms,
"vcpus_used": node.vcpus_used
}})
return result
开发者ID:xww,项目名称:nova-old,代码行数:25,代码来源:quotas_usage.py
示例20: test_get_all_host_states_after_delete_one
def test_get_all_host_states_after_delete_one(self):
context = 'fake_context'
self.mox.StubOutWithMock(db, 'compute_node_get_all')
# all nodes active for first call
db.compute_node_get_all(context).AndReturn(fakes.COMPUTE_NODES)
# remove node4 for second call
running_nodes = [n for n in fakes.COMPUTE_NODES
if n.get('hypervisor_hostname') != 'node4']
db.compute_node_get_all(context).AndReturn(running_nodes)
self.mox.ReplayAll()
self.host_manager.get_all_host_states(context)
self.host_manager.get_all_host_states(context)
host_states_map = self.host_manager.host_state_map
self.assertEqual(len(host_states_map), 3)
开发者ID:EliseCheng,项目名称:nova,代码行数:16,代码来源:test_host_manager.py
注:本文中的nova.db.compute_node_get_all函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论