本文整理汇总了Python中sqlalchemy.sql.expression.select函数的典型用法代码示例。如果您正苦于以下问题:Python select函数的具体用法?Python select怎么用?Python select使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了select函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: upgrade
def upgrade(migrate_engine):
if migrate_engine.name == 'sqlite':
return
meta = MetaData(bind=migrate_engine)
for table_name, ref, child in TABLES:
table = Table(table_name, meta, autoload=True)
column_name, ref_table_name, ref_column_name = ref
column = table.c[column_name]
ref_table = Table(ref_table_name, meta, autoload=True)
ref_column = ref_table.c[ref_column_name]
subq = select([ref_column]).where(ref_column != None)
if child:
# Dump and cleanup rows in child table first
child_table_name, child_column_name, child_ref_column_name = child
child_table = Table(child_table_name, meta, autoload=True)
child_column = child_table.c[child_column_name]
child_ref_column = table.c[child_ref_column_name]
child_subq = select([child_ref_column]).where(~ column.in_(subq))
dump_cleanup_rows(migrate_engine, meta, child_table,
child_column.in_(child_subq))
dump_cleanup_rows(migrate_engine, meta, table, ~ column.in_(subq))
params = {'columns': [column], 'refcolumns': [ref_column]}
if migrate_engine.name == 'mysql':
params['name'] = "_".join(('fk', table_name, column_name))
fkey = ForeignKeyConstraint(**params)
fkey.create()
开发者ID:BATYD-Turksat,项目名称:nova_servo,代码行数:35,代码来源:209_add_missing_foreign_keys.py
示例2: test_in_25
def test_in_25(self):
self.assert_compile(
select([self.table1.c.myid.in_(
select([self.table2.c.otherid]).as_scalar())]),
"SELECT mytable.myid IN (SELECT myothertable.otherid "
"FROM myothertable) AS anon_1 FROM mytable"
)
开发者ID:e0ne,项目名称:sqlalchemy,代码行数:7,代码来源:test_operators.py
示例3: test_nested_in
def test_nested_in(self):
from sqlalchemy.sql.expression import select
from sqlalchemy.schema import Table, MetaData, Column
from sqlalchemy.types import Integer
meta = MetaData()
table = Table(
u'Foo',
meta,
Column(u'a', Integer())
)
target = self._makeOne(
select(
[u'a'],
from_obj=table,
whereclause=table.c.a.in_(
self._makeOne(
select(
[u'a'],
from_obj=table,
),
comment=u'inner'
)
)
),
comment=u'test'
)
result = target.compile()
self.assertEqual(unicode(result).replace(u'\n', ''), u'SELECT a FROM "Foo" WHERE "Foo".a IN (SELECT a FROM "Foo") /* test */')
开发者ID:moriyoshi,项目名称:sqlalchemy_querytagger,代码行数:28,代码来源:tests.py
示例4: test_migrate_batch_stureg
def test_migrate_batch_stureg(self):
batch_guid = '2bb942b9-75cf-4055-a67a-8b9ab53a9dfc'
batch = {UdlStatsConstants.REC_ID: '6',
UdlStatsConstants.BATCH_GUID: batch_guid, UdlStatsConstants.TENANT: self.__tenant,
UdlStatsConstants.SCHEMA_NAME: None, Constants.DEACTIVATE: False,
UdlStatsConstants.LOAD_TYPE: LoadType.STUDENT_REGISTRATION,
UdlStatsConstants.BATCH_OPERATION: 's',
UdlStatsConstants.SNAPSHOT_CRITERIA: '{"reg_system_id": "015247bd-058c-48cd-bb4d-f6cffe5b40c1", "academic_year": 2015}'}
self.insert_into_udl_stats(batch[UdlStatsConstants.REC_ID], batch_guid, self.__tenant, batch[UdlStatsConstants.LOAD_TYPE])
preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
count_to_source_query = select([func.count()]).select_from(preprod_conn.get_table(Constants.STUDENT_REG))
count_to_be_inserted = preprod_conn.execute(count_to_source_query).fetchall()[0][0]
self.assertEqual(10, count_to_be_inserted)
prod_conn = EdMigrateDestConnection(tenant=get_unittest_preprod_tenant_name())
student_reg_table = prod_conn.get_table(Constants.STUDENT_REG)
count_query = select([func.count()]).select_from(student_reg_table)
count_before = prod_conn.execute(count_query).fetchall()[0][0]
self.assertEqual(2581, count_before)
count_snapshot_query = select([func.count()], student_reg_table.c.academic_year == 2015).select_from(student_reg_table)
count_to_be_deleted = prod_conn.execute(count_snapshot_query).fetchall()[0][0]
self.assertEqual(1217, count_to_be_deleted)
rtn = migrate_batch(batch)
self.assertTrue(rtn)
expected_count_after = count_before - count_to_be_deleted + count_to_be_inserted
count_after = prod_conn.execute(count_query).fetchall()[0][0]
self.assertEqual(expected_count_after, count_after)
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:31,代码来源:test_migrate.py
示例5: test_union
def test_union(self):
from sqlalchemy.sql.expression import select
from sqlalchemy.schema import Table, MetaData, Column
from sqlalchemy.types import Integer
meta = MetaData()
table = Table(
u'Foo',
meta,
Column(u'a', Integer())
)
target = self._makeOne(
select(
[u'a'],
from_obj=table,
whereclause=(table.c.a == 1)
),
comment=u'test'
)
target = target.union(
select(
[u'a'],
from_obj=table,
whereclause=(table.c.a == 2)
)
)
result = target.compile()
self.assertRegexpMatches(unicode(result).replace(u'\n', ''), ur'SELECT a FROM "Foo" WHERE "Foo".a = [^ ]+ UNION SELECT a FROM "Foo" WHERE "Foo".a = [^ ]+ /\* test \*/')
开发者ID:moriyoshi,项目名称:sqlalchemy_querytagger,代码行数:27,代码来源:tests.py
示例6: test_migrate_student_reg
def test_migrate_student_reg(self):
Unittest_with_edcore_sqlite.setUpClass(EdMigrateDestConnection.get_datasource_name(TestMigrate.test_tenant),
use_metadata_from_db=False)
preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
prod_conn = EdMigrateDestConnection(tenant=get_unittest_prod_tenant_name())
batch_guid = "0aa942b9-75cf-4055-a67a-8b9ab53a9dfc"
student_reg_table = preprod_conn.get_table(Constants.STUDENT_REG)
get_query = select([student_reg_table.c.student_reg_rec_id]).order_by(student_reg_table.c.student_reg_rec_id)
count_query = select([func.count().label('student_reg_rec_ids')],
student_reg_table.c.student_reg_rec_id.in_(range(15541, 15551)))
rset = preprod_conn.execute(get_query)
row = rset.fetchall()
self.assertEqual(10, len(row))
self.assertListEqual([(15541,), (15542,), (15543,), (15544,), (15545,), (15546,), (15547,), (15548,), (15549,), (15550,)],
row)
rset.close()
rset = prod_conn.execute(count_query)
row = rset.fetchone()
self.assertEqual(0, row['student_reg_rec_ids'])
rset.close()
delete_count, insert_count = migrate_table(batch_guid, None, preprod_conn, prod_conn, 'student_reg', False)
self.assertEqual(0, delete_count)
self.assertEqual(10, insert_count)
rset = prod_conn.execute(count_query)
row = rset.fetchone()
self.assertEqual(10, row['student_reg_rec_ids'])
rset.close()
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:31,代码来源:test_migrate.py
示例7: create_expression
def create_expression(self):
if self.last_result is None:
try:
expr = select(self.table.columns).order_by(*[safe_collate(self.table.columns[nm], None) for nm in self.sort_key])
if self.starter is not None:
expr = expr.where(and_(self.starter, self.filter))
else:
expr = expr.where(self.filter)
except:
raise
else:
try:
where_clause = vector_greater_than([self.table.columns[nm] for nm in self.sort_key], \
[self.last_result[n] for n in self.sort_index])
expr = (select(self.table.columns).order_by(*[safe_collate(self.table.columns[nm], None) for nm in self.sort_key]) \
.where(and_(where_clause, self.filter)))
except:
raise
if self.limit_per is not None:
expr = expr.limit(self.limit_per)
if self.stream:
expr = expr.execution_options(stream_results=True)
return expr
开发者ID:jcrudy,项目名称:oreader,代码行数:26,代码来源:reader_configs.py
示例8: get_maps
def get_maps(document, lang):
"""Load and return maps that intersect with the document geometry.
"""
if document.geometry is None:
return []
document_geom = select([DocumentGeometry.geom]). \
where(DocumentGeometry.document_id == document.document_id)
document_geom_detail = select([DocumentGeometry.geom_detail]). \
where(DocumentGeometry.document_id == document.document_id)
topo_maps = DBSession. \
query(TopoMap). \
filter(TopoMap.redirects_to.is_(None)). \
join(
DocumentGeometry,
TopoMap.document_id == DocumentGeometry.document_id). \
options(load_only(
TopoMap.document_id, TopoMap.editor, TopoMap.code,
TopoMap.version, TopoMap.protected)). \
options(joinedload(TopoMap.locales).load_only(
DocumentLocale.lang, DocumentLocale.title,
DocumentLocale.version)). \
filter(
or_(
DocumentGeometry.geom_detail.ST_Intersects(
document_geom.label('t1')),
DocumentGeometry.geom_detail.ST_Intersects(
document_geom_detail.label('t2'))
)). \
all()
if lang is not None:
set_best_locale(topo_maps, lang)
return topo_maps
开发者ID:arnaud-morvan,项目名称:v6_api,代码行数:35,代码来源:topo_map.py
示例9: _connect_ping_listener
def _connect_ping_listener(connection, branch):
"""Ping the server at connection startup.
Ping the server at transaction begin and transparently reconnect
if a disconnect exception occurs.
"""
if branch:
return
# turn off "close with result". This can also be accomplished
# by branching the connection, however just setting the flag is
# more performant and also doesn't get involved with some
# connection-invalidation awkardness that occurs (see
# https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
save_should_close_with_result = connection.should_close_with_result
connection.should_close_with_result = False
try:
# run a SELECT 1. use a core select() so that
# any details like that needed by Oracle, DB2 etc. are handled.
connection.scalar(select([1]))
except exception.DBConnectionError:
# catch DBConnectionError, which is raised by the filter
# system.
# disconnect detected. The connection is now
# "invalid", but the pool should be ready to return
# new connections assuming they are good now.
# run the select again to re-validate the Connection.
connection.scalar(select([1]))
finally:
connection.should_close_with_result = save_should_close_with_result
开发者ID:SvenDowideit,项目名称:clearlinux,代码行数:30,代码来源:session.py
示例10: query
def query(self, parent_ids=None):
""" Construct a SQL query for this level of the request. """
if self.parent is None:
q = select(from_obj=self.join(self.alias))
q = q.offset(self.get_child_node_value('offset', 0))
if not self.node.as_list:
q = q.limit(1)
else:
q = q.limit(self.get_child_node_value('limit', 10))
else:
q = select(from_obj=self.join(self.parent.alias))
q = self.filter(q)
if parent_ids is not None:
q = q.where(self.parent.alias.c.id.in_(parent_ids))
q = self.project(q)
q = q.distinct()
#print self, type(self)
#print q
ids = []
rp = db.session.execute(q)
while True:
row = rp.fetchone()
if row is None:
break
row = dict(row.items())
ids.append(row.get(self.pk_id))
self.collect(row)
for name, child in self.children.items():
child.query(parent_ids=ids)
开发者ID:01-,项目名称:grano,代码行数:33,代码来源:__init__.py
示例11: get_asmt_rec_id
def get_asmt_rec_id(guid_batch, tenant_name, asmt_rec_id_info):
'''
Returns asmt_rec_id from dim_asmt table
Steps:
1. Get guid_asmt from integration table INT_SBAC_ASMT
2. Select asmt_rec_id from dim_asmt by the same guid_amst got from 1. It should have 1 value
'''
source_table_name = asmt_rec_id_info['source_table']
guid_column_name_in_source = asmt_rec_id_info['guid_column_in_source']
target_table_name = asmt_rec_id_info['target_table']
guid_column_name_in_target = asmt_rec_id_info['guid_column_name']
rec_id_column_name = asmt_rec_id_info['rec_id']
# connect to integration table, to get the value of guid_asmt
with get_udl_connection() as udl_conn:
int_table = udl_conn.get_table(source_table_name)
query = select([int_table.c[guid_column_name_in_source]], from_obj=int_table, limit=1)
query = query.where(int_table.c['guid_batch'] == guid_batch)
results = udl_conn.get_result(query)
if results:
guid_column_value = results[0][guid_column_name_in_source]
# connect to target table, to get the value of asmt_rec_id
with get_target_connection(tenant_name, guid_batch) as target_conn:
dim_asmt = target_conn.get_table(target_table_name)
query = select([dim_asmt.c[rec_id_column_name]], from_obj=dim_asmt, limit=1)
query = query.where(dim_asmt.c[guid_column_name_in_target] == guid_column_value)
query = query.where(and_(dim_asmt.c['batch_guid'] == guid_batch))
results = target_conn.get_result(query)
if results:
asmt_rec_id = results[0][rec_id_column_name]
return asmt_rec_id
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:33,代码来源:move_to_target.py
示例12: generate_create_inheritance_view_statement
def generate_create_inheritance_view_statement(class_):
viewname = cls2tbl(class_)[1:]
tables = class_.__table__
cols = {}
def add_cols(table):
for col in table.c:
if col.name not in cols:
cols[col.name] = col
add_cols(class_.__table__)
if class_.__score_db__['inheritance'] is not None:
parent = class_.__score_db__['parent']
while parent:
table = parent.__table__
tables = tables.join(
table, onclause=table.c.id == class_.__table__.c.id)
add_cols(table)
parent = parent.__score_db__['parent']
if class_.__score_db__['inheritance'] != 'single-table':
viewselect = select(cols.values(), from_obj=tables)
else:
typecol = getattr(
class_, class_.__score_db__['type_column'])
typenames = []
def add_typenames(cls):
typenames.append(cls.__score_db__['type_name'])
for subclass in cls.__subclasses__():
add_typenames(subclass)
add_typenames(class_)
viewselect = select(cols.values(),
from_obj=class_.__table__,
whereclause=typecol.in_(typenames))
return CreateView(viewname, viewselect)
开发者ID:score-framework,项目名称:py.db,代码行数:32,代码来源:_sa_stmt.py
示例13: set_leg_waypoints
def set_leg_waypoints():
t = time.time()
dd = db.metadata.tables["device_data"]
legs = db.metadata.tables["legs"]
glue = db.metadata.tables["leg_waypoints"]
legpoints = select(
[legs.c.id, dd.c.waypoint_id, dd.c.time, dd.c.snapping_time],
from_obj=dd.join(legs, and_(
dd.c.device_id == legs.c.device_id,
dd.c.time.between(legs.c.time_start, legs.c.time_end)))) \
.alias("legpoints")
done = select([glue.c.leg], distinct=True)
nounsnapped = select(
[legpoints.c.id],
legpoints.c.id.notin_(done),
group_by=legpoints.c.id,
having=func.bool_and(legpoints.c.snapping_time.isnot(None)))
newitems = select(
[legpoints.c.id, legpoints.c.waypoint_id, func.min(legpoints.c.time)],
legpoints.c.id.in_(nounsnapped),
group_by=[legpoints.c.id, legpoints.c.waypoint_id]).alias("newitems")
ins = glue.insert().from_select(["leg", "waypoint", "first"], newitems)
rowcount = db.engine.execute(ins).rowcount
print("set_leg_waypoints on %d rows in %.2g seconds" % (
rowcount, time.time() - t))
开发者ID:aalto-trafficsense,项目名称:regular-routes-server,代码行数:28,代码来源:scheduler.py
示例14: constructQuery
def constructQuery(self, context):
session= Session()
trusted=removeSecurityProxy(context)
user_id = getattr(trusted, self.value_field, None)
if user_id:
query = session.query(domain.User
).filter(domain.User.user_id ==
user_id).order_by(domain.User.last_name,
domain.User.first_name,
domain.User.middle_name)
return query
else:
sitting = trusted.__parent__
group_id = sitting.group_id
group_sitting_id = sitting.group_sitting_id
all_member_ids = sql.select([schema.user_group_memberships.c.user_id],
sql.and_(
schema.user_group_memberships.c.group_id == group_id,
schema.user_group_memberships.c.active_p == True))
attended_ids = sql.select([schema.group_sitting_attendance.c.member_id],
schema.group_sitting_attendance.c.group_sitting_id == group_sitting_id)
query = session.query(domain.User).filter(
sql.and_(domain.User.user_id.in_(all_member_ids),
~ domain.User.user_id.in_(attended_ids))).order_by(
domain.User.last_name,
domain.User.first_name,
domain.User.middle_name)
return query
开发者ID:BenoitTalbot,项目名称:bungeni-portal,代码行数:28,代码来源:vocabulary.py
示例15: p_is_term
def p_is_term(p):
'''is_term : OP_IS string'''
#TODO: implement starred, watched, owner, reviewer, draft
username = p.parser.username
if p[2] == 'reviewed':
filters = []
filters.append(gertty.db.approval_table.c.change_key == gertty.db.change_table.c.key)
filters.append(gertty.db.approval_table.c.value != 0)
s = select([gertty.db.change_table.c.key], correlate=False).where(and_(*filters))
p[0] = gertty.db.change_table.c.key.in_(s)
elif p[2] == 'open':
p[0] = gertty.db.change_table.c.status.notin_(['MERGED', 'ABANDONED'])
elif p[2] == 'closed':
p[0] = gertty.db.change_table.c.status.in_(['MERGED', 'ABANDONED'])
elif p[2] == 'submitted':
p[0] = gertty.db.change_table.c.status == 'SUBMITTED'
elif p[2] == 'merged':
p[0] = gertty.db.change_table.c.status == 'MERGED'
elif p[2] == 'abandoned':
p[0] = gertty.db.change_table.c.status == 'ABANDONED'
elif p[2] == 'owner':
p[0] = and_(gertty.db.change_table.c.account_key == gertty.db.account_table.c.key,
gertty.db.account_table.c.username == username)
elif p[2] == 'reviewer':
filters = []
filters.append(gertty.db.approval_table.c.change_key == gertty.db.change_table.c.key)
filters.append(gertty.db.approval_table.c.account_key == gertty.db.account_table.c.key)
filters.append(gertty.db.account_table.c.username == username)
s = select([gertty.db.change_table.c.key], correlate=False).where(and_(*filters))
p[0] = gertty.db.change_table.c.key.in_(s)
else:
raise gertty.search.SearchSyntaxError('Syntax error: has:%s is not supported' % p[2])
开发者ID:bradleyjones,项目名称:gertty,代码行数:32,代码来源:parser.py
示例16: test_within_distance
def test_within_distance(self):
"""
Because SDO_WITHIN_DISTANCE requires a spatial index for the geometry used
as first parameter, we have to insert out test geometries into tables,
unlike to the other databases.
Note that Oracle uses meter as unit for the tolerance value for geodetic coordinate
systems (like 4326)!
"""
# test if SDO_functions._within_distance is called correctly
eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location, 'POINT(0 0)', 0)).count(), 1)
eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location, 'POINT(0 0)', 0.1)).count(), 1)
eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location, 'POINT(9 9)', 100000)).count(), 0)
eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location,
'Polygon((-5 -5, 5 -5, 5 5, -5 5, -5 -5))', 0)).count(), 3)
eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location,
'Polygon((-10 -10, 10 -10, 10 10, -10 10, -10 -10))', 0)).count(), 4)
eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location,
'Polygon((-10 -10, 10 -10, 10 10, -10 10, -10 -10))', 200000)).count(), 5)
# test if SDO_GEOM.functions._within_distance is called correctly
eq_(session.scalar(select([text('1')], from_obj=['dual']).where(
functions._within_distance('POINT(0 0)', 'POINT(0 0)', 0,
{'tol' : 0.00000005}))), 1)
eq_(session.scalar(select([text('1')], from_obj=['dual']).where(
functions._within_distance('POINT(0 0)', 'POINT(0 0)', 0,
{'dim1' : text(diminfo),
'dim2' : text(diminfo)}))), 1)
开发者ID:ComedianClown,项目名称:geoalchemy,代码行数:31,代码来源:test_oracle.py
示例17: test_six_pt_five
def test_six_pt_five(self):
x = column("x")
self.assert_compile(select([x]).where(or_(x == 7, true())),
"SELECT x WHERE true")
self.assert_compile(select([x]).where(or_(x == 7, true())),
"SELECT x WHERE 1 = 1",
dialect=default.DefaultDialect(supports_native_boolean=False))
开发者ID:chundi,项目名称:sqlalchemy,代码行数:8,代码来源:test_operators.py
示例18: load_to_table
def load_to_table(data_dict, guid_batch, int_table, tenant_name, udl_schema):
'''
Load the table into the proper table
@param data_dict: the dictionary containing the data to be loaded
@param guid_batch: the id for the batch
@param int_table: the name of the integration table
@param tenant_name: name of the tenant
@param udl_schema: udl schema name
'''
# Create sqlalchemy connection and get table information from sqlalchemy
ref_column_mapping_columns = {}
with get_udl_connection() as conn:
data_dict[mk.GUID_BATCH] = guid_batch
data_dict = fix_empty_strings(data_dict)
ref_table = conn.get_table('ref_column_mapping')
s_int_table = conn.get_table(int_table)
column_mapping_query = select([ref_table.c.target_column,
ref_table.c.stored_proc_name],
from_obj=ref_table).where(and_(ref_table.c.source_table == 'lz_json',
ref_table.c.target_table == int_table))
results = conn.get_result(column_mapping_query)
for result in results:
target_column = result['target_column']
stored_proc_name = result['stored_proc_name']
value = data_dict.get(target_column)
if value:
if stored_proc_name:
if stored_proc_name.startswith('sp_'):
ref_column_mapping_columns[target_column] = stored_proc_name + '(' + QuotedString(value if type(value) is str else str(value)).getquoted().decode('utf-8') + ')'
else:
format_value = dict()
format_value['value'] = QuotedString(value if type(value) is str
else str(value)).getquoted().decode('utf-8')
if s_int_table.c[target_column].type.python_type is str:
format_value['length'] = s_int_table.c[target_column].type.length
ref_column_mapping_columns[target_column] = stored_proc_name.format(**format_value)
continue
ref_column_mapping_columns[target_column] = value
record_sid = 'nextval(\'{schema_name}.{tenant_sequence_name}\')'.\
format(schema_name=udl_schema, tenant_sequence_name=Constants.TENANT_SEQUENCE_NAME(tenant_name))
from_select_column_names = ['record_sid']
from_select_select_values = [record_sid]
for column in s_int_table.c:
value = data_dict.get(column.name)
if value is not None:
from_select_column_names.append(column.name)
from_select_select_values.append(
ref_column_mapping_columns.get(column.name,
QuotedString(value if type(value) is str else str(value)).getquoted().decode('utf-8')))
insert_into_int_table = s_int_table.insert().from_select(from_select_column_names,
select(from_select_select_values))
# create insert statement and execute
affected_row = db_util.execute_udl_queries(conn, [insert_into_int_table],
'Exception in loading json data -- ',
'json_loader', 'load_to_table')
return affected_row[0]
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:58,代码来源:json_loader.py
示例19: get_asmt_and_outcome_result
def get_asmt_and_outcome_result(self, conf):
with get_udl_connection() as conn:
asmt_table = conn.get_table(conf.get(mk.ASMT_TABLE))
asmt_outcome_table = conn.get_table(conf.get(mk.ASMT_OUTCOME_TABLE))
asmt_result = conn.get_result(select([asmt_table.c.guid_asmt]).
where(asmt_table.c.guid_batch == conf.get(mk.GUID_BATCH)))
asmt_outcome_result = conn.get_result(select([asmt_outcome_table.c.assessmentguid], distinct=True).
where(asmt_outcome_table.c.guid_batch == conf.get(mk.GUID_BATCH)))
return asmt_result, asmt_outcome_result
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:9,代码来源:content_validator.py
示例20: initialize_table_source
def initialize_table_source(db, tblname, schema=None, cols=None):
tbl = initialize_table(db, tblname, schema)
if not cols:
return select([tbl])
else:
return select([tbl.c[col] for col in cols])
开发者ID:daterrell2,项目名称:datatools,代码行数:9,代码来源:database.py
注:本文中的sqlalchemy.sql.expression.select函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论