• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.engine_from_config函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.engine_from_config函数的典型用法代码示例。如果您正苦于以下问题:Python engine_from_config函数的具体用法?Python engine_from_config怎么用?Python engine_from_config使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了engine_from_config函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: load_environment

def load_environment(global_conf, app_conf):
    """Configure the Pylons environment via the ``pylons.config``
    object
    """
    # Pylons paths
    root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    paths = dict(root=root,
                 controllers=os.path.join(root, 'controllers'),
                 static_files=os.path.join(root, 'public'),
                 templates=[os.path.join(root, 'templates')])

    # Initialize config with the basic options
    config.init_app(global_conf, app_conf, package='mapclient', paths=paths)

    config['routes.map'] = make_map()
    config['pylons.app_globals'] = app_globals.Globals()
    config['pylons.h'] = mapclient.lib.helpers

    # Create the Jinja2 Environment
    config['pylons.app_globals'].jinja2_env = Environment(loader=ChoiceLoader(
            [FileSystemLoader(path) for path in paths['templates']]))
    # Jinja2's unable to request c's attributes without strict_c
    config['pylons.strict_c'] = True

    # Setup the SQLAlchemy database engine
    engine = engine_from_config(config, 'mapclient.sqlalchemy.')
    engine_ckan_data = engine_from_config(config, 'vectorstore.sqlalchemy.')
    init_model(engine, engine_ckan_data)
开发者ID:PublicaMundi,项目名称:MapClient,代码行数:28,代码来源:environment.py


示例2: on_celeryd_init

def on_celeryd_init(**kw):
    """
    Initializes application resources when the Celery daeomon starts
    """
    settings = app.settings

    redisurl = six.moves.urllib.parse.urlparse(settings['redis.url'])
    app.redis = redis.StrictRedis(
        host=redisurl.hostname,
        port=redisurl.port,
        db=os.path.basename(redisurl.path)
    )

    app.userid = settings['celery.blame']

    # Attempt to add the user via raw engine connection, using the scoped
    # session leaves it in a dangerous non-thread-local state as we're
    # still in the parent setup process
    throw_away_engine = sa.engine_from_config(settings, 'occams.db.')
    with throw_away_engine.begin() as connection:
        try:
            connection.execute(models.User.__table__.insert(),  key=app.userid)
        except sa.exc.IntegrityError:
            pass
    throw_away_engine.dispose()

    # Configure the session with an untainted engine
    engine = sa.engine_from_config(settings, 'occams.db.')
    Session.configure(bind=engine)
开发者ID:davidmote,项目名称:occams,代码行数:29,代码来源:celery.py


示例3: setup_sqlalchemy

    def setup_sqlalchemy(self):
        """Setup SQLAlchemy database engine.

        The most common reason for modifying this method is to add
        multiple database support.  To do this you might modify your
        app_cfg.py file in the following manner::

            from tg.configuration import AppConfig, config
            from myapp.model import init_model

            # add this before base_config =
            class MultiDBAppConfig(AppConfig):
                def setup_sqlalchemy(self):
                    '''Setup SQLAlchemy database engine(s)'''
                    from sqlalchemy import engine_from_config
                    engine1 = engine_from_config(config, 'sqlalchemy.first.')
                    engine2 = engine_from_config(config, 'sqlalchemy.second.')
                    # engine1 should be assigned to sa_engine as well as your first engine's name
                    config['tg.app_globals'].sa_engine = engine1
                    config['tg.app_globals'].sa_engine_first = engine1
                    config['tg.app_globals'].sa_engine_second = engine2
                    # Pass the engines to init_model, to be able to introspect tables
                    init_model(engine1, engine2)

            #base_config = AppConfig()
            base_config = MultiDBAppConfig()

        This will pull the config settings from your .ini files to create the necessary
        engines for use within your application.  Make sure you have a look at :ref:`multidatabase`
        for more information.

        """
        from sqlalchemy import engine_from_config

        balanced_master = config.get('sqlalchemy.master.url')
        if not balanced_master:
            engine = engine_from_config(config, 'sqlalchemy.')
        else:
            engine = engine_from_config(config, 'sqlalchemy.master.')
            config['balanced_engines'] = {'master':engine,
                                          'slaves':{},
                                          'all':{'master':engine}}

            all_engines = config['balanced_engines']['all']
            slaves = config['balanced_engines']['slaves']
            for entry in config.keys():
                if entry.startswith('sqlalchemy.slaves.'):
                    slave_path = entry.split('.')
                    slave_name = slave_path[2]
                    if slave_name == 'master':
                        raise TGConfigError('A slave node cannot be named master')
                    slave_config = '.'.join(slave_path[:3])
                    all_engines[slave_name] = slaves[slave_name] = engine_from_config(config, slave_config+'.')

            if not config['balanced_engines']['slaves']:
                raise TGConfigError('When running in balanced mode your must specify at least a slave node')

        # Pass the engine to initmodel, to be able to introspect tables
        config['tg.app_globals'].sa_engine = engine
        self.package.model.init_model(engine)
开发者ID:mabodo,项目名称:MapReduceWords,代码行数:60,代码来源:app_config.py


示例4: main

def main(global_config, **settings):
    """ This function returns a Pyramid WSGI application.
    """
    if 'DATABASE_URL' in os.environ:
        settings['sqlalchemy.url'] = os.environ['DATABASE_URL']
    engine = engine_from_config(settings, 'sqlalchemy.')
    auth_secret = os.environ.get('AUTH_SECRET', 'secret')
    authn_policy = AuthTktAuthenticationPolicy(secret=auth_secret,
                                               hashalg='sha256')
    authz_policy = ACLAuthorizationPolicy()
    engine = engine_from_config(settings, 'sqlalchemy.')
    DBSession.configure(bind=engine)
    Base.metadata.bind = engine
    config = Configurator(
        settings=settings,
        root_factory=MyRoot)
    config.set_session_factory(SignedCookieSessionFactory('seekrit'))
    config.set_authentication_policy(authn_policy)
    config.set_authorization_policy(authz_policy)
    config.include('pyramid_jinja2')
    config.add_static_view('static', 'static', cache_max_age=3600)
    config.add_route('index', '/')
    config.add_route('add', '/add')
    config.add_route('entry', '/entries/{id:\d+}')
    config.add_route('edit', '/entries/{id:\d+}/edit')
    config.add_route('login', '/login')
    config.add_route('logout', '/logout')
    # config.add_route('add_json', '/add_json')
    # config.add_route('entry_json', '/entry_json')
    config.scan()
    return config.make_wsgi_app()
开发者ID:cewing,项目名称:learning-journal-1,代码行数:31,代码来源:__init__.py


示例5: create_sqlalchemy_engine

    def create_sqlalchemy_engine(conf):
        from sqlalchemy import engine_from_config
        balanced_master = conf.get('sqlalchemy.master.url')
        if not balanced_master:
            engine = engine_from_config(conf, 'sqlalchemy.')
        else:
            engine = engine_from_config(conf, 'sqlalchemy.master.')
            conf['balanced_engines'] = {'master': engine,
                                        'slaves': {},
                                        'all': {'master': engine}}

            all_engines = conf['balanced_engines']['all']
            slaves = conf['balanced_engines']['slaves']
            for entry in conf.keys():
                if entry.startswith('sqlalchemy.slaves.'):
                    slave_path = entry.split('.')
                    slave_name = slave_path[2]
                    if slave_name == 'master':
                        raise TGConfigError('A slave node cannot be named master')
                    slave_config = '.'.join(slave_path[:3])
                    all_engines[slave_name] = slaves[slave_name] = engine_from_config(conf, slave_config + '.')

            if not conf['balanced_engines']['slaves']:
                raise TGConfigError('When running in balanced mode your must specify at least a slave node')

        return engine
开发者ID:TurboGears,项目名称:tg2,代码行数:26,代码来源:sqlalchemy.py


示例6: main

def main(argv=sys.argv):
    settings_file = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), 'migration.ini')
    settings = get_appsettings(settings_file)

    engine_target = engine_from_config(settings, 'sqlalchemy_target.')
    engine_source = engine_from_config(settings, 'sqlalchemy_source.')

    logging.basicConfig()
    logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)

    # create a fresh schema on the target database
    Session = sessionmaker(extension=ZopeTransactionExtension())  # noqa
    session = Session(bind=engine_target)
    Base.metadata.drop_all(engine_target, checkfirst=True)
    setup_db(engine_target, session)

    connection_source = engine_source.connect()

    batch_size = 1000
    MigrateUsers(connection_source, session, batch_size).migrate()
    MigrateSummits(connection_source, session, batch_size).migrate()
    MigrateParkings(connection_source, session, batch_size).migrate()
    MigrateSites(connection_source, session, batch_size).migrate()
    MigrateProducts(connection_source, session, batch_size).migrate()
    MigrateHuts(connection_source, session, batch_size).migrate()
    MigrateRoutes(connection_source, session, batch_size).migrate()
    MigrateVersions(connection_source, session, batch_size).migrate()
    UpdateSequences(connection_source, session, batch_size).migrate()
开发者ID:mfournier,项目名称:v6_api,代码行数:29,代码来源:migrate.py


示例7: run_migrations_online

def run_migrations_online():
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    if 'DB_URL' in os.environ:
        engine = engine_from_config(
            config.get_section("app:pyramidapp"),
            prefix='sqlalchemy.',
            url=os.environ['DB_URL'],
            poolclass=pool.NullPool)
    else:
        engine = engine_from_config(
            config.get_section("app:pyramidapp"),
            prefix='sqlalchemy.',
            poolclass=pool.NullPool)

    connection = engine.connect()
    context.configure(
        connection=connection,
        target_metadata=target_metadata
    )

    try:
        with context.begin_transaction():
            context.run_migrations()
    finally:
        connection.close()
开发者ID:SabatierBoris,项目名称:CecileWebSite,代码行数:30,代码来源:env.py


示例8: run_migrations_online

def run_migrations_online():
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    # FIX for Postgres updates
    url = config.get_section(config.config_ini_section).get("sqlalchemy.url")
    driver = url.split(":")[0]

    if driver == "postgresql+psycopg2":
        engine = engine_from_config(
                    config.get_section(config.config_ini_section),
                    prefix='sqlalchemy.',
                    isolation_level="AUTOCOMMIT",
                    poolclass=pool.NullPool)
    else:
        engine = engine_from_config(
                    config.get_section(config.config_ini_section),
                    prefix='sqlalchemy.',
                    poolclass=pool.NullPool)

    connection = engine.connect()
    context.configure(
                connection=connection,
                target_metadata=target_metadata
                )

    try:
        with context.begin_transaction():
            context.run_migrations()
    finally:
        connection.close()
开发者ID:Andrew8305,项目名称:privacyidea,代码行数:34,代码来源:env.py


示例9: update_config

    def update_config(self, config_):
        #toolkit.add_template_directory(config_, 'templates')
        #toolkit.add_public_directory(config_, 'public')
        #toolkit.add_resource('fanstatic', 'jsondatastore')

        if not 'ckanext.jsondatastore.write.url' in config_:
            error_message = 'jsonckan.datastore.write_url not found in config'
            raise JsonDatastoreError(error_message)

        ckan_db = config_['sqlalchemy.url']
        self.write_url = config_['ckanext.jsondatastore.write.url']

        if self.write_url == ckan_db:
            raise JsonDatastoreError('The json datastore url cannot be the '
                                     'same as the main ckan database')


        self.write_engine = sqlalchemy.engine_from_config(
            config_,
            prefix='ckanext.jsondatastore.write.',
            client_encoding='utf8',
        ) 

        self.read_engine = sqlalchemy.engine_from_config(
            config_,
            prefix='ckanext.jsondatastore.read.',
            client_encoding='utf8',
        ) 
开发者ID:joetsoi,项目名称:ckanext-jsondatastore,代码行数:28,代码来源:plugin.py


示例10: setup_sqlalchemy

 def setup_sqlalchemy(self):
     from sqlalchemy import engine_from_config
     engine1=engine_from_config(pylons_config, 'sqlalchemy.first.')
     engine2=engine_from_config(pylons_config, 'sqlalchemy.second.')
     config['pylons.app_globals'].sa_engine=engine1
     config['pylons.app_globals'].sa_engine_first=engine1
     config['pylons.app_globals'].sa_engine_second=engine2
     init_model(engine1, engine2)
开发者ID:LamCiuLoeng,项目名称:vat,代码行数:8,代码来源:app_cfg.py


示例11: load_environment

def load_environment(global_conf, app_conf):
    """Configure the Pylons environment via the ``pylons.config`` object

    """

    # Pylons paths
    root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    paths = dict(root=root,
                 controllers=os.path.join(root, 'controllers'),
                 static_files=os.path.join(root, 'public'),
                 templates=[os.path.join(root, 'templates')])

    # Initialize config with the basic options
    config.init_app(
        global_conf, app_conf, package='onlinelinguisticdatabase', paths=paths
    )

    config['routes.map'] = make_map()
    config['pylons.app_globals'] = app_globals.Globals()
    config['pylons.h'] = onlinelinguisticdatabase.lib.helpers

    # Create the Mako TemplateLookup, with the default auto-escaping
    config['pylons.app_globals'].mako_lookup = TemplateLookup(
        directories=paths['templates'],
        error_handler=handle_mako_error,
        module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
        input_encoding='utf-8', default_filters=['escape'],
        imports=['from webhelpers.html import escape'])

    # Setup the SQLAlchemy database engine
    #  Modification: check if SQLite is RDBMS and, if so,
    #  give the engine a SQLiteSetup listener which
    #  provides the regexp function missing from the SQLite dbapi
    #  (cf. http://groups.google.com/group/pylons-discuss/browse_thread/thread/8c82699e6b6a400c/5c5237c86202e2b8)
    SQLAlchemyURL = config['sqlalchemy.url']
    rdbms = SQLAlchemyURL.split(':')[0]
    if rdbms == 'sqlite':
        engine = engine_from_config(
            config, 'sqlalchemy.', listeners=[SQLiteSetup()])
    else:
        engine = engine_from_config(config, 'sqlalchemy.')
    init_model(engine)

    # Put the application settings into the app_globals object
    #  This has the effect that when the app is restarted the globals like
    #  objectLanguageName, metalanguageName, etc. have the correct values
    #  Do the same for the variable app_globals attributes, e.g., sources list
    #  I HAD TO DISABLE THE FOLLOWING TWO COMMANDS BECAUSE IT WAS CAUSING
    #  setup-app TO CRASH BECAUSE application_settings WAS REQUESTED BEFORE THE
    #  TABLES EXISTED!  FIND ANOTHER WAY TO FIX THIS PROBLEM ...
    #applicationSettingsToAppGlobals(config['pylons.app_globals'])
    #updateSecondaryObjectsInAppGlobals(config['pylons.app_globals'])

    # CONFIGURATION OPTIONS HERE (note: all config options will override
    # any Pylons config options)
    config['pylons.strict_c'] = True
开发者ID:jrwdunham,项目名称:old-webapp,代码行数:56,代码来源:environment.py


示例12: run_migrations_online

def run_migrations_online():
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """

    # for the direct-to-DB use case, start a transaction on all
    # engines, then run all migrations, then commit all transactions.

    engines = {'': {'engine': engine_from_config(
        config.get_section(config.config_ini_section),
        prefix='sqlalchemy.',
        poolclass=pool.NullPool)}}
    for name in bind_names:
        engines[name] = rec = {}
        rec['engine'] = engine_from_config(
            context.config.get_section(name),
            prefix='sqlalchemy.',
            poolclass=pool.NullPool)

    for name, rec in engines.items():
        engine = rec['engine']
        rec['connection'] = conn = engine.connect()

        if USE_TWOPHASE:
            rec['transaction'] = conn.begin_twophase()
        else:
            rec['transaction'] = conn.begin()

    try:
        for name, rec in engines.items():
            logger.info("Migrating database %s" % (name or '<default>'))
            context.configure(
                connection=rec['connection'],
                upgrade_token="%s_upgrades" % name,
                downgrade_token="%s_downgrades" % name,
                user_module_prefix="application.models._external_types.",
                target_metadata=get_metadata(name)
            )
            context.run_migrations(engine_name=name)

        if USE_TWOPHASE:
            for rec in engines.values():
                rec['transaction'].prepare()

        for rec in engines.values():
            rec['transaction'].commit()
    except:
        for rec in engines.values():
            rec['transaction'].rollback()
        raise
    finally:
        for rec in engines.values():
            rec['connection'].close()
开发者ID:minhoryang,项目名称:marrybird-api,代码行数:56,代码来源:env.py


示例13: __init__

 def __init__(self, *args):
     super(TestDBActions, self).__init__(*args)
     pconfig['pylons.g'] = Mock()
     pconfig['pylons.g'].sa_engine = engine_from_config(config,
         prefix = 'sqlalchemy.reflect.'
     )
     self.memengine = engine_from_config(config,
         prefix = 'sqlalchemy.default.')
     from masterapp import model
     self.model = model
     model.metadata.bind = self.memengine
     model.Session.configure(bind=self.memengine)
开发者ID:JustinTulloss,项目名称:harmonize.fm,代码行数:12,代码来源:test_actions.py


示例14: setup_db_engine

def setup_db_engine(settings):
    cachedir = settings.get("db.cachedir")
    regions = []
    for region in settings.get("db.cacheregions", "").split(" "):
        regions.append(region.split(":"))
    if cachedir:
        init_cache(cachedir, regions)
    if settings.get("app.mode") == "testing":
        return engine_from_config(settings, 'sqlalchemy.',
                                  poolclass=StaticPool)
    else:
        return engine_from_config(settings, 'sqlalchemy.')
开发者ID:mjsorribas,项目名称:ringo,代码行数:12,代码来源:db.py


示例15: main

def main(argv=sys.argv):
    if len(argv) < 2:
        usage(argv)
    config_uri = argv[1]
    options = parse_vars(argv[2:])
    setup_logging(config_uri)
    settings = get_appsettings(config_uri, options=options)
    if 'DATABASE_URL' in os.environ:
        settings['sqlalchemy.url'] = os.environ['DATABASE_URL']
    engine = engine_from_config(settings, 'sqlalchemy.')
    engine = engine_from_config(settings, 'sqlalchemy.')
    DBSession.configure(bind=engine)
    Base.metadata.create_all(engine)
开发者ID:scotist,项目名称:learning-journal-1,代码行数:13,代码来源:initializedb.py


示例16: main

def main(argv=sys.argv):
    if len(argv) != 2:
        usage(argv)
    config_uri = argv[1]
    setup_logging(config_uri)
    settings = get_appsettings(config_uri)
    os.environ['PYJASPER_SERVLET_URL'] = settings['jasper_url'] 
    bootstrap(config_uri)
    engine = engine_from_config(settings, 'sqlalchemy.')
    other_engine = engine_from_config(settings, 'othersql.')
    Base.metadata.bind = engine
    OtherBase.metadata.bind = other_engine
    from ..models.esppt_models import (
        esNopModel,
        esRegModel,
        spptModel,
        )
    from ..views.es_reports import GenerateSppt
    DBSession.configure(bind=engine)
    q = DBSession.query(esNopModel, esRegModel).filter(
            esNopModel.es_reg_id == esRegModel.id)
    q = q.filter(esNopModel.email_sent == 0)
    for r_nop, r_reg in q:
        nop = get_nop(r_nop)
        q = spptModel.get_by_nop_thn(nop, r_nop.tahun)
        sppt = q.first()
        if not sppt:
            continue
        nilai = thousand(sppt.pbb_yg_harus_dibayar_sppt)
        g = GenerateSppt(nop, r_nop.tahun, r_reg.kode) 
        #USER_ID) updated menggunakan password dari user yang ada di reg.kode aagusti
        sppt_file = g.sppt_file
        e_filename = os.path.split(sppt_file)[-1]
        f = open(sppt_file)
        content = f.read()
        f.close()
        e_content = base64.encodestring(content)
        e_subject = EMAIL_SUBJECT.format(nop=nop, tahun=r_nop.tahun)
        e_body = EMAIL_BODY.format(nama_wp=sppt.nm_wp_sppt, nop=nop,
                    tahun=r_nop.tahun, nilai=nilai)
        files = [(e_filename, e_content)]
        print('To: {name} <{email}>'.format(name=sppt.nm_wp_sppt,
                email=r_reg.email))
        print('Subject: {s}'.format(s=e_subject))
        print('Body: {s}'.format(s=e_body))
        print('File: {s}'.format(s=e_filename))
        r_nop.email_sent = 1
        flush(r_nop)
        send(r_reg.email, sppt.nm_wp_sppt, e_subject, e_body, files,
                settings['email_pengirim'])
    transaction.commit()
开发者ID:aagusti,项目名称:e-sppt,代码行数:51,代码来源:send_email.py


示例17: main

def main(argv=sys.argv):
    alembic_configfile = os.path.realpath(os.path.join(
        os.path.dirname(os.path.abspath(__file__)),
        '../../../alembic.ini'))
    alembic_config = Config(alembic_configfile)

    settings_file = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), 'migration.ini')
    settings = get_appsettings(settings_file)

    engine_target = engine_from_config(settings, 'sqlalchemy_target.')
    engine_source = engine_from_config(settings, 'sqlalchemy_source.')

    logging.basicConfig()
    logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)

    Session = sessionmaker(extension=ZopeTransactionExtension())  # noqa
    session = Session(bind=engine_target)

    # set up the target database
    setup_db(alembic_config, session)

    connection_source = engine_source.connect()

    batch_size = 1000
    MigrateAreas(connection_source, session, batch_size).migrate()
    MigrateUserProfiles(connection_source, session, batch_size).migrate()
    MigrateUsers(connection_source, session, batch_size).migrate()
    MigrateSummits(connection_source, session, batch_size).migrate()
    MigrateParkings(connection_source, session, batch_size).migrate()
    MigrateSites(connection_source, session, batch_size).migrate()
    MigrateProducts(connection_source, session, batch_size).migrate()
    MigrateHuts(connection_source, session, batch_size).migrate()
    MigrateRoutes(connection_source, session, batch_size).migrate()
    MigrateMaps(connection_source, session, batch_size).migrate()
    MigrateOutings(connection_source, session, batch_size).migrate()
    MigrateImages(connection_source, session, batch_size).migrate()
    MigrateXreports(connection_source, session, batch_size).migrate()
    MigrateArticles(connection_source, session, batch_size).migrate()
    MigrateBooks(connection_source, session, batch_size).migrate()
    MigrateVersions(connection_source, session, batch_size).migrate()
    MigrateAssociations(connection_source, session, batch_size).migrate()
    CreateClimbingSiteRoutes(connection_source, session, batch_size).migrate()
    SetRouteTitlePrefix(connection_source, session, batch_size).migrate()
    SetDefaultGeometries(connection_source, session, batch_size).migrate()
    MigrateAreaAssociations(connection_source, session, batch_size).migrate()
    MigrateMapAssociations(connection_source, session, batch_size).migrate()
    MigrateMailinglists(connection_source, session, batch_size).migrate()
    UpdateSequences(connection_source, session, batch_size).migrate()
    InitFeed(connection_source, session, batch_size).migrate()
    AnalyzeAllTables(connection_source, session, batch_size).migrate()
开发者ID:c2corg,项目名称:v6_api,代码行数:51,代码来源:migrate.py


示例18: __init__

    def __init__(self, base_conf={}, master_url=None, slaves_url=[], **kwargs):
        self.engine = engine_from_config(base_conf, prefix="sqlalchemy.", url=master_url, **kwargs)
        # self.engine = create_engine(master, **kwargs)
        self._master_session = _create_session(self.engine)
        self._slaves_session = []
        for slave in slaves_url:
            # slave = create_engine(slave, **kwargs)
            slave_engine = engine_from_config(base_conf, prefix="sqlalchemy.", url=slave, **kwargs)
            self._slaves_session.append(_create_session(slave_engine))

        if "pool_recycle" in kwargs:
            # ping db, so that mysql won't goaway
            PeriodicCallback(self._ping_db, kwargs["pool_recycle"] * 1000).start()

        signals.call_finished.connect(self._remove)  # 注册信号,请求结束后remove
开发者ID:Nilom,项目名称:torngas,代码行数:15,代码来源:dbalchemy.py


示例19: setUpClass

    def setUpClass(cls):
        from sqlalchemy import engine_from_config

        engine = engine_from_config({'url': 'sqlite://'}, prefix='')

        qry = open('monasca_api/tests/sqlite_alarm.sql', 'r').read()
        sconn = engine.raw_connection()
        c = sconn.cursor()
        c.executescript(qry)
        sconn.commit()
        c.close()
        cls.engine = engine

        def _fake_engine_from_config(*args, **kw):
            return cls.engine
        cls.fixture = fixtures.MonkeyPatch(
            'sqlalchemy.create_engine', _fake_engine_from_config)
        cls.fixture.setUp()

        metadata = MetaData()
        cls.nm = models.create_nm_model(metadata)
        cls._delete_nm_query = delete(cls.nm)
        cls._insert_nm_query = (insert(cls.nm)
                                .values(
                                    id=bindparam('id'),
                                    tenant_id=bindparam('tenant_id'),
                                    name=bindparam('name'),
                                    type=bindparam('type'),
                                    address=bindparam('address'),
                                    created_at=bindparam('created_at'),
                                    updated_at=bindparam('updated_at')))
开发者ID:Missxiaoguo,项目名称:monasca-api,代码行数:31,代码来源:test_nm_repository.py


示例20: run_migrations_online

def run_migrations_online():
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """

    # this callback is used to prevent an auto-migration from being generated
    # when there are no changes to the schema
    # reference: http://alembic.readthedocs.org/en/latest/cookbook.html
    def process_revision_directives(context, revision, directives):
        if getattr(config.cmd_opts, 'autogenerate', False):
            script = directives[0]
            if script.upgrade_ops.is_empty():
                directives[:] = []
                logger.info('No changes in schema detected.')

    engine = engine_from_config(config.get_section(config.config_ini_section),
                                prefix='sqlalchemy.',
                                poolclass=pool.NullPool)

    connection = engine.connect()
    context.configure(connection=connection,
                      target_metadata=target_metadata,
                      process_revision_directives=process_revision_directives,
                      **current_app.extensions['migrate'].configure_args)

    try:
        with context.begin_transaction():
            context.run_migrations()
    finally:
        connection.close()
开发者ID:yehiaa,项目名称:clinic,代码行数:33,代码来源:env.py



注:本文中的sqlalchemy.engine_from_config函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.exists函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.distinct函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap