本文整理汇总了Python中quark.db.api.ip_address_find函数的典型用法代码示例。如果您正苦于以下问题:Python ip_address_find函数的具体用法?Python ip_address_find怎么用?Python ip_address_find使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了ip_address_find函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_find_reallocatable_ips_does_not_raise
def test_find_reallocatable_ips_does_not_raise(self):
"""Regression testing
A patch recently introduced a bug wherein addressses
could not be returned to the ip_address_find call in
attempt_to_reallocate_ip. Adding this test to prevent
a future regression.
"""
network = dict(name="public", tenant_id="fake")
ipnet = netaddr.IPNetwork("0.0.0.0/24")
next_ip = ipnet.ipv6().first + 2
subnet = dict(id=1, cidr="0.0.0.0/24", next_auto_assign_ip=next_ip,
ip_policy=None, tenant_id="fake")
with self._stubs(network, subnet) as net:
ip_kwargs = {
"network_id": net["id"], "reuse_after": 14400,
"deallocated": True, "scope": db_api.ONE,
"lock_mode": True, "version": 4,
"order_by": "address"}
try:
db_api.ip_address_find(self.context, **ip_kwargs)
except Exception:
self.fail("This should not have raised")
开发者ID:mohanraj1311,项目名称:quark,代码行数:26,代码来源:test_ipam.py
示例2: test_ip_address_find_ip_address_list
def test_ip_address_find_ip_address_list(self):
ip_address = netaddr.IPAddress("192.168.10.1")
try:
db_api.ip_address_find(self.context, ip_address=[ip_address],
scope=db_api.ONE)
except Exception as e:
self.fail("Expected no exceptions: %s" % e)
开发者ID:Cerberus98,项目名称:quark,代码行数:7,代码来源:test_db_api.py
示例3: allocate_ip_address
def allocate_ip_address(self, context, net_id, port_id, reuse_after,
version=None, ip_address=None):
elevated = context.elevated()
if ip_address:
ip_address = netaddr.IPAddress(ip_address)
address = db_api.ip_address_find(
elevated, network_id=net_id, reuse_after=reuse_after,
deallocated=True, scope=db_api.ONE, ip_address=ip_address)
if address:
return db_api.ip_address_update(
elevated, address, deallocated=False, deallocated_at=None)
subnet = self._choose_available_subnet(
elevated, net_id, ip_address=ip_address, version=version)
# Creating this IP for the first time
next_ip = None
if ip_address:
next_ip = ip_address
else:
address = True
while address:
next_ip_int = int(subnet["next_auto_assign_ip"])
next_ip = netaddr.IPAddress(next_ip_int)
if subnet["ip_version"] == 4:
next_ip = next_ip.ipv4()
subnet["next_auto_assign_ip"] = next_ip_int + 1
address = db_api.ip_address_find(
elevated,
network_id=net_id,
ip_address=next_ip,
tenant_id=elevated.tenant_id,
scope=db_api.ONE)
# TODO(mdietz): this is a hack until we have IP policies
ip_int = int(next_ip)
first_ip = netaddr.IPAddress(int(subnet["first_ip"]))
last_ip = netaddr.IPAddress(int(subnet["last_ip"]))
if subnet["ip_version"] == 4:
first_ip = first_ip.ipv4()
last_ip = last_ip.ipv4()
first_ip = int(first_ip)
last_ip = int(last_ip)
diff = ip_int - first_ip
if diff < 2:
next_ip = netaddr.IPAddress(ip_int + (2 - diff))
if ip_int == last_ip:
raise exceptions.IpAddressGenerationFailure(net_id=net_id)
if next_ip not in netaddr.IPNetwork(subnet["cidr"]):
raise exceptions.IpAddressGenerationFailure(net_id=net_id)
address = db_api.ip_address_create(
elevated, address=next_ip, subnet_id=subnet["id"],
version=subnet["ip_version"], network_id=net_id)
return address
开发者ID:ugoring,项目名称:quark,代码行数:58,代码来源:ipam.py
示例4: test_ip_address_find_device_id
def test_ip_address_find_device_id(self):
query_mock = mock.Mock()
filter_mock = mock.Mock()
self.context.session.query = query_mock
query_mock.return_value = filter_mock
db_api.ip_address_find(self.context, device_id="foo")
self.assertEqual(filter_mock.filter.call_count, 1)
开发者ID:Cerberus98,项目名称:quark,代码行数:9,代码来源:test_db_api.py
示例5: post_update_port
def post_update_port(self, context, id, port):
LOG.info("post_update_port %s for tenant %s" % (id, context.tenant_id))
port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port_db:
raise exceptions.PortNotFound(port_id=id, net_id="")
if "port" not in port or not port["port"]:
raise exceptions.BadRequest()
port = port["port"]
if "fixed_ips" in port and port["fixed_ips"]:
for ip in port["fixed_ips"]:
address = None
if ip:
if "ip_id" in ip:
ip_id = ip["ip_id"]
address = db_api.ip_address_find(
context,
id=ip_id,
tenant_id=context.tenant_id,
scope=db_api.ONE)
elif "ip_address" in ip:
ip_address = ip["ip_address"]
net_address = netaddr.IPAddress(ip_address)
address = db_api.ip_address_find(
context,
ip_address=net_address,
network_id=port_db["network_id"],
tenant_id=context.tenant_id,
scope=db_api.ONE)
if not address:
address = self.ipam_driver.allocate_ip_address(
context,
port_db["network_id"],
id,
self.ipam_reuse_after,
ip_address=ip_address)
else:
address = self.ipam_driver.allocate_ip_address(
context,
port_db["network_id"],
id,
self.ipam_reuse_after)
address["deallocated"] = 0
already_contained = False
for port_address in port_db["ip_addresses"]:
if address["id"] == port_address["id"]:
already_contained = True
break
if not already_contained:
port_db["ip_addresses"].append(address)
return self._make_port_dict(port_db)
开发者ID:ugoring,项目名称:quark,代码行数:55,代码来源:plugin.py
示例6: test_ip_address_find_port_id_as_admin
def test_ip_address_find_port_id_as_admin(self):
self.context.session.query = mock.MagicMock()
final_query_mock = self.context.session.query.return_value
db_api.ip_address_find(self.context.elevated(), port_id="foo")
# NOTE(thomasem): Creates sqlalchemy.sql.elements.BinaryExpression
# when using SQLAlchemy models in expressions.
expected_filter = models.IPAddress.ports.any(models.Port.id == "foo")
self.assertEqual(len(final_query_mock.filter.call_args[0]), 1)
self.assertEqual(str(expected_filter), str(
final_query_mock.filter.call_args[0][0]))
开发者ID:Cerberus98,项目名称:quark,代码行数:11,代码来源:test_db_api.py
示例7: post_update_port
def post_update_port(context, id, port):
LOG.info("post_update_port %s for tenant %s" % (id, context.tenant_id))
if not port.get("port"):
raise exceptions.BadRequest(resource="ports", msg="Port body required")
with context.session.begin():
port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port_db:
raise exceptions.PortNotFound(port_id=id, net_id="")
port = port["port"]
if "fixed_ips" in port and port["fixed_ips"]:
for ip in port["fixed_ips"]:
address = None
ipam_driver = ipam.IPAM_REGISTRY.get_strategy(port_db["network"]["ipam_strategy"])
if ip:
if "ip_id" in ip:
ip_id = ip["ip_id"]
address = db_api.ip_address_find(
context, id=ip_id, tenant_id=context.tenant_id, scope=db_api.ONE
)
elif "ip_address" in ip:
ip_address = ip["ip_address"]
net_address = netaddr.IPAddress(ip_address)
address = db_api.ip_address_find(
context,
ip_address=net_address,
network_id=port_db["network_id"],
tenant_id=context.tenant_id,
scope=db_api.ONE,
)
if not address:
address = ipam_driver.allocate_ip_address(
context, port_db["network_id"], id, CONF.QUARK.ipam_reuse_after, ip_address=ip_address
)
else:
address = ipam_driver.allocate_ip_address(
context, port_db["network_id"], id, CONF.QUARK.ipam_reuse_after
)
address["deallocated"] = 0
already_contained = False
for port_address in port_db["ip_addresses"]:
if address["id"] == port_address["id"]:
already_contained = True
break
if not already_contained:
port_db["ip_addresses"].append(address)
return v._make_port_dict(port_db)
开发者ID:blamarvt,项目名称:quark,代码行数:51,代码来源:ports.py
示例8: test_ip_address_find_address_type_as_admin
def test_ip_address_find_address_type_as_admin(self):
self.context.session.query = mock.MagicMock()
filter_mock = self.context.session.query.return_value
db_api.ip_address_find(self.context.elevated(), address_type="foo")
# NOTE(thomasem): Creates sqlalchemy.sql.elements.BinaryExpression
# when using SQLAlchemy models in expressions.
expected_filter = models.IPAddress.address_type == "foo"
self.assertEqual(len(filter_mock.filter.call_args[0]), 1)
# NOTE(thomasem): Unfortunately BinaryExpression.compare isn't
# showing to be a reliable comparison, so using the string
# representation which dumps the associated SQL for the filter.
self.assertEqual(str(expected_filter), str(
filter_mock.filter.call_args[0][0]))
开发者ID:Cerberus98,项目名称:quark,代码行数:14,代码来源:test_db_api.py
示例9: test_address_not_in_cidr
def test_address_not_in_cidr(self):
self.network_db = self.insert_network()
self.subnet_v4_db = self.insert_subnet(
self.network_db, "192.168.0.0/24")
self.ip_address_v4 = netaddr.IPAddress("192.168.1.1")
ip_address_db = self.insert_ip_address(self.ip_address_v4,
self.network_db,
self.subnet_v4_db)
self.transaction = self.insert_transaction()
ip_kwargs = {
"network_id": self.network_db["id"],
"reuse_after": self.REUSE_AFTER,
"deallocated": True,
"version": 4,
}
reallocated = db_api.ip_address_reallocate(
self.context,
{"transaction_id": self.transaction.id},
**ip_kwargs)
self.assertTrue(reallocated)
updated_address = db_api.ip_address_reallocate_find(
self.context, self.transaction.id)
self.assertIsNone(updated_address)
self.context.session.flush()
self.assertIsNone(db_api.ip_address_find(self.context,
id=ip_address_db.id,
scope=db_api.ONE))
开发者ID:Anonymike,项目名称:quark,代码行数:29,代码来源:test_db_ip_reallocate.py
示例10: update_port_for_ip_address
def update_port_for_ip_address(context, ip_id, id, port):
"""Update values of a port.
: param context: neutron api request context
: param ip_id: UUID representing the ip associated with port to update
: param id: UUID representing the port to update.
: param port: dictionary with keys indicating fields to update.
valid keys are those that have a value of True for 'allow_put'
as listed in the RESOURCE_ATTRIBUTE_MAP object in
neutron/api/v2/attributes.py.
"""
LOG.info("update_port %s for tenant %s" % (id, context.tenant_id))
sanitize_list = ['service']
with context.session.begin():
addr = db_api.ip_address_find(context, id=ip_id, scope=db_api.ONE)
if not addr:
raise q_exc.IpAddressNotFound(addr_id=ip_id)
port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
if not port_db:
raise q_exc.PortNotFound(port_id=id)
port_dict = {k: port['port'][k] for k in sanitize_list}
require_da = False
service = port_dict.get('service')
if require_da and _shared_ip_and_active(addr, except_port=id):
raise q_exc.PortRequiresDisassociation()
addr.set_service_for_port(port_db, service)
context.session.add(addr)
return v._make_port_for_ip_dict(addr, port_db)
开发者ID:roaet,项目名称:quark,代码行数:30,代码来源:ip_addresses.py
示例11: test_reserve_ip_non_admin
def test_reserve_ip_non_admin(self):
with self._stubs() as ip:
deallocated_ip = ip_addr.update_ip_address(self.context, ip["id"],
self.ip_address_dealloc)
ip_address = db_api.ip_address_find(
self.context,
id=deallocated_ip["id"],
scope=db_api.ONE)
self.assertEqual(ip_address["deallocated"], True)
deallocated_ip = ip_addr.update_ip_address(self.context, ip["id"],
self.ip_address_reserve)
ip_address = db_api.ip_address_find(
self.context,
id=deallocated_ip["id"],
scope=db_api.ONE)
self.assertEqual(ip_address["deallocated"], True)
开发者ID:Anonymike,项目名称:quark,代码行数:16,代码来源:test_ip_addresses.py
示例12: attempt_to_reallocate_ip
def attempt_to_reallocate_ip(self, context, net_id, port_id, reuse_after,
version=None, ip_address=None):
version = version or [4, 6]
elevated = context.elevated()
# We never want to take the chance of an infinite loop here. Instead,
# we'll clean up multiple bad IPs if we find them (assuming something
# is really wrong)
for times in xrange(3):
with context.session.begin(subtransactions=True):
address = db_api.ip_address_find(
elevated, network_id=net_id, reuse_after=reuse_after,
deallocated=True, scope=db_api.ONE, ip_address=ip_address,
lock_mode=True, version=version, order_by="address")
if address:
#NOTE(mdietz): We should always be in the CIDR but we've
# also said that before :-/
if address.get("subnet"):
cidr = netaddr.IPNetwork(address["subnet"]["cidr"])
addr = netaddr.IPAddress(int(address["address"]),
version=int(cidr.version))
if addr in cidr:
updated_address = db_api.ip_address_update(
elevated, address, deallocated=False,
deallocated_at=None,
allocated_at=timeutils.utcnow())
return [updated_address]
else:
# Make sure we never find it again
context.session.delete(address)
continue
break
return []
开发者ID:blamarvt,项目名称:quark,代码行数:34,代码来源:ipam.py
示例13: _allocate_ips_from_subnets
def _allocate_ips_from_subnets(self, context, net_id, subnets,
ip_address=None):
new_addresses = []
for subnet in subnets:
ip_policy_cidrs = models.IPPolicy.get_ip_policy_cidrs(subnet)
# Creating this IP for the first time
next_ip = None
if ip_address:
next_ip = ip_address
address = db_api.ip_address_find(
context, network_id=net_id, ip_address=next_ip,
used_by_tenant_id=context.tenant_id, scope=db_api.ONE)
if address:
raise exceptions.IpAddressGenerationFailure(
net_id=net_id)
else:
next_ip = self._iterate_until_available_ip(
context, subnet, net_id, ip_policy_cidrs)
context.session.add(subnet)
address = db_api.ip_address_create(
context, address=next_ip, subnet_id=subnet["id"],
version=subnet["ip_version"], network_id=net_id)
address["deallocated"] = 0
new_addresses.append(address)
return new_addresses
开发者ID:blamarvt,项目名称:quark-1,代码行数:26,代码来源:ipam.py
示例14: delete_ip_address
def delete_ip_address(context, id):
"""Delete an ip address.
: param context: neutron api request context
: param id: UUID representing the ip address to delete.
"""
LOG.info("delete_ip_address %s for tenant %s" % (id, context.tenant_id))
with context.session.begin():
ip_address = db_api.ip_address_find(
context, id=id, scope=db_api.ONE)
if not ip_address or ip_address.deallocated:
raise q_exc.IpAddressNotFound(addr_id=id)
iptype = ip_address.address_type
if iptype == ip_types.FIXED and not CONF.QUARK.ipaddr_allow_fixed_ip:
raise n_exc.BadRequest(
resource="ip_addresses",
msg="Fixed ips cannot be updated using this interface.")
if ip_address.has_any_shared_owner():
raise q_exc.PortRequiresDisassociation()
db_api.update_port_associations_for_ip(context, [], ip_address)
ipam_driver.deallocate_ip_address(context, ip_address)
开发者ID:roaet,项目名称:quark,代码行数:25,代码来源:ip_addresses.py
示例15: get_ip_addresses
def get_ip_addresses(context, **filters):
LOG.info("get_ip_addresses for tenant %s" % context.tenant_id)
if not filters:
filters = {}
if 'type' in filters:
filters['address_type'] = filters['type']
if context.is_admin:
if 'deallocated' in filters:
if filters['deallocated'] == 'True':
filters["_deallocated"] = True
elif filters['deallocated'] == 'False':
filters["_deallocated"] = False
elif filters['deallocated'].lower() == 'both':
pass
else:
filters['_deallocated'] = False
else:
filters['_deallocated'] = False
else:
filters["_deallocated"] = False
if 'deallocated' in filters:
# this needs to be removed or it corrupts the filters passed to the db
# model. Only _deallocated needs to be present
del filters['deallocated']
addrs = db_api.ip_address_find(context, scope=db_api.ALL, **filters)
return [v._make_ip_dict(ip, context.is_admin) for ip in addrs]
开发者ID:Anonymike,项目名称:quark,代码行数:28,代码来源:ip_addresses.py
示例16: update_ip_address
def update_ip_address(context, id, ip_address):
LOG.info("update_ip_address %s for tenant %s" %
(id, context.tenant_id))
address = db_api.ip_address_find(
context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE)
if not address:
raise exceptions.NotFound(
message="No IP address found with id=%s" % id)
old_ports = address['ports']
port_ids = ip_address['ip_address'].get('port_ids')
if port_ids is None:
return v._make_ip_dict(address)
for port in old_ports:
port['ip_addresses'].remove(address)
if port_ids:
ports = db_api.port_find(
context, tenant_id=context.tenant_id, id=port_ids,
scope=db_api.ALL)
# NOTE: could be considered inefficient because we're converting
# to a list to check length. Maybe revisit
if len(ports) != len(port_ids):
raise exceptions.NotFound(
message="No ports not found with ids=%s" % port_ids)
for port in ports:
port['ip_addresses'].extend([address])
else:
address["deallocated"] = 1
return v._make_ip_dict(address)
开发者ID:kilogram,项目名称:quark,代码行数:35,代码来源:ip_addresses.py
示例17: test_ip_address_find_ip_address_object_list_none
def test_ip_address_find_ip_address_object_list_none(self):
with self._stubs():
ip_addresses = db_api.ip_address_find(
self.context,
ip_address=[netaddr.IPAddress("192.168.10.2")],
scope=db_api.ALL)
self.assertEqual(len(ip_addresses), 0)
开发者ID:mohanraj1311,项目名称:quark,代码行数:7,代码来源:test_ip_addresses.py
示例18: get_ip_address
def get_ip_address(context, id):
LOG.info("get_ip_address %s for tenant %s" %
(id, context.tenant_id))
addr = db_api.ip_address_find(context, id=id, scope=db_api.ONE)
if not addr:
raise quark_exceptions.IpAddressNotFound(addr_id=id)
return v._make_ip_dict(addr)
开发者ID:sumanthns,项目名称:quark,代码行数:7,代码来源:ip_addresses.py
示例19: allocate_ip_address
def allocate_ip_address(self, context, net_id, port_id, reuse_after,
version=None, ip_address=None):
elevated = context.elevated()
if ip_address:
ip_address = netaddr.IPAddress(ip_address)
address = db_api.ip_address_find(
elevated, network_id=net_id, reuse_after=reuse_after,
deallocated=True, scope=db_api.ONE, ip_address=ip_address)
if address:
return db_api.ip_address_update(
elevated, address, deallocated=False, deallocated_at=None)
subnet = self._choose_available_subnet(
elevated, net_id, ip_address=ip_address, version=version)
ip_policy_rules = self.get_ip_policy_rule_set(subnet)
# Creating this IP for the first time
next_ip = None
if ip_address:
next_ip = ip_address
address = db_api.ip_address_find(
elevated, network_id=net_id, ip_address=next_ip,
tenant_id=elevated.tenant_id, scope=db_api.ONE)
if address:
raise exceptions.IpAddressGenerationFailure(net_id=net_id)
else:
address = True
while address:
next_ip_int = int(subnet["next_auto_assign_ip"])
next_ip = netaddr.IPAddress(next_ip_int)
if subnet["ip_version"] == 4:
next_ip = next_ip.ipv4()
subnet["next_auto_assign_ip"] = next_ip_int + 1
if ip_policy_rules and next_ip in ip_policy_rules:
continue
address = db_api.ip_address_find(
elevated, network_id=net_id, ip_address=next_ip,
tenant_id=elevated.tenant_id, scope=db_api.ONE)
address = db_api.ip_address_create(
elevated, address=next_ip, subnet_id=subnet["id"],
version=subnet["ip_version"], network_id=net_id)
address["deallocated"] = 0
return address
开发者ID:kilogram,项目名称:quark,代码行数:46,代码来源:ipam.py
示例20: get_ip_address
def get_ip_address(context, id):
LOG.info("get_ip_address %s for tenant %s" %
(id, context.tenant_id))
filters = {}
filters["_deallocated"] = False
addr = db_api.ip_address_find(context, id=id, scope=db_api.ONE, **filters)
if not addr:
raise q_exc.IpAddressNotFound(addr_id=id)
return v._make_ip_dict(addr)
开发者ID:roaet,项目名称:quark,代码行数:9,代码来源:ip_addresses.py
注:本文中的quark.db.api.ip_address_find函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论