本文整理汇总了Python中nova.db.fixed_ip_get_by_address函数的典型用法代码示例。如果您正苦于以下问题:Python fixed_ip_get_by_address函数的具体用法?Python fixed_ip_get_by_address怎么用?Python fixed_ip_get_by_address使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了fixed_ip_get_by_address函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_validate_networks
def test_validate_networks(self):
self.mox.StubOutWithMock(db, "network_get_all_by_uuids")
self.mox.StubOutWithMock(db, "fixed_ip_get_by_address")
requested_networks = [("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb", "192.168.1.100")]
db.network_get_all_by_uuids(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(networks)
fixed_ips[1]["network"] = FakeModel(**networks[1])
fixed_ips[1]["instance"] = None
db.fixed_ip_get_by_address(mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(fixed_ips[1])
self.mox.ReplayAll()
self.network.validate_networks(self.context, requested_networks)
开发者ID:nicoleLiu,项目名称:nova,代码行数:13,代码来源:test_network.py
示例2: test_fixed_ip_associate_succeeds_and_sets_network
def test_fixed_ip_associate_succeeds_and_sets_network(self):
address = self.create_fixed_ip()
db.fixed_ip_associate(self.ctxt, address, self.instance.id,
network_id=self.network.id)
fixed_ip = db.fixed_ip_get_by_address(self.ctxt, address)
self.assertEqual(fixed_ip.instance_id, self.instance.id)
self.assertEqual(fixed_ip.network_id, self.network.id)
开发者ID:matiu2,项目名称:nova,代码行数:7,代码来源:test_db_api.py
示例3: index
def index(self, req, filesystem_id):
"""Return a list of attachments to the specified file share."""
fs_name = filesystem_id
context = req.environ['nova.context']
try:
ips = self.fs_driver.list_attachments(fs_name)
except KeyError:
msg = _("Filesystem %s does not exist.") % fs_name
raise webob.exc.HTTPNotFound(msg)
if 'localhost' in ips:
ips.remove('localhost')
instances = []
for ip in ips:
try:
fixed_ip = db.fixed_ip_get_by_address(context, ip)
instance_id = fixed_ip.instance_id
instance = db.instance_get(context, instance_id)
instances.append(instance.uuid)
except exception.InstanceNotFound:
LOG.warning(_("Attached to a most-likely defunct "
"instance with IP address %s") % ip)
return _translate_instances_view(instances)
开发者ID:andrewbogott,项目名称:novawikiplugins,代码行数:26,代码来源:api.py
示例4: get_by_address
def get_by_address(cls, context, address, expected_attrs=None):
if expected_attrs is None:
expected_attrs = []
db_fixedip = db.fixed_ip_get_by_address(context, str(address),
columns_to_join=expected_attrs)
return cls._from_db_object(context, cls(context), db_fixedip,
expected_attrs)
开发者ID:csdhome,项目名称:nova,代码行数:7,代码来源:fixed_ip.py
示例5: test_unreserve
def test_unreserve(self):
db.fixed_ip_update(context.get_admin_context(), '10.0.0.100',
{'reserved': True})
self.commands.unreserve('10.0.0.100')
address = db.fixed_ip_get_by_address(context.get_admin_context(),
'10.0.0.100')
self.assertEqual(address['reserved'], False)
开发者ID:lovocas,项目名称:nova,代码行数:7,代码来源:test_nova_manage.py
示例6: _set_reserved
def _set_reserved(self, context, address, reserved):
try:
fixed_ip = db.fixed_ip_get_by_address(context, address)
db.fixed_ip_update(context, fixed_ip["address"], {"reserved": reserved})
except exception.FixedIpNotFoundForAddress:
msg = _("Fixed IP %s not found") % address
raise webob.exc.HTTPNotFound(explanation=msg)
return webob.exc.HTTPAccepted()
开发者ID:sdague,项目名称:nova,代码行数:9,代码来源:fixed_ips.py
示例7: _set_reserved
def _set_reserved(self, address, reserved):
ctxt = context.get_admin_context()
try:
fixed_ip = db.fixed_ip_get_by_address(ctxt, address)
if fixed_ip is None:
raise exception.NotFound("Could not find address")
db.fixed_ip_update(ctxt, fixed_ip["address"], {"reserved": reserved})
except exception.NotFound as ex:
print _("error: %s") % ex
return 2
开发者ID:comstud,项目名称:nova,代码行数:11,代码来源:manage.py
示例8: _set_reserved
def _set_reserved(self, address, reserved):
ctxt = context.get_admin_context()
try:
fixed_ip = db.fixed_ip_get_by_address(ctxt, address)
if fixed_ip is None:
raise exception.NotFound('Could not find address')
db.fixed_ip_update(ctxt, fixed_ip['address'],
{'reserved': reserved})
except exception.NotFound as ex:
print(_("error: %s") % ex)
return(2)
开发者ID:RibeiroAna,项目名称:nova,代码行数:12,代码来源:manage.py
示例9: test_network_delete_safe
def test_network_delete_safe(self):
ctxt = context.get_admin_context()
values = {"host": "localhost", "project_id": "project1"}
network = db.network_create_safe(ctxt, values)
db_network = db.network_get(ctxt, network.id)
values = {"network_id": network["id"], "address": "fake1"}
address1 = db.fixed_ip_create(ctxt, values)
values = {"network_id": network["id"], "address": "fake2", "allocated": True}
address2 = db.fixed_ip_create(ctxt, values)
self.assertRaises(exception.NetworkInUse, db.network_delete_safe, ctxt, network["id"])
db.fixed_ip_update(ctxt, address2, {"allocated": False})
network = db.network_delete_safe(ctxt, network["id"])
ctxt = ctxt.elevated(read_deleted="yes")
fixed_ip = db.fixed_ip_get_by_address(ctxt, address1)
self.assertTrue(fixed_ip["deleted"])
开发者ID:hiteshwadekar,项目名称:nova,代码行数:15,代码来源:test_db_api.py
示例10: _setup_networking
def _setup_networking(instance_id, ip="1.2.3.4", flo_addr="1.2.1.2"):
ctxt = context.get_admin_context()
network_ref = db.project_get_networks(ctxt, "fake", associate=True)[0]
vif = {"address": "56:12:12:12:12:12", "network_id": network_ref["id"], "instance_id": instance_id}
vif_ref = db.virtual_interface_create(ctxt, vif)
fixed_ip = {
"address": ip,
"network_id": network_ref["id"],
"virtual_interface_id": vif_ref["id"],
"allocated": True,
"instance_id": instance_id,
}
db.fixed_ip_create(ctxt, fixed_ip)
fix_ref = db.fixed_ip_get_by_address(ctxt, ip)
db.floating_ip_create(ctxt, {"address": flo_addr, "fixed_ip_id": fix_ref["id"]})
开发者ID:hiteshwadekar,项目名称:nova,代码行数:16,代码来源:test_db_api.py
示例11: test_network_delete_safe
def test_network_delete_safe(self):
ctxt = context.get_admin_context()
values = {'host': 'localhost', 'project_id': 'project1'}
network = db.network_create_safe(ctxt, values)
db_network = db.network_get(ctxt, network.id)
values = {'network_id': network['id'], 'address': 'fake1'}
address1 = db.fixed_ip_create(ctxt, values)
values = {'network_id': network['id'],
'address': 'fake2',
'allocated': True}
address2 = db.fixed_ip_create(ctxt, values)
self.assertRaises(exception.NetworkInUse,
db.network_delete_safe, ctxt, network['id'])
db.fixed_ip_update(ctxt, address2, {'allocated': False})
network = db.network_delete_safe(ctxt, network['id'])
ctxt = ctxt.elevated(read_deleted='yes')
fixed_ip = db.fixed_ip_get_by_address(ctxt, address1)
self.assertTrue(fixed_ip['deleted'])
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:18,代码来源:test_db_api.py
示例12: _setup_networking
def _setup_networking(instance_id, ip='1.2.3.4', flo_addr='1.2.1.2'):
ctxt = context.get_admin_context()
network_ref = db.project_get_networks(ctxt,
'fake',
associate=True)[0]
vif = {'address': '56:12:12:12:12:12',
'network_id': network_ref['id'],
'instance_id': instance_id}
vif_ref = db.virtual_interface_create(ctxt, vif)
fixed_ip = {'address': ip,
'network_id': network_ref['id'],
'virtual_interface_id': vif_ref['id'],
'allocated': True,
'instance_id': instance_id}
db.fixed_ip_create(ctxt, fixed_ip)
fix_ref = db.fixed_ip_get_by_address(ctxt, ip)
db.floating_ip_create(ctxt, {'address': flo_addr,
'fixed_ip_id': fix_ref['id']})
开发者ID:matiu2,项目名称:nova,代码行数:19,代码来源:test_db_api.py
示例13: test_post_live_migration_working_correctly
def test_post_live_migration_working_correctly(self):
"""Confirm post_live_migration() works as expected correctly."""
dest = 'desthost'
flo_addr = '1.2.1.2'
# Preparing datas
c = context.get_admin_context()
instance_id = self._create_instance()
i_ref = db.instance_get(c, instance_id)
db.instance_update(c, i_ref['id'], {'state_description': 'migrating',
'state': power_state.PAUSED})
v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id})
fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1',
'instance_id': instance_id})
fix_ref = db.fixed_ip_get_by_address(c, fix_addr)
flo_ref = db.floating_ip_create(c, {'address': flo_addr,
'fixed_ip_id': fix_ref['id']})
# reload is necessary before setting mocks
i_ref = db.instance_get(c, instance_id)
# Preparing mocks
self.mox.StubOutWithMock(self.compute.volume_manager,
'remove_compute_volume')
for v in i_ref['volumes']:
self.compute.volume_manager.remove_compute_volume(c, v['id'])
self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance')
self.compute.driver.unfilter_instance(i_ref, [])
# executing
self.mox.ReplayAll()
ret = self.compute.post_live_migration(c, i_ref, dest)
# make sure every data is rewritten to dest
i_ref = db.instance_get(c, i_ref['id'])
c1 = (i_ref['host'] == dest)
flo_refs = db.floating_ip_get_all_by_host(c, dest)
c2 = (len(flo_refs) != 0 and flo_refs[0]['address'] == flo_addr)
# post operaton
self.assertTrue(c1 and c2)
db.instance_destroy(c, instance_id)
db.volume_destroy(c, v_ref['id'])
db.floating_ip_destroy(c, flo_addr)
开发者ID:cp16net,项目名称:reddwarf,代码行数:43,代码来源:test_compute.py
示例14: update_vm_stat
def update_vm_stat(params, remote_address):
"""
NOTE(hzyangtk):
params:
state: 1 means live, others mean error,
"""
ctxt = context.get_admin_context()
try:
fixed_ip = db.fixed_ip_get_by_address(ctxt, remote_address)
except exception.FixedIpNotFoundForAddress:
LOG.exception(_("fixed ip not found with address %s") % remote_address)
raise
instance_uuid = fixed_ip['instance_uuid']
# NOTE(hzyangtk): determine if let all VMs store state in DB
# when only HA vm record state, check db table
# instance_metadata has key 'HA' or not.
HA_key = 'HA'
record_state = False
if not FLAGS.record_all_vms_heartbeat:
instance_metadata = db.instance_metadata_get(ctxt, instance_uuid)
record_state = HA_key in instance_metadata
else:
record_state = True
if record_state:
cache_key = str(instance_uuid + '_heart')
cache_value = timeutils.utcnow().strftime('%Y-%m-%d %H:%M:%S')
memcache_client = get_memcache_client()
if memcache_client is not None:
result = memcache_client.set(cache_key, cache_value)
if not result:
LOG.exception(_("Memcache insert error. Key: %(cache_key)s, "
"value: %(cache_value)s"),
{'cache_key': cache_key,
'cache_value': cache_value})
raise exception.NovaException()
return json.dumps({cache_key: cache_value})
else:
LOG.exception(_("Memcache connection failed"))
raise exception.NovaException()
raise exception.NotFound()
开发者ID:xww,项目名称:nova-old,代码行数:41,代码来源:base.py
示例15: test_unreserve
def test_unreserve(self):
self.commands.unreserve('192.168.0.100')
address = db.fixed_ip_get_by_address(context.get_admin_context(),
'192.168.0.100')
self.assertEqual(address['reserved'], False)
开发者ID:Francis-Liu,项目名称:animated-broccoli,代码行数:5,代码来源:test_nova_manage.py
示例16: test_reserve
def test_reserve(self):
self.commands.reserve('192.168.0.100')
address = db.fixed_ip_get_by_address(context.get_admin_context(),
'192.168.0.100')
self.assertTrue(address['reserved'])
开发者ID:LoveAndKilling,项目名称:nova,代码行数:5,代码来源:test_nova_manage.py
示例17: test_unreserve
def test_unreserve(self):
self.commands.unreserve("192.168.0.100")
address = db.fixed_ip_get_by_address(context.get_admin_context(), "192.168.0.100")
self.assertEqual(address["reserved"], False)
开发者ID:jcalonsoh,项目名称:openstack-nova,代码行数:4,代码来源:test_nova_manage.py
注:本文中的nova.db.fixed_ip_get_by_address函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论