本文整理汇总了Python中netaddr.all_matching_cidrs函数的典型用法代码示例。如果您正苦于以下问题:Python all_matching_cidrs函数的具体用法?Python all_matching_cidrs怎么用?Python all_matching_cidrs使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了all_matching_cidrs函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _check_for_dup_router_subnet
def _check_for_dup_router_subnet(self, context, router_id,
network_id, subnet_id, subnet_cidr):
try:
rport_qry = context.session.query(models_v2.Port)
rports = rport_qry.filter_by(
device_id=router_id).all()
# its possible these ports on on the same network, but
# different subnet
new_ipnet = netaddr.IPNetwork(subnet_cidr)
for p in rports:
for ip in p['fixed_ips']:
if ip['subnet_id'] == subnet_id:
msg = (_("Router already has a port on subnet %s")
% subnet_id)
raise q_exc.BadRequest(resource='router', msg=msg)
sub_id = ip['subnet_id']
cidr = self._get_subnet(context.elevated(),
sub_id)['cidr']
ipnet = netaddr.IPNetwork(cidr)
match1 = netaddr.all_matching_cidrs(new_ipnet, [cidr])
match2 = netaddr.all_matching_cidrs(ipnet, [subnet_cidr])
if match1 or match2:
data = {'subnet_cidr': subnet_cidr,
'subnet_id': subnet_id,
'cidr': cidr,
'sub_id': sub_id}
msg = (_("Cidr %(subnet_cidr)s of subnet "
"%(subnet_id)s overlaps with cidr %(cidr)s "
"of subnet %(sub_id)s") % data)
raise q_exc.BadRequest(resource='router', msg=msg)
except exc.NoResultFound:
pass
开发者ID:CiscoAS,项目名称:quantum,代码行数:32,代码来源:l3_db.py
示例2: _check_for_dup_router_subnet
def _check_for_dup_router_subnet(self, context, router_id,
network_id, subnet_id):
try:
rport_qry = context.session.query(models_v2.Port)
rports = rport_qry.filter_by(
device_id=router_id,
device_owner=DEVICE_OWNER_ROUTER_INTF,).all()
# its possible these ports on on the same network, but
# different subnet
new_cidr = self._get_subnet(context, subnet_id)['cidr']
new_ipnet = netaddr.IPNetwork(new_cidr)
for p in rports:
for ip in p['fixed_ips']:
if ip['subnet_id'] == subnet_id:
msg = ("Router already has a port on subnet %s"
% subnet_id)
raise q_exc.BadRequest(resource='router', msg=msg)
cidr = self._get_subnet(context, ip['subnet_id'])['cidr']
ipnet = netaddr.IPNetwork(cidr)
match1 = netaddr.all_matching_cidrs(new_ipnet, [cidr])
match2 = netaddr.all_matching_cidrs(ipnet, [new_cidr])
if match1 or match2:
msg = (("Cidr %s of subnet %s is overlapped "
+ "with cidr %s of subnet %s")
% (new_cidr, subnet_id, cidr, ip['subnet_id']))
raise q_exc.BadRequest(resource='router', msg=msg)
except exc.NoResultFound:
pass
开发者ID:ddutta,项目名称:quantum,代码行数:28,代码来源:l3_db.py
示例3: _check_for_dup_router_subnet
def _check_for_dup_router_subnet(self, router_obj, subnet_id,
subnet_cidr):
try:
router_vmi_objs = []
if router_obj.get_virtual_machine_interface_refs():
vmis = [x['uuid']
for x in router_obj.virtual_machine_interface_refs]
router_vmi_objs = self._vnc_lib.virtual_machine_interfaces_list(
obj_uuids=vmis, detail=True,
fields=['instance_ip_back_refs'])
# It's possible router ports are on the same network, but
# different subnets.
new_ipnet = netaddr.IPNetwork(subnet_cidr)
port_req_memo = {'virtual-machines': {},
'instance-ips': {},
'subnets': {}}
for vmi_obj in router_vmi_objs:
net_id = self._vmi_handler.get_vmi_net_id(vmi_obj)
vn_obj = self._vnc_lib.virtual_network_read(id=net_id)
fixed_ips = self._vmi_handler.get_vmi_ip_dict(vmi_obj, vn_obj,
port_req_memo)
vn_subnets = (
subnet_handler.SubnetHandler.get_vn_subnets(
vn_obj))
for ip in fixed_ips:
if ip['subnet_id'] == subnet_id:
msg = ("Router %s already has a port on subnet %s"
% (router_obj.uuid, subnet_id))
self._raise_contrail_exception(
'BadRequest', resource='router', msg=msg)
sub_id = ip['subnet_id']
cidr = self._get_subnet_cidr(sub_id, vn_subnets)
ipnet = netaddr.IPNetwork(cidr)
match1 = netaddr.all_matching_cidrs(new_ipnet, [cidr])
match2 = netaddr.all_matching_cidrs(ipnet, [subnet_cidr])
if match1 or match2:
data = {'subnet_cidr': subnet_cidr,
'subnet_id': subnet_id,
'cidr': cidr,
'sub_id': sub_id}
msg = (("Cidr %(subnet_cidr)s of subnet "
"%(subnet_id)s overlaps with cidr %(cidr)s "
"of subnet %(sub_id)s") % data)
self._raise_contrail_exception(
'BadRequest', resource='router', msg=msg)
except vnc_exc.NoIdError:
pass
开发者ID:morganwang010,项目名称:contrail-neutron-plugin,代码行数:48,代码来源:router_res_handler.py
示例4: isin
def isin(self,ip):
if not self.cidrs:
return False
if netaddr.all_matching_cidrs(ip, self.cidrs):
return True
else:
return False
开发者ID:RS-liuyang,项目名称:rs-pymod,代码行数:7,代码来源:nxdomain.py
示例5: finisher
def finisher(the_record):
"""
POST finished chain to a callback URL provided
"""
db.session.add(the_record)
verify_ssl = app.config['SSL_HOST_VALIDATION']
# Set the correct headers for the postback
headers = {'Content-type': 'application/json', 'Accept': 'text/plain', 'Connection': 'close'}
#proxy = {"http": "127.0.0.1:8080"}
try:
# Blacklist IP addresses
ip_addr = socket.gethostbyname(grab_domain(the_record.url))
if app.config['IP_BLACKLISTING']:
if netaddr.all_matching_cidrs(ip_addr, app.config['IP_BLACKLISTING_RANGE'].split(',')):
the_record.capture_status = "IP BLACKLISTED:{} - ".format(ip_addr) + the_record.capture_status
except:
pass
req = post(the_record.callback, verify=verify_ssl, data=json.dumps(the_record.as_dict()), headers=headers)
# If a 4xx or 5xx status is recived, raise an exception
req.raise_for_status()
# Update capture_record and save to database
the_record.job_status = 'COMPLETED'
# Removed to propigate blacklist message
#the_record.capture_status = 'CALLBACK_SUCCEEDED'
db.session.add(the_record)
db.session.commit()
开发者ID:LiaoHongzheng,项目名称:sketchy,代码行数:31,代码来源:tasks.py
示例6: _validate_signature
def _validate_signature(self, project, request):
secret_key = request.GET.get("key", None)
if secret_key is None:
return False
if not hasattr(project, "modules_config"):
return False
if project.modules_config.config is None:
return False
project_secret = project.modules_config.config.get("bitbucket", {}).get("secret", "")
if not project_secret:
return False
bitbucket_config = project.modules_config.config.get("bitbucket", {})
valid_origin_ips = bitbucket_config.get("valid_origin_ips",
settings.BITBUCKET_VALID_ORIGIN_IPS)
origin_ip = get_ip(request)
mathching_origin_ip = True
if valid_origin_ips:
try:
mathching_origin_ip = len(all_matching_cidrs(origin_ip,valid_origin_ips)) > 0
except(AddrFormatError, ValueError):
mathching_origin_ip = False
if not mathching_origin_ip:
return False
return project_secret == secret_key
开发者ID:shreeshreee,项目名称:taiga-back,代码行数:33,代码来源:api.py
示例7: test_051_check_instance_info
def test_051_check_instance_info(self):
(status, body) = self.gce.zone_get("/instances/",
self.ctx.instance_name)
self.assertEqual(200, status, message=body.get("message"))
self.assertEqual(self.ctx.instance_name, body["name"])
self.assertEqual("test instance", body["description"])
self.assertEqual("RUNNING", body["status"])
self.assertEqual("ACTIVE", body["statusMessage"])
self.assertEqual(self.ctx.machine_type["selfLink"],
body["machineType"])
nwifs = body.get("networkInterfaces")
self.assertEqual(1, len(nwifs))
nwif = nwifs[0]
try:
self.assertEqual(self.ctx.network_name, nwif["name"])
except Exception:
LOG.exception("Is nova network here?")
# NOTE(apavlov): change network name for future usage in this case
self.ctx.network_name = nwif["name"]
cidrs = netaddr.all_matching_cidrs(nwif["networkIP"],
[self.ctx.network_cidr])
try:
self.assertEqual(1, len(cidrs))
except Exception:
LOG.exception("Is nova network here?")
try:
self.assertEqual(self.ctx.network_cidr, str(cidrs[0]))
except Exception:
LOG.exception("Is nova network here?")
self.verify_zone_resource_uri(body["selfLink"], "/instances",
self.ctx.instance_name)
self.ctx.instance = body
开发者ID:pojoba02,项目名称:gce-api,代码行数:35,代码来源:test_gce_scenario.py
示例8: auto_tags
def auto_tags(self):
res = [u'state:' + self.context.state] if self.context.state else []
if self.context.architecture:
for i in self.context.architecture:
res.append(u'arch:' + i)
from opennode.knot.model.virtualizationcontainer import IVirtualizationContainer
p = sudo(self.context)
if (IVirtualCompute.providedBy(p) and
IVirtualizationContainer.providedBy(p.__parent__)):
res.append(u'virt_type:' + p.__parent__.backend)
res.append(u'virt:yes')
else:
res.append(u'virt:no')
config = get_config()
if config.has_section('netenv-tags'):
for tag, nets in config.items('netenv-tags'):
try:
if (self.context.ipv4_address is not None and
len(netaddr.all_matching_cidrs(self.context.ipv4_address.split('/')[0],
nets.split(','))) > 0):
res.append(u'env:' + tag)
except ValueError:
# graceful ignoring of incorrect ips
pass
return res
开发者ID:murisfurder,项目名称:opennode-knot,代码行数:27,代码来源:compute.py
示例9: _confirm_router_interface_not_in_use
def _confirm_router_interface_not_in_use(self, context, router_id, subnet_id):
super(ExtraRoute_db_mixin, self)._confirm_router_interface_not_in_use(context, router_id, subnet_id)
subnet_db = self._get_subnet(context, subnet_id)
subnet_cidr = netaddr.IPNetwork(subnet_db["cidr"])
extra_routes = self._get_extra_routes_by_router_id(context, router_id)
for route in extra_routes:
if netaddr.all_matching_cidrs(route["nexthop"], [subnet_cidr]):
raise extraroute.RouterInterfaceInUseByRoute(router_id=router_id, subnet_id=subnet_id)
开发者ID:mygoda,项目名称:openstack,代码行数:8,代码来源:extraroute_db.py
示例10: get_reverse_net_dns
def get_reverse_net_dns(ip, ip_ranges):
rev_cidr = all_matching_cidrs(ip, ip_ranges)
if len(rev_cidr) > 1:
print "ERROR: overlapping CIDRs"
sys.exit()
rev_net = str(rev_cidr[0])
rev_dns = netaddr.IPAddress(ip).reverse_dns
return rev_net, rev_dns
开发者ID:douglin,项目名称:cloud_ddns,代码行数:8,代码来源:ddns.py
示例11: check_allowed_ip
def check_allowed_ip(self, request):
""" Raise a 404 if this request is not from an allowed IP. """
if request.META['REMOTE_ADDR'] in self.allowed_IPs:
return
if bool(len(all_matching_cidrs(request.META['REMOTE_ADDR'],
self.allowed_cidr_blocks))):
return
raise Http404
开发者ID:wcirillo,项目名称:ten,代码行数:8,代码来源:connector.py
示例12: _validate_routes_nexthop
def _validate_routes_nexthop(self, cidrs, ips, routes, nexthop):
# Note(nati): Nexthop should be connected,
# so we need to check
# nexthop belongs to one of cidrs of the router ports
if not netaddr.all_matching_cidrs(nexthop, cidrs):
raise extraroute.InvalidRoutes(routes=routes, reason=_("the nexthop is not connected with router"))
# Note(nati) nexthop should not be same as fixed_ips
if nexthop in ips:
raise extraroute.InvalidRoutes(routes=routes, reason=_("the nexthop is used by router"))
开发者ID:cisco-openstack,项目名称:neutron,代码行数:9,代码来源:extraroute_db.py
示例13: comprobar_entorno
def comprobar_entorno(ip):
listado_pro = conf_dir + "listado_ips/listado_rangos_FE_pro"
vlans = open(listado_pro).read().splitlines()
match = netaddr.all_matching_cidrs(ip, vlans)
if len(match) == 0:
entorno = "no-pro"
else:
entorno = "pro"
return entorno
开发者ID:salvapinyol,项目名称:nss,代码行数:9,代码来源:NSSapp.py
示例14: comprobar_nombre_con_IP
def comprobar_nombre_con_IP(nombre_host, ip):
nombre = nombre_host.lower()
regexname = re_servidor_fe
match_fe = re.match(regexname, nombre)
if not match_fe:
regexname = re_servidor_be
match_be = re.match(regexname, nombre)
if not match_be:
regexname = re_servidor_consola
match_c = re.match(regexname, nombre)
if not match_c:
print "ERROR - La IP indicada corresponde a un rango de otro entorno al indicado: %s y %s" % (nombre, ip)
print "Solución: verificar el documento 'Estandar - Nomenclatura HW y Servicios.pdf' v1.10"
sys.exit(13)
else:
listado_pro = conf_dir + "listado_ips/listado_rangos_C"
listado_pre = conf_dir + "listado_ips/listado_rangos_C"
listado_des = conf_dir + "listado_ips/listado_rangos_C"
listado_int = conf_dir + "listado_ips/listado_rangos_C"
listado_maq = conf_dir + "listado_ips/listado_rangos_C"
entorno_servidor = match_c.group('entorno_srv')
else:
listado_pro = conf_dir + "listado_ips/listado_rangos_BE_pro"
listado_pre = conf_dir + "listado_ips/listado_rangos_BE_pre"
listado_des = conf_dir + "listado_ips/listado_rangos_BE_desarrollo"
listado_int = conf_dir + "listado_ips/listado_rangos_BE_integracion"
listado_maq = conf_dir + "listado_ips/listado_rangos_BE_maqueta"
entorno_servidor = match_be.group('entorno_srv')
else:
listado_pro = conf_dir + "listado_ips/listado_rangos_FE_pro"
listado_pre = conf_dir + "listado_ips/listado_rangos_FE_pre"
listado_des = conf_dir + "listado_ips/listado_rangos_FE_desarrollo"
listado_int = conf_dir + "listado_ips/listado_rangos_FE_integracion"
listado_maq = conf_dir + "listado_ips/listado_rangos_FE_maqueta"
entorno_servidor = match_fe.group('entorno_srv')
if entorno_servidor == 'p':
vlans = open(listado_pro).read().splitlines()
elif entorno_servidor == 'y':
vlans = open(listado_pro).read().splitlines()
elif entorno_servidor == 'r':
vlans = open(listado_pre).read().splitlines()
elif entorno_servidor == 'i':
vlans = open(listado_int).read().splitlines()
elif entorno_servidor == 'd':
vlans = open(listado_des).read().splitlines()
elif entorno_servidor == 'f':
vlans = open(listado_int).read().splitlines()
elif entorno_servidor == 'm':
vlans = open(listado_maq).read().splitlines()
match = netaddr.all_matching_cidrs(ip, vlans)
if len(match) == 0:
print "ERROR - El entorno del servidor no coincide con el direccionamiento indicado."
print "Solución: verificar que la IP %s corresponde al entorno '%s'" % (ip, entorno_servidor)
sys.exit(14)
return
开发者ID:salvapinyol,项目名称:nss,代码行数:57,代码来源:NSSapp.py
示例15: in_known_cidr_block
def in_known_cidr_block(ip_address):
redis_con = redis.StrictRedis(host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB)
cidrs = redis_con.get('cidrs')
if not cidrs or not len(cidrs):
return False
return len(netaddr.all_matching_cidrs(ip_address, cidrs.split(','))) > 0
开发者ID:marklit,项目名称:mass-ipv4-whois,代码行数:10,代码来源:get_ips_from_coordinator.py
示例16: accept
def accept(ip):
hasAccess = [ "127.0.0.1/32",
"192.168.0.0/16" ]
check = all_matching_cidrs(ip , hasAccess )
if len(check) > 0:
return True
else:
return False
开发者ID:gsilos,项目名称:YamlWebEditor,代码行数:10,代码来源:web.py
示例17: isinsubnet
def isinsubnet(x): # use netaddr to evaluate if given ip in subnet
for item in masterlist:
# check if given ip is in the masterlist
iprange = netaddr.all_matching_cidrs(x, [item])
if iprange is not False: # if ip is in masterlist return yes
return 'Yes'
if iprange is False:
return 'No' # if ip is not in masterlist return no.
开发者ID:bassclarinetl2,项目名称:SPFEval,代码行数:10,代码来源:spfeval.py
示例18: is_whitelisted
def is_whitelisted(self, ident):
for whitelisted in settings.REST_FRAMEWORK['DEFAULT_THROTTLE_WHITELIST']:
if isinstance(whitelisted, int) and whitelisted == ident:
return True
elif isinstance(whitelisted, str):
try:
if all_matching_cidrs(ident, [whitelisted]) != []:
return True
except(AddrFormatError, ValueError):
pass
return False
开发者ID:shreeshreee,项目名称:taiga-back,代码行数:11,代码来源:throttling.py
示例19: check_for_dup_router_subnet
def check_for_dup_router_subnet(self, context, router_id,
subnet_id, subnet_cidr):
# called from l3 db
openvpn_conns = self.get_openvpnconnections(
context, filters={'router_id': [router_id]})
new_ipnet = netaddr.IPNetwork(subnet_cidr)
for openvpn in openvpn_conns:
cidr = openvpn['peer_cidr']
ipnet = netaddr.IPNetwork(cidr)
match1 = netaddr.all_matching_cidrs(new_ipnet, [cidr])
match2 = netaddr.all_matching_cidrs(ipnet, [subnet_cidr])
if match1 or match2:
data = {'subnet_cidr': subnet_cidr,
'subnet_id': subnet_id,
'cidr': cidr,
'vpn_id': openvpn['id']}
msg = (_("Cidr %(subnet_cidr)s of subnet "
"%(subnet_id)s overlaps with cidr %(cidr)s "
"of openvpn %(vpn_id)s") % data)
raise n_exc.BadRequest(resource='router', msg=msg)
开发者ID:CingHu,项目名称:neutron-ustack,代码行数:20,代码来源:openvpn_db.py
示例20: ignore_ip_addr
def ignore_ip_addr(vm_, ip):
'''
Return True if we are to ignore the specified IP. Compatible with IPv4.
'''
if HAS_NETADDR is False:
return 'Error: netaddr is not installed'
cidr = vm_.get('ip_ignore', __opts__.get('OPENSTACK.ignore_cidr', ''))
if cidr != '' and all_matching_cidrs(ip, [cidr]):
return True
else:
return False
开发者ID:dhmsbr,项目名称:salt-stack-remedy,代码行数:12,代码来源:openstack.py
注:本文中的netaddr.all_matching_cidrs函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论