• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python directory.get_plugin函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron_lib.plugins.directory.get_plugin函数的典型用法代码示例。如果您正苦于以下问题:Python get_plugin函数的具体用法?Python get_plugin怎么用?Python get_plugin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_plugin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _extract

 def _extract(self, resource_type, resource_id, field):
     # NOTE(salv-orlando): This check currently assumes the parent
     # resource is handled by the core plugin. It might be worth
     # having a way to map resources to plugins so to make this
     # check more general
     plugin = directory.get_plugin()
     if resource_type in service_const.EXT_PARENT_RESOURCE_MAPPING:
         plugin = directory.get_plugin(
             service_const.EXT_PARENT_RESOURCE_MAPPING[resource_type])
     f = getattr(plugin, 'get_%s' % resource_type)
     # f *must* exist, if not found it is better to let neutron
     # explode. Check will be performed with admin context
     try:
         data = f(context.get_admin_context(),
                  resource_id,
                  fields=[field])
     except exceptions.NotFound as e:
         # NOTE(kevinbenton): a NotFound exception can occur if a
         # list operation is happening at the same time as one of
         # the parents and its children being deleted. So we issue
         # a RetryRequest so the API will redo the lookup and the
         # problem items will be gone.
         raise db_exc.RetryRequest(e)
     except Exception:
         with excutils.save_and_reraise_exception():
             LOG.exception('Policy check error while calling %s!', f)
     return data[field]
开发者ID:openstack,项目名称:neutron,代码行数:27,代码来源:policy.py


示例2: add_router_interface

    def add_router_interface(self, context, router_id, interface_info):
        # Validate args
        router = self._get_router(context, router_id)
        tenant_id = router['tenant_id']

        with db.context_manager.writer.using(context):
            # create interface in DB
            # TODO(wolverineav): hack until fixed at right place
            setattr(context, 'GUARD_TRANSACTION', False)
            new_intf_info = super(L3RestProxy,
                                  self).add_router_interface(context,
                                                             router_id,
                                                             interface_info)
            port = self._get_port(context, new_intf_info['port_id'])
            subnet_id = new_intf_info['subnet_id']
            # we will use the port's subnet id as interface's id
            intf_details = self._get_router_intf_details(context,
                                                         subnet_id)

            # get gateway_ip from port instead of gateway_ip
            if port.get("fixed_ips"):
                intf_details['ip_address'] = port["fixed_ips"][0]['ip_address']

            # create interface on the network controller
            self.servers.rest_add_router_interface(tenant_id, router_id,
                                                   intf_details)
        directory.get_plugin().update_port(
            context, port['id'], {'port': {'status': 'ACTIVE'}})
        return new_intf_info
开发者ID:sarath-kumar,项目名称:networking-bigswitch,代码行数:29,代码来源:l3_router_plugin.py


示例3: get_routers_and_interfaces

    def get_routers_and_interfaces(self):
        core = directory.get_plugin()
        ctx = nctx.get_admin_context()
        routers = directory.get_plugin(plugin_constants.L3).get_routers(ctx)
        router_interfaces = list()
        for r in routers:
            ports = core.get_ports(
                ctx,
                filters={
                    'device_id': [r['id']],
                    'device_owner': [n_const.DEVICE_OWNER_ROUTER_INTF]}) or []
            for p in ports:
                router_interface = r.copy()
                net_id = p['network_id']
                subnet_id = p['fixed_ips'][0]['subnet_id']
                subnet = core.get_subnet(ctx, subnet_id)
                ml2_db = NetworkContext(self, ctx, {'id': net_id})
                seg_id = ml2_db.network_segments[0]['segmentation_id']

                router_interface['seg_id'] = seg_id
                router_interface['cidr'] = subnet['cidr']
                router_interface['gip'] = subnet['gateway_ip']
                router_interface['ip_version'] = subnet['ip_version']
                router_interface['subnet_id'] = subnet_id
                router_interfaces.append(router_interface)
        return routers, router_interfaces
开发者ID:openstack,项目名称:networking-arista,代码行数:26,代码来源:l3_arista.py


示例4: setUp

 def setUp(self):
     self.cfg = self.useFixture(config_fixture.Config())
     self.cfg.config(core_plugin='neutron.plugins.ml2.plugin.Ml2Plugin')
     self.cfg.config(mechanism_drivers=[
                     'logger', 'opendaylight_v2'], group='ml2')
     self.useFixture(odl_base.OpenDaylightRestClientFixture())
     self.cfg.config(service_plugins=['odl-router_v2'])
     core_plugin = cfg.CONF.core_plugin
     service_plugins = {'l3_plugin_name': 'odl-router_v2'}
     self.useFixture(odl_base.OpenDaylightJournalThreadFixture())
     mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
                       '_record_in_journal').start()
     mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
                       'sync_from_callback_precommit').start()
     mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
                       'sync_from_callback_postcommit').start()
     self.useFixture(odl_base.OpenDaylightPeriodicTaskFixture())
     self.useFixture(odl_base.OpenDaylightFeaturesFixture())
     self.useFixture(odl_base.OpenDaylightPseudoAgentPrePopulateFixture())
     super(OpenDaylightL3TestCase, self).setUp(
         plugin=core_plugin, service_plugins=service_plugins)
     self.plugin = directory.get_plugin()
     self.plugin._network_is_external = mock.Mock(return_value=True)
     self.driver = directory.get_plugin(constants.L3)
     self.thread = journal.OpenDaylightJournalThread()
开发者ID:openstack,项目名称:networking-odl,代码行数:25,代码来源:test_l3_odl_v2.py


示例5: setUp

 def setUp(self):
     config.cfg.CONF.set_override("extension_drivers", self._extension_drivers, group="ml2")
     mock.patch("neutron.services.qos.notification_drivers.message_queue" ".RpcQosServiceNotificationDriver").start()
     super(TestRevisionPlugin, self).setUp()
     self.cp = directory.get_plugin()
     self.l3p = directory.get_plugin(constants.L3)
     self.ctx = nctx.get_admin_context()
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:test_revision_plugin.py


示例6: __init__

 def __init__(self, sc_plugin, context, service_chain_instance,
              service_chain_specs, current_service_chain_node, position,
              current_service_profile, provider_group, consumer_group=None,
              management_group=None, original_service_chain_node=None,
              original_service_profile=None, service_targets=None,
              classifier=None, is_consumer_external=False):
     self._gbp_plugin = get_gbp_plugin()
     self._sc_plugin = sc_plugin
     self._plugin_context = context
     self._admin_context = None
     self._service_chain_instance = service_chain_instance
     self._current_service_chain_node = current_service_chain_node
     self._current_service_profile = current_service_profile
     self._original_service_chain_node = original_service_chain_node
     self._original_service_profile = original_service_profile
     self._service_targets = service_targets
     self._service_chain_specs = service_chain_specs
     self._provider_group = provider_group
     self._consumer_group = consumer_group
     self._management_group = management_group
     self._classifier = classifier
     self._is_consumer_external = is_consumer_external
     self._relevant_specs = None
     self._core_plugin = directory.get_plugin()
     self._l3_plugin = directory.get_plugin(constants.L3)
     self._position = position
开发者ID:openstack,项目名称:group-based-policy,代码行数:26,代码来源:context.py


示例7: create_port_postcommit

    def create_port_postcommit(self, context):
        if not self._is_port_supported(context.current):
            LOG.debug("Ignoring unsupported vnic type")
            return

        if self._is_port_sriov(context.current):
            LOG.debug("SR-IOV port, nothing to do")
            return

        # If bsn_l3 plugin and it is a gateway port, bind to ivs.
        if (self.l3_bsn_plugin and
                context.current['device_owner'] == ROUTER_GATEWAY_PORT_OWNER):
            directory.get_plugin().update_port_status(
                context._plugin_context, context.current['id'],
                const.PORT_STATUS_ACTIVE)

        try:
            # create port on the network controller
            port = self._prepare_port_for_controller(context)
        except servermanager.TenantIDNotFound as e:
            LOG.warning("Skipping create port %(port)s as %(exp)s",
                        {'port': context.current.get('id'), 'exp': e})
            return

        if port:
            # For vhostuser type ports, membership rule and endpoint was
            # created during bind_port, so skip this
            if port[portbindings.VIF_TYPE] == portbindings.VIF_TYPE_VHOST_USER:
                return

            self.async_port_create(port["network"]["tenant_id"],
                                   port["network"]["id"], port)
开发者ID:sarath-kumar,项目名称:networking-bigswitch,代码行数:32,代码来源:driver.py


示例8: setUp

 def setUp(self, policy_drivers=None, core_plugin=None, l3_plugin=None,
           ml2_options=None, sc_plugin=None, qos_plugin=None,
           trunk_plugin=None):
     core_plugin = core_plugin or ML2PLUS_PLUGIN
     policy_drivers = policy_drivers or ['neutron_resources']
     config.cfg.CONF.set_override('policy_drivers',
                                  policy_drivers,
                                  group='group_policy')
     sc_cfg.cfg.CONF.set_override('node_drivers',
                                  ['dummy_driver'],
                                  group='node_composition_plugin')
     sc_cfg.cfg.CONF.set_override('node_plumber', 'dummy_plumber',
                                  group='node_composition_plugin')
     config.cfg.CONF.set_override('allow_overlapping_ips', True)
     super(CommonNeutronBaseTestCase, self).setUp(core_plugin=core_plugin,
                                                  l3_plugin=l3_plugin,
                                                  ml2_options=ml2_options,
                                                  sc_plugin=sc_plugin,
                                                  qos_plugin=qos_plugin,
                                                  trunk_plugin=trunk_plugin)
     res = mock.patch('neutron.db.l3_db.L3_NAT_dbonly_mixin.'
                      '_check_router_needs_rescheduling').start()
     res.return_value = None
     self._plugin = directory.get_plugin()
     self._plugin.remove_networks_from_down_agents = mock.Mock()
     self._plugin.is_agent_down = mock.Mock(return_value=False)
     self._context = nctx.get_admin_context()
     self._gbp_plugin = directory.get_plugin(pconst.GROUP_POLICY)
     self._l3_plugin = directory.get_plugin(constants.L3)
     config.cfg.CONF.set_override('debug', True)
开发者ID:openstack,项目名称:group-based-policy,代码行数:30,代码来源:test_neutron_resources_driver.py


示例9: setUp

 def setUp(self):
     cfg.CONF.set_override('extension_drivers',
                           self._extension_drivers,
                           group='ml2')
     super(TestRevisionPlugin, self).setUp()
     self.cp = directory.get_plugin()
     self.l3p = directory.get_plugin(constants.L3)
     self._ctx = nctx.get_admin_context()
开发者ID:igordcard,项目名称:neutron,代码行数:8,代码来源:test_revision_plugin.py


示例10: get_active_networks_info

    def get_active_networks_info(self, context, **kwargs):
        """Returns all the networks/subnets/ports in system."""
        host = kwargs.get('host')
        LOG.debug('get_active_networks_info from %s', host)
        networks = self._get_active_networks(context, **kwargs)
        plugin = directory.get_plugin()
        filters = {'network_id': [network['id'] for network in networks]}
        ports = plugin.get_ports(context, filters=filters)
        # default is to filter subnets based on 'enable_dhcp' flag
        if kwargs.get('enable_dhcp_filter', True):
            filters['enable_dhcp'] = [True]
        # NOTE(kevinbenton): we sort these because the agent builds tags
        # based on position in the list and has to restart the process if
        # the order changes.
        subnets = sorted(plugin.get_subnets(context, filters=filters),
                         key=operator.itemgetter('id'))
        # Handle the possibility that the dhcp agent(s) only has connectivity
        # inside a segment.  If the segment service plugin is loaded and
        # there are active dhcp enabled subnets, then filter out the subnets
        # that are not on the host's segment.
        seg_plug = directory.get_plugin(
            segment_ext.SegmentPluginBase.get_plugin_type())
        seg_subnets = [subnet for subnet in subnets
                       if subnet.get('segment_id')]
        nonlocal_subnets = []
        if seg_plug and seg_subnets:
            host_segment_ids = seg_plug.get_segments_by_hosts(context, [host])
            # Gather the ids of all the subnets that are on a segment that
            # this host touches
            seg_subnet_ids = {subnet['id'] for subnet in seg_subnets
                              if subnet['segment_id'] in host_segment_ids}
            # Gather the ids of all the networks that are routed
            routed_net_ids = {seg_subnet['network_id']
                              for seg_subnet in seg_subnets}
            # Remove the subnets with segments that are not in the same
            # segments as the host.  Do this only for the networks that are
            # routed because we want non-routed networks to work as
            # before.
            nonlocal_subnets = [subnet for subnet in seg_subnets
                                if subnet['id'] not in seg_subnet_ids]
            subnets = [subnet for subnet in subnets
                       if subnet['network_id'] not in routed_net_ids or
                       subnet['id'] in seg_subnet_ids]

        grouped_subnets = self._group_by_network_id(subnets)
        grouped_nonlocal_subnets = self._group_by_network_id(nonlocal_subnets)
        grouped_ports = self._group_by_network_id(ports)
        for network in networks:
            network['subnets'] = grouped_subnets.get(network['id'], [])
            network['non_local_subnets'] = (
                grouped_nonlocal_subnets.get(network['id'], []))
            network['ports'] = grouped_ports.get(network['id'], [])

        return networks
开发者ID:igordcard,项目名称:neutron,代码行数:54,代码来源:dhcp_rpc.py


示例11: test_net_tag_bumps_net_revision

 def test_net_tag_bumps_net_revision(self):
     with self.network() as network:
         rev = network["network"]["revision_number"]
         tag_plugin = directory.get_plugin("TAG")
         tag_plugin.update_tag(self.ctx, "networks", network["network"]["id"], "mytag")
         updated = directory.get_plugin().get_network(self.ctx, network["network"]["id"])
         self.assertGreater(updated["revision_number"], rev)
         tag_plugin.delete_tag(self.ctx, "networks", network["network"]["id"], "mytag")
         rev = updated["revision_number"]
         updated = directory.get_plugin().get_network(self.ctx, network["network"]["id"])
         self.assertGreater(updated["revision_number"], rev)
开发者ID:openstack,项目名称:neutron,代码行数:11,代码来源:test_revision_plugin.py


示例12: __init__

 def __init__(self):
     # REVISIT: Defer until after validating config? Or pass in PD
     # & MD?
     self.core_plugin = directory.get_plugin()
     self.md = self.core_plugin.mechanism_manager.mech_drivers[
         'apic_aim'].obj
     self.pd = self.md.gbp_driver
     self.sfcd = None
     sfc_plugin = directory.get_plugin('sfc')
     if sfc_plugin:
         driver = sfc_plugin.driver_manager.drivers.get('aim')
         if driver:
             self.sfcd = driver.obj
开发者ID:openstack,项目名称:group-based-policy,代码行数:13,代码来源:aim_validation.py


示例13: test_net_tag_bumps_net_revision

 def test_net_tag_bumps_net_revision(self):
     with self.network() as network:
         rev = network['network']['revision_number']
         tag_plugin = directory.get_plugin('TAG')
         tag_plugin.update_tag(self.ctx, 'networks',
                               network['network']['id'], 'mytag')
         updated = directory.get_plugin().get_network(
             self.ctx, network['network']['id'])
         self.assertGreater(updated['revision_number'], rev)
         tag_plugin.delete_tag(self.ctx, 'networks',
                               network['network']['id'], 'mytag')
         rev = updated['revision_number']
         updated = directory.get_plugin().get_network(
             self.ctx, network['network']['id'])
         self.assertGreater(updated['revision_number'], rev)
开发者ID:igordcard,项目名称:neutron,代码行数:15,代码来源:test_revision_plugin.py


示例14: get_resources

 def get_resources(cls):
     """Returns Ext Resources."""
     ip_controller = IpAddressesController(
         directory.get_plugin())
     ip_port_controller = IpAddressPortController(
         directory.get_plugin())
     resources = []
     resources.append(extensions.ResourceExtension(
                      Ip_addresses.get_alias(),
                      ip_controller))
     parent = {'collection_name': 'ip_addresses',
               'member_name': 'ip_address'}
     resources.append(extensions.ResourceExtension(
                      'ports', ip_port_controller, parent=parent))
     return resources
开发者ID:openstack,项目名称:quark,代码行数:15,代码来源:ip_addresses.py


示例15: tearDown

    def tearDown(self):
        # disables api_replay_mode for these tests
        cfg.CONF.set_override('api_replay_mode', False)

        # remove the extension from the plugin
        directory.get_plugin().supported_extension_aliases.remove(
            api_replay.ALIAS)

        # Revert the attributes map back to normal
        for attr_name in ('ports', 'networks', 'security_groups',
                          'security_group_rules', 'routers', 'policies'):
            attr_info = attributes.RESOURCES[attr_name]
            attr_info['id']['allow_post'] = False

        super(TestApiReplay, self).tearDown()
开发者ID:openstack,项目名称:vmware-nsx,代码行数:15,代码来源:test_api_replay.py


示例16: test_validate_port_has_binding_host

 def test_validate_port_has_binding_host(self):
     with self.port() as port:
         core_plugin = directory.get_plugin()
         port['port']['binding:host_id'] = 'host'
         core_plugin.update_port(self.context, port['port']['id'], port)
         validator = rules.TrunkPortValidator(port['port']['id'])
         self.assertTrue(validator.is_bound(self.context))
开发者ID:igordcard,项目名称:neutron,代码行数:7,代码来源:test_rules.py


示例17: index

 def index(self, request, **kwargs):
     plugin = directory.get_plugin()
     policy.enforce(request.context,
                    "get_%s" % DHCP_AGENTS,
                    {})
     return plugin.list_dhcp_agents_hosting_network(
         request.context, kwargs['network_id'])
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:7,代码来源:dhcpagentscheduler.py


示例18: _add_az_to_response

 def _add_az_to_response(router_res, router_db):
     l3_plugin = directory.get_plugin(constants.L3)
     if not extensions.is_extension_supported(
             l3_plugin, 'router_availability_zone'):
         return
     router_res['availability_zones'] = (
         l3_plugin.get_router_availability_zones(router_db))
开发者ID:cubeek,项目名称:neutron,代码行数:7,代码来源:router.py


示例19: _get_net_id_by_portchain_id

 def _get_net_id_by_portchain_id(self, context, portchain_id):
     sfc_plugin = directory.get_plugin('sfc')
     port_chain = sfc_plugin.get_port_chain(context, portchain_id)
     if not port_chain:
         raise n_exceptions.PortChainNotFound(portchain_id=portchain_id)
     port_pairs = sfc_plugin.get_port_pairs(
         context, {'portpairgroup_id': port_chain['port_pair_groups']})
     if not port_pairs:
         raise n_exceptions.PortPairsNotFoundForPortPairGroup(
             portpairgroup_id=port_chain['port_pair_groups'])
     core_plugin = directory.get_plugin()
     port = super(central_plugin.TricirclePlugin, core_plugin
                  ).get_port(context, port_pairs[0]['ingress'])
     if not port:
         raise n_exceptions.PortNotFound(port_id=port_pairs[0]['ingress'])
     return port['network_id']
开发者ID:openstack,项目名称:tricircle,代码行数:16,代码来源:central_fc_driver.py


示例20: _update_routed_network_host_routes

    def _update_routed_network_host_routes(self, context, network_id,
                                           deleted_cidr=None):
        """Update host routes on subnets on a routed network after event

        Host routes on the subnets on a routed network may need updates after
        any CREATE or DELETE event.

        :param network_id: Network ID
        :param deleted_cidr: The cidr of a deleted subnet.
        """
        for subnet in self._get_subnets(context, network_id):
            host_routes = [{'destination': str(route.destination),
                            'nexthop': route.nexthop}
                           for route in subnet.host_routes]
            calc_host_routes = self._calculate_routed_network_host_routes(
                context=context,
                ip_version=subnet.ip_version,
                network_id=subnet.network_id,
                subnet_id=subnet.id,
                segment_id=subnet.segment_id,
                host_routes=copy.deepcopy(host_routes),
                gateway_ip=subnet.gateway_ip,
                deleted_cidr=deleted_cidr)
            if self._host_routes_need_update(host_routes, calc_host_routes):
                LOG.debug(
                    "Updating host routes for subnet %s on routed network %s",
                    (subnet.id, subnet.network_id))
                plugin = directory.get_plugin()
                plugin.update_subnet(context, subnet.id,
                                     {'subnet': {
                                         'host_routes': calc_host_routes}})
开发者ID:igordcard,项目名称:neutron,代码行数:31,代码来源:plugin.py



注:本文中的neutron_lib.plugins.directory.get_plugin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python _i18n._函数代码示例发布时间:2022-05-27
下一篇:
Python directory.add_plugin函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap