• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python importutils.import_class函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中quantum.openstack.common.importutils.import_class函数的典型用法代码示例。如果您正苦于以下问题:Python import_class函数的具体用法?Python import_class怎么用?Python import_class使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了import_class函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__(self, options=None, config_file=None):
        # If no options have been provided, create an empty dict
        if not options:
            options = {}

        # NOTE(jkoelker) Testing for the subclass with the __subclasshook__
        #                breaks tach monitoring. It has been removed
        #                intentianally to allow v2 plugins to be monitored
        #                for performance metrics.
        plugin_provider = cfg.CONF.core_plugin
        LOG.debug(_("Plugin location: %s"), plugin_provider)
        # If the plugin can't be found let them know gracefully
        try:
            LOG.info(_("Loading Plugin: %s"), plugin_provider)
            plugin_klass = importutils.import_class(plugin_provider)
        except ClassNotFound:
            LOG.exception(_("Error loading plugin"))
            raise Exception(_("Plugin not found.  You can install a "
                            "plugin with: pip install <plugin-name>\n"
                            "Example: pip install quantum-sample-plugin"))
        self.plugin = plugin_klass()

        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO (enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins()
开发者ID:FreescaleSemiconductor,项目名称:quantum,代码行数:28,代码来源:manager.py


示例2: __init__

    def __init__(self):
        # NOTE(jkoelker) Register the event on all models that have ids
        for _name, klass in inspect.getmembers(models, inspect.isclass):
            if klass is models.HasId:
                continue

            if models.HasId in klass.mro():
                event.listen(klass, "init", perhaps_generate_id)

        quantum_db_api.configure_db()
        self._initDBMaker()
        self.net_driver = (importutils.import_class(CONF.QUARK.net_driver))()
        self.net_driver.load_config(CONF.QUARK.net_driver_cfg)
        self.ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))()
        self.ipam_reuse_after = CONF.QUARK.ipam_reuse_after
        models.BASEV2.metadata.create_all(quantum_db_api._ENGINE)
开发者ID:ugoring,项目名称:quark,代码行数:16,代码来源:plugin.py


示例3: _load_service_plugins

    def _load_service_plugins(self):
        LOG.debug("***** in _load_service_plugins")
        plugin_providers = cfg.CONF.service_plugins
        LOG.debug(_("Loading Service Plugins: %s"), plugin_providers)
        for provider in plugin_providers:
            if provider == '':
                continue
            try:
                provider = provider.strip()
                LOG.info(_("Loading Plugin: %s"), provider)
                plugin_class = importutils.import_class(provider)
            except ClassNotFound:
	        LOG.exception(_("Plugin not found.")) 
            plugin_inst = plugin_class()
            # only one implementation of svc_type allowed
            # specifying more than one plugin
            # for the same type is a fatal exception
            if plugin_inst.get_plugin_type() in self.service_plugins:
                raise Exception(_("Multiple plugins for service "
                                "%s were configured"),
                                plugin_inst.get_plugin_type())

            self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst

            LOG.debug(_("Successfully loaded %(type)s plugin. "
                        "Description: %(desc)s"),
                      {"type": plugin_inst.get_plugin_type(),
                       "desc": plugin_inst.get_plugin_description()})
开发者ID:kumarcv,项目名称:openstack-nf,代码行数:28,代码来源:manager.py


示例4: setUp

    def setUp(self):
        super(QuantumPolicyTestCase, self).setUp()
        policy.reset()
        policy.init()
        self.addCleanup(policy.reset)
        self.rules = dict(
            (k, common_policy.parse_rule(v))
            for k, v in {
                "admin_or_network_owner": "role:admin or " "tenant_id:%(network_tenant_id)s",
                "admin_or_owner": "role:admin or tenant_id:%(tenant_id)s",
                "admin_only": "role:admin",
                "regular_user": "role:user",
                "shared": "field:networks:shared=True",
                "external": "field:networks:router:external=True",
                "default": "@",
                "create_network": "rule:admin_or_owner",
                "create_network:shared": "rule:admin_only",
                "update_network": "@",
                "update_network:shared": "rule:admin_only",
                "get_network": "rule:admin_or_owner or " "rule:shared or " "rule:external",
                "create_port:mac": "rule:admin_or_network_owner",
            }.items()
        )

        def fakepolicyinit():
            common_policy.set_rules(common_policy.Rules(self.rules))

        self.patcher = mock.patch.object(quantum.policy, "init", new=fakepolicyinit)
        self.patcher.start()
        self.addCleanup(self.patcher.stop)
        self.context = context.Context("fake", "fake", roles=["user"])
        plugin_klass = importutils.import_class("quantum.db.db_base_plugin_v2.QuantumDbPluginV2")
        self.plugin = plugin_klass()
开发者ID:NCU-PDCLAB,项目名称:quantum,代码行数:33,代码来源:test_policy.py


示例5: __init__

    def __init__(self, options=None, config_file=None):
        # If no options have been provided, create an empty dict
        if not options:
            options = {}
	if cfg.CONF.core_plugin is None:
	   msg = _('Quantum core_plugin not configured!')
           LOG.critical(msg)
           raise Exception(msg) 
        # NOTE(jkoelker) Testing for the subclass with the __subclasshook__
        #                breaks tach monitoring. It has been removed
        #                intentianally to allow v2 plugins to be monitored
        #                for performance metrics.
        plugin_provider = cfg.CONF.core_plugin
        LOG.debug("Plugin location:%s", plugin_provider)
        # If the plugin can't be found let them know gracefully
        try:
            LOG.info("Loading Plugin: %s" % plugin_provider)
            plugin_klass = importutils.import_class(plugin_provider)
        except ClassNotFound:
            LOG.exception("Error loading plugin")
            raise Exception("Plugin not found.  You can install a "
                            "plugin with: pip install <plugin-name>\n"
                            "Example: pip install quantum-sample-plugin")
        self.plugin = plugin_klass()
        LOG.debug("Plugin => %s", str(self.plugin))
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins() 
        self._load_slb_scheduler_driver()
        self._load_nws_scheduler_driver()
开发者ID:kumarcv,项目名称:openstack-nf,代码行数:29,代码来源:manager.py


示例6: __init__

 def __init__(self, conf):
     self.conf = conf
     self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
     self.db = None
     self.polling_interval = conf.polling_interval
     self.reconnect_interval = conf.reconnect_interval
     self._run = True
     self.prev_state = State(set(), set(), set())
开发者ID:t-lin,项目名称:quantum,代码行数:8,代码来源:dhcp_agent.py


示例7: _load_nws_scheduler_driver

 def _load_nws_scheduler_driver(self):
     plugin_provider = cfg.CONF.NWSDRIVER.nwservice_driver
     try:
         LOG.debug(_("Loading Network Service Driver: %s"), cfg.CONF.NWSDRIVER.nwservice_driver)
         plugin_class = importutils.import_class(plugin_provider)
     except ClassNotFound:
         LOG.exception("Error loading NW Service Scheduler Driver")
         raise Exception("Unable to import NW Service Scheduler Driver")
     self.nws_scheduler_driver = plugin_class()
开发者ID:kumarcv,项目名称:openstack-nf,代码行数:9,代码来源:manager.py


示例8: _load_slb_scheduler_driver

 def _load_slb_scheduler_driver(self):
     plugin_provider = cfg.CONF.DRIVER.loadbalancer_driver
     try:
         LOG.debug(_("Loading SLB Scheduler Driver: %s"), cfg.CONF.DRIVER.loadbalancer_driver)
         plugin_class = importutils.import_class(plugin_provider)
     except ClassNotFound:
         LOG.exception("Error loading SLB Scheduler Driver")
         raise Exception("Unable to import SLB Scheduler Driver")
     self.slb_scheduler_driver = plugin_class()
开发者ID:kumarcv,项目名称:openstack-nf,代码行数:9,代码来源:manager.py


示例9: get_instance

 def get_instance(cls):
     if cls._instance is None:
         plugin_provider = cfg.CONF.core_plugin
         if plugin_provider in ENABLED_EXTS:
             for model in ENABLED_EXTS[plugin_provider]["ext_db_models"]:
                 LOG.debug("loading model %s", model)
                 model_class = importutils.import_class(model)
         cls._instance = cls(get_extensions_path(), QuantumManager.get_plugin())
     return cls._instance
开发者ID:fmanco,项目名称:quantum,代码行数:9,代码来源:extensions.py


示例10: __init__

    def __init__(self, conf):
        self.conf = conf
        self.cache = NetworkCache()

        self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
        ctx = context.RequestContext('quantum', 'quantum', is_admin=True)
        self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)

        self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
        self.notifications = agent_rpc.NotificationDispatcher()
        self.lease_relay = DhcpLeaseRelay(self.update_lease)
开发者ID:r-mibu,项目名称:quantum,代码行数:11,代码来源:dhcp_agent.py


示例11: __init__

 def __init__(self, host=None):
     super(DhcpAgent, self).__init__(host=host)
     self.needs_resync = False
     self.conf = cfg.CONF
     self.cache = NetworkCache()
     self.root_helper = config.get_root_helper(self.conf)
     self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
     ctx = context.get_admin_context_without_session()
     self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)
     self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
     self.lease_relay = DhcpLeaseRelay(self.update_lease)
开发者ID:Frostman,项目名称:quantum,代码行数:11,代码来源:dhcp_agent.py


示例12: get_plugin

def get_plugin(plugin_provider):
    # If the plugin can't be found let them know gracefully
    try:
        LOG.info("Loading Plugin: %s" % plugin_provider)
        plugin_klass = importutils.import_class(plugin_provider)
    except ClassNotFound:
        LOG.exception("Error loading plugin")
        raise Exception("Plugin not found.  You can install a "
                        "plugin with: pip install <plugin-name>\n"
                        "Example: pip install quantum-sample-plugin")
    return plugin_klass()
开发者ID:LuizOz,项目名称:quantum,代码行数:11,代码来源:manager.py


示例13: __init__

    def __init__(self, conf):
        self.needs_resync = False
        self.conf = conf
        self.cache = NetworkCache()

        self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
        ctx = context.get_admin_context_without_session()
        self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)

        self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
        self.notifications = agent_rpc.NotificationDispatcher()
        self.lease_relay = DhcpLeaseRelay(self.update_lease)
开发者ID:ntoll,项目名称:quantum,代码行数:12,代码来源:dhcp_agent.py


示例14: _load_driver

 def _load_driver(self, driver_provider):
     LOG.debug("Driver location:%s", driver_provider)
     # If the plugin can't be found let them know gracefully
     try:
         LOG.info("Loading Driver: %s" % driver_provider)
         plugin_klass = importutils.import_class(driver_provider)
     except ClassNotFound:
         LOG.exception("Error loading driver")
         raise Exception("driver_provider not found.  You can install a "
                         "Driver with: pip install <plugin-name>\n"
                         "Example: pip install quantum-sample-driver")
     return plugin_klass(self.conf)
开发者ID:riverbedstingray,项目名称:quantum,代码行数:12,代码来源:interface.py


示例15: __init__

    def __init__(self, host, binary, topic, manager, report_interval=None,
                 periodic_interval=None, periodic_fuzzy_delay=None,
                 *args, **kwargs):

        self.binary = binary
        self.manager_class_name = manager
        manager_class = importutils.import_class(self.manager_class_name)
        self.manager = manager_class(host=host, *args, **kwargs)
        self.report_interval = report_interval
        self.periodic_interval = periodic_interval
        self.periodic_fuzzy_delay = periodic_fuzzy_delay
        self.saved_args, self.saved_kwargs = args, kwargs
        self.timers = []
        super(Service, self).__init__(host, topic, manager=self.manager)
开发者ID:Apsu,项目名称:quantum,代码行数:14,代码来源:service.py


示例16: setUp

    def setUp(self):
        super(QuantumPolicyTestCase, self).setUp()
        policy.reset()
        policy.init()
        self.addCleanup(policy.reset)
        self.admin_only_legacy = "role:admin"
        self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
        self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
            "context_is_admin": "role:admin",
            "admin_or_network_owner": "rule:context_is_admin or "
                                      "tenant_id:%(network:tenant_id)s",
            "admin_or_owner": ("rule:context_is_admin or "
                               "tenant_id:%(tenant_id)s"),
            "admin_only": "rule:context_is_admin",
            "regular_user": "role:user",
            "shared": "field:networks:shared=True",
            "external": "field:networks:router:external=True",
            "default": '@',

            "create_network": "rule:admin_or_owner",
            "create_network:shared": "rule:admin_only",
            "update_network": '@',
            "update_network:shared": "rule:admin_only",

            "get_network": "rule:admin_or_owner or "
                           "rule:shared or "
                           "rule:external",
            "create_port:mac": "rule:admin_or_network_owner",
        }.items())

        def fakepolicyinit():
            common_policy.set_rules(common_policy.Rules(self.rules))

        self.patcher = mock.patch.object(quantum.policy,
                                         'init',
                                         new=fakepolicyinit)
        self.patcher.start()
        self.addCleanup(self.patcher.stop)
        self.context = context.Context('fake', 'fake', roles=['user'])
        plugin_klass = importutils.import_class(
            "quantum.db.db_base_plugin_v2.QuantumDbPluginV2")
        self.manager_patcher = mock.patch('quantum.manager.QuantumManager')
        fake_manager = self.manager_patcher.start()
        fake_manager_instance = fake_manager.return_value
        fake_manager_instance.plugin = plugin_klass()
        self.addCleanup(self.manager_patcher.stop)
开发者ID:XULI,项目名称:quantum,代码行数:46,代码来源:test_policy.py


示例17: setUp

    def setUp(self):
        super(QuantumPolicyTestCase, self).setUp()
        policy.reset()
        policy.init()
        self.rules = {
            "admin_or_network_owner": [["role:admin"],
                                       ["tenant_id:%(network_tenant_id)s"]],
            "admin_only": [["role:admin"]],
            "regular_user": [["role:user"]],
            "default": [],

            "networks:private:read": [["rule:admin_only"]],
            "networks:private:write": [["rule:admin_only"]],
            "networks:shared:read": [["rule:regular_user"]],
            "networks:shared:write": [["rule:admin_only"]],

            "create_network": [],
            "create_network:shared": [["rule:admin_only"]],
            "update_network": [],
            "update_network:shared": [["rule:admin_only"]],

            "get_network": [],
            "create_port:mac": [["rule:admin_or_network_owner"]],
        }

        def fakepolicyinit():
            common_policy.set_brain(common_policy.Brain(self.rules))

        self.patcher = mock.patch.object(quantum.policy,
                                         'init',
                                         new=fakepolicyinit)
        self.patcher.start()
        self.context = context.Context('fake', 'fake', roles=['user'])
        plugin_klass = importutils.import_class(
            "quantum.db.db_base_plugin_v2.QuantumDbPluginV2")
        self.plugin = plugin_klass()
开发者ID:missall,项目名称:quantum,代码行数:36,代码来源:test_policy.py


示例18: _load_service_plugins

    def _load_service_plugins(self):
        """Loads service plugins.

        Starts from the core plugin and checks if it supports
        advanced services then loads classes provided in configuration.
        """
        # load services from the core plugin first
        self._load_services_from_core_plugin()

        plugin_providers = cfg.CONF.service_plugins
        LOG.debug(_("Loading service plugins: %s"), plugin_providers)
        for provider in plugin_providers:
            if provider == '':
                continue
            try:
                LOG.info(_("Loading Plugin: %s"), provider)
                plugin_class = importutils.import_class(provider)
            except ClassNotFound:
                LOG.exception(_("Error loading plugin"))
                raise Exception(_("Plugin not found."))
            plugin_inst = plugin_class()

            # only one implementation of svc_type allowed
            # specifying more than one plugin
            # for the same type is a fatal exception
            if plugin_inst.get_plugin_type() in self.service_plugins:
                raise Exception(_("Multiple plugins for service "
                                "%s were configured"),
                                plugin_inst.get_plugin_type())

            self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst

            LOG.debug(_("Successfully loaded %(type)s plugin. "
                        "Description: %(desc)s"),
                      {"type": plugin_inst.get_plugin_type(),
                       "desc": plugin_inst.get_plugin_description()})
开发者ID:hobbytp,项目名称:quantum,代码行数:36,代码来源:manager.py


示例19: __init__

 def __init__(self, plugin):
     self._resource_name = RESOURCE_NAME
     self._plugin = plugin
     self._driver = importutils.import_class(DB_QUOTA_DRIVER)
开发者ID:lucian,项目名称:quantum,代码行数:4,代码来源:quotasv2.py


示例20: fileConfig

from quantum.db import model_base
from quantum.openstack.common import importutils


DATABASE_QUOTA_DRIVER = 'quantum.extensions._quotav2_driver.DbQuotaDriver'

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
quantum_config = config.quantum_config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

plugin_klass = importutils.import_class(quantum_config.core_plugin)

# set the target for 'autogenerate' support
target_metadata = model_base.BASEV2.metadata


def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.
开发者ID:AmirolAhmad,项目名称:quantum,代码行数:31,代码来源:env.py



注:本文中的quantum.openstack.common.importutils.import_class函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python importutils.import_object函数代码示例发布时间:2022-05-26
下一篇:
Python gettextutils._函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap