本文整理汇总了Python中netaddr.IPNetwork类的典型用法代码示例。如果您正苦于以下问题:Python IPNetwork类的具体用法?Python IPNetwork怎么用?Python IPNetwork使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了IPNetwork类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _create_subnets
def _create_subnets(self):
logging.debug("creating subnets")
zones = self.vpc.vpc.connection.get_all_zones()
logging.debug("zones: %s", zones)
# We'll need to split each subnet into smaller ones, one per zone
# offset is how much we need to add to cidr divisor to create at least
# that len(zone) subnets
zone_cidr_offset = ceil(log(len(zones), 2))
logging.debug("zone_offset: %s", zone_cidr_offset)
network_cidr = IPNetwork(self.vpc.get_config("{0}_cidr".format(self.name)))
zone_cidrs = network_cidr.subnet(
int(network_cidr.prefixlen + zone_cidr_offset)
)
subnets = []
for zone, cidr in zip(zones, zone_cidrs):
logging.debug("%s %s", zone, cidr)
subnet = self.vpc.vpc.connection.create_subnet(self.vpc.vpc.id, cidr, zone.name)
self.vpc.vpc.connection.associate_route_table(self.route_table.id, subnet.id)
self._tag_resource(subnet, zone.name)
subnets.append(subnet)
logging.debug("%s subnet: %s", self.name, subnet)
return subnets
开发者ID:Angakkuit,项目名称:asiaq-aws,代码行数:25,代码来源:disco_metanetwork.py
示例2: create_address_pools
def create_address_pools(interfaceorder, networks_pools):
address_pools = {
iname: {
'net': ':'.join(networks_pools[iname]),
'params': {
'ip_reserved': {
# Gateway will be used for configure OpenStack networks
'gateway': 1,
# l2_network_device will be used for configure local bridge
'l2_network_device': 1,
},
'ip_ranges': {
'default': [2, -2],
},
},
} for iname in interfaceorder
}
if 'public' in interfaceorder:
# Put floating IP range for public network
default_pool_name = 'default'
floating_pool_name = 'floating'
# Take a first subnet with necessary size and calculate the size
net = IPNetwork(networks_pools['public'][0])
new_prefix = int(networks_pools['public'][1])
subnet = next(net.subnet(prefixlen=new_prefix))
network_size = subnet.size
address_pools['public']['params']['ip_ranges'][default_pool_name] = [
2, network_size // 2 - 1]
address_pools['public']['params']['ip_ranges'][floating_pool_name] = [
network_size // 2, -2]
return address_pools
开发者ID:loles,项目名称:fuel-devops,代码行数:35,代码来源:templates.py
示例3: allocateInterconnect
def allocateInterconnect(self, interConnectPrefix, spines, leafs):
numOfIpsPerInterconnect = 2
numOfSubnets = len(spines) * len(leafs)
# no need to add +2 for network and broadcast, as junos supports /31
# TODO: it should be configurable and come from property file
bitsPerSubnet = int(math.ceil(math.log(numOfIpsPerInterconnect, 2))) # value is 1
cidrForEachSubnet = 32 - bitsPerSubnet # value is 31 as junos supports /31
numOfIps = (numOfSubnets * (numOfIpsPerInterconnect)) # no need to add +2 for network and broadcast
numOfBits = int(math.ceil(math.log(numOfIps, 2)))
cidr = 32 - numOfBits
interconnectBlock = IPNetwork(interConnectPrefix + "/" + str(cidr))
interconnectSubnets = list(interconnectBlock.subnet(cidrForEachSubnet))
interfaces = []
spines[0].pod.allocatedInterConnectBlock = str(interconnectBlock.cidr)
for spine in spines:
ifdsHasPeer = self.dao.Session().query(InterfaceDefinition).filter(InterfaceDefinition.device_id == spine.id).filter(InterfaceDefinition.peer != None).order_by(InterfaceDefinition.name_order_num).all()
for spineIfdHasPeer in ifdsHasPeer:
subnet = interconnectSubnets.pop(0)
ips = list(subnet)
spineEndIfl= InterfaceLogical(spineIfdHasPeer.name + '.0', spine, str(ips.pop(0)) + '/' + str(cidrForEachSubnet))
spineIfdHasPeer.layerAboves.append(spineEndIfl)
interfaces.append(spineEndIfl)
leafEndIfd = spineIfdHasPeer.peer
leafEndIfl= InterfaceLogical(leafEndIfd.name + '.0', leafEndIfd.device, str(ips.pop(0)) + '/' + str(cidrForEachSubnet))
leafEndIfd.layerAboves.append(leafEndIfl)
interfaces.append(leafEndIfl)
self.dao.createObjects(interfaces)
开发者ID:bgp44,项目名称:juniper,代码行数:32,代码来源:l3Clos.py
示例4: createNewVlanRange
def createNewVlanRange(cls):
""" Increment current cidr of vlan range present in network
and create new range
"""
publicIpRange = PublicIpRange.list(cls.api_client)
cls.startIp = publicIpRange[0].startip
cls.endIp = publicIpRange[0].endip
cls.gateway = publicIpRange[0].gateway
cls.netmask = publicIpRange[0].netmask
# Pass ip address and mask length to IPNetwork to findout the CIDR
ip = IPNetwork(cls.startIp + "/" + cls.netmask)
# Take random increment factor to avoid adding the same vlan ip range
# in each test case
networkIncrementFactor = random.randint(1,255)
new_cidr = ip.__iadd__(networkIncrementFactor)
ip2 = IPNetwork(new_cidr)
test_nw = ip2.network
ip = IPAddress(test_nw)
# Add IP range(5 IPs) in the new CIDR
test_gateway = ip.__add__(1)
test_startIp = ip.__add__(3)
test_endIp = ip.__add__(10)
# Populating services with new IP range
cls.testdata["vlan_ip_range"]["startip"] = test_startIp
cls.testdata["vlan_ip_range"]["endip"] = test_endIp
cls.testdata["vlan_ip_range"]["gateway"] = test_gateway
cls.testdata["vlan_ip_range"]["netmask"] = cls.netmask
cls.testdata["vlan_ip_range"]["zoneid"] = cls.zone.id
cls.testdata["vlan_ip_range"]["podid"] = cls.pod.id
return PublicIpRange.create(
cls.api_client,
cls.testdata["vlan_ip_range"])
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:33,代码来源:test_multiple_ip_ranges.py
示例5: validate_ip
def validate_ip(self, request, remote_ip):
# When we aren't configured to restrict on IP address
if not getattr(settings, 'RESTRICTEDSESSIONS_RESTRICT_IP', True):
return True
# When the IP address key hasn't yet been set on the request session
if SESSION_IP_KEY not in request.session:
return True
# When there is no remote IP, check if one has been set on the session
session_ip = request.session[SESSION_IP_KEY]
if not remote_ip:
if session_ip: # session has remote IP value so validate :-(
return False
else: # Session doesn't have remote IP value so possibly :-)
return True
# Compute fuzzy IP compare based on settings on compare sensitivity
session_network = IPNetwork(session_ip)
remote_ip = IPAddress(remote_ip)
try:
session_network = session_network.ipv4()
remote_ip = remote_ip.ipv4()
session_network.prefixlen = getattr(settings, 'RESTRICTEDSESSIONS_IPV4_LENGTH', 32)
except AddrConversionError:
try:
session_network.prefixlen = getattr(settings, 'RESTRICTEDSESSIONS_IPV6_LENGTH', 64)
except AddrFormatError:
# session_network must be IPv4, but remote_ip is IPv6
return False
return remote_ip in session_network
开发者ID:erikr,项目名称:django-restricted-sessions,代码行数:29,代码来源:middleware.py
示例6: reserve_ip_addr
def reserve_ip_addr(self):
"""Picks first available IP address from weave network.
If there's no available IP address anymore, ``IndexError``
will be raised. To prevent this error, catch the exception
or checks the value ``GluuCluster.ip_addr_available`` first
before trying to call this method.
:returns: A 2-elements tuple consists of IP address and network prefix,
e.g. ``("10.10.10.1", 24)``.
"""
# represents a pool of IP addresses
pool = IPNetwork(self.weave_ip_network)
# a generator holds possible IP addresses range, excluding exposed weave IP
ip_range = IPSet(pool.iter_hosts()) ^ IPSet(self.reserved_ip_addrs) ^ IPSet([self.exposed_weave_ip[0]])
# retrieves first IP address from ``ip_range`` generator
ip_addr = list(itertools.islice(ip_range, 1))[0]
# register the IP address so it will be excluded
# from possible IP range in subsequent requests
self.reserved_ip_addrs.append(str(ip_addr))
# weave IP address for container expects a traditional CIDR,
# e.g. 10.10.10.1/24, hence we return the actual IP and
# its prefix length
return str(ip_addr), pool.prefixlen
开发者ID:hasanmehmood,项目名称:gluu-flask,代码行数:28,代码来源:gluu_cluster.py
示例7: split_subnets
def split_subnets(cidr, split):
"""
Finds the nearest power of two to the number of desired subnets,
Splits a CIDR into that many sub CIDRs, and returns the array.
:param cidr: CIDR for entire range
:type cidr: str.
:param split: Number to split into
:type split: int.
:raises: :class:`pmcf.exceptions.PropertyException`
:returns: list.
"""
# Find the next closest power of two to a number
# For 3, return 4, for 5 return 8
if split == 1:
return [cidr]
power = 0
while split > 0:
split >>= 1
power += 1
try:
ipaddr = IPNetwork(cidr)
prefix = ipaddr.prefixlen
newprefix = prefix + power
return list(ipaddr.subnet(newprefix))
except Exception, exc:
raise PropertyException(str(exc))
开发者ID:pikselpalette,项目名称:pmcf,代码行数:31,代码来源:__init__.py
示例8: _add_search_filters
def _add_search_filters(filters, query):
"""
Add Search Service response to filters
"""
search_query = query
# Try to parse IP/CIDR search
if IP_CIDR_RE.match(query):
try:
network = IPNetwork(query)
if network.size <= 4096:
search_query = ' '.join([str(host) for host in network.iter_hosts()])
search_query = search_query if search_query else query
except (AttributeError, IndexError, AddrFormatError, AddrConversionError):
pass
try:
reports = ImplementationFactory.instance.get_singleton_of('SearchServiceBase').search_reports(search_query)
if not reports:
reports = [None]
except SearchServiceException:
return
if 'in' in filters['where']:
for field in filters['where']['in']:
for key, values in field.iteritems():
if key == 'id' and len(values):
reports.extend(values)
filters['where']['in'].remove({key: values})
filters['where']['in'].append({'id': list(set(reports))})
else:
filters['where']['in'] = [{'id': reports}]
开发者ID:ovh,项目名称:cerberus-core,代码行数:31,代码来源:ReportsController.py
示例9: populateDhcpGlobalSettings
def populateDhcpGlobalSettings(self):
ztp = {}
ztpGlobalSettings = util.loadClosDefinition()['ztp']
subnet = ztpGlobalSettings['dhcpSubnet']
dhcpBlock = IPNetwork(subnet)
ipList = list(dhcpBlock.iter_hosts())
ztp['network'] = str(dhcpBlock.network)
ztp['netmask'] = str(dhcpBlock.netmask)
ztp['defaultRoute'] = ztpGlobalSettings.get('dhcpOptionRoute')
if ztp['defaultRoute'] is None or ztp['defaultRoute'] == '':
ztp['defaultRoute'] = str(ipList[0])
ztp['rangeStart'] = ztpGlobalSettings.get('dhcpOptionRangeStart')
if ztp['rangeStart'] is None or ztp['rangeStart'] == '':
ztp['rangeStart'] = str(ipList[1])
ztp['rangeEnd'] = ztpGlobalSettings.get('dhcpOptionRangeEnd')
if ztp['rangeEnd'] is None or ztp['rangeEnd'] == '':
ztp['rangeEnd'] = str(ipList[-1])
ztp['broadcast'] = str(dhcpBlock.broadcast)
ztp['httpServerIp'] = self.conf['httpServer']['ipAddr']
ztp['imageUrl'] = ztpGlobalSettings.get('junosImage')
return ztp
开发者ID:rgiyer,项目名称:OpenClos,代码行数:26,代码来源:ztp.py
示例10: add
def add(caller_id, address, mask):
networks = []
for net in AvailableNetwork.objects.all():
networks.append(net.to_ipnetwork())
networks.sort()
# Find duplicate
ipnet = IPNetwork('%s/%d' % (address, mask))
for i in xrange(len(networks)):
if ipnet.prefixlen > networks[i].prefixlen and ipnet > networks[i].previous() and ipnet < networks[i].next():
raise CMException('network_exists')
elif ipnet.prefixlen < networks[i].prefixlen and ipnet.previous() < networks[i] and ipnet.next() > networks[i]:
raise CMException('network_exists')
# Add new network
new_net = AvailableNetwork()
new_net.address = ipnet.network
new_net.mask = mask
if ipnet.is_private():
new_net.state = available_network_states['ok']
else:
new_net.state = available_network_states['locked']
new_net.save()
开发者ID:cc1-cloud,项目名称:cc1,代码行数:25,代码来源:network.py
示例11: calculate_vip_start
def calculate_vip_start(self, vip_start, mgmtip, selfip_external):
if not selfip_external and not vip_start:
LOG.info('Skipping auto VIP generation.')
return
if selfip_external:
if not isinstance(selfip_external, IPNetwork):
selfip_external = IPNetwork(selfip_external)
if selfip_external.prefixlen == 32:
LOG.info('Self IP external has no prefix. Assuming /16.')
selfip_external.prefixlen = 16
if vip_start is None:
# '1.1.1.1/24' -> '1.1.1.0/24'
cidr = "{0.network}/{0.prefixlen}".format(mgmtip)
host_id = mgmtip.ip.value - mgmtip.network.value
subnet_index = SUBNET_MAP.get(cidr)
if subnet_index is None:
LOG.warning('The %s subnet was not found! '
'Skipping Virtual Servers.' % cidr)
return
# Start from 10.11.50.0 - 10.11.147.240 (planned for 10 subnets, 8 VIPs each)
offset = 1 + 50 * 256 + DEFAULT_VIPS * (256 * subnet_index + host_id)
vip_start = selfip_external.network + offset
else:
vip_start = IPAddress(vip_start)
return vip_start
开发者ID:pombredanne,项目名称:f5test2,代码行数:31,代码来源:placer.py
示例12: _validateLoopbackPrefix
def _validateLoopbackPrefix(self, pod, podDict, inventoryData):
inventoryDeviceCount = len(inventoryData['spines']) + len(inventoryData['leafs'])
lo0Block = IPNetwork(podDict['loopbackPrefix'])
lo0Ips = list(lo0Block.iter_hosts())
availableIps = len(lo0Ips)
if availableIps < inventoryDeviceCount:
raise ValueError("Pod[id='%s', name='%s']: loopbackPrefix available IPs %d not enough: required %d" % (pod.id, pod.name, availableIps, inventoryDeviceCount))
开发者ID:codyrat,项目名称:OpenClos,代码行数:7,代码来源:l3Clos.py
示例13: __init__
def __init__(self):
self.net1 = IPNetwork(NETWORK_1)
self.net1_ip_pool = cycle(self.net1.iter_hosts())
self.net2 = IPNetwork(NETWORK_2)
self.net2_ip_pool = cycle(self.net2.iter_hosts())
self.mcounter = dict()
self.mac_counter = 0
开发者ID:SmartInfrastructures,项目名称:fuel-web-dev,代码行数:8,代码来源:fake_generator.py
示例14: _validateLoopbackPrefix
def _validateLoopbackPrefix(self, pod, podDict, inventoryData):
inventoryDeviceCount = len(inventoryData['spines']) + len(inventoryData['leafs'])
lo0Block = IPNetwork(podDict['loopbackPrefix'])
lo0Ips = list(lo0Block.iter_hosts())
availableIps = len(lo0Ips)
cidr = 32 - int(math.ceil(math.log(inventoryDeviceCount, 2)))
if availableIps < inventoryDeviceCount:
raise InsufficientLoopbackIp("Pod[id='%s', name='%s']: loopbackPrefix minimum required: %s/%d" % (pod.id, pod.name, lo0Block.ip, cidr))
开发者ID:Juniper,项目名称:OpenClos,代码行数:8,代码来源:l3Clos.py
示例15: test_ip_v4_to_ipv6_compatible
def test_ip_v4_to_ipv6_compatible():
assert IPAddress('192.0.2.15').ipv6(ipv4_compatible=True) == IPAddress('::192.0.2.15')
assert IPAddress('192.0.2.15').ipv6(ipv4_compatible=True).is_ipv4_compat()
assert IPAddress('192.0.2.15').ipv6(True) == IPAddress('::192.0.2.15')
ip = IPNetwork('192.0.2.1/23')
assert ip.ipv4() == IPNetwork('192.0.2.1/23')
assert ip.ipv6() == IPNetwork('::ffff:192.0.2.1/119')
assert ip.ipv6(ipv4_compatible=True) == IPNetwork('::192.0.2.1/119')
开发者ID:drkjam,项目名称:netaddr,代码行数:9,代码来源:test_ip_v4_v6_conversions.py
示例16: allocate_tap_ips
def allocate_tap_ips(self):
#TODO: take tap subnet parameter
lab_topology = self.nidb.topology[self.host]
from netaddr import IPNetwork
address_block = IPNetwork("172.16.0.0/16").iter_hosts()
lab_topology.tap_host = address_block.next()
lab_topology.tap_vm = address_block.next() # for tunnel host
for node in self.nidb.nodes("is_l3device", host = self.host):
#TODO: check this works for switches
node.tap.ip = address_block.next()
开发者ID:sk2,项目名称:ANK-NG,代码行数:10,代码来源:compiler.py
示例17: is_cidr_reserved
def is_cidr_reserved(cidr, vpccidr):
""" Check if CIDR is in the first 2 x /22 in the VPC Range """
cidr = str(cidr)
vpc = IPNetwork(vpccidr)
reserved1 = list(vpc.subnet(22))[0]
reserved2 = list(vpc.subnet(22))[1]
if reserved1 == IPNetwork(cidr).supernet(22)[0] or reserved2 == IPNetwork(cidr).supernet(22)[0]:
return True
return False
开发者ID:bcosta12,项目名称:AdvancedCloudFormation,代码行数:10,代码来源:autosubnet.py
示例18: _allocateLoopback
def _allocateLoopback(self, session, pod, loopbackPrefix, devices):
loopbackIp = IPNetwork(loopbackPrefix).network
numOfIps = len(devices) + 2 # +2 for network and broadcast
numOfBits = int(math.ceil(math.log(numOfIps, 2)))
cidr = 32 - numOfBits
lo0Block = IPNetwork(str(loopbackIp) + "/" + str(cidr))
lo0Ips = list(lo0Block.iter_hosts())
pod.allocatedLoopbackBlock = str(lo0Block.cidr)
self._assignAllocatedLoopbackToDevices(session, devices, lo0Ips)
开发者ID:Juniper,项目名称:OpenClos,代码行数:10,代码来源:l3Clos.py
示例19: split_network
def split_network(request):
try:
network = IPNetwork(request.GET['network'])
except (KeyError, AddrFormatError):
return HttpResponseBadRequest()
return render(request, 'pammy/split_network.html', {
'networks': network.subnet(network.prefixlen + 1),
})
开发者ID:ocadotechnology,项目名称:pammy,代码行数:10,代码来源:ui.py
示例20: get_admin_ip_w_prefix
def get_admin_ip_w_prefix(node):
"""Getting admin ip and assign prefix from admin network."""
network_manager = objects.Node.get_network_manager(node)
admin_ip = network_manager.get_admin_ip_for_node(node)
admin_ip = IPNetwork(admin_ip)
# Assign prefix from admin network
admin_net = IPNetwork(network_manager.get_admin_network_group().cidr)
admin_ip.prefixlen = admin_net.prefixlen
return str(admin_ip)
开发者ID:sk4lf,项目名称:fuel-web,代码行数:11,代码来源:deployment_serializers.py
注:本文中的netaddr.IPNetwork类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论