• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python api.get_engine函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.db.api.get_engine函数的典型用法代码示例。如果您正苦于以下问题:Python get_engine函数的具体用法?Python get_engine怎么用?Python get_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: start

 def start(self):
     # We may have just forked from parent process.  A quick disposal of the
     # existing sql connections avoids producing 500 errors later when they
     # are discovered to be broken.
     if CONF.database.connection:
         api.get_engine().pool.dispose()
     self._server = self._service.pool.spawn(self._service._run, self._application, self._service._socket)
开发者ID:balagopalraj,项目名称:clearlinux,代码行数:7,代码来源:wsgi.py


示例2: serve_rpc

def serve_rpc():
    plugin = manager.NeutronManager.get_plugin()

    # If 0 < rpc_workers then start_rpc_listeners would be called in a
    # subprocess and we cannot simply catch the NotImplementedError.  It is
    # simpler to check this up front by testing whether the plugin supports
    # multiple RPC workers.
    if not plugin.rpc_workers_supported():
        LOG.debug("Active plugin doesn't implement start_rpc_listeners")
        if 0 < cfg.CONF.rpc_workers:
            LOG.error(
                _LE("'rpc_workers = %d' ignored because " "start_rpc_listeners is not implemented."),
                cfg.CONF.rpc_workers,
            )
        raise NotImplementedError()

    try:
        rpc = RpcWorker(plugin)

        if cfg.CONF.rpc_workers < 1:
            rpc.start()
            return rpc
        else:
            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            session.get_engine().pool.dispose()
            launcher = common_service.ProcessLauncher(wait_interval=1.0)
            launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
            return launcher
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE("Unrecoverable error: please check log for " "details."))
开发者ID:kongseokhwan,项目名称:kulcloud-iitp-neutron,代码行数:33,代码来源:service.py


示例3: setUp

    def setUp(self):
        super(FlavorPluginTestCase, self).setUp()

        self.config_parse()
        cfg.CONF.set_override(
            'core_plugin',
            'neutron.tests.unit.extensions.test_flavors.DummyCorePlugin')
        cfg.CONF.set_override(
            'service_plugins',
            ['neutron.tests.unit.extensions.test_flavors.DummyServicePlugin'])

        self.useFixture(
            fixtures.MonkeyPatch('neutron.manager.NeutronManager._instance'))

        self.plugin = flavors_plugin.FlavorsPlugin()
        self.ctx = context.get_admin_context()

        providers = [DummyServiceDriver.get_service_type() +
                     ":" + _provider + ":" + _driver]
        self.service_manager = servicetype_db.ServiceTypeManager.get_instance()
        self.service_providers = mock.patch.object(
            provconf.NeutronModule, 'service_providers').start()
        self.service_providers.return_value = providers
        for provider in providers:
            self.service_manager.add_provider_configuration(
                provider.split(':')[0], provconf.ProviderConfiguration())

        dbapi.get_engine()
开发者ID:21atlas,项目名称:neutron,代码行数:28,代码来源:test_flavors.py


示例4: setUp

 def setUp(self):
     super(MySqlBaseFunctionalTest, self).setUp()
     self.context = context.Context('fake', 'fake', is_admin=False)
     configure_mappers()
     engine = neutron_db_api.get_engine()
     models.BASEV2.metadata.create_all(engine)
     quota_driver.Quota.metadata.create_all(engine)
开发者ID:Anonymike,项目名称:quark,代码行数:7,代码来源:base.py


示例5: register_models

 def register_models(self):
     """Register Models and create properties."""
     try:
         engine = db_api.get_engine()
         model.PowerVCMapping.metadata.create_all(engine)
     except sql.exc.OperationalError as e:
         LOG.info(_("Database registration exception: %s"), e)
开发者ID:openstack,项目名称:powervc-driver,代码行数:7,代码来源:powervc_db_v2.py


示例6: setUp

 def setUp(self, policy_drivers=None,
           core_plugin=n_test_plugin.PLUGIN_NAME, ml2_options=None,
           sc_plugin=None):
     policy_drivers = policy_drivers or ['neutron_resources']
     config.cfg.CONF.set_override('policy_drivers',
                                  policy_drivers,
                                  group='group_policy')
     sc_cfg.cfg.CONF.set_override('servicechain_drivers',
                                  ['dummy'], group='servicechain')
     config.cfg.CONF.set_override('allow_overlapping_ips', True)
     super(CommonNeutronBaseTestCase, self).setUp(core_plugin=core_plugin,
                                                  ml2_options=ml2_options,
                                                  sc_plugin=sc_plugin)
     engine = db_api.get_engine()
     model_base.BASEV2.metadata.create_all(engine)
     res = mock.patch('neutron.db.l3_db.L3_NAT_dbonly_mixin.'
                      '_check_router_needs_rescheduling').start()
     res.return_value = None
     self._plugin = manager.NeutronManager.get_plugin()
     self._plugin.remove_networks_from_down_agents = mock.Mock()
     self._plugin.is_agent_down = mock.Mock(return_value=False)
     self._context = nctx.get_admin_context()
     plugins = manager.NeutronManager.get_service_plugins()
     self._gbp_plugin = plugins.get(pconst.GROUP_POLICY)
     self._l3_plugin = plugins.get(pconst.L3_ROUTER_NAT)
     config.cfg.CONF.set_override('debug', True)
     config.cfg.CONF.set_override('verbose', True)
开发者ID:ashutosh-mishra,项目名称:my-gbp,代码行数:27,代码来源:test_neutron_resources_driver.py


示例7: setUp

 def setUp(self, core_plugin=None, gp_plugin=None, node_drivers=None,
           node_plumber=None):
     if node_drivers:
         cfg.CONF.set_override('node_drivers', node_drivers,
                               group='node_composition_plugin')
     cfg.CONF.set_override('node_plumber', node_plumber or 'dummy_plumber',
                           group='node_composition_plugin')
     config.cfg.CONF.set_override('policy_drivers',
                                  ['implicit_policy', 'resource_mapping',
                                   'chain_mapping'],
                                  group='group_policy')
     super(TestQuotasForServiceChain, self).setUp(
         core_plugin=core_plugin or CORE_PLUGIN,
         gp_plugin=gp_plugin or GP_PLUGIN_KLASS,
         sc_plugin=SC_PLUGIN_KLASS)
     engine = db_api.get_engine()
     model_base.BASEV2.metadata.create_all(engine)
     self.driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
     cfg.CONF.set_override('quota_servicechain_node', 1,
                           group='QUOTAS')
     cfg.CONF.set_override('quota_servicechain_spec', 1,
                           group='QUOTAS')
     cfg.CONF.set_override('quota_servicechain_instance', 1,
                           group='QUOTAS')
     cfg.CONF.set_override('quota_service_profile', 1,
                           group='QUOTAS')
开发者ID:ashutosh-mishra,项目名称:my-gbp,代码行数:26,代码来源:test_ncp_plugin.py


示例8: setUp

    def setUp(self, policy_drivers=None, core_plugin=None, ml2_options=None,
              sc_plugin=None, **kwargs):
        core_plugin = core_plugin or ML2PLUS_PLUGIN
        policy_drivers = policy_drivers or ['aim_mapping']
        ml2_opts = ml2_options or {'mechanism_drivers': ['logger', 'apic_aim'],
                                   'extension_drivers': ['apic_aim'],
                                   'type_drivers': ['opflex', 'local', 'vlan'],
                                   'tenant_network_types': ['opflex']}
        super(AIMBaseTestCase, self).setUp(
            policy_drivers=policy_drivers, core_plugin=core_plugin,
            ml2_options=ml2_opts, sc_plugin=sc_plugin)
        config.cfg.CONF.set_override('network_vlan_ranges',
                                     ['physnet1:1000:1099'],
                                     group='ml2_type_vlan')

        self.saved_keystone_client = ksc_client.Client
        ksc_client.Client = test_aim_md.FakeKeystoneClient

        self._tenant_id = 'test-tenant'
        self._neutron_context = nctx.Context(
            '', kwargs.get('tenant_id', self._tenant_id),
            is_admin_context=False)
        self._neutron_admin_context = nctx.get_admin_context()

        engine = db_api.get_engine()
        aim_model_base.Base.metadata.create_all(engine)
        self._aim_mgr = None
        self._aim_context = aim_context.AimContext(
            self._neutron_context.session)
        self._db = model.DbModel()
        self._name_mapper = None
开发者ID:ashutosh-mishra,项目名称:my-gbp,代码行数:31,代码来源:test_aim_mapping_driver.py


示例9: setUp

    def setUp(self):
        # Enable the test mechanism driver to ensure that
        # we can successfully call through to all mechanism
        # driver apis.
        config.cfg.CONF.set_override('mechanism_drivers',
                                     ['logger', 'apic_aim'],
                                     'ml2')
        config.cfg.CONF.set_override('extension_drivers',
                                     ['apic_aim'],
                                     'ml2')
        config.cfg.CONF.set_override('type_drivers',
                                     ['opflex', 'local', 'vlan'],
                                     'ml2')
        config.cfg.CONF.set_override('tenant_network_types',
                                     ['opflex'],
                                     'ml2')
        config.cfg.CONF.set_override('network_vlan_ranges',
                                     ['physnet1:1000:1099'],
                                     group='ml2_type_vlan')

        super(ApicAimTestCase, self).setUp(PLUGIN_NAME)
        self.port_create_status = 'DOWN'

        self.saved_keystone_client = ksc_client.Client
        ksc_client.Client = FakeKeystoneClient

        engine = db_api.get_engine()
        aim_model_base.Base.metadata.create_all(engine)

        self.plugin = manager.NeutronManager.get_plugin()
        self.plugin.start_rpc_listeners()
        self.driver = self.plugin.mechanism_manager.mech_drivers[
            'apic_aim'].obj
开发者ID:ashutosh-mishra,项目名称:my-gbp,代码行数:33,代码来源:test_apic_aim.py


示例10: setUp

 def setUp(self):
     super(BaseFunctionalTest, self).setUp()
     self.context = context.Context('fake', 'fake', is_admin=False)
     cfg.CONF.set_override('connection', 'sqlite://', 'database')
     configure_mappers()
     self.engine = neutron_db_api.get_engine()
     models.BASEV2.metadata.create_all(self.engine)
     quota_driver.Quota.metadata.create_all(self.engine)
开发者ID:Anonymike,项目名称:quark,代码行数:8,代码来源:base.py


示例11: _launch

 def _launch(self, application, workers=0):
     service = WorkerService(self, application)
     if workers < 1:
         # The API service should run in the current process.
         self._server = service
         service.start()
         systemd.notify_once()
     else:
         # dispose the whole pool before os.fork, otherwise there will
         # be shared DB connections in child processes which may cause
         # DB errors.
         if CONF.database.connection:
             api.get_engine().pool.dispose()
         # The API service runs in a number of child processes.
         # Minimize the cost of checking for child exit by extending the
         # wait interval past the default of 0.01s.
         self._server = common_service.ProcessLauncher(wait_interval=1.0)
         self._server.launch_service(service, workers=workers)
开发者ID:kongseokhwan,项目名称:kulcloud-iitp-neutron,代码行数:18,代码来源:wsgi.py


示例12: setUp

    def setUp(self):
        super(FlavorManagerTestCase, self).setUp()

        self.config_parse()
        cfg.CONF.set_override(
            'core_plugin',
            'neutron.tests.unit.extensions.test_flavors.DummyCorePlugin')
        cfg.CONF.set_override(
            'service_plugins',
            ['neutron.tests.unit.extensions.test_flavors.DummyServicePlugin'])

        self.useFixture(
            fixtures.MonkeyPatch('neutron.manager.NeutronManager._instance'))

        self.plugin = flavors_db.FlavorManager(
            manager.NeutronManager().get_instance())
        self.ctx = context.get_admin_context()
        dbapi.get_engine()
开发者ID:dhanunjaya,项目名称:neutron,代码行数:18,代码来源:test_flavors.py


示例13: setUp

 def setUp(self):
     super(BaseFunctionalTest, self).setUp()
     self.context = context.Context('fake', 'fake', is_admin=False)
     cfg.CONF.set_override('connection', 'sqlite://', 'database')
     configure_mappers()
     # Must set the neutron's facade to none before each test
     # otherwise the data will be shared between tests
     neutron_db_api._FACADE = None
     self.engine = neutron_db_api.get_engine()
     models.BASEV2.metadata.create_all(self.engine)
     quota_driver.Quota.metadata.create_all(self.engine)
开发者ID:Cerberus98,项目名称:quark,代码行数:11,代码来源:base.py


示例14: setUp

    def setUp(self):
        super(SqlTestCase, self).setUp()
        # Register all data models
        engine = db_api.get_engine()
        model_base.BASEV2.metadata.create_all(engine)

        def unregister_models():
            """Unregister all data models."""
            model_base.BASEV2.metadata.drop_all(engine)

        self.addCleanup(unregister_models)
开发者ID:AsherBond,项目名称:quantum,代码行数:11,代码来源:testlib_api.py


示例15: setUp

 def setUp(self, core_plugin=None, gp_plugin=None, service_plugins=None):
     testlib_api.SqlTestCase._TABLES_ESTABLISHED = False
     if not gp_plugin:
         gp_plugin = DB_GP_PLUGIN_KLASS
     if not service_plugins:
         service_plugins = {'l3_plugin_name': "router",
                            'gp_plugin_name': gp_plugin,
                            'servicechain_plugin': SC_PLUGIN_KLASS}
     super(GroupPolicyMappingDbTestCase, self).setUp(
         core_plugin=core_plugin, gp_plugin=gp_plugin,
         service_plugins=service_plugins
     )
     engine = db_api.get_engine()
     model_base.BASEV2.metadata.create_all(engine)
开发者ID:tbachman,项目名称:group-based-policy,代码行数:14,代码来源:test_group_policy_mapping_db.py


示例16: _setUp

    def _setUp(self):
        # Register all data models
        engine = db_api.get_engine()
        if not SqlFixture._TABLES_ESTABLISHED:
            model_base.BASEV2.metadata.create_all(engine)
            SqlFixture._TABLES_ESTABLISHED = True

        def clear_tables():
            with engine.begin() as conn:
                for table in reversed(
                        model_base.BASEV2.metadata.sorted_tables):
                    conn.execute(table.delete())

        self.addCleanup(clear_tables)
开发者ID:21atlas,项目名称:neutron,代码行数:14,代码来源:testlib_api.py


示例17: setUp

 def setUp(self):
     super(QuarkIpAvailabilityBaseFunctionalTest, self).setUp()
     self.connection = neutron_db_api.get_engine().connect()
     self.networks = models.BASEV2.metadata.tables["quark_networks"]
     self.subnets = models.BASEV2.metadata.tables["quark_subnets"]
     self.ip_policy = models.BASEV2.metadata.tables[
         "quark_ip_policy"]
     self.ip_policy_cidr = models.BASEV2.metadata.tables[
         "quark_ip_policy_cidrs"]
     self.ip_addresses = models.BASEV2.metadata.tables[
         "quark_ip_addresses"]
     self.locks = models.BASEV2.metadata.tables[
         "quark_locks"]
     self.default_kwargs = {
         "network_id": "00000000-0000-0000-0000-000000000000",
         "ip_version": 4}
开发者ID:Anonymike,项目名称:quark,代码行数:16,代码来源:test_ip_availability.py


示例18: setUp

 def setUp(self, core_plugin=None, gp_plugin=None, node_drivers=None,
           node_plumber=None):
     if node_drivers:
         cfg.CONF.set_override('node_drivers', node_drivers,
                               group='node_composition_plugin')
     cfg.CONF.set_override('node_plumber', node_plumber or 'dummy_plumber',
                           group='node_composition_plugin')
     config.cfg.CONF.set_override('policy_drivers',
                                  ['implicit_policy', 'resource_mapping'],
                                  group='group_policy')
     super(NodeCompositionPluginTestCase, self).setUp(
         core_plugin=core_plugin or CORE_PLUGIN,
         gp_plugin=gp_plugin or GP_PLUGIN_KLASS,
         sc_plugin=SC_PLUGIN_KLASS)
     engine = db_api.get_engine()
     model_base.BASEV2.metadata.create_all(engine)
     self.driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
开发者ID:sriharshabarkuru,项目名称:group-based-policy,代码行数:17,代码来源:test_ncp_plugin.py


示例19: setUp

    def setUp(self, core_plugin=None, sc_plugin=None, service_plugins=None,
              ext_mgr=None, gp_plugin=None):
        if not sc_plugin:
            sc_plugin = DB_GP_PLUGIN_KLASS
        if not service_plugins:
            service_plugins = {
                'l3_plugin_name': 'router',
                'gp_plugin_name': gp_plugin or GP_PLUGIN_KLASS,
                'sc_plugin_name': sc_plugin}

        super(ServiceChainDbTestCase, self).setUp(
            plugin=core_plugin, ext_mgr=ext_mgr,
            service_plugins=service_plugins
        )
        self.plugin = importutils.import_object(sc_plugin)
        if not ext_mgr:
            ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
            self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
        engine = db_api.get_engine()
        model_base.BASEV2.metadata.create_all(engine)
开发者ID:tnahak,项目名称:group-based-policy,代码行数:20,代码来源:test_servicechain_db.py


示例20: setUp

    def setUp(self):
        config.cfg.CONF.set_override('service_delete_timeout',
                                     SERVICE_DELETE_TIMEOUT,
                                     group='nfp_node_driver')

        config.cfg.CONF.set_override(
            'extension_drivers', ['proxy_group'], group='group_policy')
        config.cfg.CONF.set_override('node_drivers', ['nfp_node_driver'],
                                     group='node_composition_plugin')
        config.cfg.CONF.set_override('node_plumber', 'stitching_plumber',
                                     group='node_composition_plugin')
        config.cfg.CONF.set_override('policy_drivers',
                                     ['implicit_policy', 'resource_mapping',
                                      'chain_mapping'],
                                     group='group_policy')
        super(NFPNodeDriverTestCase, self).setUp(
            core_plugin=CORE_PLUGIN,
            gp_plugin=GP_PLUGIN_KLASS,
            sc_plugin=SC_PLUGIN_KLASS)
        engine = db_api.get_engine()
        model_base.BASEV2.metadata.create_all(engine)
开发者ID:ashutosh-mishra,项目名称:group-based-policy,代码行数:21,代码来源:test_nfp_node_driver.py



注:本文中的neutron.db.api.get_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python api.get_session函数代码示例发布时间:2022-05-27
下一篇:
Python api.configure_db函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap