• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python l3_test_common.prepare_router_data函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.tests.common.l3_test_common.prepare_router_data函数的典型用法代码示例。如果您正苦于以下问题:Python prepare_router_data函数的具体用法?Python prepare_router_data怎么用?Python prepare_router_data使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了prepare_router_data函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: generate_router_info

    def generate_router_info(self, enable_ha,
                             ip_version=constants.IP_VERSION_4,
                             extra_routes=True,
                             enable_fip=True, enable_snat=True,
                             num_internal_ports=1,
                             dual_stack=False, enable_gw=True,
                             v6_ext_gw_with_sub=True,
                             enable_pf_floating_ip=False,
                             qos_policy_id=None):
        if ip_version == constants.IP_VERSION_6 and not dual_stack:
            enable_snat = False
            enable_fip = False
            extra_routes = False

        return l3_test_common.prepare_router_data(ip_version=ip_version,
                                                  enable_snat=enable_snat,
                                                  num_internal_ports=(
                                                      num_internal_ports),
                                                  enable_floating_ip=(
                                                      enable_fip),
                                                  enable_ha=enable_ha,
                                                  extra_routes=extra_routes,
                                                  dual_stack=dual_stack,
                                                  enable_gw=enable_gw,
                                                  v6_ext_gw_with_sub=(
                                                      v6_ext_gw_with_sub),
                                                  enable_pf_floating_ip=(
                                                      enable_pf_floating_ip),
                                                  qos_policy_id=qos_policy_id)
开发者ID:openstack,项目名称:neutron,代码行数:29,代码来源:framework.py


示例2: test_add_centralized_floatingip

    def test_add_centralized_floatingip(self,
                                        super_add_centralized_floatingip):
        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
        agent.conf.agent_mode = lib_constants.L3_AGENT_MODE_DVR_SNAT
        router = l3_test_common.prepare_router_data(num_internal_ports=2)
        router['gw_port_host'] = HOSTNAME
        self.mock_driver.unplug.reset_mock()
        self._set_ri_kwargs(agent, router['id'], router)
        fip = {'id': _uuid()}
        fip_cidr = '11.22.33.44/24'

        ri = dvr_edge_ha_rtr.DvrEdgeHaRouter(HOSTNAME, [], **self.ri_kwargs)
        ri.is_router_master = mock.Mock(return_value=False)
        ri._add_vip = mock.Mock()
        interface_name = ri.get_snat_external_device_interface_name(
            ri.get_ex_gw_port())
        ri.add_centralized_floatingip(fip, fip_cidr)
        ri._add_vip.assert_called_once_with(fip_cidr, interface_name)
        super_add_centralized_floatingip.assert_not_called()

        ri1 = dvr_edge_ha_rtr.DvrEdgeHaRouter(HOSTNAME, [], **self.ri_kwargs)
        ri1.is_router_master = mock.Mock(return_value=True)
        ri1._add_vip = mock.Mock()
        interface_name = ri1.get_snat_external_device_interface_name(
            ri1.get_ex_gw_port())
        ri1.add_centralized_floatingip(fip, fip_cidr)
        ri1._add_vip.assert_called_once_with(fip_cidr, interface_name)
        super_add_centralized_floatingip.assert_called_once_with(fip,
                                                                 fip_cidr)
开发者ID:cubeek,项目名称:neutron,代码行数:29,代码来源:test_dvr_local_router.py


示例3: test__set_subnet_arp_info

    def test__set_subnet_arp_info(self):
        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
        router = l3_test_common.prepare_router_data(num_internal_ports=2)
        router['distributed'] = True
        ri = dvr_router.DvrLocalRouter(
            agent, HOSTNAME, router['id'], router, **self.ri_kwargs)
        ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
        subnet_id = l3_test_common.get_subnet_id(ports[0])
        test_ports = [{'mac_address': '00:11:22:33:44:55',
                      'device_owner': 'network:dhcp',
                      'fixed_ips': [{'ip_address': '1.2.3.4',
                                     'prefixlen': 24,
                                     'subnet_id': subnet_id}]}]

        self.plugin_api.get_ports_by_subnet.return_value = test_ports

        # Test basic case
        ports[0]['subnets'] = [{'id': subnet_id,
                                'cidr': '1.2.3.0/24'}]
        ri._set_subnet_arp_info(subnet_id)
        self.mock_ip_dev.neigh.add.assert_called_once_with(
            '1.2.3.4', '00:11:22:33:44:55')

        # Test negative case
        router['distributed'] = False
        ri._set_subnet_arp_info(subnet_id)
        self.mock_ip_dev.neigh.add.never_called()
开发者ID:dhanunjaya,项目名称:neutron,代码行数:27,代码来源:test_dvr_local_router.py


示例4: generate_dvr_router_info

    def generate_dvr_router_info(self,
                                 enable_ha=False,
                                 enable_snat=False,
                                 agent=None,
                                 extra_routes=False,
                                 **kwargs):
        if not agent:
            agent = self.agent
        router = l3_test_common.prepare_router_data(
            enable_snat=enable_snat,
            enable_floating_ip=True,
            enable_ha=enable_ha,
            extra_routes=extra_routes,
            num_internal_ports=2,
            **kwargs)
        internal_ports = router.get(l3_constants.INTERFACE_KEY, [])
        router['distributed'] = True
        router['gw_port_host'] = agent.conf.host
        router['gw_port'][portbindings.HOST_ID] = agent.conf.host
        floating_ip = router['_floatingips'][0]
        floating_ip['floating_network_id'] = router['gw_port']['network_id']
        floating_ip['host'] = agent.conf.host
        floating_ip['port_id'] = internal_ports[0]['id']
        floating_ip['status'] = 'ACTIVE'

        self._add_snat_port_info_to_router(router, internal_ports)
        # FIP has a dependency on external gateway. So we need to create
        # the snat_port info and fip_agent_gw_port_info irrespective of
        # the agent type the dvr supports. The namespace creation is
        # dependent on the agent_type.
        external_gw_port = router['gw_port']
        self._add_fip_agent_gw_port_info_to_router(router, external_gw_port)
        return router
开发者ID:manjeetbhatia,项目名称:test_l3,代码行数:33,代码来源:test_dvr_router.py


示例5: test_external_gateway_removed_ext_gw_port_and_fip

    def test_external_gateway_removed_ext_gw_port_and_fip(self):
        self.conf.set_override('state_path', '/tmp')

        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
        agent.conf.agent_mode = 'dvr'
        router = l3_test_common.prepare_router_data(num_internal_ports=2)
        router['gw_port_host'] = HOSTNAME
        self.mock_driver.unplug.reset_mock()

        external_net_id = router['gw_port']['network_id']
        ri = dvr_router.DvrLocalRouter(
            agent, HOSTNAME, router['id'], router, **self.ri_kwargs)
        ri.remove_floating_ip = mock.Mock()
        agent._fetch_external_net_id = mock.Mock(return_value=external_net_id)
        ri.ex_gw_port = ri.router['gw_port']
        del ri.router['gw_port']
        ri.fip_ns = None
        nat = ri.iptables_manager.ipv4['nat']
        nat.clear_rules_by_tag = mock.Mock()
        nat.add_rule = mock.Mock()

        ri.fip_ns = agent.get_fip_ns(external_net_id)
        subnet_id = _uuid()
        ri.fip_ns.agent_gateway_port = {
            'fixed_ips': [{
                            'ip_address': '20.0.0.30',
                            'prefixlen': 24,
                            'subnet_id': subnet_id
                         }],
            'subnets': [{'id': subnet_id,
                         'cidr': '20.0.0.0/24',
                         'gateway_ip': '20.0.0.1'}],
            'id': _uuid(),
            'network_id': external_net_id,
            'mac_address': 'ca:fe:de:ad:be:ef'}

        vm_floating_ip = '19.4.4.2'
        ri.floating_ips_dict[vm_floating_ip] = FIP_PRI
        ri.dist_fip_count = 1
        ri.rtr_fip_subnet = ri.fip_ns.local_subnets.allocate(ri.router_id)
        _, fip_to_rtr = ri.rtr_fip_subnet.get_pair()
        self.mock_ip.get_devices.return_value = [
            l3_test_common.FakeDev(ri.fip_ns.get_ext_device_name(_uuid()))]
        self.mock_ip_dev.addr.list.return_value = [
            {'cidr': vm_floating_ip + '/32'},
            {'cidr': '19.4.4.1/24'}]
        self.device_exists.return_value = True

        ri.external_gateway_removed(
            ri.ex_gw_port,
            ri.get_external_device_name(ri.ex_gw_port['id']))

        ri.remove_floating_ip.assert_called_once_with(self.mock_ip_dev,
                                                      '19.4.4.2/32')
开发者ID:Blahhhhh,项目名称:neutron,代码行数:54,代码来源:test_dvr_local_router.py


示例6: test_ipv6_router_advts_after_router_state_change

 def test_ipv6_router_advts_after_router_state_change(self):
     # Schedule router to l3 agent, and then add router gateway. Verify
     # that router gw interface is configured to receive Router Advts.
     router_info = l3_test_common.prepare_router_data(
         enable_snat=True, enable_ha=True, dual_stack=True, enable_gw=False)
     router = self.manage_router(self.agent, router_info)
     common_utils.wait_until_true(lambda: router.ha_state == 'master')
     _ext_dev_name, ex_port = l3_test_common.prepare_ext_gw_test(
         mock.Mock(), router)
     router_info['gw_port'] = ex_port
     router.process(self.agent)
     self._assert_ipv6_accept_ra(router)
开发者ID:gotostack,项目名称:neutron,代码行数:12,代码来源:test_ha_router.py


示例7: generate_router_info

    def generate_router_info(self, enable_ha, ip_version=4, extra_routes=True,
                             enable_fip=True, enable_snat=True,
                             dual_stack=False, v6_ext_gw_with_sub=True):
        if ip_version == 6 and not dual_stack:
            enable_snat = False
            enable_fip = False
            extra_routes = False

        return l3_test_common.prepare_router_data(ip_version=ip_version,
                                                 enable_snat=enable_snat,
                                                 enable_floating_ip=enable_fip,
                                                 enable_ha=enable_ha,
                                                 extra_routes=extra_routes,
                                                 dual_stack=dual_stack,
                                                 v6_ext_gw_with_sub=(
                                                     v6_ext_gw_with_sub))
开发者ID:dlundquist,项目名称:neutron,代码行数:16,代码来源:framework.py


示例8: test_handle_snat_rule_for_centralized_fip

    def test_handle_snat_rule_for_centralized_fip(
            self, _add_snat_rules, _handle_router_snat_rules):
        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
        agent.conf.agent_mode = lib_constants.L3_AGENT_MODE_DVR_SNAT
        self.mock_driver.unplug.reset_mock()

        router = l3_test_common.prepare_router_data(enable_floating_ip=True)
        router['gw_port_host'] = HOSTNAME
        self._set_ri_kwargs(agent, router['id'], router)
        ri = dvr_edge_rtr.DvrEdgeRouter(HOSTNAME, **self.ri_kwargs)
        ri.snat_iptables_manager = mock.MagicMock()
        ipv4_nat = ri.snat_iptables_manager.ipv4['nat']
        interface_name, ex_gw_port = l3_test_common.prepare_ext_gw_test(self,
                                                                        ri)
        ri._handle_router_snat_rules(ex_gw_port, interface_name)
        ipv4_nat.add_rule.assert_called_once_with('snat', '-j $float-snat')
开发者ID:cubeek,项目名称:neutron,代码行数:16,代码来源:test_dvr_local_router.py


示例9: test_dvr_router_add_internal_network_set_arp_cache

 def test_dvr_router_add_internal_network_set_arp_cache(self):
     # Check that, when the router is set up and there are
     # existing ports on the uplinked subnet, the ARP
     # cache is properly populated.
     self.agent.conf.agent_mode = "dvr_snat"
     router_info = l3_test_common.prepare_router_data()
     router_info["distributed"] = True
     expected_neighbor = "35.4.1.10"
     port_data = {
         "fixed_ips": [{"ip_address": expected_neighbor}],
         "mac_address": "fa:3e:aa:bb:cc:dd",
         "device_owner": DEVICE_OWNER_COMPUTE,
     }
     self.agent.plugin_rpc.get_ports_by_subnet.return_value = [port_data]
     router1 = self.manage_router(self.agent, router_info)
     internal_device = router1.get_internal_device_name(router_info["_interfaces"][0]["id"])
     neighbors = ip_lib.IPDevice(internal_device, router1.ns_name).neigh
     self.assertEqual(expected_neighbor, neighbors.show(ip_version=4).split()[0])
开发者ID:klmitch,项目名称:neutron,代码行数:18,代码来源:test_dvr_router.py


示例10: _test_ipv6_router_advts_and_fwd_helper

 def _test_ipv6_router_advts_and_fwd_helper(self, state, enable_v6_gw,
                                            expected_ra,
                                            expected_forwarding):
     # Schedule router to l3 agent, and then add router gateway. Verify
     # that router gw interface is configured to receive Router Advts and
     # IPv6 forwarding is enabled.
     router_info = l3_test_common.prepare_router_data(
         enable_snat=True, enable_ha=True, dual_stack=True, enable_gw=False)
     router = self.manage_router(self.agent, router_info)
     common_utils.wait_until_true(lambda: router.ha_state == 'master')
     if state == 'backup':
         self.fail_ha_router(router)
         common_utils.wait_until_true(lambda: router.ha_state == 'backup')
     _ext_dev_name, ex_port = l3_test_common.prepare_ext_gw_test(
         mock.Mock(), router, dual_stack=enable_v6_gw)
     router_info['gw_port'] = ex_port
     router.process()
     self._assert_ipv6_accept_ra(router, expected_ra)
     self._assert_ipv6_forwarding(router, expected_forwarding)
开发者ID:cubeek,项目名称:neutron,代码行数:19,代码来源:test_ha_router.py



注:本文中的neutron.tests.common.l3_test_common.prepare_router_data函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python net_helpers.assert_no_ping函数代码示例发布时间:2022-05-27
下一篇:
Python l3_test_common.get_subnet_id函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap