本文整理汇总了Python中sqlalchemy.engine_from_config函数的典型用法代码示例。如果您正苦于以下问题:Python engine_from_config函数的具体用法?Python engine_from_config怎么用?Python engine_from_config使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了engine_from_config函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: load_environment
def load_environment(global_conf, app_conf):
"""Configure the Pylons environment via the ``pylons.config``
object
"""
# Pylons paths
root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
paths = dict(root=root,
controllers=os.path.join(root, 'controllers'),
static_files=os.path.join(root, 'public'),
templates=[os.path.join(root, 'templates')])
# Initialize config with the basic options
config.init_app(global_conf, app_conf, package='mapclient', paths=paths)
config['routes.map'] = make_map()
config['pylons.app_globals'] = app_globals.Globals()
config['pylons.h'] = mapclient.lib.helpers
# Create the Jinja2 Environment
config['pylons.app_globals'].jinja2_env = Environment(loader=ChoiceLoader(
[FileSystemLoader(path) for path in paths['templates']]))
# Jinja2's unable to request c's attributes without strict_c
config['pylons.strict_c'] = True
# Setup the SQLAlchemy database engine
engine = engine_from_config(config, 'mapclient.sqlalchemy.')
engine_ckan_data = engine_from_config(config, 'vectorstore.sqlalchemy.')
init_model(engine, engine_ckan_data)
开发者ID:PublicaMundi,项目名称:MapClient,代码行数:28,代码来源:environment.py
示例2: on_celeryd_init
def on_celeryd_init(**kw):
"""
Initializes application resources when the Celery daeomon starts
"""
settings = app.settings
redisurl = six.moves.urllib.parse.urlparse(settings['redis.url'])
app.redis = redis.StrictRedis(
host=redisurl.hostname,
port=redisurl.port,
db=os.path.basename(redisurl.path)
)
app.userid = settings['celery.blame']
# Attempt to add the user via raw engine connection, using the scoped
# session leaves it in a dangerous non-thread-local state as we're
# still in the parent setup process
throw_away_engine = sa.engine_from_config(settings, 'occams.db.')
with throw_away_engine.begin() as connection:
try:
connection.execute(models.User.__table__.insert(), key=app.userid)
except sa.exc.IntegrityError:
pass
throw_away_engine.dispose()
# Configure the session with an untainted engine
engine = sa.engine_from_config(settings, 'occams.db.')
Session.configure(bind=engine)
开发者ID:davidmote,项目名称:occams,代码行数:29,代码来源:celery.py
示例3: setup_sqlalchemy
def setup_sqlalchemy(self):
"""Setup SQLAlchemy database engine.
The most common reason for modifying this method is to add
multiple database support. To do this you might modify your
app_cfg.py file in the following manner::
from tg.configuration import AppConfig, config
from myapp.model import init_model
# add this before base_config =
class MultiDBAppConfig(AppConfig):
def setup_sqlalchemy(self):
'''Setup SQLAlchemy database engine(s)'''
from sqlalchemy import engine_from_config
engine1 = engine_from_config(config, 'sqlalchemy.first.')
engine2 = engine_from_config(config, 'sqlalchemy.second.')
# engine1 should be assigned to sa_engine as well as your first engine's name
config['tg.app_globals'].sa_engine = engine1
config['tg.app_globals'].sa_engine_first = engine1
config['tg.app_globals'].sa_engine_second = engine2
# Pass the engines to init_model, to be able to introspect tables
init_model(engine1, engine2)
#base_config = AppConfig()
base_config = MultiDBAppConfig()
This will pull the config settings from your .ini files to create the necessary
engines for use within your application. Make sure you have a look at :ref:`multidatabase`
for more information.
"""
from sqlalchemy import engine_from_config
balanced_master = config.get('sqlalchemy.master.url')
if not balanced_master:
engine = engine_from_config(config, 'sqlalchemy.')
else:
engine = engine_from_config(config, 'sqlalchemy.master.')
config['balanced_engines'] = {'master':engine,
'slaves':{},
'all':{'master':engine}}
all_engines = config['balanced_engines']['all']
slaves = config['balanced_engines']['slaves']
for entry in config.keys():
if entry.startswith('sqlalchemy.slaves.'):
slave_path = entry.split('.')
slave_name = slave_path[2]
if slave_name == 'master':
raise TGConfigError('A slave node cannot be named master')
slave_config = '.'.join(slave_path[:3])
all_engines[slave_name] = slaves[slave_name] = engine_from_config(config, slave_config+'.')
if not config['balanced_engines']['slaves']:
raise TGConfigError('When running in balanced mode your must specify at least a slave node')
# Pass the engine to initmodel, to be able to introspect tables
config['tg.app_globals'].sa_engine = engine
self.package.model.init_model(engine)
开发者ID:mabodo,项目名称:MapReduceWords,代码行数:60,代码来源:app_config.py
示例4: main
def main(global_config, **settings):
""" This function returns a Pyramid WSGI application.
"""
if 'DATABASE_URL' in os.environ:
settings['sqlalchemy.url'] = os.environ['DATABASE_URL']
engine = engine_from_config(settings, 'sqlalchemy.')
auth_secret = os.environ.get('AUTH_SECRET', 'secret')
authn_policy = AuthTktAuthenticationPolicy(secret=auth_secret,
hashalg='sha256')
authz_policy = ACLAuthorizationPolicy()
engine = engine_from_config(settings, 'sqlalchemy.')
DBSession.configure(bind=engine)
Base.metadata.bind = engine
config = Configurator(
settings=settings,
root_factory=MyRoot)
config.set_session_factory(SignedCookieSessionFactory('seekrit'))
config.set_authentication_policy(authn_policy)
config.set_authorization_policy(authz_policy)
config.include('pyramid_jinja2')
config.add_static_view('static', 'static', cache_max_age=3600)
config.add_route('index', '/')
config.add_route('add', '/add')
config.add_route('entry', '/entries/{id:\d+}')
config.add_route('edit', '/entries/{id:\d+}/edit')
config.add_route('login', '/login')
config.add_route('logout', '/logout')
# config.add_route('add_json', '/add_json')
# config.add_route('entry_json', '/entry_json')
config.scan()
return config.make_wsgi_app()
开发者ID:cewing,项目名称:learning-journal-1,代码行数:31,代码来源:__init__.py
示例5: create_sqlalchemy_engine
def create_sqlalchemy_engine(conf):
from sqlalchemy import engine_from_config
balanced_master = conf.get('sqlalchemy.master.url')
if not balanced_master:
engine = engine_from_config(conf, 'sqlalchemy.')
else:
engine = engine_from_config(conf, 'sqlalchemy.master.')
conf['balanced_engines'] = {'master': engine,
'slaves': {},
'all': {'master': engine}}
all_engines = conf['balanced_engines']['all']
slaves = conf['balanced_engines']['slaves']
for entry in conf.keys():
if entry.startswith('sqlalchemy.slaves.'):
slave_path = entry.split('.')
slave_name = slave_path[2]
if slave_name == 'master':
raise TGConfigError('A slave node cannot be named master')
slave_config = '.'.join(slave_path[:3])
all_engines[slave_name] = slaves[slave_name] = engine_from_config(conf, slave_config + '.')
if not conf['balanced_engines']['slaves']:
raise TGConfigError('When running in balanced mode your must specify at least a slave node')
return engine
开发者ID:TurboGears,项目名称:tg2,代码行数:26,代码来源:sqlalchemy.py
示例6: main
def main(argv=sys.argv):
settings_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'migration.ini')
settings = get_appsettings(settings_file)
engine_target = engine_from_config(settings, 'sqlalchemy_target.')
engine_source = engine_from_config(settings, 'sqlalchemy_source.')
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
# create a fresh schema on the target database
Session = sessionmaker(extension=ZopeTransactionExtension()) # noqa
session = Session(bind=engine_target)
Base.metadata.drop_all(engine_target, checkfirst=True)
setup_db(engine_target, session)
connection_source = engine_source.connect()
batch_size = 1000
MigrateUsers(connection_source, session, batch_size).migrate()
MigrateSummits(connection_source, session, batch_size).migrate()
MigrateParkings(connection_source, session, batch_size).migrate()
MigrateSites(connection_source, session, batch_size).migrate()
MigrateProducts(connection_source, session, batch_size).migrate()
MigrateHuts(connection_source, session, batch_size).migrate()
MigrateRoutes(connection_source, session, batch_size).migrate()
MigrateVersions(connection_source, session, batch_size).migrate()
UpdateSequences(connection_source, session, batch_size).migrate()
开发者ID:mfournier,项目名称:v6_api,代码行数:29,代码来源:migrate.py
示例7: run_migrations_online
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
if 'DB_URL' in os.environ:
engine = engine_from_config(
config.get_section("app:pyramidapp"),
prefix='sqlalchemy.',
url=os.environ['DB_URL'],
poolclass=pool.NullPool)
else:
engine = engine_from_config(
config.get_section("app:pyramidapp"),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
开发者ID:SabatierBoris,项目名称:CecileWebSite,代码行数:30,代码来源:env.py
示例8: run_migrations_online
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# FIX for Postgres updates
url = config.get_section(config.config_ini_section).get("sqlalchemy.url")
driver = url.split(":")[0]
if driver == "postgresql+psycopg2":
engine = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
isolation_level="AUTOCOMMIT",
poolclass=pool.NullPool)
else:
engine = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
开发者ID:Andrew8305,项目名称:privacyidea,代码行数:34,代码来源:env.py
示例9: update_config
def update_config(self, config_):
#toolkit.add_template_directory(config_, 'templates')
#toolkit.add_public_directory(config_, 'public')
#toolkit.add_resource('fanstatic', 'jsondatastore')
if not 'ckanext.jsondatastore.write.url' in config_:
error_message = 'jsonckan.datastore.write_url not found in config'
raise JsonDatastoreError(error_message)
ckan_db = config_['sqlalchemy.url']
self.write_url = config_['ckanext.jsondatastore.write.url']
if self.write_url == ckan_db:
raise JsonDatastoreError('The json datastore url cannot be the '
'same as the main ckan database')
self.write_engine = sqlalchemy.engine_from_config(
config_,
prefix='ckanext.jsondatastore.write.',
client_encoding='utf8',
)
self.read_engine = sqlalchemy.engine_from_config(
config_,
prefix='ckanext.jsondatastore.read.',
client_encoding='utf8',
)
开发者ID:joetsoi,项目名称:ckanext-jsondatastore,代码行数:28,代码来源:plugin.py
示例10: setup_sqlalchemy
def setup_sqlalchemy(self):
from sqlalchemy import engine_from_config
engine1=engine_from_config(pylons_config, 'sqlalchemy.first.')
engine2=engine_from_config(pylons_config, 'sqlalchemy.second.')
config['pylons.app_globals'].sa_engine=engine1
config['pylons.app_globals'].sa_engine_first=engine1
config['pylons.app_globals'].sa_engine_second=engine2
init_model(engine1, engine2)
开发者ID:LamCiuLoeng,项目名称:vat,代码行数:8,代码来源:app_cfg.py
示例11: load_environment
def load_environment(global_conf, app_conf):
"""Configure the Pylons environment via the ``pylons.config`` object
"""
# Pylons paths
root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
paths = dict(root=root,
controllers=os.path.join(root, 'controllers'),
static_files=os.path.join(root, 'public'),
templates=[os.path.join(root, 'templates')])
# Initialize config with the basic options
config.init_app(
global_conf, app_conf, package='onlinelinguisticdatabase', paths=paths
)
config['routes.map'] = make_map()
config['pylons.app_globals'] = app_globals.Globals()
config['pylons.h'] = onlinelinguisticdatabase.lib.helpers
# Create the Mako TemplateLookup, with the default auto-escaping
config['pylons.app_globals'].mako_lookup = TemplateLookup(
directories=paths['templates'],
error_handler=handle_mako_error,
module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
input_encoding='utf-8', default_filters=['escape'],
imports=['from webhelpers.html import escape'])
# Setup the SQLAlchemy database engine
# Modification: check if SQLite is RDBMS and, if so,
# give the engine a SQLiteSetup listener which
# provides the regexp function missing from the SQLite dbapi
# (cf. http://groups.google.com/group/pylons-discuss/browse_thread/thread/8c82699e6b6a400c/5c5237c86202e2b8)
SQLAlchemyURL = config['sqlalchemy.url']
rdbms = SQLAlchemyURL.split(':')[0]
if rdbms == 'sqlite':
engine = engine_from_config(
config, 'sqlalchemy.', listeners=[SQLiteSetup()])
else:
engine = engine_from_config(config, 'sqlalchemy.')
init_model(engine)
# Put the application settings into the app_globals object
# This has the effect that when the app is restarted the globals like
# objectLanguageName, metalanguageName, etc. have the correct values
# Do the same for the variable app_globals attributes, e.g., sources list
# I HAD TO DISABLE THE FOLLOWING TWO COMMANDS BECAUSE IT WAS CAUSING
# setup-app TO CRASH BECAUSE application_settings WAS REQUESTED BEFORE THE
# TABLES EXISTED! FIND ANOTHER WAY TO FIX THIS PROBLEM ...
#applicationSettingsToAppGlobals(config['pylons.app_globals'])
#updateSecondaryObjectsInAppGlobals(config['pylons.app_globals'])
# CONFIGURATION OPTIONS HERE (note: all config options will override
# any Pylons config options)
config['pylons.strict_c'] = True
开发者ID:jrwdunham,项目名称:old-webapp,代码行数:56,代码来源:environment.py
示例12: run_migrations_online
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# for the direct-to-DB use case, start a transaction on all
# engines, then run all migrations, then commit all transactions.
engines = {'': {'engine': engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)}}
for name in bind_names:
engines[name] = rec = {}
rec['engine'] = engine_from_config(
context.config.get_section(name),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
for name, rec in engines.items():
engine = rec['engine']
rec['connection'] = conn = engine.connect()
if USE_TWOPHASE:
rec['transaction'] = conn.begin_twophase()
else:
rec['transaction'] = conn.begin()
try:
for name, rec in engines.items():
logger.info("Migrating database %s" % (name or '<default>'))
context.configure(
connection=rec['connection'],
upgrade_token="%s_upgrades" % name,
downgrade_token="%s_downgrades" % name,
user_module_prefix="application.models._external_types.",
target_metadata=get_metadata(name)
)
context.run_migrations(engine_name=name)
if USE_TWOPHASE:
for rec in engines.values():
rec['transaction'].prepare()
for rec in engines.values():
rec['transaction'].commit()
except:
for rec in engines.values():
rec['transaction'].rollback()
raise
finally:
for rec in engines.values():
rec['connection'].close()
开发者ID:minhoryang,项目名称:marrybird-api,代码行数:56,代码来源:env.py
示例13: __init__
def __init__(self, *args):
super(TestDBActions, self).__init__(*args)
pconfig['pylons.g'] = Mock()
pconfig['pylons.g'].sa_engine = engine_from_config(config,
prefix = 'sqlalchemy.reflect.'
)
self.memengine = engine_from_config(config,
prefix = 'sqlalchemy.default.')
from masterapp import model
self.model = model
model.metadata.bind = self.memengine
model.Session.configure(bind=self.memengine)
开发者ID:JustinTulloss,项目名称:harmonize.fm,代码行数:12,代码来源:test_actions.py
示例14: setup_db_engine
def setup_db_engine(settings):
cachedir = settings.get("db.cachedir")
regions = []
for region in settings.get("db.cacheregions", "").split(" "):
regions.append(region.split(":"))
if cachedir:
init_cache(cachedir, regions)
if settings.get("app.mode") == "testing":
return engine_from_config(settings, 'sqlalchemy.',
poolclass=StaticPool)
else:
return engine_from_config(settings, 'sqlalchemy.')
开发者ID:mjsorribas,项目名称:ringo,代码行数:12,代码来源:db.py
示例15: main
def main(argv=sys.argv):
if len(argv) < 2:
usage(argv)
config_uri = argv[1]
options = parse_vars(argv[2:])
setup_logging(config_uri)
settings = get_appsettings(config_uri, options=options)
if 'DATABASE_URL' in os.environ:
settings['sqlalchemy.url'] = os.environ['DATABASE_URL']
engine = engine_from_config(settings, 'sqlalchemy.')
engine = engine_from_config(settings, 'sqlalchemy.')
DBSession.configure(bind=engine)
Base.metadata.create_all(engine)
开发者ID:scotist,项目名称:learning-journal-1,代码行数:13,代码来源:initializedb.py
示例16: main
def main(argv=sys.argv):
if len(argv) != 2:
usage(argv)
config_uri = argv[1]
setup_logging(config_uri)
settings = get_appsettings(config_uri)
os.environ['PYJASPER_SERVLET_URL'] = settings['jasper_url']
bootstrap(config_uri)
engine = engine_from_config(settings, 'sqlalchemy.')
other_engine = engine_from_config(settings, 'othersql.')
Base.metadata.bind = engine
OtherBase.metadata.bind = other_engine
from ..models.esppt_models import (
esNopModel,
esRegModel,
spptModel,
)
from ..views.es_reports import GenerateSppt
DBSession.configure(bind=engine)
q = DBSession.query(esNopModel, esRegModel).filter(
esNopModel.es_reg_id == esRegModel.id)
q = q.filter(esNopModel.email_sent == 0)
for r_nop, r_reg in q:
nop = get_nop(r_nop)
q = spptModel.get_by_nop_thn(nop, r_nop.tahun)
sppt = q.first()
if not sppt:
continue
nilai = thousand(sppt.pbb_yg_harus_dibayar_sppt)
g = GenerateSppt(nop, r_nop.tahun, r_reg.kode)
#USER_ID) updated menggunakan password dari user yang ada di reg.kode aagusti
sppt_file = g.sppt_file
e_filename = os.path.split(sppt_file)[-1]
f = open(sppt_file)
content = f.read()
f.close()
e_content = base64.encodestring(content)
e_subject = EMAIL_SUBJECT.format(nop=nop, tahun=r_nop.tahun)
e_body = EMAIL_BODY.format(nama_wp=sppt.nm_wp_sppt, nop=nop,
tahun=r_nop.tahun, nilai=nilai)
files = [(e_filename, e_content)]
print('To: {name} <{email}>'.format(name=sppt.nm_wp_sppt,
email=r_reg.email))
print('Subject: {s}'.format(s=e_subject))
print('Body: {s}'.format(s=e_body))
print('File: {s}'.format(s=e_filename))
r_nop.email_sent = 1
flush(r_nop)
send(r_reg.email, sppt.nm_wp_sppt, e_subject, e_body, files,
settings['email_pengirim'])
transaction.commit()
开发者ID:aagusti,项目名称:e-sppt,代码行数:51,代码来源:send_email.py
示例17: main
def main(argv=sys.argv):
alembic_configfile = os.path.realpath(os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'../../../alembic.ini'))
alembic_config = Config(alembic_configfile)
settings_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'migration.ini')
settings = get_appsettings(settings_file)
engine_target = engine_from_config(settings, 'sqlalchemy_target.')
engine_source = engine_from_config(settings, 'sqlalchemy_source.')
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
Session = sessionmaker(extension=ZopeTransactionExtension()) # noqa
session = Session(bind=engine_target)
# set up the target database
setup_db(alembic_config, session)
connection_source = engine_source.connect()
batch_size = 1000
MigrateAreas(connection_source, session, batch_size).migrate()
MigrateUserProfiles(connection_source, session, batch_size).migrate()
MigrateUsers(connection_source, session, batch_size).migrate()
MigrateSummits(connection_source, session, batch_size).migrate()
MigrateParkings(connection_source, session, batch_size).migrate()
MigrateSites(connection_source, session, batch_size).migrate()
MigrateProducts(connection_source, session, batch_size).migrate()
MigrateHuts(connection_source, session, batch_size).migrate()
MigrateRoutes(connection_source, session, batch_size).migrate()
MigrateMaps(connection_source, session, batch_size).migrate()
MigrateOutings(connection_source, session, batch_size).migrate()
MigrateImages(connection_source, session, batch_size).migrate()
MigrateXreports(connection_source, session, batch_size).migrate()
MigrateArticles(connection_source, session, batch_size).migrate()
MigrateBooks(connection_source, session, batch_size).migrate()
MigrateVersions(connection_source, session, batch_size).migrate()
MigrateAssociations(connection_source, session, batch_size).migrate()
CreateClimbingSiteRoutes(connection_source, session, batch_size).migrate()
SetRouteTitlePrefix(connection_source, session, batch_size).migrate()
SetDefaultGeometries(connection_source, session, batch_size).migrate()
MigrateAreaAssociations(connection_source, session, batch_size).migrate()
MigrateMapAssociations(connection_source, session, batch_size).migrate()
MigrateMailinglists(connection_source, session, batch_size).migrate()
UpdateSequences(connection_source, session, batch_size).migrate()
InitFeed(connection_source, session, batch_size).migrate()
AnalyzeAllTables(connection_source, session, batch_size).migrate()
开发者ID:c2corg,项目名称:v6_api,代码行数:51,代码来源:migrate.py
示例18: __init__
def __init__(self, base_conf={}, master_url=None, slaves_url=[], **kwargs):
self.engine = engine_from_config(base_conf, prefix="sqlalchemy.", url=master_url, **kwargs)
# self.engine = create_engine(master, **kwargs)
self._master_session = _create_session(self.engine)
self._slaves_session = []
for slave in slaves_url:
# slave = create_engine(slave, **kwargs)
slave_engine = engine_from_config(base_conf, prefix="sqlalchemy.", url=slave, **kwargs)
self._slaves_session.append(_create_session(slave_engine))
if "pool_recycle" in kwargs:
# ping db, so that mysql won't goaway
PeriodicCallback(self._ping_db, kwargs["pool_recycle"] * 1000).start()
signals.call_finished.connect(self._remove) # 注册信号,请求结束后remove
开发者ID:Nilom,项目名称:torngas,代码行数:15,代码来源:dbalchemy.py
示例19: setUpClass
def setUpClass(cls):
from sqlalchemy import engine_from_config
engine = engine_from_config({'url': 'sqlite://'}, prefix='')
qry = open('monasca_api/tests/sqlite_alarm.sql', 'r').read()
sconn = engine.raw_connection()
c = sconn.cursor()
c.executescript(qry)
sconn.commit()
c.close()
cls.engine = engine
def _fake_engine_from_config(*args, **kw):
return cls.engine
cls.fixture = fixtures.MonkeyPatch(
'sqlalchemy.create_engine', _fake_engine_from_config)
cls.fixture.setUp()
metadata = MetaData()
cls.nm = models.create_nm_model(metadata)
cls._delete_nm_query = delete(cls.nm)
cls._insert_nm_query = (insert(cls.nm)
.values(
id=bindparam('id'),
tenant_id=bindparam('tenant_id'),
name=bindparam('name'),
type=bindparam('type'),
address=bindparam('address'),
created_at=bindparam('created_at'),
updated_at=bindparam('updated_at')))
开发者ID:Missxiaoguo,项目名称:monasca-api,代码行数:31,代码来源:test_nm_repository.py
示例20: run_migrations_online
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.readthedocs.org/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, 'autogenerate', False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info('No changes in schema detected.')
engine = engine_from_config(config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
**current_app.extensions['migrate'].configure_args)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
开发者ID:yehiaa,项目名称:clinic,代码行数:33,代码来源:env.py
注:本文中的sqlalchemy.engine_from_config函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论