本文整理汇总了Python中sqlalchemy.select函数的典型用法代码示例。如果您正苦于以下问题:Python select函数的具体用法?Python select怎么用?Python select使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了select函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_insert_from_select
def test_insert_from_select(self):
table = self.tables.autoinc_pk
config.db.execute(
table.insert(),
[
dict(data="data1"),
dict(data="data2"),
dict(data="data3"),
]
)
config.db.execute(
table.insert().
from_select(
("data",), select([table.c.data]).where(
table.c.data.in_(["data2", "data3"]))
),
)
eq_(
config.db.execute(
select([table.c.data]).order_by(table.c.data)
).fetchall(),
[("data1", ), ("data2", ), ("data2", ),
("data3", ), ("data3", )]
)
开发者ID:mjallday,项目名称:sqlalchemy,代码行数:27,代码来源:test_insert.py
示例2: test_reconnect
def test_reconnect(self):
conn = self.engine.connect()
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.closed
self.engine.test_shutdown()
_assert_invalidated(conn.execute, select([1]))
assert not conn.closed
assert conn.invalidated
assert conn.invalidated
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
# one more time
self.engine.test_shutdown()
_assert_invalidated(conn.execute, select([1]))
assert conn.invalidated
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
conn.close()
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:26,代码来源:test_reconnect.py
示例3: test_basic
def test_basic(self):
for threadlocal in False, True:
engine = engines.reconnecting_engine(
options={'pool_threadlocal': threadlocal})
conn = engine.contextual_connect()
eq_(conn.execute(select([1])).scalar(), 1)
conn.close()
# set the pool recycle down to 1.
# we aren't doing this inline with the
# engine create since cx_oracle takes way
# too long to create the 1st connection and don't
# want to build a huge delay into this test.
engine.pool._recycle = 1
# kill the DB connection
engine.test_shutdown()
# wait until past the recycle period
time.sleep(2)
# can connect, no exception
conn = engine.contextual_connect()
eq_(conn.execute(select([1])).scalar(), 1)
conn.close()
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:27,代码来源:test_reconnect.py
示例4: test_limit_offset_with_correlated_order_by
def test_limit_offset_with_correlated_order_by(self):
t1 = table('t1', column('x', Integer), column('y', Integer))
t2 = table('t2', column('x', Integer), column('y', Integer))
order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar()
s = select([t1]).where(t1.c.x == 5).order_by(order_by) \
.limit(10).offset(20)
self.assert_compile(
s,
"SELECT anon_1.x, anon_1.y "
"FROM (SELECT t1.x AS x, t1.y AS y, "
"ROW_NUMBER() OVER (ORDER BY "
"(SELECT t2.y FROM t2 WHERE t1.x = t2.x)"
") AS mssql_rn "
"FROM t1 "
"WHERE t1.x = :x_1) AS anon_1 "
"WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1",
checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5}
)
c = s.compile(dialect=mssql.MSDialect())
eq_(len(c._result_columns), 2)
assert t1.c.x in set(c._create_result_map()['x'][1])
assert t1.c.y in set(c._create_result_map()['y'][1])
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:25,代码来源:test_compiler.py
示例5: verify_thd
def verify_thd(conn):
metadata = sa.MetaData()
metadata.bind = conn
builders = sautils.Table('builders', metadata, autoload=True)
q = sa.select([builders])
num_rows = 0
for row in conn.execute(q):
self.assertEqual(
row, (1, u'bname', u'description', u'dontcare'))
num_rows += 1
self.assertEqual(num_rows, 1)
tags = sautils.Table('tags', metadata, autoload=True)
builders_tags = sautils.Table('builders_tags', metadata,
autoload=True)
q = sa.select([tags.c.id, tags.c.name,
tags.c.name_hash])
self.assertEqual(conn.execute(q).fetchall(), [])
q = sa.select([builders_tags.c.id,
builders_tags.c.builderid,
builders_tags.c.tagid])
self.assertEqual(conn.execute(q).fetchall(), [])
开发者ID:nand0p,项目名称:buildbot,代码行数:25,代码来源:test_db_migrate_versions_041_add_N_N_tagsbuilders.py
示例6: verify_thd
def verify_thd(self, conn):
"partially verify the contents of the db - run in a thread"
model = self.db.model
# this is a big db, so we only spot-check things -- hopefully any errors
# will occur on the import
r = conn.execute(
sa.select([model.changes],
whereclause=model.changes.c.changeid == 70))
ch = r.fetchone()
self.failUnlessEqual(ch.changeid, 70)
self.failUnlessEqual(ch.author, u'Jakub Vysoky <[email protected]>')
self.failUnlessEqual(ch.comments, u'some failing tests in check_downgrade and metapackage_version')
self.failUnlessEqual(ch.revision, u'2ce0c33b7e10cce98e8d9c5b734b8c133ee4d320')
self.failUnlessEqual(ch.branch, u'master')
r = conn.execute(
sa.select([model.change_files.c.filename],
whereclause=model.change_files.c.changeid == 70))
self.assertEqual(r.scalar(), 'tests/test_debian.py')
r = conn.execute(
sa.select([model.changes],
whereclause=model.changes.c.changeid == 77))
ch = r.fetchone()
self.failUnlessEqual(ch.changeid, 77)
self.failUnlessEqual(ch.author, u'BuildBot')
self.failUnlessEqual(ch.comments, u'Dependency changed, sending dummy commit')
self.failUnlessEqual(ch.revision, u'HEAD')
self.failUnlessEqual(ch.branch, u'master')
r = conn.execute(
sa.select([model.change_files.c.filename],
whereclause=model.change_files.c.changeid == 77))
self.assertEqual(r.scalar(), 'CHANGELOG')
开发者ID:jamesob,项目名称:buildbot,代码行数:35,代码来源:test_upgrade.py
示例7: test_strict_binds
def test_strict_binds(self):
"""test the 'strict' compiler binds."""
from sqlalchemy.dialects.mssql.base import MSSQLStrictCompiler
mxodbc_dialect = mxodbc.dialect()
mxodbc_dialect.statement_compiler = MSSQLStrictCompiler
t = table('sometable', column('foo'))
for expr, compile in [
(
select([literal("x"), literal("y")]),
"SELECT 'x' AS anon_1, 'y' AS anon_2",
),
(
select([t]).where(t.c.foo.in_(['x', 'y', 'z'])),
"SELECT sometable.foo FROM sometable WHERE sometable.foo "
"IN ('x', 'y', 'z')",
),
(
t.c.foo.in_([None]),
"sometable.foo IN (NULL)"
)
]:
self.assert_compile(expr, compile, dialect=mxodbc_dialect)
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:25,代码来源:test_compiler.py
示例8: test_insert_from_select
def test_insert_from_select(self):
table = self.tables.manual_pk
config.db.execute(
table.insert(),
[
dict(id=1, data="data1"),
dict(id=2, data="data2"),
dict(id=3, data="data3"),
]
)
config.db.execute(
table.insert(inline=True).
from_select(
("id", "data",), select([table.c.id + 5, table.c.data]).where(
table.c.data.in_(["data2", "data3"]))
),
)
eq_(
config.db.execute(
select([table.c.data]).order_by(table.c.data)
).fetchall(),
[("data1", ), ("data2", ), ("data2", ),
("data3", ), ("data3", )]
)
开发者ID:13111,项目名称:SickRage,代码行数:27,代码来源:test_insert.py
示例9: thddeleteOldLogs
def thddeleteOldLogs(conn):
model = self.db.model
res = conn.execute(sa.select([sa.func.count(model.logchunks.c.content)]))
count1 = res.fetchone()[0]
res.close()
# update log types older than timestamps
# we do it first to avoid having UI discrepancy
res = conn.execute(
model.logs.update()
.where(model.logs.c.stepid.in_(
sa.select([model.steps.c.id])
.where(model.steps.c.started_at < older_than_timestamp)))
.values(type='d')
)
res.close()
# query all logs with type 'd' and delete their chunks.
q = sa.select([model.logs.c.id])
q = q.select_from(model.logs)
q = q.where(model.logs.c.type == 'd')
# delete their logchunks
res = conn.execute(
model.logchunks.delete()
.where(model.logchunks.c.logid.in_(q))
)
res.close()
res = conn.execute(sa.select([sa.func.count(model.logchunks.c.content)]))
count2 = res.fetchone()[0]
res.close()
return count1 - count2
开发者ID:opalmer,项目名称:buildbot,代码行数:32,代码来源:logs.py
示例10: whereInEquipement
def whereInEquipement(self,fullQueryJoin,criteria):
sensorObj = list(filter(lambda x:'FK_Sensor'==x['Column'], criteria))[0]
sensor = sensorObj['Value']
table = Base.metadata.tables['MonitoredSiteEquipment']
joinTable = outerjoin(table,Sensor, table.c['FK_Sensor'] == Sensor.ID)
if sensorObj['Operator'].lower() in ['is','is not'] and sensorObj['Value'].lower() == 'null':
subSelect = select([table.c['FK_MonitoredSite']]
).select_from(joinTable).where(
and_(MonitoredSite.ID== table.c['FK_MonitoredSite']
,or_(table.c['EndDate'] >= func.now(),table.c['EndDate'] == None)
))
if sensorObj['Operator'].lower() == 'is':
fullQueryJoin = fullQueryJoin.where(~exists(subSelect))
else :
fullQueryJoin = fullQueryJoin.where(exists(subSelect))
else :
subSelect = select([table.c['FK_MonitoredSite']]
).select_from(joinTable).where(
and_(MonitoredSite.ID== table.c['FK_MonitoredSite']
,and_(eval_.eval_binary_expr(Sensor.UnicIdentifier,sensorObj['Operator'],sensor)
,or_(table.c['EndDate'] >= func.now(),table.c['EndDate'] == None))
))
fullQueryJoin = fullQueryJoin.where(exists(subSelect))
return fullQueryJoin
开发者ID:FredericBerton,项目名称:ecoReleve-Data,代码行数:26,代码来源:List.py
示例11: verify_thd
def verify_thd(conn):
metadata = sa.MetaData()
metadata.bind = conn
buildworkers = sa.Table('buildworkers',
metadata, autoload=True)
configured_buildworkers = sa.Table('configured_buildworkers',
metadata, autoload=True)
connected_buildworkers = sa.Table('connected_buildworkers',
metadata, autoload=True)
q = sa.select([buildworkers])
self.assertEqual(map(dict, conn.execute(q).fetchall()), [
# (the info does not get de-JSON'd due to use of autoload)
{'id': 29, 'name': u'windows', 'info': '{}'}])
# check that the name column was resized
self.assertEqual(buildworkers.c.name.type.length, 50)
q = sa.select([configured_buildworkers.c.buildermasterid,
configured_buildworkers.c.buildworkerid])
self.assertEqual(conn.execute(q).fetchall(), [])
q = sa.select([connected_buildworkers.c.masterid,
connected_buildworkers.c.buildworkerid])
self.assertEqual(conn.execute(q).fetchall(), [])
开发者ID:iskradelta,项目名称:buildbot,代码行数:26,代码来源:test_db_migrate_versions_032_worker_connections.py
示例12: test_mol_from_ctab
def test_mol_from_ctab(self):
rs = engine.execute(select([ func.is_valid_ctab('xyz') ]))
self.assertFalse(rs.fetchall()[0][0])
ctab = '''chiral1.mol
ChemDraw04200416412D
5 4 0 0 0 0 0 0 0 0999 V2000
-0.0141 0.0553 0.0000 C 0 0 0 0 0 0 0 0 0 0 0 0
0.8109 0.0553 0.0000 F 0 0 0 0 0 0 0 0 0 0 0 0
-0.4266 0.7697 0.0000 Br 0 0 0 0 0 0 0 0 0 0 0 0
-0.0141 -0.7697 0.0000 Cl 0 0 0 0 0 0 0 0 0 0 0 0
-0.8109 -0.1583 0.0000 C 0 0 0 0 0 0 0 0 0 0 0 0
1 2 1 0
1 3 1 0
1 4 1 1
1 5 1 0
M END'''
rs = engine.execute(select([ func.is_valid_ctab(ctab) ]))
self.assertTrue(rs.fetchall()[0][0])
rs = engine.execute(select([ func.mol_from_ctab(ctab) ]))
self.assertIsInstance(rs.fetchall()[0][0], Chem.Mol)
开发者ID:rvianello,项目名称:razi,代码行数:25,代码来源:test_mol.py
示例13: store_initial_soil_water
def store_initial_soil_water(engine, fname_HDFstore, args):
"""retrieves the INITIAL_SOIL_WATER table from a CGMS12 database.
if the --crop_no option is used only the records for the given crop_no
will be retrieved.
"""
meta = MetaData(engine)
tbl_isw = Table("initial_soil_water", meta, autoload=True)
# retrieve distinct crop types from DB table
if args.crop_no is not None:
s = sa.select([tbl_isw.c.crop_no]).distinct()
crops = [row[0] for row in s.execute()]
if args.crop_no not in crops:
print("Crop ID specified with --cropno (%s) not found in INITIAL_SOIL_WATER table! Returning..." % args.crop_no)
sys.exit()
# Select distinct years to iterate over
s = sa.select([tbl_isw.c.year]).distinct()
years = s.execute()
dataset_name = "/initial_soil_water"
with pd.io.pytables.HDFStore(fname_HDFstore) as store:
for yr, in sorted(years):
if args.crop_no:
s = tbl_isw.select().where(sa.and_(tbl_isw.c.year == yr,
tbl_isw.c.crop_no == args.crop_no))
print("Storing initial_soil_water for crop %i and year %i" % (args.crop_no, yr))
else:
s = tbl_isw.select().where(tbl_isw.c.year == yr)
print("Storing initial_soil_water for year %i" % yr)
df_isw = pd.read_sql(s, engine)
if dataset_name in store:
store.append(dataset_name, df_isw, data_columns=["grid_no", "stu_no", "crop_no", "year"])
else:
store.put(dataset_name, df_isw, format="table", data_columns=["grid_no", "stu_no", "crop_no", "year"])
开发者ID:ritviksahajpal,项目名称:pcse,代码行数:35,代码来源:cgms2hdfstore.py
示例14: ping_connection
def ping_connection(self, connection, branch):
if branch:
# "branch" refers to a sub-connection of a connection,
# we don't want to bother pinging on these.
return
try:
# run a SELECT 1. use a core select() so that
# the SELECT of a scalar value without a table is
# appropriately formatted for the backend
connection.scalar(select([1]))
except exc.DBAPIError as err:
# catch SQLAlchemy's DBAPIError, which is a wrapper
# for the DBAPI's exception. It includes a .connection_invalidated
# attribute which specifies if this connection is a "disconnect"
# condition, which is based on inspection of the original exception
# by the dialect in use.
if err.connection_invalidated:
# run the same SELECT again - the connection will re-validate
# itself and establish a new connection. The disconnect detection
# here also causes the whole connection pool to be invalidated
# so that all stale connections are discarded.
connection.scalar(select([1]))
else:
raise
开发者ID:moreonion,项目名称:moflask,代码行数:25,代码来源:flask_sqlalchemy.py
示例15: execute
def execute(self, metadata, connection, filter_values):
try:
table = metadata.tables[self.table_name]
except KeyError:
raise TableNotFoundException("Unable to query table, table not found: %s" % self.table_name)
asha_table = self.get_asha_table(metadata)
max_date_query = sqlalchemy.select([
sqlalchemy.func.max(asha_table.c.date).label('date'),
asha_table.c.case_id.label('case_id')
])
if self.filters:
for filter in self.filters:
max_date_query.append_whereclause(filter.build_expression(table))
max_date_query.append_group_by(
asha_table.c.case_id
)
max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')
checklist_query = sqlalchemy.select()
for column in self.columns:
checklist_query.append_column(column.build_column(asha_table))
checklist_query = checklist_query.where(
asha_table.c.case_id == max_date_subquery.c.case_id
).where(
asha_table.c.date == max_date_subquery.c.date
)
return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:tlwakwella,项目名称:commcare-hq,代码行数:34,代码来源:sql_data.py
示例16: test_conn_reusable
def test_conn_reusable(self):
conn = db.connect()
conn.execute(select([1]))
assert len(dbapi.connections) == 1
dbapi.shutdown()
# raises error
try:
conn.execute(select([1]))
assert False
except tsa.exc.DBAPIError:
pass
assert not conn.closed
assert conn.invalidated
# ensure all connections closed (pool was recycled)
gc_collect()
assert len(dbapi.connections) == 0
# test reconnects
conn.execute(select([1]))
assert not conn.invalidated
assert len(dbapi.connections) == 1
开发者ID:clones,项目名称:sqlalchemy,代码行数:27,代码来源:test_reconnect.py
示例17: test_joins
def test_joins(self):
region = self.con.table("tpch_region")
nation = self.con.table("tpch_nation")
rt = self._to_sqla(region).alias("t0")
nt = self._to_sqla(nation).alias("t1")
ipred = region.r_regionkey == nation.n_regionkey
spred = rt.c.r_regionkey == nt.c.n_regionkey
fully_mat_joins = [
(region.inner_join(nation, ipred), rt.join(nt, spred)),
(region.left_join(nation, ipred), rt.join(nt, spred, isouter=True)),
(region.outer_join(nation, ipred), rt.outerjoin(nt, spred)),
]
for ibis_joined, joined_sqla in fully_mat_joins:
expected = sa.select(["*"]).select_from(joined_sqla)
self._compare_sqla(ibis_joined, expected)
subselect_joins = [
(region.inner_join(nation, ipred).projection(nation), rt.join(nt, spred)),
(region.left_join(nation, ipred).projection(nation), rt.join(nt, spred, isouter=True)),
(region.outer_join(nation, ipred).projection(nation), rt.outerjoin(nt, spred)),
]
for ibis_joined, joined_sqla in subselect_joins:
expected = sa.select([nt]).select_from(joined_sqla)
self._compare_sqla(ibis_joined, expected)
开发者ID:thekingofhero,项目名称:ibis,代码行数:27,代码来源:test_sqlalchemy.py
示例18: _propagate_new_wf_comments
def _propagate_new_wf_comments(cycle_task_ids):
"""Special handler for comment propagation."""
if not cycle_task_ids:
return
acl_table = all_models.AccessControlList.__table__.alias("original_acl")
acr_table = all_models.AccessControlRole.__table__.alias("original_acr")
ct_propagated_roles = {
"Admin Mapped",
"Workflow Member Mapped",
}
propagation_acr_ids = sa.select([acr_table.c.id]).where(
acr_table.c.name.in_(ct_propagated_roles)
)
cycle_task_type = all_models.CycleTaskGroupObjectTask.__name__
cycle_task_acl_ids = sa.select([acl_table.c.id]).where(
sa.and_(
acl_table.c.object_id.in_(cycle_task_ids),
acl_table.c.object_type == cycle_task_type,
acl_table.c.ac_role_id.in_(propagation_acr_ids),
)
)
_propagate_to_cte(cycle_task_acl_ids)
开发者ID:egorhm,项目名称:ggrc-core,代码行数:27,代码来源:workflow.py
示例19: _index_query
def _index_query(self, obj):
"""
Returns the query needed for fetching the index of this record relative
to version history.
"""
alias = sa.orm.aliased(obj)
subquery = (
sa.select([sa.func.count('1')], from_obj=[alias.__table__])
.where(
getattr(alias, tx_column_name(obj))
<
getattr(obj, tx_column_name(obj))
)
.correlate(alias.__table__)
.label('position')
)
query = (
sa.select([subquery], from_obj=[obj.__table__])
.where(sa.and_(*self._pk_correlation_condition(obj, False)))
.order_by(
getattr(obj.__class__, tx_column_name(obj))
)
)
return query
开发者ID:kandarz,项目名称:sqlalchemy-continuum,代码行数:25,代码来源:fetcher.py
示例20: test_union
def test_union(self):
t1 = table(
't1', column('col1'), column('col2'),
column('col3'), column('col4'))
t2 = table(
't2', column('col1'), column('col2'),
column('col3'), column('col4'))
s1, s2 = select(
[t1.c.col3.label('col3'), t1.c.col4.label('col4')],
t1.c.col2.in_(['t1col2r1', 't1col2r2'])), \
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
t2.c.col2.in_(['t2col2r2', 't2col2r3']))
u = union(s1, s2, order_by=['col3', 'col4'])
self.assert_compile(u,
'SELECT t1.col3 AS col3, t1.col4 AS col4 '
'FROM t1 WHERE t1.col2 IN (:col2_1, '
':col2_2) UNION SELECT t2.col3 AS col3, '
't2.col4 AS col4 FROM t2 WHERE t2.col2 IN '
'(:col2_3, :col2_4) ORDER BY col3, col4')
self.assert_compile(u.alias('bar').select(),
'SELECT bar.col3, bar.col4 FROM (SELECT '
't1.col3 AS col3, t1.col4 AS col4 FROM t1 '
'WHERE t1.col2 IN (:col2_1, :col2_2) UNION '
'SELECT t2.col3 AS col3, t2.col4 AS col4 '
'FROM t2 WHERE t2.col2 IN (:col2_3, '
':col2_4)) AS bar')
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:26,代码来源:test_compiler.py
注:本文中的sqlalchemy.select函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论