• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python v2_0.find_resourceid_by_name_or_id函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutronclient.neutron.v2_0.find_resourceid_by_name_or_id函数的典型用法代码示例。如果您正苦于以下问题:Python find_resourceid_by_name_or_id函数的具体用法?Python find_resourceid_by_name_or_id怎么用?Python find_resourceid_by_name_or_id使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了find_resourceid_by_name_or_id函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_ip_address_is_cidr

    def test_ip_address_is_cidr(self):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'abcd1234'
        ).MultipleTimes().AndReturn('abcd1234')
        neutronclient.Client.create_port({'port': {
            'network_id': u'abcd1234',
            'allowed_address_pairs': [{
                'ip_address': u'10.0.3.0/24',
                'mac_address': u'00-B0-D0-86-BB-F7'
            }],
            'name': utils.PhysName('test_stack', 'port'),
            'admin_state_up': True}}
        ).AndReturn({'port': {
            "status": "BUILD",
            "id": "2e00180a-ff9d-42c4-b701-a0606b243447"
        }})
        neutronclient.Client.show_port(
            '2e00180a-ff9d-42c4-b701-a0606b243447'
        ).AndReturn({'port': {
            "status": "ACTIVE",
            "id": "2e00180a-ff9d-42c4-b701-a0606b243447"
        }})

        self.m.ReplayAll()

        t = template_format.parse(neutron_port_with_address_pair_template)
        t['resources']['port']['properties'][
            'allowed_address_pairs'][0]['ip_address'] = '10.0.3.0/24'
        stack = utils.parse_stack(t)

        port = stack['port']
        scheduler.TaskRunner(port.create)()
        self.m.VerifyAll()
开发者ID:noako,项目名称:heat,代码行数:35,代码来源:test_neutron_port.py


示例2: validate

    def validate(self):
        """Validate the provided params."""
        super(RBACPolicy, self).validate()

        action = self.properties[self.ACTION]
        obj_type = self.properties[self.OBJECT_TYPE]
        obj_id_or_name = self.properties[self.OBJECT_ID]

        # Validate obj_type and action per SUPPORTED_TYPES_ACTIONS.
        if obj_type not in self.SUPPORTED_TYPES_ACTIONS:
            msg = (_("Invalid object_type: %(obj_type)s. "
                     "Valid object_type :%(value)s") %
                   {'obj_type': obj_type,
                    'value': self.SUPPORTED_TYPES_ACTIONS.keys()})
            raise exception.StackValidationFailed(message=msg)
        if action not in self.SUPPORTED_TYPES_ACTIONS[obj_type]:
            msg = (_("Invalid action %(action)s for object type "
                   "%(obj_type)s. Valid actions :%(value)s") %
                   {'action': action, 'obj_type': obj_type,
                    'value': self.SUPPORTED_TYPES_ACTIONS[obj_type]})
            raise exception.StackValidationFailed(message=msg)

        # Make sure the value of object_id is correct.
        neutronV20.find_resourceid_by_name_or_id(self.client(),
                                                 obj_type,
                                                 obj_id_or_name)
开发者ID:Hopebaytech,项目名称:heat,代码行数:26,代码来源:rbac_policy.py


示例3: test_port_security_enabled

    def test_port_security_enabled(self):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client), "network", "abcd1234"
        ).MultipleTimes().AndReturn("abcd1234")

        neutronclient.Client.create_port(
            {
                "port": {
                    "network_id": u"abcd1234",
                    "port_security_enabled": False,
                    "name": utils.PhysName("test_stack", "port"),
                    "admin_state_up": True,
                }
            }
        ).AndReturn({"port": {"status": "BUILD", "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"}})

        neutronclient.Client.show_port("fc68ea2c-b60b-4b4f-bd82-94ec81110766").AndReturn(
            {"port": {"status": "ACTIVE", "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"}}
        )

        self.m.ReplayAll()

        t = template_format.parse(neutron_port_security_template)
        stack = utils.parse_stack(t)

        port = stack["port"]
        scheduler.TaskRunner(port.create)()
        self.m.VerifyAll()
开发者ID:ricolin,项目名称:heat,代码行数:28,代码来源:test_neutron_port.py


示例4: create_vpnservice

    def create_vpnservice(self, resolve_neutron=True, resolve_router=True):
        self.stub_SubnetConstraint_validate()
        self.stub_RouterConstraint_validate()
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'subnet',
            'sub123',
            cmd_resource=None,
        ).AndReturn('sub123')
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'router',
            'rou123',
            cmd_resource=None,
        ).AndReturn('rou123')
        if resolve_neutron:
            snippet = template_format.parse(vpnservice_template)
        else:
            snippet = template_format.parse(vpnservice_template_deprecated)
        if resolve_router:
            props = snippet['resources']['VPNService']['properties']
            props['router'] = 'rou123'
            del props['router_id']
        neutronclient.Client.create_vpnservice(
            self.VPN_SERVICE_CONF).AndReturn({'vpnservice': {'id': 'vpn123'}})

        self.stack = utils.parse_stack(snippet)
        resource_defns = self.stack.t.resource_definitions(self.stack)
        return vpnservice.VPNService('vpnservice',
                                     resource_defns['VPNService'],
                                     self.stack)
开发者ID:MatMaul,项目名称:heat,代码行数:31,代码来源:test_neutron_vpnservice.py


示例5: test_ip_address_is_cidr

    def test_ip_address_is_cidr(self):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client), "network", "abcd1234", cmd_resource=None
        ).MultipleTimes().AndReturn("abcd1234")
        neutronclient.Client.create_port(
            {
                "port": {
                    "network_id": u"abcd1234",
                    "allowed_address_pairs": [{"ip_address": u"10.0.3.0/24", "mac_address": u"00-B0-D0-86-BB-F7"}],
                    "name": utils.PhysName("test_stack", "port"),
                    "admin_state_up": True,
                }
            }
        ).AndReturn({"port": {"status": "BUILD", "id": "2e00180a-ff9d-42c4-b701-a0606b243447"}})
        neutronclient.Client.show_port("2e00180a-ff9d-42c4-b701-a0606b243447").AndReturn(
            {"port": {"status": "ACTIVE", "id": "2e00180a-ff9d-42c4-b701-a0606b243447"}}
        )

        self.m.ReplayAll()

        t = template_format.parse(neutron_port_with_address_pair_template)
        t["resources"]["port"]["properties"]["allowed_address_pairs"][0]["ip_address"] = "10.0.3.0/24"
        stack = utils.parse_stack(t)

        port = stack["port"]
        scheduler.TaskRunner(port.create)()
        self.m.VerifyAll()
开发者ID:zhengxiaochuan-3,项目名称:heat,代码行数:27,代码来源:test_neutron_port.py


示例6: test_port_security_enabled

    def test_port_security_enabled(self):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'abcd1234'
        ).MultipleTimes().AndReturn('abcd1234')

        neutronclient.Client.create_port({'port': {
            'network_id': u'abcd1234',
            'port_security_enabled': False,
            'name': utils.PhysName('test_stack', 'port'),
            'admin_state_up': True}}
        ).AndReturn({'port': {
            "status": "BUILD",
            "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
        }})

        neutronclient.Client.show_port(
            'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
        ).AndReturn({'port': {
            "status": "ACTIVE",
            "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766",
        }})

        self.m.ReplayAll()

        t = template_format.parse(neutron_port_security_template)
        stack = utils.parse_stack(t)

        port = stack['port']
        scheduler.TaskRunner(port.create)()
        self.m.VerifyAll()
开发者ID:sizowie,项目名称:heat,代码行数:32,代码来源:test_neutron_port.py


示例7: args2body

 def args2body(self, parsed_args):
     _security_group_id = neutronV20.find_resourceid_by_name_or_id(
         self.get_client(), "security_group", parsed_args.security_group_id
     )
     body = {
         "security_group_rule": {
             "security_group_id": _security_group_id,
             "direction": parsed_args.direction,
             "ethertype": parsed_args.ethertype,
         }
     }
     if parsed_args.protocol:
         body["security_group_rule"].update({"protocol": parsed_args.protocol})
     if parsed_args.port_range_min:
         body["security_group_rule"].update({"port_range_min": parsed_args.port_range_min})
     if parsed_args.port_range_max:
         body["security_group_rule"].update({"port_range_max": parsed_args.port_range_max})
     if parsed_args.remote_ip_prefix:
         body["security_group_rule"].update({"remote_ip_prefix": parsed_args.remote_ip_prefix})
     if parsed_args.remote_group_id:
         _remote_group_id = neutronV20.find_resourceid_by_name_or_id(
             self.get_client(), "security_group", parsed_args.remote_group_id
         )
         body["security_group_rule"].update({"remote_group_id": _remote_group_id})
     if parsed_args.tenant_id:
         body["security_group_rule"].update({"tenant_id": parsed_args.tenant_id})
     return body
开发者ID:Cerberus98,项目名称:python-neutronclient,代码行数:27,代码来源:securitygroup.py


示例8: test_update_router_gateway_as_property

    def test_update_router_gateway_as_property(self):
        self._create_router_with_gateway()
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'other_public',
            cmd_resource=None,
        ).AndReturn('91e47a57-7508-46fe-afc9-fc454e8580e1')
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'subnet',
            'sub1234',
            cmd_resource=None,
        ).AndReturn('sub1234')
        neutronclient.Client.update_router(
            '3e46229d-8fce-4733-819a-b5fe630550f8',
            {'router': {
                "external_gateway_info": {
                    'network_id': '91e47a57-7508-46fe-afc9-fc454e8580e1',
                    'enable_snat': False
                },
                }}
        ).AndReturn(None)

        neutronclient.Client.show_router(
            '3e46229d-8fce-4733-819a-b5fe630550f8').MultipleTimes().AndReturn({
                "router": {
                    "status": "ACTIVE",
                    "external_gateway_info": {
                        "network_id": "91e47a57-7508-46fe-afc9-fc454e8580e1",
                        "enable_snat": False
                    },
                    "name": "Test Router",
                    "admin_state_up": True,
                    "tenant_id": "3e21026f2dc94372b105808c0e721661",
                    "routes": [],
                    "id": "3e46229d-8fce-4733-819a-b5fe630550f8"
                }
            })

        self.m.ReplayAll()
        t = template_format.parse(neutron_external_gateway_template)
        stack = utils.parse_stack(t)
        rsrc = self.create_router(t, stack, 'router')

        props = t['resources']['router']['properties'].copy()
        props['external_gateway_info'] = {
            "network": "other_public",
            "enable_snat": False
        }
        update_template = rsrc.t.freeze(properties=props)
        scheduler.TaskRunner(rsrc.update, update_template)()
        self.assertEqual((rsrc.UPDATE, rsrc.COMPLETE), rsrc.state)

        gateway_info = rsrc.FnGetAtt('external_gateway_info')
        self.assertEqual('91e47a57-7508-46fe-afc9-fc454e8580e1',
                         gateway_info.get('network_id'))
        self.assertFalse(gateway_info.get('enable_snat'))

        self.m.VerifyAll()
开发者ID:hongbin,项目名称:heat,代码行数:60,代码来源:test_neutron_router.py


示例9: run

 def run(self, parsed_args):
     self.log.debug('run(%s)' % parsed_args)
     neutron_client = self.get_client()
     neutron_client.format = parsed_args.request_format
     _router_id = neutronV20.find_resourceid_by_name_or_id(
         neutron_client, self.resource, parsed_args.router)
     _ext_net_id = neutronV20.find_resourceid_by_name_or_id(
         neutron_client, 'network', parsed_args.external_network)
     router_dict = {'network_id': _ext_net_id}
     if parsed_args.disable_snat:
         router_dict['enable_snat'] = False
     if parsed_args.fixed_ip:
         ips = []
         for ip_spec in parsed_args.fixed_ip:
             ip_dict = utils.str2dict(ip_spec)
             subnet_name_id = ip_dict.get('subnet_id')
             if subnet_name_id:
                 subnet_id = neutronV20.find_resourceid_by_name_or_id(
                     neutron_client, 'subnet', subnet_name_id)
                 ip_dict['subnet_id'] = subnet_id
             ips.append(ip_dict)
         router_dict['external_fixed_ips'] = ips
     neutron_client.add_gateway_router(_router_id, router_dict)
     print(_('Set gateway for router %s') % parsed_args.router,
           file=self.app.stdout)
开发者ID:Gvkrishna,项目名称:python-neutronclient,代码行数:25,代码来源:router.py


示例10: args2body

    def args2body(self, parsed_args):
        _network_id = neutronV20.find_resourceid_by_name_or_id(self.get_client(), "network", parsed_args.network_id)
        body = {"port": {"admin_state_up": parsed_args.admin_state, "network_id": _network_id}}
        if parsed_args.mac_address:
            body["port"].update({"mac_address": parsed_args.mac_address})
        if parsed_args.device_id:
            body["port"].update({"device_id": parsed_args.device_id})
        if parsed_args.tenant_id:
            body["port"].update({"tenant_id": parsed_args.tenant_id})
        if parsed_args.name:
            body["port"].update({"name": parsed_args.name})
        ips = []
        if parsed_args.fixed_ip:
            for ip_spec in parsed_args.fixed_ip:
                ip_dict = utils.str2dict(ip_spec)
                if "subnet_id" in ip_dict:
                    subnet_name_id = ip_dict["subnet_id"]
                    _subnet_id = neutronV20.find_resourceid_by_name_or_id(self.get_client(), "subnet", subnet_name_id)
                    ip_dict["subnet_id"] = _subnet_id
                ips.append(ip_dict)
        if ips:
            body["port"].update({"fixed_ips": ips})

        self.args2body_secgroup(parsed_args, body["port"])
        self.args2body_extradhcpopt(parsed_args, body["port"])

        return body
开发者ID:kbijon,项目名称:OpenStack-CVRM,代码行数:27,代码来源:port.py


示例11: _test_network_gateway_create

    def _test_network_gateway_create(self, resolve_neutron=True):
        rsrc = self.prepare_create_network_gateway(resolve_neutron)
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            '6af055d3-26f6-48dd-a597-7611d7e58d35'
        ).MultipleTimes().AndReturn(
            '6af055d3-26f6-48dd-a597-7611d7e58d35')

        neutronclient.Client.disconnect_network_gateway(
            'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37', {
                'network_id': u'6af055d3-26f6-48dd-a597-7611d7e58d35',
                'segmentation_id': 10,
                'segmentation_type': u'vlan'
            }
        ).AndReturn(None)

        neutronclient.Client.disconnect_network_gateway(
            u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37', {
                'network_id': u'6af055d3-26f6-48dd-a597-7611d7e58d35',
                'segmentation_id': 10,
                'segmentation_type': u'vlan'
            }
        ).AndReturn(qe.NeutronClientException(status_code=404))

        neutronclient.Client.delete_network_gateway(
            u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37'
        ).AndReturn(None)

        neutronclient.Client.show_network_gateway(
            u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37'
        ).AndReturn(sng)

        neutronclient.Client.show_network_gateway(
            u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37'
        ).AndRaise(qe.NeutronClientException(status_code=404))

        neutronclient.Client.delete_network_gateway(
            u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37'
        ).AndRaise(qe.NeutronClientException(status_code=404))

        self.m.ReplayAll()

        rsrc.validate()
        scheduler.TaskRunner(rsrc.create)()

        self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)

        ref_id = rsrc.FnGetRefId()
        self.assertEqual(u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37', ref_id)

        self.assertRaises(
            exception.InvalidTemplateAttribute, rsrc.FnGetAtt, 'Foo')

        self.assertIsNone(scheduler.TaskRunner(rsrc.delete)())
        self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
        rsrc.state_set(rsrc.CREATE, rsrc.COMPLETE, 'to delete again')
        scheduler.TaskRunner(rsrc.delete)()
        self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
        self.m.VerifyAll()
开发者ID:ricolin,项目名称:heat,代码行数:60,代码来源:test_neutron_network_gateway.py


示例12: args2body

    def args2body(self, parsed_args):
        _network_id = neutronV20.find_resourceid_by_name_or_id(
            self.get_client(), 'network', parsed_args.network_id)
        body = {'subnet': {'network_id': _network_id,
                           'ip_version': parsed_args.ip_version, }, }

        if parsed_args.cidr:
            # With subnetpool, cidr is now optional for creating subnet.
            cidr = parsed_args.cidr
            body['subnet'].update({'cidr': cidr})
            unusable_cidr = '/32' if parsed_args.ip_version == 4 else '/128'
            if cidr.endswith(unusable_cidr):
                self.log.warning(_("An IPv%(ip)d subnet with a %(cidr)s CIDR "
                                   "will have only one usable IP address so "
                                   "the device attached to it will not have "
                                   "any IP connectivity.")
                                 % {"ip": parsed_args.ip_version,
                                    "cidr": unusable_cidr})

        if parsed_args.prefixlen:
            body['subnet'].update({'prefixlen': parsed_args.prefixlen})
        if parsed_args.subnetpool:
            if parsed_args.subnetpool == 'None':
                _subnetpool_id = None
            else:
                _subnetpool_id = neutronV20.find_resourceid_by_name_or_id(
                    self.get_client(), 'subnetpool', parsed_args.subnetpool)
            body['subnet'].update({'subnetpool_id': _subnetpool_id})

        updatable_args2body(parsed_args, body)
        if parsed_args.tenant_id:
            body['subnet'].update({'tenant_id': parsed_args.tenant_id})

        return body
开发者ID:annp,项目名称:python-neutronclient,代码行数:34,代码来源:subnet.py


示例13: find

 def find(self, id_or_name, ref_only=False):
     service = None
     ref = None
     if isinstance(self, Key):
         service = self._agent.client.compute.keypairs
     if isinstance(self, Image):
         service = self._agent.client.compute.images
     elif isinstance(self, Server):
         service = self._agent.client.compute.servers
     elif isinstance(self, Flavor):
         service = self._agent.client.compute.flavors
     elif isinstance(self, Volume):
         service = self._agent.client.volume.volumes
     elif isinstance(self, SecurityGroup):
         service = self._agent.client.compute.security_groups
     elif isinstance(self, Network):
         service = self._agent.client.compute.networks
     elif isinstance(self, SubNet):
         sid = neutronV20.find_resourceid_by_name_or_id(
             self._agent.clientneutron, "subnet", id_or_name)
         ref = self._agent.clientneutron.show_subnet(sid)
     elif isinstance(self, Router):
         rid = neutronV20.find_resourceid_by_name_or_id(
             self._agent.clientneutron, "router", id_or_name)
         ref = self._agent.clientneutron.show_router(rid)
         
     if service:
         ref = utils.find_resource(service, id_or_name)
     if ref_only:
         return ref
     self._ref = ref
     return self
开发者ID:bharat109puri,项目名称:warm,代码行数:32,代码来源:__init__.py


示例14: _mock_create_with_security_groups

    def _mock_create_with_security_groups(self, port_prop):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'net1234',
            cmd_resource=None,
        ).MultipleTimes().AndReturn('net1234')
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'subnet',
            'sub1234',
            cmd_resource=None,
        ).MultipleTimes().AndReturn('sub1234')
        neutronclient.Client.create_port({'port': port_prop}).AndReturn(
            {'port': {
                "status": "BUILD",
                "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"}})
        neutronclient.Client.show_port(
            'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
        ).AndReturn({'port': {
            "status": "ACTIVE",
            "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
        }})

        self.m.ReplayAll()
开发者ID:MarcDufresne,项目名称:heat,代码行数:25,代码来源:test_neutron_port.py


示例15: args2body

 def args2body(self, parsed_args):
     _security_group_id = neutronV20.find_resourceid_by_name_or_id(
         self.get_client(), 'security_group', parsed_args.security_group_id)
     body = {'security_group_rule': {
         'security_group_id': _security_group_id,
         'direction': parsed_args.direction,
         'ethertype': parsed_args.ethertype}}
     if parsed_args.protocol:
         body['security_group_rule'].update(
             {'protocol': parsed_args.protocol})
     if parsed_args.port_range_min:
         body['security_group_rule'].update(
             {'port_range_min': parsed_args.port_range_min})
     if parsed_args.port_range_max:
         body['security_group_rule'].update(
             {'port_range_max': parsed_args.port_range_max})
     if parsed_args.remote_ip_prefix:
         body['security_group_rule'].update(
             {'remote_ip_prefix': parsed_args.remote_ip_prefix})
     if parsed_args.remote_group_id:
         _remote_group_id = neutronV20.find_resourceid_by_name_or_id(
             self.get_client(), 'security_group',
             parsed_args.remote_group_id)
         body['security_group_rule'].update(
             {'remote_group_id': _remote_group_id})
     if parsed_args.tenant_id:
         body['security_group_rule'].update(
             {'tenant_id': parsed_args.tenant_id})
     return body
开发者ID:yuvarajan07,项目名称:python-neutronclient,代码行数:29,代码来源:securitygroup.py


示例16: test_subnet

    def test_subnet(self):
        update_props = {'subnet': {
            'dns_nameservers': ['8.8.8.8', '192.168.1.254'],
            'name': 'mysubnet',
            'enable_dhcp': True,
            'host_routes': [{'destination': '192.168.1.0/24',
                             'nexthop': '194.168.1.2'}],
            'gateway_ip': '10.0.3.105',
            'allocation_pools': [
                {'start': '10.0.3.20', 'end': '10.0.3.100'},
                {'start': '10.0.3.110', 'end': '10.0.3.200'}]}}

        t = self._test_subnet(u_props=update_props)
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'None',
            cmd_resource=None,
        ).AndReturn('None')
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'router',
            'None',
            cmd_resource=None,
        ).AndReturn('None')
        neutronclient.Client.update_subnet(
            '91e47a57-7508-46fe-afc9-fc454e8580e1', update_props)
        stack = utils.parse_stack(t)
        rsrc = self.create_subnet(t, stack, 'sub_net')
        self.m.ReplayAll()
        scheduler.TaskRunner(rsrc.create)()
        self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
        rsrc.validate()
        ref_id = rsrc.FnGetRefId()
        self.assertEqual('91e47a57-7508-46fe-afc9-fc454e8580e1', ref_id)
        self.assertIsNone(rsrc.FnGetAtt('network_id'))
        self.assertEqual('fc68ea2c-b60b-4b4f-bd82-94ec81110766',
                         rsrc.FnGetAtt('network_id'))
        self.assertEqual('8.8.8.8', rsrc.FnGetAtt('dns_nameservers')[0])

        # assert the dependency (implicit or explicit) between the ports
        # and the subnet

        self.assertIn(stack['port'], stack.dependencies[stack['sub_net']])
        self.assertIn(stack['port2'], stack.dependencies[stack['sub_net']])
        update_snippet = rsrc_defn.ResourceDefinition(rsrc.name, rsrc.type(),
                                                      update_props['subnet'])
        rsrc.handle_update(update_snippet, {}, update_props['subnet'])

        # with name None
        del update_props['subnet']['name']
        rsrc.handle_update(update_snippet, {}, update_props['subnet'])

        # with no prop_diff
        rsrc.handle_update(update_snippet, {}, {})

        self.assertIsNone(scheduler.TaskRunner(rsrc.delete)())
        rsrc.state_set(rsrc.CREATE, rsrc.COMPLETE, 'to delete again')
        self.assertIsNone(scheduler.TaskRunner(rsrc.delete)())
        self.m.VerifyAll()
开发者ID:TonyChengTW,项目名称:OpenStack_Liberty_Control,代码行数:60,代码来源:test_neutron_subnet.py


示例17: test_allowed_address_pair

    def test_allowed_address_pair(self):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'abcd1234'
        ).MultipleTimes().AndReturn('abcd1234')
        neutronclient.Client.create_port({'port': {
            'network_id': u'abcd1234',
            'allowed_address_pairs': [{
                'ip_address': u'10.0.3.21',
                'mac_address': u'00-B0-D0-86-BB-F7'
            }],
            'name': utils.PhysName('test_stack', 'port'),
            'admin_state_up': True}}
        ).AndReturn({'port': {
            "status": "BUILD",
            "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
        }})
        neutronclient.Client.show_port(
            'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
        ).AndReturn({'port': {
            "status": "ACTIVE",
            "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
        }})

        self.m.ReplayAll()

        t = template_format.parse(neutron_port_with_address_pair_template)
        stack = utils.parse_stack(t)

        port = stack['port']
        scheduler.TaskRunner(port.create)()
        self.m.VerifyAll()
开发者ID:sizowie,项目名称:heat,代码行数:33,代码来源:test_neutron_port.py


示例18: test_gateway_router

    def test_gateway_router(self):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'fc68ea2c-b60b-4b4f-bd82-94ec81110766',
            cmd_resource=None,
        ).MultipleTimes().AndReturn('fc68ea2c-b60b-4b4f-bd82-94ec81110766')
        neutronclient.Client.add_gateway_router(
            '3e46229d-8fce-4733-819a-b5fe630550f8',
            {'network_id': 'fc68ea2c-b60b-4b4f-bd82-94ec81110766'}
        ).AndReturn(None)
        neutronclient.Client.remove_gateway_router(
            '3e46229d-8fce-4733-819a-b5fe630550f8'
        ).AndReturn(None)
        neutronclient.Client.remove_gateway_router(
            '3e46229d-8fce-4733-819a-b5fe630550f8'
        ).AndRaise(qe.NeutronClientException(status_code=404))
        self.stub_RouterConstraint_validate()

        self.m.ReplayAll()
        t = template_format.parse(neutron_template)
        stack = utils.parse_stack(t)

        rsrc = self.create_gateway_router(
            t, stack, 'gateway', properties={
                'router_id': '3e46229d-8fce-4733-819a-b5fe630550f8',
                'network': 'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
            })

        scheduler.TaskRunner(rsrc.delete)()
        rsrc.state_set(rsrc.CREATE, rsrc.COMPLETE, 'to delete again')
        scheduler.TaskRunner(rsrc.delete)()
        self.m.VerifyAll()
开发者ID:MarcDufresne,项目名称:heat,代码行数:33,代码来源:test_neutron_router.py


示例19: test_missing_subnet_id

    def test_missing_subnet_id(self):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'net1234'
        ).MultipleTimes().AndReturn('net1234')
        neutronclient.Client.create_port({'port': {
            'network_id': u'net1234',
            'fixed_ips': [
                {'ip_address': u'10.0.3.21'}
            ],
            'name': utils.PhysName('test_stack', 'port'),
            'admin_state_up': True,
            'device_owner': u'network:dhcp'}}
        ).AndReturn({'port': {
            "status": "BUILD",
            "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
        }})
        neutronclient.Client.show_port(
            'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
        ).AndReturn({'port': {
            "status": "ACTIVE",
            "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
        }})

        self.m.ReplayAll()

        t = template_format.parse(neutron_port_template)
        t['resources']['port']['properties']['fixed_ips'][0].pop('subnet')
        stack = utils.parse_stack(t)

        port = stack['port']
        scheduler.TaskRunner(port.create)()

        self.m.VerifyAll()
开发者ID:sizowie,项目名称:heat,代码行数:35,代码来源:test_neutron_port.py


示例20: test_create_router_gateway_enable_snat

    def test_create_router_gateway_enable_snat(self):
        neutronV20.find_resourceid_by_name_or_id(
            mox.IsA(neutronclient.Client),
            'network',
            'public',
            cmd_resource=None,
        ).AndReturn('fc68ea2c-b60b-4b4f-bd82-94ec81110766')
        neutronclient.Client.create_router({
            "router": {
                "name": "Test Router",
                "external_gateway_info": {
                    'network_id': 'fc68ea2c-b60b-4b4f-bd82-94ec81110766',
                },
                "admin_state_up": True,
            }
        }).AndReturn({
            "router": {
                "status": "BUILD",
                "external_gateway_info": None,
                "name": "Test Router",
                "admin_state_up": True,
                "tenant_id": "3e21026f2dc94372b105808c0e721661",
                "id": "3e46229d-8fce-4733-819a-b5fe630550f8",
            }
        })

        neutronclient.Client.show_router(
            '3e46229d-8fce-4733-819a-b5fe630550f8').MultipleTimes().AndReturn({
                "router": {
                    "status": "ACTIVE",
                    "external_gateway_info": {
                        "network_id":
                        "fc68ea2c-b60b-4b4f-bd82-94ec81110766",
                        "enable_snat": True
                    },
                    "name": "Test Router",
                    "admin_state_up": True,
                    "tenant_id": "3e21026f2dc94372b105808c0e721661",
                    "routes": [],
                    "id": "3e46229d-8fce-4733-819a-b5fe630550f8"
                }
            })

        self.m.ReplayAll()
        t = template_format.parse(neutron_external_gateway_template)
        t["resources"]["router"]["properties"]["external_gateway_info"].pop(
            "enable_snat")
        t["resources"]["router"]["properties"]["external_gateway_info"].pop(
            "external_fixed_ips")
        stack = utils.parse_stack(t)
        rsrc = self.create_router(t, stack, 'router')

        rsrc.validate()

        ref_id = rsrc.FnGetRefId()
        self.assertEqual('3e46229d-8fce-4733-819a-b5fe630550f8', ref_id)
        gateway_info = rsrc.FnGetAtt('external_gateway_info')
        self.assertEqual('fc68ea2c-b60b-4b4f-bd82-94ec81110766',
                         gateway_info.get('network_id'))
        self.m.VerifyAll()
开发者ID:MarcDufresne,项目名称:heat,代码行数:60,代码来源:test_neutron_router.py



注:本文中的neutronclient.neutron.v2_0.find_resourceid_by_name_or_id函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python v2_0.parse_args_to_dict函数代码示例发布时间:2022-05-27
下一篇:
Python i18n._函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap