• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python netaddr.IPNetwork类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中netaddr.IPNetwork的典型用法代码示例。如果您正苦于以下问题:Python IPNetwork类的具体用法?Python IPNetwork怎么用?Python IPNetwork使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了IPNetwork类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _create_subnets

    def _create_subnets(self):
        logging.debug("creating subnets")
        zones = self.vpc.vpc.connection.get_all_zones()
        logging.debug("zones: %s", zones)
        # We'll need to split each subnet into smaller ones, one per zone
        # offset is how much we need to add to cidr divisor to create at least
        # that len(zone) subnets
        zone_cidr_offset = ceil(log(len(zones), 2))
        logging.debug("zone_offset: %s", zone_cidr_offset)

        network_cidr = IPNetwork(self.vpc.get_config("{0}_cidr".format(self.name)))
        zone_cidrs = network_cidr.subnet(
            int(network_cidr.prefixlen + zone_cidr_offset)
        )

        subnets = []
        for zone, cidr in zip(zones, zone_cidrs):
            logging.debug("%s %s", zone, cidr)
            subnet = self.vpc.vpc.connection.create_subnet(self.vpc.vpc.id, cidr, zone.name)
            self.vpc.vpc.connection.associate_route_table(self.route_table.id, subnet.id)
            self._tag_resource(subnet, zone.name)
            subnets.append(subnet)
            logging.debug("%s subnet: %s", self.name, subnet)

        return subnets
开发者ID:Angakkuit,项目名称:asiaq-aws,代码行数:25,代码来源:disco_metanetwork.py


示例2: create_address_pools

def create_address_pools(interfaceorder, networks_pools):
    address_pools = {
        iname: {
            'net': ':'.join(networks_pools[iname]),
            'params': {
                'ip_reserved': {
                    # Gateway will be used for configure OpenStack networks
                    'gateway': 1,
                    # l2_network_device will be used for configure local bridge
                    'l2_network_device': 1,
                },
                'ip_ranges': {
                    'default': [2, -2],
                },
            },
        } for iname in interfaceorder
    }

    if 'public' in interfaceorder:
        # Put floating IP range for public network
        default_pool_name = 'default'
        floating_pool_name = 'floating'

        # Take a first subnet with necessary size and calculate the size
        net = IPNetwork(networks_pools['public'][0])
        new_prefix = int(networks_pools['public'][1])
        subnet = next(net.subnet(prefixlen=new_prefix))
        network_size = subnet.size

        address_pools['public']['params']['ip_ranges'][default_pool_name] = [
            2, network_size // 2 - 1]
        address_pools['public']['params']['ip_ranges'][floating_pool_name] = [
            network_size // 2, -2]

    return address_pools
开发者ID:loles,项目名称:fuel-devops,代码行数:35,代码来源:templates.py


示例3: allocateInterconnect

    def allocateInterconnect(self, interConnectPrefix, spines, leafs):
        numOfIpsPerInterconnect = 2
        numOfSubnets = len(spines) * len(leafs)
        # no need to add +2 for network and broadcast, as junos supports /31
        # TODO: it should be configurable and come from property file
        bitsPerSubnet = int(math.ceil(math.log(numOfIpsPerInterconnect, 2)))    # value is 1  
        cidrForEachSubnet = 32 - bitsPerSubnet  # value is 31 as junos supports /31

        numOfIps = (numOfSubnets * (numOfIpsPerInterconnect)) # no need to add +2 for network and broadcast
        numOfBits = int(math.ceil(math.log(numOfIps, 2))) 
        cidr = 32 - numOfBits
        interconnectBlock = IPNetwork(interConnectPrefix + "/" + str(cidr))
        interconnectSubnets = list(interconnectBlock.subnet(cidrForEachSubnet))

        interfaces = [] 
        spines[0].pod.allocatedInterConnectBlock = str(interconnectBlock.cidr)

        for spine in spines:
            ifdsHasPeer = self.dao.Session().query(InterfaceDefinition).filter(InterfaceDefinition.device_id == spine.id).filter(InterfaceDefinition.peer != None).order_by(InterfaceDefinition.name_order_num).all()
            for spineIfdHasPeer in ifdsHasPeer:
                subnet =  interconnectSubnets.pop(0)
                ips = list(subnet)
                
                spineEndIfl= InterfaceLogical(spineIfdHasPeer.name + '.0', spine, str(ips.pop(0)) + '/' + str(cidrForEachSubnet))
                spineIfdHasPeer.layerAboves.append(spineEndIfl)
                interfaces.append(spineEndIfl)
                
                leafEndIfd = spineIfdHasPeer.peer
                leafEndIfl= InterfaceLogical(leafEndIfd.name + '.0', leafEndIfd.device, str(ips.pop(0)) + '/' + str(cidrForEachSubnet))
                leafEndIfd.layerAboves.append(leafEndIfl)
                interfaces.append(leafEndIfl)
        self.dao.createObjects(interfaces)
开发者ID:bgp44,项目名称:juniper,代码行数:32,代码来源:l3Clos.py


示例4: createNewVlanRange

    def createNewVlanRange(cls):
        """ Increment current cidr of vlan range present in network
            and create new range
        """
        publicIpRange = PublicIpRange.list(cls.api_client)
        cls.startIp = publicIpRange[0].startip
        cls.endIp = publicIpRange[0].endip
        cls.gateway = publicIpRange[0].gateway
        cls.netmask = publicIpRange[0].netmask
        # Pass ip address and mask length to IPNetwork to findout the CIDR
        ip = IPNetwork(cls.startIp + "/" + cls.netmask)
        # Take random increment factor to avoid adding the same vlan ip range
        # in each test case
        networkIncrementFactor = random.randint(1,255)
        new_cidr = ip.__iadd__(networkIncrementFactor)
        ip2 = IPNetwork(new_cidr)
        test_nw = ip2.network
        ip = IPAddress(test_nw)
        # Add IP range(5 IPs) in the new CIDR
        test_gateway = ip.__add__(1)
        test_startIp = ip.__add__(3)
        test_endIp = ip.__add__(10)
        # Populating services with new IP range
        cls.testdata["vlan_ip_range"]["startip"] = test_startIp
        cls.testdata["vlan_ip_range"]["endip"] = test_endIp
        cls.testdata["vlan_ip_range"]["gateway"] = test_gateway
        cls.testdata["vlan_ip_range"]["netmask"] = cls.netmask
        cls.testdata["vlan_ip_range"]["zoneid"] = cls.zone.id
        cls.testdata["vlan_ip_range"]["podid"] = cls.pod.id

        return PublicIpRange.create(
                cls.api_client,
                cls.testdata["vlan_ip_range"])
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:33,代码来源:test_multiple_ip_ranges.py


示例5: validate_ip

    def validate_ip(self, request, remote_ip):
        # When we aren't configured to restrict on IP address
        if not getattr(settings, 'RESTRICTEDSESSIONS_RESTRICT_IP', True):
            return True
        # When the IP address key hasn't yet been set on the request session
        if SESSION_IP_KEY not in request.session:
            return True
        # When there is no remote IP, check if one has been set on the session
        session_ip = request.session[SESSION_IP_KEY]
        if not remote_ip:
            if session_ip:  # session has remote IP value so validate :-(
                return False
            else:  # Session doesn't have remote IP value so possibly :-)
                return True

        # Compute fuzzy IP compare based on settings on compare sensitivity
        session_network = IPNetwork(session_ip)
        remote_ip = IPAddress(remote_ip)
        try:
            session_network = session_network.ipv4()
            remote_ip = remote_ip.ipv4()
            session_network.prefixlen = getattr(settings, 'RESTRICTEDSESSIONS_IPV4_LENGTH', 32)
        except AddrConversionError:
            try:
                session_network.prefixlen = getattr(settings, 'RESTRICTEDSESSIONS_IPV6_LENGTH', 64)
            except AddrFormatError:
                # session_network must be IPv4, but remote_ip is IPv6
                return False
        return remote_ip in session_network
开发者ID:erikr,项目名称:django-restricted-sessions,代码行数:29,代码来源:middleware.py


示例6: reserve_ip_addr

    def reserve_ip_addr(self):
        """Picks first available IP address from weave network.

        If there's no available IP address anymore, ``IndexError``
        will be raised. To prevent this error, catch the exception
        or checks the value ``GluuCluster.ip_addr_available`` first
        before trying to call this method.

        :returns: A 2-elements tuple consists of IP address and network prefix,
                  e.g. ``("10.10.10.1", 24)``.
        """
        # represents a pool of IP addresses
        pool = IPNetwork(self.weave_ip_network)

        # a generator holds possible IP addresses range, excluding exposed weave IP
        ip_range = IPSet(pool.iter_hosts()) ^ IPSet(self.reserved_ip_addrs) ^ IPSet([self.exposed_weave_ip[0]])

        # retrieves first IP address from ``ip_range`` generator
        ip_addr = list(itertools.islice(ip_range, 1))[0]

        # register the IP address so it will be excluded
        # from possible IP range in subsequent requests
        self.reserved_ip_addrs.append(str(ip_addr))

        # weave IP address for container expects a traditional CIDR,
        # e.g. 10.10.10.1/24, hence we return the actual IP and
        # its prefix length
        return str(ip_addr), pool.prefixlen
开发者ID:hasanmehmood,项目名称:gluu-flask,代码行数:28,代码来源:gluu_cluster.py


示例7: split_subnets

def split_subnets(cidr, split):
    """
    Finds the nearest power of two to the number of desired subnets,
    Splits a CIDR into that many sub CIDRs, and returns the array.

    :param cidr: CIDR for entire range
    :type cidr: str.
    :param split: Number to split into
    :type split: int.
    :raises: :class:`pmcf.exceptions.PropertyException`
    :returns: list.
    """

    # Find the next closest power of two to a number
    # For 3, return 4, for 5 return 8

    if split == 1:
        return [cidr]

    power = 0
    while split > 0:
        split >>= 1
        power += 1

    try:
        ipaddr = IPNetwork(cidr)
        prefix = ipaddr.prefixlen
        newprefix = prefix + power
        return list(ipaddr.subnet(newprefix))
    except Exception, exc:
        raise PropertyException(str(exc))
开发者ID:pikselpalette,项目名称:pmcf,代码行数:31,代码来源:__init__.py


示例8: _add_search_filters

def _add_search_filters(filters, query):
    """
        Add Search Service response to filters
    """
    search_query = query

    # Try to parse IP/CIDR search
    if IP_CIDR_RE.match(query):
        try:
            network = IPNetwork(query)
            if network.size <= 4096:
                search_query = ' '.join([str(host) for host in network.iter_hosts()])
                search_query = search_query if search_query else query
        except (AttributeError, IndexError, AddrFormatError, AddrConversionError):
            pass
    try:
        reports = ImplementationFactory.instance.get_singleton_of('SearchServiceBase').search_reports(search_query)
        if not reports:
            reports = [None]
    except SearchServiceException:
        return

    if 'in' in filters['where']:
        for field in filters['where']['in']:
            for key, values in field.iteritems():
                if key == 'id' and len(values):
                    reports.extend(values)
                    filters['where']['in'].remove({key: values})
            filters['where']['in'].append({'id': list(set(reports))})
    else:
        filters['where']['in'] = [{'id': reports}]
开发者ID:ovh,项目名称:cerberus-core,代码行数:31,代码来源:ReportsController.py


示例9: populateDhcpGlobalSettings

    def populateDhcpGlobalSettings(self):
        ztp = {}
        ztpGlobalSettings = util.loadClosDefinition()['ztp']
        subnet = ztpGlobalSettings['dhcpSubnet']
        dhcpBlock = IPNetwork(subnet)
        ipList = list(dhcpBlock.iter_hosts())
        ztp['network'] = str(dhcpBlock.network)
        ztp['netmask'] = str(dhcpBlock.netmask)

        ztp['defaultRoute'] = ztpGlobalSettings.get('dhcpOptionRoute')
        if  ztp['defaultRoute'] is None or ztp['defaultRoute'] == '': 
            ztp['defaultRoute'] = str(ipList[0])

        ztp['rangeStart'] = ztpGlobalSettings.get('dhcpOptionRangeStart')
        if  ztp['rangeStart'] is None or ztp['rangeStart'] == '': 
            ztp['rangeStart'] = str(ipList[1])

        ztp['rangeEnd'] = ztpGlobalSettings.get('dhcpOptionRangeEnd')
        if  ztp['rangeEnd'] is None or ztp['rangeEnd'] == '': 
            ztp['rangeEnd'] = str(ipList[-1])

        ztp['broadcast'] = str(dhcpBlock.broadcast)
        ztp['httpServerIp'] = self.conf['httpServer']['ipAddr']
        ztp['imageUrl'] = ztpGlobalSettings.get('junosImage')

        return ztp
开发者ID:rgiyer,项目名称:OpenClos,代码行数:26,代码来源:ztp.py


示例10: add

def add(caller_id, address, mask):
    networks = []
    for net in AvailableNetwork.objects.all():
        networks.append(net.to_ipnetwork())
    networks.sort()

    # Find duplicate
    ipnet = IPNetwork('%s/%d' % (address, mask))
    for i in xrange(len(networks)):
        if ipnet.prefixlen > networks[i].prefixlen and ipnet > networks[i].previous() and ipnet < networks[i].next():
            raise CMException('network_exists')
        elif ipnet.prefixlen < networks[i].prefixlen and ipnet.previous() < networks[i] and ipnet.next() > networks[i]:
            raise CMException('network_exists')

    # Add new network
    new_net = AvailableNetwork()
    new_net.address = ipnet.network
    new_net.mask = mask

    if ipnet.is_private():
        new_net.state = available_network_states['ok']
    else:
        new_net.state = available_network_states['locked']

    new_net.save()
开发者ID:cc1-cloud,项目名称:cc1,代码行数:25,代码来源:network.py


示例11: calculate_vip_start

    def calculate_vip_start(self, vip_start, mgmtip, selfip_external):
        if not selfip_external and not vip_start:
            LOG.info('Skipping auto VIP generation.')
            return

        if selfip_external:
            if not isinstance(selfip_external, IPNetwork):
                selfip_external = IPNetwork(selfip_external)

            if selfip_external.prefixlen == 32:
                    LOG.info('Self IP external has no prefix. Assuming /16.')
                    selfip_external.prefixlen = 16

        if vip_start is None:
            # '1.1.1.1/24' -> '1.1.1.0/24'
            cidr = "{0.network}/{0.prefixlen}".format(mgmtip)
            host_id = mgmtip.ip.value - mgmtip.network.value
            subnet_index = SUBNET_MAP.get(cidr)

            if subnet_index is None:
                LOG.warning('The %s subnet was not found! '
                            'Skipping Virtual Servers.' % cidr)
                return

            # Start from 10.11.50.0 - 10.11.147.240 (planned for 10 subnets, 8 VIPs each)
            offset = 1 + 50 * 256 + DEFAULT_VIPS * (256 * subnet_index + host_id)

            vip_start = selfip_external.network + offset
        else:
            vip_start = IPAddress(vip_start)
        return vip_start
开发者ID:pombredanne,项目名称:f5test2,代码行数:31,代码来源:placer.py


示例12: _validateLoopbackPrefix

 def _validateLoopbackPrefix(self, pod, podDict, inventoryData):
     inventoryDeviceCount = len(inventoryData['spines']) + len(inventoryData['leafs'])
     lo0Block = IPNetwork(podDict['loopbackPrefix'])
     lo0Ips = list(lo0Block.iter_hosts())
     availableIps = len(lo0Ips)
     if availableIps < inventoryDeviceCount:
         raise ValueError("Pod[id='%s', name='%s']: loopbackPrefix available IPs %d not enough: required %d" % (pod.id, pod.name, availableIps, inventoryDeviceCount))
开发者ID:codyrat,项目名称:OpenClos,代码行数:7,代码来源:l3Clos.py


示例13: __init__

    def __init__(self):
        self.net1 = IPNetwork(NETWORK_1)
        self.net1_ip_pool = cycle(self.net1.iter_hosts())
        self.net2 = IPNetwork(NETWORK_2)
        self.net2_ip_pool = cycle(self.net2.iter_hosts())

        self.mcounter = dict()
        self.mac_counter = 0
开发者ID:SmartInfrastructures,项目名称:fuel-web-dev,代码行数:8,代码来源:fake_generator.py


示例14: _validateLoopbackPrefix

 def _validateLoopbackPrefix(self, pod, podDict, inventoryData):
     inventoryDeviceCount = len(inventoryData['spines']) + len(inventoryData['leafs'])
     lo0Block = IPNetwork(podDict['loopbackPrefix'])
     lo0Ips = list(lo0Block.iter_hosts())
     availableIps = len(lo0Ips)
     cidr = 32 - int(math.ceil(math.log(inventoryDeviceCount, 2)))
     if availableIps < inventoryDeviceCount:
         raise InsufficientLoopbackIp("Pod[id='%s', name='%s']: loopbackPrefix minimum required: %s/%d" % (pod.id, pod.name, lo0Block.ip, cidr))
开发者ID:Juniper,项目名称:OpenClos,代码行数:8,代码来源:l3Clos.py


示例15: test_ip_v4_to_ipv6_compatible

def test_ip_v4_to_ipv6_compatible():
    assert IPAddress('192.0.2.15').ipv6(ipv4_compatible=True) == IPAddress('::192.0.2.15')
    assert IPAddress('192.0.2.15').ipv6(ipv4_compatible=True).is_ipv4_compat()
    assert IPAddress('192.0.2.15').ipv6(True) == IPAddress('::192.0.2.15')

    ip = IPNetwork('192.0.2.1/23')
    assert ip.ipv4() == IPNetwork('192.0.2.1/23')
    assert ip.ipv6() == IPNetwork('::ffff:192.0.2.1/119')
    assert ip.ipv6(ipv4_compatible=True) == IPNetwork('::192.0.2.1/119')
开发者ID:drkjam,项目名称:netaddr,代码行数:9,代码来源:test_ip_v4_v6_conversions.py


示例16: allocate_tap_ips

 def allocate_tap_ips(self):
     #TODO: take tap subnet parameter
     lab_topology = self.nidb.topology[self.host]
     from netaddr import IPNetwork
     address_block = IPNetwork("172.16.0.0/16").iter_hosts()
     lab_topology.tap_host = address_block.next()
     lab_topology.tap_vm = address_block.next() # for tunnel host
     for node in self.nidb.nodes("is_l3device", host = self.host):
         #TODO: check this works for switches
         node.tap.ip = address_block.next()
开发者ID:sk2,项目名称:ANK-NG,代码行数:10,代码来源:compiler.py


示例17: is_cidr_reserved

def is_cidr_reserved(cidr, vpccidr):
    """ Check if CIDR is in the first 2 x /22 in the VPC Range """
    cidr = str(cidr)
    vpc = IPNetwork(vpccidr)
    reserved1 = list(vpc.subnet(22))[0]
    reserved2 = list(vpc.subnet(22))[1]

    if reserved1 == IPNetwork(cidr).supernet(22)[0] or reserved2 == IPNetwork(cidr).supernet(22)[0]:
        return True
    return False
开发者ID:bcosta12,项目名称:AdvancedCloudFormation,代码行数:10,代码来源:autosubnet.py


示例18: _allocateLoopback

 def _allocateLoopback(self, session, pod, loopbackPrefix, devices):
     loopbackIp = IPNetwork(loopbackPrefix).network
     numOfIps = len(devices) + 2 # +2 for network and broadcast
     numOfBits = int(math.ceil(math.log(numOfIps, 2))) 
     cidr = 32 - numOfBits
     lo0Block = IPNetwork(str(loopbackIp) + "/" + str(cidr))
     lo0Ips = list(lo0Block.iter_hosts())
     
     pod.allocatedLoopbackBlock = str(lo0Block.cidr)
     self._assignAllocatedLoopbackToDevices(session, devices, lo0Ips)
开发者ID:Juniper,项目名称:OpenClos,代码行数:10,代码来源:l3Clos.py


示例19: split_network

def split_network(request):

    try:
        network = IPNetwork(request.GET['network'])
    except (KeyError, AddrFormatError):
        return HttpResponseBadRequest()

    return render(request, 'pammy/split_network.html', {
        'networks': network.subnet(network.prefixlen + 1),
    })
开发者ID:ocadotechnology,项目名称:pammy,代码行数:10,代码来源:ui.py


示例20: get_admin_ip_w_prefix

    def get_admin_ip_w_prefix(node):
        """Getting admin ip and assign prefix from admin network."""
        network_manager = objects.Node.get_network_manager(node)
        admin_ip = network_manager.get_admin_ip_for_node(node)
        admin_ip = IPNetwork(admin_ip)

        # Assign prefix from admin network
        admin_net = IPNetwork(network_manager.get_admin_network_group().cidr)
        admin_ip.prefixlen = admin_net.prefixlen

        return str(admin_ip)
开发者ID:sk4lf,项目名称:fuel-web,代码行数:11,代码来源:deployment_serializers.py



注:本文中的netaddr.IPNetwork类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python netaddr.IPSet类代码示例发布时间:2022-05-27
下一篇:
Python netaddr.IPAddress类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap