本文整理汇总了Python中sqlalchemy.union函数的典型用法代码示例。如果您正苦于以下问题:Python union函数的具体用法?Python union怎么用?Python union使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了union函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: eval
def eval(self, builder):
""" return a query object
"""
dbh = builder._get_dbh()
tokens = self.value[1:]
expr_1 = self.value[0].eval( builder )
while tokens:
op = tokens[0]
eval_2 = tokens[1].eval( builder )
tokens = tokens[2:]
if op == '|':
eval_1 = sqla.union(eval_1, eval_2)
elif op == '&':
eval_1 = sqla.intersect(eval_1, eval_2)
elif op == ':':
eval_1 = sqla.except_(
dbh.session().query(dbh.Sample.id).filter(
dbh.Sample.id.in_( sqla.union(eval_1, eval_2)) ),
dbh.session().query(dbh.Sample.id).filter(
dbh.Sample.iid.n_( sqla.intersect(eval_1, eval_2)) )
)
q = dbh.session().query(dbh.Sample.id).filter( dbh.Sample.id.in_( eval_1 ) )
return q
开发者ID:trmznt,项目名称:genaf,代码行数:28,代码来源:querytext.py
示例2: test_intersect_unions_2
def test_intersect_unions_2(self):
u = intersect(
union(select([t1.c.col3, t1.c.col4]), select([t3.c.col3, t3.c.col4])).alias().select(),
union(select([t2.c.col3, t2.c.col4]), select([t3.c.col3, t3.c.col4])).alias().select(),
)
wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")]
found = self._fetchall_sorted(u.execute())
eq_(found, wanted)
开发者ID:t3573393,项目名称:sqlalchemy,代码行数:9,代码来源:test_query.py
示例3: test_intersect_unions_2
def test_intersect_unions_2(self):
u = intersect(
union(
select([t1.c.col3, t1.c.col4]),
select([t3.c.col3, t3.c.col4]),
).alias().select(),
union(
select([t2.c.col3, t2.c.col4]),
select([t3.c.col3, t3.c.col4]),
).alias().select()
)
wanted = [('aaa', 'ccc'), ('bbb', 'aaa'), ('ccc', 'bbb')]
found = self._fetchall_sorted(u.execute())
eq_(found, wanted)
开发者ID:rlugojr,项目名称:sqlalchemy,代码行数:15,代码来源:test_query.py
示例4: timeseries_all
def timeseries_all(cls, table_names, agg_unit, start, end, geom=None):
# For each table in table_names, generate a query to be unioned
selects = []
for name in table_names:
table = cls.get_by_dataset_name(name)
ts_select = table.timeseries(agg_unit, start, end, geom)
selects.append(ts_select)
# Union the time series selects to get a panel
panel_query = sa.union(*selects)\
.order_by('dataset_name')\
.order_by('time_bucket')
panel_vals = session.execute(panel_query)
panel = []
for dataset_name, ts in groupby(panel_vals, lambda row: row.dataset_name):
# ts gets closed after it's been iterated over once,
# so we need to store the rows somewhere to iterate over them twice.
rows = [row for row in ts]
# If no records were found, don't include this dataset
if all([row.count == 0 for row in rows]):
continue
ts_dict = {'dataset_name': dataset_name,
'items': []}
for row in rows:
ts_dict['items'].append({
'datetime': row.time_bucket.date(), # UTC time
'count': row.count
})
panel.append(ts_dict)
return panel
开发者ID:mvksumanth,项目名称:plenario,代码行数:35,代码来源:models.py
示例5: main
def main():
db = define.connect()
all_media = sa.union(
sa.select([orm.UserMediaLink.mediaid]),
sa.select([orm.SubmissionMediaLink.mediaid]),
sa.select([orm.MediaMediaLink.describee_id]),
sa.select([orm.MediaMediaLink.described_with_id]),
).alias('all_media')
q = (
db.query(orm.MediaItem)
.with_polymorphic([orm.DiskMediaItem])
.outerjoin(all_media)
.filter(all_media.c.mediaid == None))
count = q.count()
for e, media_item in enumerate(q, start=1):
sys.stdout.write('\r%d/%d' % (e, count))
sys.stdout.flush()
db.delete(media_item)
try:
os.unlink(media_item.full_file_path)
except OSError as e:
if e.errno == errno.ENOENT:
continue
raise
db.flush()
print
开发者ID:0x15,项目名称:weasyl,代码行数:26,代码来源:media_gc.py
示例6: get_supplier_contacts_union
def get_supplier_contacts_union(self):
authorised_representative = select([Supplier.code, Supplier.data['email'].astext.label('email_address')])
business_contact = select([Supplier.code, Supplier.data['contact_email'].astext.label('email_address')])
user_email_addresses = (select([User.supplier_code.label('code'), User.email_address])
.where(User.active))
return union(authorised_representative, business_contact, user_email_addresses).alias('email_addresses')
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:7,代码来源:suppliers.py
示例7: _lookup
def _lookup(self, assignment_id, *args):
try:
assignment_id = int(assignment_id)
# TODO: Use SQLAlchemy magic on model to make queries on assignment easier
q1 = Assignment.query.filter(Assignment.id == assignment_id).join(Assignment._lti).order_by(None)
q2 = (
Assignment.query.filter(Assignment.id == assignment_id)
.join(Sheet)
.join(Event)
.join(Event.lti)
.order_by(None)
)
assignment = Assignment.query.select_from(union(q1, q2)).one()
except ValueError:
flash("Invalid LTI Assignment id: %s" % assignment_id, "error")
abort(400)
except NoResultFound:
flash("LTI Assignment %d not found" % assignment_id, "error")
abort(404)
except MultipleResultsFound: # pragma: no cover
log.error("Database inconsistency: LTI Assignment %d", assignment_id, exc_info=True)
flash("An error occurred while accessing LTI Assignment %d" % assignment_id, "error")
abort(500)
controller = LTIAssignmentController(assignment)
return controller, args
开发者ID:moschlar,项目名称:SAUCE,代码行数:26,代码来源:lti.py
示例8: test_union
def test_union(self):
t1 = table(
't1', column('col1'), column('col2'),
column('col3'), column('col4'))
t2 = table(
't2', column('col1'), column('col2'),
column('col3'), column('col4'))
s1, s2 = select(
[t1.c.col3.label('col3'), t1.c.col4.label('col4')],
t1.c.col2.in_(['t1col2r1', 't1col2r2'])), \
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
t2.c.col2.in_(['t2col2r2', 't2col2r3']))
u = union(s1, s2, order_by=['col3', 'col4'])
self.assert_compile(u,
'SELECT t1.col3 AS col3, t1.col4 AS col4 '
'FROM t1 WHERE t1.col2 IN (:col2_1, '
':col2_2) UNION SELECT t2.col3 AS col3, '
't2.col4 AS col4 FROM t2 WHERE t2.col2 IN '
'(:col2_3, :col2_4) ORDER BY col3, col4')
self.assert_compile(u.alias('bar').select(),
'SELECT bar.col3, bar.col4 FROM (SELECT '
't1.col3 AS col3, t1.col4 AS col4 FROM t1 '
'WHERE t1.col2 IN (:col2_1, :col2_2) UNION '
'SELECT t2.col3 AS col3, t2.col4 AS col4 '
'FROM t2 WHERE t2.col2 IN (:col2_3, '
':col2_4)) AS bar')
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:26,代码来源:test_compiler.py
示例9: fetch_submissions
def fetch_submissions(hdlr_name=None):
"""
:param hdlr_name: Name of the handler that requests subsmissions.
:return:
Return submissions with 'pid' greater than the milestones added by the
handler with 'handler_name' and sorted by 'pid', which is equivalent to
sorted by 'submit_time'.
If hdlr_name is not specified or there are not any milestones under
the name of a handler, all submissions are returned.
An empty list is returned if there are no available submissions.
:rtype: [Submission]
:caller: Handler
"""
mlst = select([t_milestone.c.submission_pid, t_milestone.c.handler_name]) \
.where(t_milestone.c.handler_name == hdlr_name) \
.order_by(t_milestone.c.submission_pid.desc()).limit(1)
mlst = union(select([mlst]), select([None, None]).where(~exists(mlst)))
s = select([t.c.oj,
t.c.problem_id,
t.c.problem_title,
t.c.problem_url,
t.c.submit_time,
t.c.timezone,
t.c.pid]) \
.where((mlst.c.submission_pid == None) |
(t.c.pid > mlst.c.submission_pid) & (mlst.c.handler_name == hdlr_name)) \
.order_by(t.c.pid)
with engine.connect() as conn:
return [Submission(*d) for d in conn.execute(s)]
开发者ID:yehzhang,项目名称:Show-My-Solutions,代码行数:29,代码来源:dbmanager.py
示例10: _do_get_provider_count_and_objs
def _do_get_provider_count_and_objs(self, **kw):
'''Custom getter function respecting lesson
Returns the result count from the database and a query object
'''
# TODO: Code duplication with CRC?!
qry = Submission.query
# Process lesson filter
if self.lesson:
q1 = (qry.join(Submission.user).join(lesson_members).join(Lesson)
.filter(Lesson.id == self.lesson.id).order_by(None))
q2 = (qry.join(Submission.user).join(team_members).join(Team)
.filter(Team.lesson_id == self.lesson.id).order_by(None))
qry = qry.select_from(union(q1, q2)).order_by(Submission.id)
filters = kw.pop('filters', dict())
for filter in filters:
if isinstance(filters[filter], (list, tuple, set)):
qry = qry.filter(getattr(Submission, filter).in_(filters[filter]))
else:
qry = qry.filter(getattr(Submission, filter) == filters[filter])
# Process filters from url
kwfilters = kw
exc = False
try:
kwfilters = self.__provider__._modify_params_for_dates(self.__model__, kwfilters)
except ValueError as e:
log.info('Could not parse date filters', exc_info=True)
flash('Could not parse date filters: "%s".' % e.message, 'error')
exc = True
try:
kwfilters = self.__provider__._modify_params_for_relationships(self.__model__, kwfilters)
except (ValueError, AttributeError) as e:
log.info('Could not parse relationship filters', exc_info=True)
flash('Could not parse relationship filters: "%s". '
'You can only filter by the IDs of relationships, not by their names.' % e.message, 'error')
exc = True
if exc:
# Since non-parsed kwfilters are bad, we just have to ignore them now
kwfilters = {}
for field_name, value in kwfilters.iteritems():
field = getattr(self.__model__, field_name)
try:
if self.__provider__.is_relation(self.__model__, field_name) and isinstance(value, list): # pragma: no cover
value = value[0]
qry = qry.filter(field.contains(value))
else:
qry = qry.filter(field == value)
except:
log.warn('Could not create filter on query', exc_info=True)
# Get total count
count = qry.count()
return count, qry
开发者ID:Ayutac,项目名称:SAUCE,代码行数:60,代码来源:submission_table.py
示例11: query
def query(cls):
def select_1():
h = sql.t.EvPytisHelp.alias("h")
u = sqlalchemy.select(["*"], from_obj=["pytis_view_user_menu()"]).alias("u")
return sqlalchemy.select(
sql.reorder_columns(
cls._exclude(h),
[
"help_id",
"menuid",
"fullname",
"title",
"description",
"menu_help",
"spec_name",
"spec_description",
"spec_help",
"page_id",
"parent",
"ord",
"content",
"position",
"position_nsub",
"changed",
"removed",
],
),
from_obj=[h.join(u, sql.gR("h.menuid = u.menuid"))],
)
def select_2():
h = sql.t.EvPytisHelp.alias("h")
return sqlalchemy.select(
sql.reorder_columns(
cls._exclude(h),
[
"help_id",
"menuid",
"fullname",
"title",
"description",
"menu_help",
"spec_name",
"spec_description",
"spec_help",
"page_id",
"parent",
"ord",
"content",
"position",
"position_nsub",
"changed",
"removed",
],
),
from_obj=[h],
whereclause="h.menuid is null",
)
return sqlalchemy.union(select_1(), select_2())
开发者ID:nicLucian,项目名称:pytis,代码行数:60,代码来源:db_pytis_help.py
示例12: determine_fetches
def determine_fetches(db_session, cred):
for thread in db_session.query(Thread).filter_by(closed=False):
update_thread_status(thread, cred)
db_session.flush()
incomplete_page_ids = (
sa.select([ThreadPost.page_id])
.group_by(ThreadPost.page_id)
.having(sa.func.count(ThreadPost.id) < 40)
.as_scalar()
)
incomplete_pages = sa.select(
[ThreadPage.thread_id, ThreadPage.page_num], from_obj=sa.join(ThreadPage, Thread)
).where(sa.and_(ThreadPage.id.in_(incomplete_page_ids), Thread.closed == sa.false()))
fetch_status = (
sa.select(
[ThreadPage.thread_id.label("thread_id"), sa.func.max(ThreadPage.page_num).label("last_fetched_page")]
)
.group_by(ThreadPage.thread_id)
.alias("fetch_status")
)
unfetched_pages = sa.select(
[
Thread.id.label("thread_id"),
sa.func.generate_series(fetch_status.c.last_fetched_page + 1, Thread.page_count).label("page_num"),
],
from_obj=sa.join(Thread, fetch_status, Thread.id == fetch_status.c.thread_id),
)
fetched_first_pages = sa.select([ThreadPage.thread_id]).where(ThreadPage.page_num == 1).as_scalar()
unfetched_first_pages = sa.select(
[Thread.id.label("thread_id"), sa.literal(1, sa.Integer).label("page_num")], from_obj=Thread
).where(Thread.id.notin_(fetched_first_pages))
q = sa.union(incomplete_pages, unfetched_pages, unfetched_first_pages)
q = q.order_by(q.c.thread_id.asc(), q.c.page_num.asc())
return db_session.execute(q).fetchall()
开发者ID:inklesspen,项目名称:mimir,代码行数:34,代码来源:fetch.py
示例13: test_compare_col_identity
def test_compare_col_identity(self):
stmt1 = (
select([table_a.c.a, table_b.c.b])
.where(table_a.c.a == table_b.c.b)
.alias()
)
stmt1_c = (
select([table_a.c.a, table_b.c.b])
.where(table_a.c.a == table_b.c.b)
.alias()
)
stmt2 = union(select([table_a]), select([table_b]))
stmt3 = select([table_b])
equivalents = {table_a.c.a: [table_b.c.a]}
is_false(
stmt1.compare(stmt2, use_proxies=True, equivalents=equivalents)
)
is_true(
stmt1.compare(stmt1_c, use_proxies=True, equivalents=equivalents)
)
is_true(
(table_a.c.a == table_b.c.b).compare(
stmt1.c.a == stmt1.c.b,
use_proxies=True,
equivalents=equivalents,
)
)
开发者ID:monetate,项目名称:sqlalchemy,代码行数:32,代码来源:test_compare.py
示例14: list_worksheets
def list_worksheets(self, owner_id=None):
'''
Return a list of row dicts, one per worksheet. These dicts do NOT contain
ALL worksheet items; this method is meant to make it easy for a user to see
the currently existing worksheets. Included worksheet items are those that
define metadata that one will likely want to see in a list view (e.g. title).
'''
cols_to_select = [cl_worksheet.c.id,
cl_worksheet.c.uuid,
cl_worksheet.c.name,
cl_worksheet.c.owner_id,
cl_group_object_permission.c.permission]
if owner_id is None:
# query for public worksheets
stmt = select(cols_to_select).\
where(cl_worksheet.c.uuid == cl_group_object_permission.c.object_uuid).\
where(cl_group_object_permission.c.group_uuid == self.public_group_uuid)
else:
# query for worksheets owned by owner_id
cols1 = cols_to_select[:4]
cols1.extend([literal(GROUP_OBJECT_PERMISSION_ALL).label('permission')])
stmt1 = select(cols1).where(cl_worksheet.c.owner_id == owner_id)
# query for worksheets visible to owner_id or co-owned by owner_id
stmt2_groups = select([cl_user_group.c.group_uuid]).\
where(cl_user_group.c.user_id == owner_id)
stmt2 = select(cols_to_select).\
where(cl_worksheet.c.uuid == cl_group_object_permission.c.object_uuid).\
where(or_(
cl_group_object_permission.c.group_uuid.in_(stmt2_groups),
cl_group_object_permission.c.group_uuid == self.public_group_uuid)).\
where(cl_worksheet.c.owner_id != owner_id)
stmt = union(stmt1, stmt2)
with self.engine.begin() as connection:
rows = connection.execute(stmt).fetchall()
if not rows:
return []
uuids = set(row.uuid for row in rows)
item_rows = connection.execute(
cl_worksheet_item.select().\
where(cl_worksheet_item.c.worksheet_uuid.in_(uuids)).\
where(or_(
cl_worksheet_item.c.type == 'title',
cl_worksheet_item.c.type == 'description'))
).fetchall()
row_dicts = [dict(row) for row in sorted(rows, key=lambda item: item['id'])]
uuid_index_map = {}
for i in range(0, len(row_dicts)):
row_dict = row_dicts[i]
row_dict.update({'items': []})
uuid_index_map[row_dict['uuid']] = i
for item_row in item_rows:
idx = uuid_index_map.get(item_row.worksheet_uuid, -1)
if idx < 0:
raise IntegrityError('Got item %s without worksheet' % (item_row,))
row_dicts[idx]['items'].append(dict(item_row))
return row_dicts
开发者ID:avinava07,项目名称:codalab-cli,代码行数:59,代码来源:bundle_model.py
示例15: test_union_label
def test_union_label(self):
s1 = select([func.foo("hoho").label("x")])
s2 = select([func.foo("Bar").label("y")])
stmt = union(s1, s2).order_by("x")
self.assert_compile(
stmt,
"SELECT foo(:foo_1) AS x UNION SELECT foo(:foo_2) AS y ORDER BY x",
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:8,代码来源:test_text.py
示例16: members_query
def members_query(self, qry=None):
if not qry:
qry = User.query
qry = qry.select_from(union(
qry.join(lesson_members).filter_by(lesson_id=self.id).order_by(None),
qry.join(team_members).join(Team).filter_by(lesson_id=self.id).order_by(None),
)).order_by(User.id)
return qry
开发者ID:Ayutac,项目名称:SAUCE,代码行数:8,代码来源:event.py
示例17: test_compound
def test_compound(self):
t1 = table('t1', column('c1'), column('c2'), column('c3'))
t2 = table('t2', column('c1'), column('c2'), column('c3'))
self.assert_compile(union(t1.select(), t2.select()),
'SELECT t1.c1, t1.c2, t1.c3 FROM t1 UNION '
'SELECT t2.c1, t2.c2, t2.c3 FROM t2')
self.assert_compile(except_(t1.select(), t2.select()),
'SELECT t1.c1, t1.c2, t1.c3 FROM t1 MINUS '
'SELECT t2.c1, t2.c2, t2.c3 FROM t2')
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:9,代码来源:test_compiler.py
示例18: test_union_ordered_alias
def test_union_ordered_alias(self):
(s1, s2) = (
select([t1.c.col3.label("col3"), t1.c.col4.label("col4")], t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
select([t2.c.col3.label("col3"), t2.c.col4.label("col4")], t2.c.col2.in_(["t2col2r2", "t2col2r3"])),
)
u = union(s1, s2, order_by=["col3", "col4"])
wanted = [("aaa", "aaa"), ("bbb", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
eq_(u.alias("bar").select().execute().fetchall(), wanted)
开发者ID:t3573393,项目名称:sqlalchemy,代码行数:9,代码来源:test_query.py
示例19: test_union_all
def test_union_all(self):
e = union_all(select([t1.c.col3]), union(select([t1.c.col3]), select([t1.c.col3])))
wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)]
found1 = self._fetchall_sorted(e.execute())
eq_(found1, wanted)
found2 = self._fetchall_sorted(e.alias("foo").select().execute())
eq_(found2, wanted)
开发者ID:t3573393,项目名称:sqlalchemy,代码行数:9,代码来源:test_query.py
示例20: _list_by_category
def _list_by_category(self, balance_uid, expenses = True, incomes = True):
model = request.environ['sqlalchemy.model']
db = request.environ['sqlalchemy.session']
try:
user_uid = h.authenticated_user().uid
except:
return { "failure": Messages.pemissionDenied() }
if not Operations(db, model).can_see_balance(user_uid, balance_uid):
return { "failure": Messages.permissionDenied() }
now = date.today()
select_expenses = select(
[model.ExpenseCategory.c.uid.label('uid'), model.ExpenseCategory.c.name.label('name'), func.sum(model.BalanceChange.c.amount).label('summary')],
and_(model.BalanceChange.is_income==False,
model.ExpenseCategory.uid==model.BalanceChange.expense_category_uid,
model.BalanceChange.balance_uid==balance_uid,
model.BalanceChange.occurred_on >= date(now.year, now.month, 1)),
from_obj=[model.expense_categories_table, model.balance_changes_table],
group_by=[model.ExpenseCategory.c.uid, model.ExpenseCategory.c.name])
select_incomes = select(
[model.IncomeCategory.c.uid.label('uid'), model.IncomeCategory.c.name.label('name'), func.sum(model.BalanceChange.c.amount).label('summary')],
and_(model.BalanceChange.is_income==True,
model.IncomeCategory.uid==model.BalanceChange.income_category_uid,
model.BalanceChange.balance_uid==balance_uid,
model.BalanceChange.occurred_on >= date(now.year, now.month, 1)),
from_obj=[model.income_categories_table, model.balance_changes_table],
group_by=[model.IncomeCategory.c.uid, model.IncomeCategory.c.name])
if expenses and incomes:
query = union(select_expenses, select_incomes)
else:
query = expenses and select_expenses or select_incomes
balance_changes = db.execute(query.order_by('name')).fetchall()
total = len(balance_changes)
try:
page_nr = request.params['page_nr']
except:
page_nr = 1
try:
items_per_page = int(request.params['items_per_page'])
except:
items_per_page = 15
subset = Page(balance_changes, item_count=total, current_page=page_nr, items_per_page=items_per_page)
return {
"totalItems" : total,
"itemsFound" : len(subset),
"items" : [{
"uid" : item.uid,
"name" : item.name,
"summary" : h.format_decimal(Decimal(item.summary))} for item in subset ]
}
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:57,代码来源:balance_changes.py
注:本文中的sqlalchemy.union函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论