• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python api.ip_address_find函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中quark.db.api.ip_address_find函数的典型用法代码示例。如果您正苦于以下问题:Python ip_address_find函数的具体用法?Python ip_address_find怎么用?Python ip_address_find使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了ip_address_find函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_find_reallocatable_ips_does_not_raise

    def test_find_reallocatable_ips_does_not_raise(self):
        """Regression testing

        A patch recently introduced a bug wherein addressses
        could not be returned to the ip_address_find call in
        attempt_to_reallocate_ip. Adding this test to prevent
        a future regression.
        """

        network = dict(name="public", tenant_id="fake")
        ipnet = netaddr.IPNetwork("0.0.0.0/24")
        next_ip = ipnet.ipv6().first + 2
        subnet = dict(id=1, cidr="0.0.0.0/24", next_auto_assign_ip=next_ip,
                      ip_policy=None, tenant_id="fake")

        with self._stubs(network, subnet) as net:
            ip_kwargs = {
                "network_id": net["id"], "reuse_after": 14400,
                "deallocated": True, "scope": db_api.ONE,
                "lock_mode": True, "version": 4,
                "order_by": "address"}

            try:
                db_api.ip_address_find(self.context, **ip_kwargs)
            except Exception:
                self.fail("This should not have raised")
开发者ID:mohanraj1311,项目名称:quark,代码行数:26,代码来源:test_ipam.py


示例2: test_ip_address_find_ip_address_list

 def test_ip_address_find_ip_address_list(self):
     ip_address = netaddr.IPAddress("192.168.10.1")
     try:
         db_api.ip_address_find(self.context, ip_address=[ip_address],
                                scope=db_api.ONE)
     except Exception as e:
         self.fail("Expected no exceptions: %s" % e)
开发者ID:Cerberus98,项目名称:quark,代码行数:7,代码来源:test_db_api.py


示例3: allocate_ip_address

    def allocate_ip_address(self, context, net_id, port_id, reuse_after,
                            version=None, ip_address=None):
        elevated = context.elevated()
        if ip_address:
            ip_address = netaddr.IPAddress(ip_address)

        address = db_api.ip_address_find(
            elevated, network_id=net_id, reuse_after=reuse_after,
            deallocated=True, scope=db_api.ONE, ip_address=ip_address)
        if address:
            return db_api.ip_address_update(
                elevated, address, deallocated=False, deallocated_at=None)

        subnet = self._choose_available_subnet(
            elevated, net_id, ip_address=ip_address, version=version)

        # Creating this IP for the first time
        next_ip = None
        if ip_address:
            next_ip = ip_address
        else:
            address = True
            while address:
                next_ip_int = int(subnet["next_auto_assign_ip"])
                next_ip = netaddr.IPAddress(next_ip_int)
                if subnet["ip_version"] == 4:
                    next_ip = next_ip.ipv4()
                subnet["next_auto_assign_ip"] = next_ip_int + 1
                address = db_api.ip_address_find(
                    elevated,
                    network_id=net_id,
                    ip_address=next_ip,
                    tenant_id=elevated.tenant_id,
                    scope=db_api.ONE)

        # TODO(mdietz): this is a hack until we have IP policies
        ip_int = int(next_ip)
        first_ip = netaddr.IPAddress(int(subnet["first_ip"]))
        last_ip = netaddr.IPAddress(int(subnet["last_ip"]))
        if subnet["ip_version"] == 4:
            first_ip = first_ip.ipv4()
            last_ip = last_ip.ipv4()
        first_ip = int(first_ip)
        last_ip = int(last_ip)

        diff = ip_int - first_ip
        if diff < 2:
            next_ip = netaddr.IPAddress(ip_int + (2 - diff))
        if ip_int == last_ip:
            raise exceptions.IpAddressGenerationFailure(net_id=net_id)
        if next_ip not in netaddr.IPNetwork(subnet["cidr"]):
            raise exceptions.IpAddressGenerationFailure(net_id=net_id)

        address = db_api.ip_address_create(
            elevated, address=next_ip, subnet_id=subnet["id"],
            version=subnet["ip_version"], network_id=net_id)

        return address
开发者ID:ugoring,项目名称:quark,代码行数:58,代码来源:ipam.py


示例4: test_ip_address_find_device_id

    def test_ip_address_find_device_id(self):
        query_mock = mock.Mock()
        filter_mock = mock.Mock()

        self.context.session.query = query_mock
        query_mock.return_value = filter_mock

        db_api.ip_address_find(self.context, device_id="foo")
        self.assertEqual(filter_mock.filter.call_count, 1)
开发者ID:Cerberus98,项目名称:quark,代码行数:9,代码来源:test_db_api.py


示例5: post_update_port

    def post_update_port(self, context, id, port):
        LOG.info("post_update_port %s for tenant %s" % (id, context.tenant_id))
        port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
        if not port_db:
            raise exceptions.PortNotFound(port_id=id, net_id="")

        if "port" not in port or not port["port"]:
            raise exceptions.BadRequest()
        port = port["port"]

        if "fixed_ips" in port and port["fixed_ips"]:
            for ip in port["fixed_ips"]:
                address = None
                if ip:
                    if "ip_id" in ip:
                        ip_id = ip["ip_id"]
                        address = db_api.ip_address_find(
                            context,
                            id=ip_id,
                            tenant_id=context.tenant_id,
                            scope=db_api.ONE)
                    elif "ip_address" in ip:
                        ip_address = ip["ip_address"]
                        net_address = netaddr.IPAddress(ip_address)
                        address = db_api.ip_address_find(
                            context,
                            ip_address=net_address,
                            network_id=port_db["network_id"],
                            tenant_id=context.tenant_id,
                            scope=db_api.ONE)
                        if not address:
                            address = self.ipam_driver.allocate_ip_address(
                                context,
                                port_db["network_id"],
                                id,
                                self.ipam_reuse_after,
                                ip_address=ip_address)
                else:
                    address = self.ipam_driver.allocate_ip_address(
                        context,
                        port_db["network_id"],
                        id,
                        self.ipam_reuse_after)

            address["deallocated"] = 0

            already_contained = False
            for port_address in port_db["ip_addresses"]:
                if address["id"] == port_address["id"]:
                    already_contained = True
                    break

            if not already_contained:
                port_db["ip_addresses"].append(address)
        return self._make_port_dict(port_db)
开发者ID:ugoring,项目名称:quark,代码行数:55,代码来源:plugin.py


示例6: test_ip_address_find_port_id_as_admin

    def test_ip_address_find_port_id_as_admin(self):
        self.context.session.query = mock.MagicMock()
        final_query_mock = self.context.session.query.return_value

        db_api.ip_address_find(self.context.elevated(), port_id="foo")
        # NOTE(thomasem): Creates sqlalchemy.sql.elements.BinaryExpression
        # when using SQLAlchemy models in expressions.
        expected_filter = models.IPAddress.ports.any(models.Port.id == "foo")
        self.assertEqual(len(final_query_mock.filter.call_args[0]), 1)
        self.assertEqual(str(expected_filter), str(
            final_query_mock.filter.call_args[0][0]))
开发者ID:Cerberus98,项目名称:quark,代码行数:11,代码来源:test_db_api.py


示例7: post_update_port

def post_update_port(context, id, port):
    LOG.info("post_update_port %s for tenant %s" % (id, context.tenant_id))
    if not port.get("port"):
        raise exceptions.BadRequest(resource="ports", msg="Port body required")

    with context.session.begin():
        port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
        if not port_db:
            raise exceptions.PortNotFound(port_id=id, net_id="")

        port = port["port"]
        if "fixed_ips" in port and port["fixed_ips"]:
            for ip in port["fixed_ips"]:
                address = None
                ipam_driver = ipam.IPAM_REGISTRY.get_strategy(port_db["network"]["ipam_strategy"])
                if ip:
                    if "ip_id" in ip:
                        ip_id = ip["ip_id"]
                        address = db_api.ip_address_find(
                            context, id=ip_id, tenant_id=context.tenant_id, scope=db_api.ONE
                        )
                    elif "ip_address" in ip:
                        ip_address = ip["ip_address"]
                        net_address = netaddr.IPAddress(ip_address)
                        address = db_api.ip_address_find(
                            context,
                            ip_address=net_address,
                            network_id=port_db["network_id"],
                            tenant_id=context.tenant_id,
                            scope=db_api.ONE,
                        )
                        if not address:
                            address = ipam_driver.allocate_ip_address(
                                context, port_db["network_id"], id, CONF.QUARK.ipam_reuse_after, ip_address=ip_address
                            )
                else:
                    address = ipam_driver.allocate_ip_address(
                        context, port_db["network_id"], id, CONF.QUARK.ipam_reuse_after
                    )

            address["deallocated"] = 0

            already_contained = False
            for port_address in port_db["ip_addresses"]:
                if address["id"] == port_address["id"]:
                    already_contained = True
                    break

            if not already_contained:
                port_db["ip_addresses"].append(address)
    return v._make_port_dict(port_db)
开发者ID:blamarvt,项目名称:quark,代码行数:51,代码来源:ports.py


示例8: test_ip_address_find_address_type_as_admin

    def test_ip_address_find_address_type_as_admin(self):
        self.context.session.query = mock.MagicMock()
        filter_mock = self.context.session.query.return_value

        db_api.ip_address_find(self.context.elevated(), address_type="foo")
        # NOTE(thomasem): Creates sqlalchemy.sql.elements.BinaryExpression
        # when using SQLAlchemy models in expressions.
        expected_filter = models.IPAddress.address_type == "foo"
        self.assertEqual(len(filter_mock.filter.call_args[0]), 1)
        # NOTE(thomasem): Unfortunately BinaryExpression.compare isn't
        # showing to be a reliable comparison, so using the string
        # representation which dumps the associated SQL for the filter.
        self.assertEqual(str(expected_filter), str(
            filter_mock.filter.call_args[0][0]))
开发者ID:Cerberus98,项目名称:quark,代码行数:14,代码来源:test_db_api.py


示例9: test_address_not_in_cidr

    def test_address_not_in_cidr(self):
        self.network_db = self.insert_network()
        self.subnet_v4_db = self.insert_subnet(
            self.network_db, "192.168.0.0/24")
        self.ip_address_v4 = netaddr.IPAddress("192.168.1.1")
        ip_address_db = self.insert_ip_address(self.ip_address_v4,
                                               self.network_db,
                                               self.subnet_v4_db)
        self.transaction = self.insert_transaction()
        ip_kwargs = {
            "network_id": self.network_db["id"],
            "reuse_after": self.REUSE_AFTER,
            "deallocated": True,
            "version": 4,
        }
        reallocated = db_api.ip_address_reallocate(
            self.context,
            {"transaction_id": self.transaction.id},
            **ip_kwargs)
        self.assertTrue(reallocated)

        updated_address = db_api.ip_address_reallocate_find(
            self.context, self.transaction.id)
        self.assertIsNone(updated_address)

        self.context.session.flush()
        self.assertIsNone(db_api.ip_address_find(self.context,
                                                 id=ip_address_db.id,
                                                 scope=db_api.ONE))
开发者ID:Anonymike,项目名称:quark,代码行数:29,代码来源:test_db_ip_reallocate.py


示例10: update_port_for_ip_address

def update_port_for_ip_address(context, ip_id, id, port):
    """Update values of a port.

    : param context: neutron api request context
    : param ip_id: UUID representing the ip associated with port to update
    : param id: UUID representing the port to update.
    : param port: dictionary with keys indicating fields to update.
        valid keys are those that have a value of True for 'allow_put'
        as listed in the RESOURCE_ATTRIBUTE_MAP object in
        neutron/api/v2/attributes.py.
    """
    LOG.info("update_port %s for tenant %s" % (id, context.tenant_id))
    sanitize_list = ['service']
    with context.session.begin():
        addr = db_api.ip_address_find(context, id=ip_id, scope=db_api.ONE)
        if not addr:
            raise q_exc.IpAddressNotFound(addr_id=ip_id)
        port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
        if not port_db:
            raise q_exc.PortNotFound(port_id=id)
        port_dict = {k: port['port'][k] for k in sanitize_list}

        require_da = False
        service = port_dict.get('service')

        if require_da and _shared_ip_and_active(addr, except_port=id):
            raise q_exc.PortRequiresDisassociation()
        addr.set_service_for_port(port_db, service)
        context.session.add(addr)
    return v._make_port_for_ip_dict(addr, port_db)
开发者ID:roaet,项目名称:quark,代码行数:30,代码来源:ip_addresses.py


示例11: test_reserve_ip_non_admin

 def test_reserve_ip_non_admin(self):
     with self._stubs() as ip:
         deallocated_ip = ip_addr.update_ip_address(self.context, ip["id"],
                                                    self.ip_address_dealloc)
         ip_address = db_api.ip_address_find(
             self.context,
             id=deallocated_ip["id"],
             scope=db_api.ONE)
         self.assertEqual(ip_address["deallocated"], True)
         deallocated_ip = ip_addr.update_ip_address(self.context, ip["id"],
                                                    self.ip_address_reserve)
         ip_address = db_api.ip_address_find(
             self.context,
             id=deallocated_ip["id"],
             scope=db_api.ONE)
         self.assertEqual(ip_address["deallocated"], True)
开发者ID:Anonymike,项目名称:quark,代码行数:16,代码来源:test_ip_addresses.py


示例12: attempt_to_reallocate_ip

    def attempt_to_reallocate_ip(self, context, net_id, port_id, reuse_after,
                                 version=None, ip_address=None):
        version = version or [4, 6]
        elevated = context.elevated()

        # We never want to take the chance of an infinite loop here. Instead,
        # we'll clean up multiple bad IPs if we find them (assuming something
        # is really wrong)
        for times in xrange(3):
            with context.session.begin(subtransactions=True):
                address = db_api.ip_address_find(
                    elevated, network_id=net_id, reuse_after=reuse_after,
                    deallocated=True, scope=db_api.ONE, ip_address=ip_address,
                    lock_mode=True, version=version, order_by="address")

                if address:
                    #NOTE(mdietz): We should always be in the CIDR but we've
                    #              also said that before :-/
                    if address.get("subnet"):
                        cidr = netaddr.IPNetwork(address["subnet"]["cidr"])
                        addr = netaddr.IPAddress(int(address["address"]),
                                                 version=int(cidr.version))
                        if addr in cidr:
                            updated_address = db_api.ip_address_update(
                                elevated, address, deallocated=False,
                                deallocated_at=None,
                                allocated_at=timeutils.utcnow())
                            return [updated_address]
                        else:
                            # Make sure we never find it again
                            context.session.delete(address)
                            continue
                break
        return []
开发者ID:blamarvt,项目名称:quark,代码行数:34,代码来源:ipam.py


示例13: _allocate_ips_from_subnets

    def _allocate_ips_from_subnets(self, context, net_id, subnets,
                                   ip_address=None):
        new_addresses = []
        for subnet in subnets:
            ip_policy_cidrs = models.IPPolicy.get_ip_policy_cidrs(subnet)
            # Creating this IP for the first time
            next_ip = None
            if ip_address:
                next_ip = ip_address
                address = db_api.ip_address_find(
                    context, network_id=net_id, ip_address=next_ip,
                    used_by_tenant_id=context.tenant_id, scope=db_api.ONE)
                if address:
                    raise exceptions.IpAddressGenerationFailure(
                        net_id=net_id)
            else:
                next_ip = self._iterate_until_available_ip(
                    context, subnet, net_id, ip_policy_cidrs)

            context.session.add(subnet)
            address = db_api.ip_address_create(
                context, address=next_ip, subnet_id=subnet["id"],
                version=subnet["ip_version"], network_id=net_id)
            address["deallocated"] = 0
            new_addresses.append(address)
        return new_addresses
开发者ID:blamarvt,项目名称:quark-1,代码行数:26,代码来源:ipam.py


示例14: delete_ip_address

def delete_ip_address(context, id):
    """Delete an ip address.

    : param context: neutron api request context
    : param id: UUID representing the ip address to delete.
    """
    LOG.info("delete_ip_address %s for tenant %s" % (id, context.tenant_id))
    with context.session.begin():
        ip_address = db_api.ip_address_find(
            context, id=id, scope=db_api.ONE)
        if not ip_address or ip_address.deallocated:
            raise q_exc.IpAddressNotFound(addr_id=id)

        iptype = ip_address.address_type
        if iptype == ip_types.FIXED and not CONF.QUARK.ipaddr_allow_fixed_ip:
            raise n_exc.BadRequest(
                resource="ip_addresses",
                msg="Fixed ips cannot be updated using this interface.")

        if ip_address.has_any_shared_owner():
            raise q_exc.PortRequiresDisassociation()

        db_api.update_port_associations_for_ip(context, [], ip_address)

        ipam_driver.deallocate_ip_address(context, ip_address)
开发者ID:roaet,项目名称:quark,代码行数:25,代码来源:ip_addresses.py


示例15: get_ip_addresses

def get_ip_addresses(context, **filters):
    LOG.info("get_ip_addresses for tenant %s" % context.tenant_id)
    if not filters:
        filters = {}
    if 'type' in filters:
        filters['address_type'] = filters['type']
    if context.is_admin:
        if 'deallocated' in filters:
            if filters['deallocated'] == 'True':
                filters["_deallocated"] = True
            elif filters['deallocated'] == 'False':
                filters["_deallocated"] = False
            elif filters['deallocated'].lower() == 'both':
                pass
            else:
                filters['_deallocated'] = False
        else:
            filters['_deallocated'] = False
    else:
        filters["_deallocated"] = False

    if 'deallocated' in filters:
        # this needs to be removed or it corrupts the filters passed to the db
        # model. Only _deallocated needs to be present
        del filters['deallocated']

    addrs = db_api.ip_address_find(context, scope=db_api.ALL, **filters)
    return [v._make_ip_dict(ip, context.is_admin) for ip in addrs]
开发者ID:Anonymike,项目名称:quark,代码行数:28,代码来源:ip_addresses.py


示例16: update_ip_address

def update_ip_address(context, id, ip_address):
    LOG.info("update_ip_address %s for tenant %s" %
            (id, context.tenant_id))

    address = db_api.ip_address_find(
        context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE)

    if not address:
        raise exceptions.NotFound(
            message="No IP address found with id=%s" % id)

    old_ports = address['ports']
    port_ids = ip_address['ip_address'].get('port_ids')
    if port_ids is None:
        return v._make_ip_dict(address)

    for port in old_ports:
        port['ip_addresses'].remove(address)

    if port_ids:
        ports = db_api.port_find(
            context, tenant_id=context.tenant_id, id=port_ids,
            scope=db_api.ALL)

        # NOTE: could be considered inefficient because we're converting
        #       to a list to check length. Maybe revisit
        if len(ports) != len(port_ids):
            raise exceptions.NotFound(
                message="No ports not found with ids=%s" % port_ids)
        for port in ports:
            port['ip_addresses'].extend([address])
    else:
        address["deallocated"] = 1

    return v._make_ip_dict(address)
开发者ID:kilogram,项目名称:quark,代码行数:35,代码来源:ip_addresses.py


示例17: test_ip_address_find_ip_address_object_list_none

 def test_ip_address_find_ip_address_object_list_none(self):
     with self._stubs():
         ip_addresses = db_api.ip_address_find(
             self.context,
             ip_address=[netaddr.IPAddress("192.168.10.2")],
             scope=db_api.ALL)
         self.assertEqual(len(ip_addresses), 0)
开发者ID:mohanraj1311,项目名称:quark,代码行数:7,代码来源:test_ip_addresses.py


示例18: get_ip_address

def get_ip_address(context, id):
    LOG.info("get_ip_address %s for tenant %s" %
             (id, context.tenant_id))
    addr = db_api.ip_address_find(context, id=id, scope=db_api.ONE)
    if not addr:
        raise quark_exceptions.IpAddressNotFound(addr_id=id)
    return v._make_ip_dict(addr)
开发者ID:sumanthns,项目名称:quark,代码行数:7,代码来源:ip_addresses.py


示例19: allocate_ip_address

    def allocate_ip_address(self, context, net_id, port_id, reuse_after,
                            version=None, ip_address=None):
        elevated = context.elevated()
        if ip_address:
            ip_address = netaddr.IPAddress(ip_address)

        address = db_api.ip_address_find(
            elevated, network_id=net_id, reuse_after=reuse_after,
            deallocated=True, scope=db_api.ONE, ip_address=ip_address)
        if address:
            return db_api.ip_address_update(
                elevated, address, deallocated=False, deallocated_at=None)

        subnet = self._choose_available_subnet(
            elevated, net_id, ip_address=ip_address, version=version)
        ip_policy_rules = self.get_ip_policy_rule_set(subnet)

        # Creating this IP for the first time
        next_ip = None
        if ip_address:
            next_ip = ip_address
            address = db_api.ip_address_find(
                elevated, network_id=net_id, ip_address=next_ip,
                tenant_id=elevated.tenant_id, scope=db_api.ONE)
            if address:
                raise exceptions.IpAddressGenerationFailure(net_id=net_id)
        else:
            address = True
            while address:
                next_ip_int = int(subnet["next_auto_assign_ip"])
                next_ip = netaddr.IPAddress(next_ip_int)
                if subnet["ip_version"] == 4:
                    next_ip = next_ip.ipv4()
                subnet["next_auto_assign_ip"] = next_ip_int + 1
                if ip_policy_rules and next_ip in ip_policy_rules:
                    continue
                address = db_api.ip_address_find(
                    elevated, network_id=net_id, ip_address=next_ip,
                    tenant_id=elevated.tenant_id, scope=db_api.ONE)

        address = db_api.ip_address_create(
            elevated, address=next_ip, subnet_id=subnet["id"],
            version=subnet["ip_version"], network_id=net_id)
        address["deallocated"] = 0

        return address
开发者ID:kilogram,项目名称:quark,代码行数:46,代码来源:ipam.py


示例20: get_ip_address

def get_ip_address(context, id):
    LOG.info("get_ip_address %s for tenant %s" %
             (id, context.tenant_id))
    filters = {}
    filters["_deallocated"] = False
    addr = db_api.ip_address_find(context, id=id, scope=db_api.ONE, **filters)
    if not addr:
        raise q_exc.IpAddressNotFound(addr_id=id)
    return v._make_ip_dict(addr)
开发者ID:roaet,项目名称:quark,代码行数:9,代码来源:ip_addresses.py



注:本文中的quark.db.api.ip_address_find函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python api.network_create函数代码示例发布时间:2022-05-26
下一篇:
Python client.Client类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap