本文整理汇总了Python中sqlalchemy.orm.sessionmaker函数的典型用法代码示例。如果您正苦于以下问题:Python sessionmaker函数的具体用法?Python sessionmaker怎么用?Python sessionmaker使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了sessionmaker函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: convert_all
def convert_all():
"""
Convert all images from SQLite to MySQL
:return:
"""
# setup engines to each database
mysql_engine = create_engine('mysql+pymysql://root:[email protected]:3306/plane_viewer')
sqlite_engine = create_engine('sqlite:///images.sqlite3')
# setup session for each database
mysqlsessionamaker = sessionmaker(bind=mysql_engine)
sqlitesessionmaker = sessionmaker(bind=sqlite_engine)
mysql_session = mysqlsessionamaker()
sqlite_session = sqlitesessionmaker()
# retrieve all images from SQLite
sqlite_images = sqlite_session.query(SqlitePlaneImage).all()
# convert all images from SQLite to MySQL and attempt to load them, ignoring duiplicates
for image in sqlite_images:
mysql_image = convert_image_object(image)
mysql_session.add(mysql_image)
try:
mysql_session.commit()
print('Loaded', image)
except exc.IntegrityError:
mysql_session.rollback()
print("Image {} already loaded".format(image))
mysql_images = mysql_session.query(MySqlPlaneImage).all()
print(len(mysql_images))
开发者ID:willwagner602,项目名称:PlaneViewer,代码行数:34,代码来源:DatabaseMigration.py
示例2: edit_category
def edit_category(category_id):
if request.method == "GET":
DBSession = sessionmaker(bind=engine)
session = DBSession()
try:
result = session.query(Category).filter_by(id=category_id).one()
return render_template("edit_category.html", category=result)
except NoResultFound:
abort(404)
finally:
session.close()
elif request.method == "POST":
DBSession = sessionmaker(bind=engine)
session = DBSession()
try:
result = session.query(Category).filter_by(id=category_id).one()
result.name = request.form["name"]
result.description = request.form["description"]
session.add(result)
session.commit()
return redirect(url_for("get_category", category_id=category_id), code=302)
except NoResultFound:
abort(404)
finally:
session.close()
return "test"
else:
abort(400)
开发者ID:chenlu2015,项目名称:fullstack-nanodegree-vm,代码行数:29,代码来源:application.py
示例3: init_sqlalchemy
def init_sqlalchemy(self, scheme, connection):
try:
import sqlalchemy
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker
from torweb.db import CacheQuery
import _mysql_exceptions
from sqlalchemy import event
from sqlalchemy.exc import DisconnectionError
def my_on_checkout(dbapi_conn, connection_rec, connection_proxy):
try:
dbapi_conn.cursor().execute('select now()')
except _mysql_exceptions.OperationalError:
raise DisconnectionError
engine = create_engine(
connection,
convert_unicode=True,
encoding="utf-8",
pool_recycle=3600*7,
#echo_pool=True,
echo=False,
)
event.listen(engine, 'checkout', my_on_checkout)
metadata = MetaData(bind=engine)
session = scoped_session(sessionmaker(bind=engine, query_cls=CacheQuery))
sqlalchemy_sessions = [session]
DB_Session = sessionmaker(bind=engine)
return {"metadata":metadata, "session":session, "sqlalchemy_sessions":sqlalchemy_sessions}
#setattr(self.app, 'metadata', metadata)
#setattr(self.app, scheme.get('sqlalchemy', 'session'), session)
#setattr(self.app, 'sqlalchemy_sessions', sqlalchemy_sessions)
except Exception as e:
print e
开发者ID:finalbattle,项目名称:torweb,代码行数:34,代码来源:config.py
示例4: test_all_htmlpopups
def test_all_htmlpopups(self):
from chsdi.models import models_from_name
from chsdi.models.bod import LayersConfig
from sqlalchemy import distinct
from sqlalchemy.orm import scoped_session, sessionmaker
val = True
DBSession = scoped_session(sessionmaker())
query = DBSession.query(distinct(LayersConfig.idBod)).filter(LayersConfig.queryable == val).filter(LayersConfig.staging == 'prod')
# Get a list of all the queryable layers
layers = [q[0] for q in query]
DBSession.close()
# Get a list of feature ids
features = []
for layer in layers:
try:
model = models_from_name(layer)[0]
DBSession = scoped_session(sessionmaker())
query = DBSession.query(model.primary_key_column()).limit(1)
ID = [q[0] for q in query]
# If tables are empty ID is an empty list
if ID:
features.append((layer, str(ID[0])))
DBSession.close()
except Exception as e:
print e
finally:
DBSession.close()
for f in features:
for lang in ('de', 'fr', 'it', 'rm', 'en'):
self.testapp.get('/rest/services/all/MapServer/%s/%s/htmlPopup' % (f[0], f[1]), params={'callback': 'cb', 'lang': '%s' % lang}, status=200)
开发者ID:jesseeichar,项目名称:mf-chsdi3,代码行数:30,代码来源:test_mapservice.py
示例5: test_all_htmlpopups
def test_all_htmlpopups(self):
from chsdi.models import models_from_name
from chsdi.models.bod import LayersConfig
from sqlalchemy import distinct
from sqlalchemy.orm import scoped_session, sessionmaker
val = True
DBSession = scoped_session(sessionmaker())
valnone = None
query = DBSession.query(distinct(LayersConfig.layerBodId)).filter(LayersConfig.staging == 'prod').filter(LayersConfig.queryable == val).filter(LayersConfig.parentLayerId == valnone)
features = []
try:
for layer in getLayers(query):
try:
FeatDBSession = scoped_session(sessionmaker())
models = models_from_name(layer)
self.failUnless(models is not None and len(models) > 0, layer)
model = models[0]
query = FeatDBSession.query(model.primary_key_column()).limit(1)
ID = [q[0] for q in query]
if ID:
features.append((layer, str(ID[0])))
finally:
FeatDBSession.close()
finally:
DBSession.close()
for f in features:
for lang in ('de', 'fr', 'it', 'rm', 'en'):
link = '/rest/services/all/MapServer/' + f[0] + '/' + f[1] + '/htmlPopup?callback=cb&lang=' + lang
resp = self.testapp.get(link)
self.failUnless(resp.status_int == 200, link)
开发者ID:qbns,项目名称:mf-chsdi3,代码行数:31,代码来源:test_mapservice.py
示例6: setup_database
def setup_database(self):
# DB library imports
from sqlalchemy import (create_engine, Table, MetaData, Column, Integer,
String, Unicode, Text, UnicodeText, Date, Numeric,
Time, Float, DateTime, Interval, Binary, Boolean,
PickleType)
from sqlalchemy.orm import sessionmaker, mapper
# Create global name mappings for model()
global column_mapping
column_mapping = {
'string': String, 'str': String,
'integer': Integer, 'int': Integer,
'unicode': Unicode, 'text': Text,
'unicodetext': UnicodeText, 'date': Date,
'numeric': Numeric, 'time': Time,
'float': Float, 'datetime': DateTime,
'interval': Interval, 'binary': Binary,
'boolean': Boolean, 'bool': Boolean,
'pickletype': PickleType,
}
# Add a few SQLAlchemy types to globals() so we can use them in models
globals().update({'Column': Column, 'Table': Table, 'Integer': Integer,
'MetaData': MetaData, 'mapper': mapper})
# Ensures correct slash number for sqlite
if self.config['db_type'] == 'sqlite':
self.config['db_location'] = '/' + self.config['db_location']
eng_name = self.config['db_type'] + '://' + self.config['db_location']
self.config['db_engine'] = create_engine(eng_name)
self.config['db_session'] = sessionmaker(bind=self.config['db_engine'])()
elif self.config['db_type'] == 'postgres':
eng_name = self.config['db_type'] + '://' + self.config['db_location']
self.config['db_engine'] = create_engine(eng_name, encoding="utf8", convert_unicode=True)
self.config['db_session'] = sessionmaker(bind=self.config['db_engine'])()
开发者ID:malaniz,项目名称:webyarara,代码行数:34,代码来源:yarara.py
示例7: __init__
def __init__(self, dbname='xraydata.db', read_only=True):
"connect to an existing database"
if not os.path.exists(dbname):
parent, _ = os.path.split(__file__)
dbname = os.path.join(parent, dbname)
if not os.path.exists(dbname):
raise IOError("Database '%s' not found!" % dbname)
if not isxrayDB(dbname):
raise ValueError("'%s' is not a valid X-ray Database file!" % dbname)
self.dbname = dbname
self.engine = make_engine(dbname)
self.conn = self.engine.connect()
kwargs = {}
if read_only:
kwargs = {'autoflush': True, 'autocommit':False}
def readonly_flush(*args, **kwargs):
return
self.session = sessionmaker(bind=self.engine, **kwargs)()
self.session.flush = readonly_flush
else:
self.session = sessionmaker(bind=self.engine, **kwargs)()
self.metadata = MetaData(self.engine)
self.metadata.reflect()
self.tables = self.metadata.tables
self.atomic_symbols = [e.element for e in self.tables['elements'].select(
).execute().fetchall()]
开发者ID:xraypy,项目名称:xraylarch,代码行数:30,代码来源:xraydb.py
示例8: __init__
def __init__(self, name=None):
if not name:
name = 'default'
data = {}
if hasattr(settings, 'DATABASES') and \
name in settings.DATABASES:
data = settings.DATABASES[name]
if data.get('ENGINE', '') == '':
raise ImproperlyConfigured('''\
The database ORM connection requires, at minimum, an engine type.''')
if '.' in data['ENGINE']:
data['ENGINE'] = data['ENGINE'].rsplit('.', 1)[-1]
# django needs sqlite3 but sqlalchemy references sqlite
if data['ENGINE'] == 'sqlite3':
data['ENGINE'] = 'sqlite'
self.engine = self._create_engine(data)
ro_values = dict([(k[9:], v) for k, v in data.iteritems()
if k.startswith('READONLY_')])
if len(ro_values):
ro_data = dict(data)
ro_data.update(ro_values)
self.readonly_engine = self._create_engine(ro_data)
self.metadata = MetaData(self.engine)
self.Base = declarative_base(metadata=self.metadata)
if hasattr(self, 'readonly_engine'):
self._readonly_sessionmaker = \
scoped_session(sessionmaker(bind=self.readonly_engine))
self._sessionmaker = scoped_session(sessionmaker(bind=self.engine))
开发者ID:saebyn,项目名称:baph,代码行数:30,代码来源:orm.py
示例9: __init__
def __init__(self, dburi):
self.engine = create_engine(dburi, echo=False, pool_recycle=3600, echo_pool=True,
connect_args={'check_same_thread':False})
Session = sessionmaker(bind=self.engine)
self.session = Session()
Base.metadata.create_all(self.engine, checkfirst=True)
self.session_factory = sessionmaker(bind=self.engine)
开发者ID:movb,项目名称:isrecorder,代码行数:7,代码来源:db.py
示例10: create_session
def create_session():
# cloud_usage
engine_csu = create_engine(config.get('main', 'cloud_usage_uri'))
metadata_csu = MetaData(engine_csu)
account_csu = Table('account', metadata_csu, autoload=True)
mapper(AccountCsU, account_csu)
usage_event_csu = Table('usage_event', metadata_csu, autoload=True)
mapper(UsageEventCsU, usage_event_csu)
cloud_usage_csu = Table('cloud_usage', metadata_csu, autoload=True)
mapper(CloudUsageCsU, cloud_usage_csu)
# cloud database
engine_cs = create_engine(config.get('main', 'cloud_uri'))
metadata_cs = MetaData(engine_cs)
account_cs = Table('account', metadata_cs, autoload=True)
mapper(AccountCs, account_cs)
disk_offering_cs = Table('disk_offering', metadata_cs, autoload=True)
mapper(DiskOfferingCs, disk_offering_cs)
template_cs = Table('vm_template', metadata_cs, autoload=True)
mapper(TemplateCs, template_cs)
guest_os_cs = Table('guest_os', metadata_cs, autoload=True)
mapper(GuestOSCs, guest_os_cs)
SessionCs = sessionmaker(bind=engine_cs)
session_cs = SessionCs()
SessionCsU = sessionmaker(bind=engine_csu)
session_csu = SessionCsU()
# setup our dedicated model
engine = create_engine(config.get('main', 'rbc_usage_uri'))
Session = sessionmaker(bind=engine)
session = Session()
return (session_csu, session_cs, session)
开发者ID:redbridge,项目名称:rbc-usage,代码行数:32,代码来源:cloudusage.py
示例11: copy_stations_to_sqlite
def copy_stations_to_sqlite(src_dsn, dest_dsn):
src_sesh = sessionmaker(bind=create_engine(src_dsn))()
dest_sesh = sessionmaker(bind=create_engine(dest_dsn))()
net = Network(name='MoTIe')
dest_sesh.add(net)
dest_sesh.flush()
q = src_sesh.query(Station).join(History).join(
Network).filter(Network.name == 'MoTIe')
for stn in q.all():
histories = [History(station_name=hist.station_name)
for hist in stn.histories]
new_obj = Station(native_id=stn.native_id,
network_id=net.id, histories=histories)
dest_sesh.add(new_obj)
dest_sesh.commit()
q = src_sesh.query(Variable).join(Network).filter(Network.name == 'MoTIe')
for var in q.all():
v = Variable(name=var.name, standard_name=var.standard_name,
cell_method=var.cell_method, network_id=net.id,
unit=var.unit)
dest_sesh.add(v)
dest_sesh.commit()
开发者ID:pacificclimate,项目名称:crmprtd,代码行数:25,代码来源:moti_infill_insert.py
示例12: init_sqlalchemy
def init_sqlalchemy(settings):
# master
master_url = settings['sqlalchemy.url']
connect_kwargs = settings.get('sqlalchemy.connect_kwargs')
kwargs = {}
if connect_kwargs is not None:
if isinstance(connect_kwargs, str):
connect_kwargs = json.loads(connect_kwargs)
for k, v in connect_kwargs.items():
kwargs[k] = v
engine = create_engine(master_url, **kwargs)
sm = orm.sessionmaker(bind=engine, extension=ZopeTransactionExtension())
meta.Session = orm.scoped_session(sm)
meta.metadata.bind = engine
# slaves
slaves_url = settings.get('sqlalchemy.slaves', [])
slaves = []
for url in slaves_url:
slave = create_engine(url, **kwargs)
sm = orm.sessionmaker(bind=slave, extension=ZopeTransactionExtension())
slaves.append(orm.scoped_session(sm))
if slaves:
slave = random.choice(slaves)
meta.BaseObject.query = slave.query_property(orm.Query)
else:
meta.BaseObject.query = meta.Session.query_property(orm.Query)
开发者ID:lxneng,项目名称:easy_sqlalchemy,代码行数:29,代码来源:__init__.py
示例13: initialize_sql
def initialize_sql(settings):
global DBSession
global sqlalchemy_url
global DBSession
global Base
global engine
global Session
global session
global tmp_dir
global extended
sqlalchemy_url = settings['sqlalchemy.url']
DBSession = scoped_session(sessionmaker())
Base = declarative_base()
engine = create_engine(sqlalchemy_url)
Session = sessionmaker(bind=engine)
session = Session()
tmp_dir = settings['tmp_dir']
# Extended attributes
if settings['extended'] == 'true':
extended = True
else:
extended = False
DBSession.configure(bind=engine)
Base.metadata.bind = engine
开发者ID:lightbase,项目名称:WSCServer,代码行数:28,代码来源:__init__.py
示例14: test_merge_load
def test_merge_load(self):
Parent = self.classes.Parent
sess = sessionmaker()()
sess2 = sessionmaker()()
p1 = sess.query(Parent).get(1)
p1.children
# preloading of collection took this down from 1728 to 1192
# using sqlite3 the C extension took it back up to approx. 1257
# (py2.6)
@profiling.function_call_count(variance=0.10,
versions={'2.5':1050, '2.6':1050,
'2.6+cextension':1005,
'2.7':1005,
'3':1050}
)
def go():
p2 = sess2.merge(p1)
go()
# one more time, count the SQL
sess2 = sessionmaker(testing.db)()
self.assert_sql_count(testing.db, go, 2)
开发者ID:sleepsonthefloor,项目名称:sqlalchemy,代码行数:26,代码来源:test_orm.py
示例15: get_data
def get_data():
engine = db_connect()
create_deals_table(engine)
Session = sessionmaker(bind=engine)
sess = Session()
if request.method == 'POST':
search_code = request.form['naics']
if not search_code:
rfps_nasa = sess.query(RFP).all()
else:
rfps_nasa = sess.query(RFP).filter(RFP.NAICS_code == search_code).all()
else:
rfps_nasa = sess.query(RFP).all()
engine = db_connect()
create_deals_table(engine)
Session = sessionmaker(bind=engine)
sess = Session()
if request.method == 'POST':
search_code = request.form['naics']
if not search_code:
rfps_fbo = OpportunityDetail.query.all()
else:
code = NAICSChild.query.filter_by(code=search_code).first()
rfps_fbo = OpportunityDetail.query.filter_by(naics_code=code)\
.all()
else:
rfps_fbo = OpportunityDetail.query.all()
res = []
for r in rfps_nasa:
res.append(r.to_json())
for r in rfps_fbo:
res.append(r.to_json())
return str(res)
开发者ID:Ksynko,项目名称:RFP_spiders,代码行数:35,代码来源:main.py
示例16: test_taxa_to_count_and_push
def test_taxa_to_count_and_push(MergedataToUpload, engine, tbl_taxa, count, tbl_count, count_table):
count.fillna("NA", inplace=True)
uploading = MergedataToUpload(sessionmaker(bind=engine, autoflush=False))
uploading.merge_for_datatype_table_upload(
raw_dataframe=count,
formated_dataframe=tbl_count,
formated_dataframe_name="count",
raw_data_taxa_columns=["site", "genus", "species"],
uploaded_taxa_columns=["study_site_table_fkey", "genus", "species"],
)
Session = sessionmaker(bind=engine, autoflush=False)
session = Session()
tbl_count_query = select([count_table])
count_query_stm = session.execute(tbl_count_query)
count_query_df = pd.DataFrame(count_query_stm.fetchall())
count_query_df.columns = count_query_stm.keys()
session.close()
engine.dispose()
print(count_query_df)
assert (
tbl_count["count_observation"].values.tolist() == count_query_df["count_observation"].values.tolist()
) is True
assert (
[str(x) for x in tbl_count["spatial_replication_level_2"].values.tolist()]
== [str(x) for x in count_query_df["spatial_replication_level_2"].values.tolist()]
) is True
开发者ID:bibsian,项目名称:database-development,代码行数:29,代码来源:test_upload_popler3.py
示例17: __init__
def __init__( self, filename ):
echo = False
if self.DebugLevel > 2:
echo = True
engine = create_engine( 'sqlite:///' + filename, echo = echo )
metadata=Base.metadata
metadata.create_all(engine)
self.Session=sessionmaker()
self.Session.configure(bind=engine)
self.SessionInstance = self.Session()
dirname=os.path.dirname(filename)
self.SessionsInstances={}
for file_list in self.SessionInstance.query( FileList ).all():
filename=file_list.filename
if not os.path.isfile(filename):
filename=os.path.join(dirname,os.path.basename(file_list.filename))
engine = create_engine( 'sqlite:///' + filename, echo = echo )
metadata=Base.metadata
metadata.create_all(engine)
self.Session=sessionmaker()
self.Session.configure(bind=engine)
self.SessionsInstances[file_list.type] = self.Session()
if not self.SessionsInstances.has_key('Source'):
self.SessionsInstances['Source']=self.SessionInstance
if not self.SessionsInstances.has_key('Target'):
self.SessionsInstances['Target']=self.SessionInstance
开发者ID:LucaBongiorni,项目名称:DarunGrim,代码行数:33,代码来源:DarunGrimDatabase.py
示例18: test_taxa_to_biomass_and_push
def test_taxa_to_biomass_and_push(MergedataToUpload, engine, tbl_taxa, biomass, tbl_biomass, biomass_table):
biomass.fillna("NA", inplace=True)
uploading = MergedataToUpload(sessionmaker(bind=engine, autoflush=False))
uploading.merge_for_datatype_table_upload(
raw_dataframe=biomass,
formated_dataframe=tbl_biomass,
formated_dataframe_name="biomass",
raw_data_taxa_columns=["site", "phylum", "clss", "ordr", "family", "genus", "species"],
uploaded_taxa_columns=["study_site_table_fkey", "phylum", "clss", "ordr", "family", "genus", "species"],
)
Session = sessionmaker(bind=engine, autoflush=False)
session = Session()
tbl_biomass_query = select([biomass_table])
biomass_query_stm = session.execute(tbl_biomass_query)
biomass_query_df = pd.DataFrame(biomass_query_stm.fetchall())
biomass_query_df.columns = biomass_query_stm.keys()
session.close()
engine.dispose()
biomass_true_list = tbl_biomass["biomass_observation"].values.tolist()
biomass_test_list = biomass_query_df["biomass_observation"].values.tolist()
biomass_true_list = [float(x) for x in biomass_true_list]
biomass_test_list = [float(x) for x in biomass_test_list]
assert (biomass_true_list == biomass_test_list) is True
assert (
[str(x) for x in tbl_biomass["spatial_replication_level_2"].values.tolist()]
== [str(x) for x in biomass_query_df["spatial_replication_level_2"].values.tolist()]
) is True
开发者ID:bibsian,项目名称:database-development,代码行数:31,代码来源:test_upload_popler3.py
示例19: construct_maker
def construct_maker(databases, models=None, query_cls=Query, engine_params=None,
session_class=DBSession):
'''
databases - str with db uri or dict.
models - str name of models package (module), default is 'module'
query_cls - additional query class
engine_params - additional engine params
'''
models = models or 'models'
engine_params = engine_params or {}
db_dict = {}
if isinstance(databases, basestring):
engine = create_engine(databases, **engine_params)
return orm.sessionmaker(class_=session_class, query_cls=query_cls,
bind=engine, autoflush=False)
for ref, uri in databases.items():
md_ref = '.'.join(filter(None, [models, ref]))
metadata = import_string(md_ref, 'metadata')
engine = create_engine(uri, **engine_params)
#TODO: find out why this is broken since sqlalchemy 0.7
#engine.logger.logger.name += '(%s)' % ref
for table in metadata.sorted_tables:
db_dict[table] = engine
return orm.sessionmaker(class_=session_class, query_cls=query_cls, binds=db_dict,
autoflush=False)
开发者ID:riffm,项目名称:mage,代码行数:25,代码来源:sqla.py
示例20: test_taxa_to_percent_and_push
def test_taxa_to_percent_and_push(
MergedataToUpload, engine, tbl_taxa, percent, tbl_percent, percent_cover_table, convert_types, find_types
):
percent.fillna("NA", inplace=True)
uploading = MergedataToUpload(sessionmaker(bind=engine, autoflush=False))
uploading.merge_for_datatype_table_upload(
raw_dataframe=percent,
formated_dataframe=tbl_percent,
formated_dataframe_name="percent_cover",
raw_data_taxa_columns=["site", "code"],
uploaded_taxa_columns=["study_site_table_fkey", "sppcode"],
)
Session = sessionmaker(bind=engine, autoflush=False)
session = Session()
tbl_percent_cover_query = select([percent_cover_table])
percent_cover_query_stm = session.execute(tbl_percent_cover_query)
percent_cover_query_df = pd.DataFrame(percent_cover_query_stm.fetchall())
percent_cover_query_df.columns = percent_cover_query_stm.keys()
session.close()
engine.dispose()
percent_cover_true_list = tbl_percent["percent_cover_observation"].values.tolist()
percent_cover_test_list = percent_cover_query_df["percent_cover_observation"].values.tolist()
percent_cover_true_list = [float(x) for x in percent_cover_true_list]
percent_cover_test_list = [float(x) for x in percent_cover_test_list]
assert (percent_cover_true_list == percent_cover_test_list) is True
assert (
[str(x) for x in tbl_percent["spatial_replication_level_2"].values.tolist()]
== [str(x) for x in percent_cover_query_df["spatial_replication_level_2"].values.tolist()]
) is True
开发者ID:bibsian,项目名称:database-development,代码行数:35,代码来源:test_upload_popler3.py
注:本文中的sqlalchemy.orm.sessionmaker函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论