• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.get_ip_version函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.common.utils.get_ip_version函数的典型用法代码示例。如果您正苦于以下问题:Python get_ip_version函数的具体用法?Python get_ip_version怎么用?Python get_ip_version使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_ip_version函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: create_flows_from_rule_and_port

def create_flows_from_rule_and_port(rule, port):
    ethertype = rule['ethertype']
    direction = rule['direction']
    dst_ip_prefix = rule.get('dest_ip_prefix')
    src_ip_prefix = rule.get('source_ip_prefix')

    flow_template = {
        'priority': 70,
        'dl_type': ovsfw_consts.ethertype_to_dl_type_map[ethertype],
        'reg_port': port.ofport,
    }

    if is_valid_prefix(dst_ip_prefix):
        flow_template[FLOW_FIELD_FOR_IPVER_AND_DIRECTION[(
            utils.get_ip_version(dst_ip_prefix), firewall.EGRESS_DIRECTION)]
        ] = dst_ip_prefix

    if is_valid_prefix(src_ip_prefix):
        flow_template[FLOW_FIELD_FOR_IPVER_AND_DIRECTION[(
            utils.get_ip_version(src_ip_prefix), firewall.INGRESS_DIRECTION)]
        ] = src_ip_prefix

    flows = create_protocol_flows(direction, flow_template, port, rule)

    return flows
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:25,代码来源:rules.py


示例2: create_flows_from_rule_and_port

def create_flows_from_rule_and_port(rule, port, conjunction=False):
    """Create flows from given args.
    For description of the optional conjunction arg, see flow_priority_offset.
    """
    ethertype = rule['ethertype']
    direction = rule['direction']
    dst_ip_prefix = rule.get('dest_ip_prefix')
    src_ip_prefix = rule.get('source_ip_prefix')

    flow_template = {
        'priority': 70 + flow_priority_offset(rule, conjunction),
        'dl_type': ovsfw_consts.ethertype_to_dl_type_map[ethertype],
        'reg_port': port.ofport,
    }

    if is_valid_prefix(dst_ip_prefix):
        flow_template[FLOW_FIELD_FOR_IPVER_AND_DIRECTION[(
            utils.get_ip_version(dst_ip_prefix), n_consts.EGRESS_DIRECTION)]
        ] = dst_ip_prefix

    if is_valid_prefix(src_ip_prefix):
        flow_template[FLOW_FIELD_FOR_IPVER_AND_DIRECTION[(
            utils.get_ip_version(src_ip_prefix), n_consts.INGRESS_DIRECTION)]
        ] = src_ip_prefix

    flows = create_protocol_flows(direction, flow_template, port, rule)

    return flows
开发者ID:igordcard,项目名称:neutron,代码行数:28,代码来源:rules.py


示例3: create_flows_for_ip_address

def create_flows_for_ip_address(ip_address, direction, ethertype,
                                vlan_tag, conj_ids):
    """Create flows from a rule and an ip_address derived from
    remote_group_id
    """

    ip_prefix = str(netaddr.IPNetwork(ip_address).cidr)

    flow_template = {
        'priority': 70,
        'dl_type': ovsfw_consts.ethertype_to_dl_type_map[ethertype],
        'reg_net': vlan_tag,  # needed for project separation
    }

    ip_ver = utils.get_ip_version(ip_prefix)

    if direction == firewall.EGRESS_DIRECTION:
        flow_template['table'] = ovs_consts.RULES_EGRESS_TABLE
    elif direction == firewall.INGRESS_DIRECTION:
        flow_template['table'] = ovs_consts.RULES_INGRESS_TABLE

    flow_template[FLOW_FIELD_FOR_IPVER_AND_DIRECTION[(
        ip_ver, direction)]] = ip_prefix

    return substitute_conjunction_actions([flow_template], 1, conj_ids)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:25,代码来源:rules.py


示例4: delete_gateway

 def delete_gateway(self, gateway, table=None):
     ip_version = common_utils.get_ip_version(gateway)
     args = ['del', 'default',
             'via', gateway]
     args += self._dev_args()
     args += self._table_args(table)
     self._run_as_root_detect_device_not_found([ip_version], args)
开发者ID:huntxu,项目名称:neutron,代码行数:7,代码来源:ip_lib.py


示例5: add_gateway

 def add_gateway(self, gateway, metric=None, table=None):
     ip_version = common_utils.get_ip_version(gateway)
     args = ['replace', 'default', 'via', gateway]
     if metric:
         args += ['metric', metric]
     args += self._dev_args()
     args += self._table_args(table)
     self._as_root([ip_version], tuple(args))
开发者ID:huntxu,项目名称:neutron,代码行数:8,代码来源:ip_lib.py


示例6: get_gateway_ips

 def get_gateway_ips(gateway_port):
     gw_ips = {}
     if gateway_port:
         for subnet in gateway_port.get('subnets', []):
             gateway_ip = subnet.get('gateway_ip', None)
             if gateway_ip:
                 ip_version = common_utils.get_ip_version(gateway_ip)
                 gw_ips[ip_version] = gateway_ip
     return gw_ips
开发者ID:igordcard,项目名称:neutron,代码行数:9,代码来源:dvr_fip_ns.py


示例7: delete

    def delete(self, ip, **kwargs):
        ip_version = common_utils.get_ip_version(ip)

        # TODO(Carl) ip ignored in delete, okay in general?

        canonical_kwargs = self._make_canonical(ip_version, kwargs)

        args_tuple = self._make__flat_args_tuple('del', **canonical_kwargs)
        self._as_root([ip_version], args_tuple)
开发者ID:eayunstack,项目名称:neutron,代码行数:9,代码来源:ip_lib.py


示例8: _add_default_gateway_for_fip

 def _add_default_gateway_for_fip(self, gw_ip, ip_device, tbl_index):
     """Adds default gateway for fip based on the tbl_index passed."""
     if tbl_index is None:
         ip_version = common_utils.get_ip_version(gw_ip)
         tbl_index_list = self.get_fip_table_indexes(ip_version)
         for tbl_index in tbl_index_list:
             ip_device.route.add_gateway(gw_ip, table=tbl_index)
     else:
         ip_device.route.add_gateway(gw_ip, table=tbl_index)
开发者ID:igordcard,项目名称:neutron,代码行数:9,代码来源:dvr_fip_ns.py


示例9: delete_route

 def delete_route(self, cidr, via=None, table=None, **kwargs):
     ip_version = common_utils.get_ip_version(cidr)
     args = ['del', cidr]
     if via:
         args += ['via', via]
     args += self._dev_args()
     args += self._table_args(table)
     for k, v in kwargs.items():
         args += [k, v]
     self._run_as_root_detect_device_not_found([ip_version], args)
开发者ID:huntxu,项目名称:neutron,代码行数:10,代码来源:ip_lib.py


示例10: _test_icmp_connectivity

 def _test_icmp_connectivity(self, direction, protocol, src_port, dst_port):
     src_namespace, ip_address = self._get_namespace_and_address(direction)
     ip_version = common_utils.get_ip_version(ip_address)
     icmp_timeout = ICMP_VERSION_TIMEOUTS[ip_version]
     try:
         net_helpers.assert_ping(src_namespace, ip_address,
                                 timeout=icmp_timeout)
     except RuntimeError:
         raise ConnectionTesterException(
             "ICMP packets can't get from %s namespace to %s address" % (
                 src_namespace, ip_address))
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:11,代码来源:conn_testers.py


示例11: start

 def start(self):
     if self.proc and self.proc.is_running:
         raise RuntimeError("This pinger has already a running process")
     ip_version = common_utils.get_ip_version(self.address)
     ping_exec = 'ping' if ip_version == n_const.IP_VERSION_4 else 'ping6'
     cmd = [ping_exec, self.address, '-W', str(self.timeout)]
     if self.count:
         cmd.extend(['-c', str(self.count)])
     if self.interval:
         cmd.extend(['-i', str(self.interval)])
     self.proc = RootHelperProcess(cmd, namespace=self.namespace)
开发者ID:openstack,项目名称:neutron,代码行数:11,代码来源:net_helpers.py


示例12: add

    def add(self, ip, **kwargs):
        ip_version = common_utils.get_ip_version(ip)

        # In case if we need to add in a rule based on incoming
        # interface we don't need to pass in the ip.
        if not kwargs.get('iif'):
            kwargs.update({'from': ip})
        canonical_kwargs = self._make_canonical(ip_version, kwargs)

        if not self._exists(ip_version, **canonical_kwargs):
            args_tuple = self._make__flat_args_tuple('add', **canonical_kwargs)
            self._as_root([ip_version], args_tuple)
开发者ID:SUSE-Cloud,项目名称:neutron,代码行数:12,代码来源:ip_lib.py


示例13: delete

    def delete(self, ip, **kwargs):
        ip_version = common_utils.get_ip_version(ip)

        # In case we need to delete a rule based on an incoming
        # interface, pass the "any" IP address, for example, 0.0.0.0/0,
        # else pass the given IP.
        if kwargs.get('iif'):
            kwargs.update({'from': constants.IP_ANY[ip_version]})
        else:
            kwargs.update({'from': ip})
        canonical_kwargs = self._make_canonical(ip_version, kwargs)

        args_tuple = self._make__flat_args_tuple('del', **canonical_kwargs)
        self._as_root([ip_version], args_tuple)
开发者ID:huntxu,项目名称:neutron,代码行数:14,代码来源:ip_lib.py


示例14: add_neigh_entry

def add_neigh_entry(ip_address, mac_address, device, namespace=None, **kwargs):
    """Add a neighbour entry.

    :param ip_address: IP address of entry to add
    :param mac_address: MAC address of entry to add
    :param device: Device name to use in adding entry
    :param namespace: The name of the namespace in which to add the entry
    """
    ip_version = common_utils.get_ip_version(ip_address)
    privileged.add_neigh_entry(ip_version,
                               ip_address,
                               mac_address,
                               device,
                               namespace,
                               **kwargs)
开发者ID:huntxu,项目名称:neutron,代码行数:15,代码来源:ip_lib.py


示例15: test_add_rule_ip

    def test_add_rule_ip(self):
        ip_addresses = ['192.168.200.250', '2001::250']
        for ip_address in ip_addresses:
            ip_version = common_utils.get_ip_version(ip_address)
            ip_lenght = common_utils.get_network_length(ip_version)
            ip_family = common_utils.get_socket_address_family(ip_version)
            priv_ip_lib.add_ip_rule(self.namespace, src=ip_address,
                                    src_len=ip_lenght, family=ip_family)
            rules = ip_lib.list_ip_rules(self.namespace, ip_version)
            self._check_rules(rules, ['from'], [ip_address],
                              '"from" IP address %s' % ip_address)

            priv_ip_lib.delete_ip_rule(self.namespace, family=ip_family,
                                       src=ip_address, src_len=ip_lenght)
            rules = ip_lib.list_ip_rules(self.namespace, ip_version)
            self.assertFalse(
                self._check_rules(rules, ['from'], [ip_address],
                                  raise_exception=False))
开发者ID:noironetworks,项目名称:neutron,代码行数:18,代码来源:test_ip_lib.py


示例16: test_add_rule_table

    def test_add_rule_table(self):
        table = 212
        ip_addresses = ['192.168.200.251', '2001::251']
        for ip_address in ip_addresses:
            ip_version = common_utils.get_ip_version(ip_address)
            ip_lenght = common_utils.get_network_length(ip_version)
            ip_family = common_utils.get_socket_address_family(ip_version)
            priv_ip_lib.add_ip_rule(self.namespace, table=table,
                                    src=ip_address, src_len=ip_lenght,
                                    family=ip_family)
            rules = ip_lib.list_ip_rules(self.namespace, ip_version)
            self._check_rules(
                rules, ['table', 'from'], [str(table), ip_address],
                'table %s and "from" IP address %s' % (table, ip_address))

            priv_ip_lib.delete_ip_rule(self.namespace, table=table,
                                       src=ip_address, src_len=ip_lenght,
                                       family=ip_family)
            rules = ip_lib.list_ip_rules(self.namespace, ip_version)
            self.assertFalse(
                self._check_rules(rules, ['table', 'from'],
                                  [str(table), ip_address],
                                  raise_exception=False))
开发者ID:noironetworks,项目名称:neutron,代码行数:23,代码来源:test_ip_lib.py


示例17: create_flows_for_ip_address

def create_flows_for_ip_address(ip_address, direction, ethertype,
                                vlan_tag, conj_ids):
    """Create flows from a rule and an ip_address derived from
    remote_group_id
    """

    # Group conj_ids per priority.
    conj_id_lists = [[] for i in range(4)]
    for conj_id in conj_ids:
        conj_id_lists[
            _flow_priority_offset_from_conj_id(conj_id)].append(conj_id)

    ip_prefix = str(netaddr.IPNetwork(ip_address).cidr)

    flow_template = {
        'dl_type': ovsfw_consts.ethertype_to_dl_type_map[ethertype],
        'reg_net': vlan_tag,  # needed for project separation
    }

    ip_ver = utils.get_ip_version(ip_prefix)

    if direction == n_consts.EGRESS_DIRECTION:
        flow_template['table'] = ovs_consts.RULES_EGRESS_TABLE
    elif direction == n_consts.INGRESS_DIRECTION:
        flow_template['table'] = ovs_consts.RULES_INGRESS_TABLE

    flow_template[FLOW_FIELD_FOR_IPVER_AND_DIRECTION[(
        ip_ver, direction)]] = ip_prefix

    result = []
    for offset, conj_id_list in enumerate(conj_id_lists):
        if not conj_id_list:
            continue
        flow_template['priority'] = 70 + offset
        result.extend(
            substitute_conjunction_actions([flow_template], 1, conj_id_list))
    return result
开发者ID:igordcard,项目名称:neutron,代码行数:37,代码来源:rules.py


示例18: test_add_rule_priority

    def test_add_rule_priority(self):
        priority = 12345
        ip_addresses = ['192.168.200.252', '2001::252']
        for ip_address in ip_addresses:
            ip_version = common_utils.get_ip_version(ip_address)
            ip_lenght = common_utils.get_network_length(ip_version)
            ip_family = common_utils.get_socket_address_family(ip_version)
            priv_ip_lib.add_ip_rule(self.namespace, priority=priority,
                                    src=ip_address, src_len=ip_lenght,
                                    family=ip_family)
            rules = ip_lib.list_ip_rules(self.namespace, ip_version)
            self._check_rules(
                rules, ['priority', 'from'], [str(priority), ip_address],
                'priority %s and "from" IP address %s' %
                (priority, ip_address))

            priv_ip_lib.delete_ip_rule(self.namespace, priority=priority,
                                       src=ip_address, src_len=ip_lenght,
                                       family=ip_family)
            rules = ip_lib.list_ip_rules(self.namespace, ip_version)
            self.assertFalse(
                self._check_rules(rules, ['priority', 'from'],
                                  [str(priority), ip_address],
                                  raise_exception=False))
开发者ID:noironetworks,项目名称:neutron,代码行数:24,代码来源:test_ip_lib.py


示例19: has_expected_arp_entry

def has_expected_arp_entry(device_name, namespace, ip, mac):
    ip_version = utils.get_ip_version(ip)
    entry = ip_lib.dump_neigh_entries(ip_version, device_name, namespace,
                                      dst=ip, lladdr=mac)
    return entry != []
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:5,代码来源:test_keepalived_state_change.py


示例20: fdb_ip_entry_exists

 def fdb_ip_entry_exists(mac, ip, interface):
     ip_version = utils.get_ip_version(ip)
     entry = ip_lib.dump_neigh_entries(ip_version, interface, dst=ip,
                                       lladdr=mac)
     return entry != []
开发者ID:noironetworks,项目名称:neutron,代码行数:5,代码来源:linuxbridge_neutron_agent.py



注:本文中的neutron.common.utils.get_ip_version函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.get_other_dvr_serviced_device_owners函数代码示例发布时间:2022-05-27
下一篇:
Python utils.get_hostname函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap