本文整理汇总了Python中sqlalchemy.case函数的典型用法代码示例。如果您正苦于以下问题:Python case函数的具体用法?Python case怎么用?Python case使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了case函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _unparent_and_close_gap
def _unparent_and_close_gap(self, node, connection, table):
gap_size = node.rgt - node.lft + 1
lft_rgt_change = gap_lft = node.lft - 1
maxrgt = connection.scalar(func.max(table.c.rgt))
connection.execute(
table.update().values(
lft = case([
(and_(table.c.lft >= node.lft, table.c.lft <= node.rgt),
table.c.lft - lft_rgt_change),
(table.c.lft > gap_lft,
table.c.lft - gap_size),
],
else_ = table.c.lft
),
rgt = case([
(and_(table.c.rgt >= node.lft, table.c.rgt <= node.rgt),
table.c.rgt - lft_rgt_change),
(table.c.rgt > gap_lft,
table.c.rgt - gap_size)
],
else_ = table.c.rgt
),
parent_id = case([(table.c.id == node.id, None)],
else_ = table.c.parent_id
)
)
)
node.lft = maxrgt - 1
node.rgt = maxrgt
开发者ID:mikesname,项目名称:sqlalchemy-qubit,代码行数:30,代码来源:nested_set.py
示例2: before_insert
def before_insert(self, mapper, connection, instance):
if instance.lft and instance.rgt:
return
table = instance.nested_object_table()
if not instance.parent_id:
max = connection.scalar(func.max(table.c.rgt))
instance.lft = max + 1
instance.rgt = max + 2
else:
right_most_sibling = connection.scalar(
select([table.c.rgt]).where(table.c.id==instance.parent_id)
)
connection.execute(
table.update(table.c.rgt>=right_most_sibling).values(
lft = case(
[(table.c.lft>right_most_sibling, table.c.lft + 2)],
else_ = table.c.lft
),
rgt = case(
[(table.c.rgt>=right_most_sibling, table.c.rgt + 2)],
else_ = table.c.rgt
)
)
)
instance.lft = right_most_sibling
instance.rgt = right_most_sibling + 1
开发者ID:mikesname,项目名称:sqlalchemy-qubit,代码行数:27,代码来源:nested_set.py
示例3: user_vote_change_comments
def user_vote_change_comments(period=None, user=None):
rel = Vote.rel(Account, Comment)
type = tdb.rel_types_id[rel._type_id]
# rt = rel table
# dt = data table
rt, account_tt, comment_tt, dt = type.rel_table
aliases = tdb.alias_generator()
author_dt = dt.alias(aliases.next())
amount = sa.cast(rt.c.name, sa.Integer)
cols = [
author_dt.c.value,
sa.func.sum(sa.case([(amount > 0, amount)], else_=0)),
sa.func.sum(sa.case([(amount < 0, amount * -1)], else_=0)),
]
query = sa.and_(
author_dt.c.thing_id == rt.c.rel_id, author_dt.c.key == "author_id", comment_tt.c.thing_id == rt.c.thing2_id
)
if period is not None:
earliest = datetime.now(g.tz) - timedelta(0, period)
query.clauses.extend((rt.c.date >= earliest, comment_tt.c.date >= earliest))
if user is not None:
query.clauses.append(author_dt.c.value == str(user._id))
s = sa.select(cols, query, group_by=author_dt.c.value)
rows = s.execute().fetchall()
return [(int(r[0]), (r[1], r[2])) for r in rows]
开发者ID:brendanlong,项目名称:lesswrong,代码行数:30,代码来源:user_stats.py
示例4: _manage_position_gap
def _manage_position_gap(self, connection, session_objs, tree_id, target, size):
"""Manages spaces in the tree identified by ``tree_id`` by changing the
values of the left and right columns by ``size`` after the given
``target`` point."""
options = self._tree_options
connection.execute(
options.table.update()
.values({
options.left_field: sqlalchemy.case(
[(options.left_field > target, options.left_field + size)],
else_ = options.left_field),
options.right_field: sqlalchemy.case(
[(options.right_field > target, options.right_field + size)],
else_ = options.right_field),
})
.where(
(options.tree_id_field == tree_id) &
((options.left_field > target) |
(options.right_field > target))
))
for obj in session_objs:
obj_tree_id = getattr(obj, options.tree_id_field.name)
if obj_tree_id != tree_id: continue
obj_left = getattr(obj, options.left_field.name)
obj_right = getattr(obj, options.right_field.name)
if obj_left > target:
setattr(obj, options.left_field.name, obj_left + size)
if obj_right > target:
setattr(obj, options.right_field.name, obj_right + size)
开发者ID:evidens,项目名称:sqlalchemy-orm-tree,代码行数:29,代码来源:orm.py
示例5: before_insert
def before_insert(mapper, connection, instance):
if not instance.parent:
instance.left = 1
instance.right = 2
else:
personnel = mapper.mapped_table
right_most_sibling = connection.scalar(
select([personnel.c.rgt]).
where(personnel.c.emp == instance.parent.emp)
)
connection.execute(
personnel.update(
personnel.c.rgt >= right_most_sibling).values(
lft=case(
[(personnel.c.lft > right_most_sibling,
personnel.c.lft + 2)],
else_=personnel.c.lft
),
rgt=case(
[(personnel.c.rgt >= right_most_sibling,
personnel.c.rgt + 2)],
else_=personnel.c.rgt
)
)
)
instance.left = right_most_sibling
instance.right = right_most_sibling + 1
开发者ID:MVReddy,项目名称:sqlalchemy,代码行数:28,代码来源:nested_sets.py
示例6: context_relationship_query
def context_relationship_query(contexts):
"""Load a list of objects related to the given contexts
Args:
contexts (list(int)): A list of context ids
Returns:
objects (list((id, type, None))): Related objects
"""
if not len(contexts):
return []
_context = aliased(all_models.Context, name="c")
_relationship = aliased(all_models.Relationship, name="rl")
headers = (case([
(_relationship.destination_type == _context.related_object_type,
_relationship.source_id.label('id'))
], else_=_relationship.destination_id.label('id')),
case([
(_relationship.destination_type == _context.related_object_type,
_relationship.source_type.label('type'))
], else_=_relationship.destination_type.label('type')),
literal(None))
return db.session.query(*headers).join(_context, and_(
_context.id.in_(contexts),
_relationship.destination_id == _context.related_object_id,
_relationship.destination_type == _context.related_object_type,
)).union(db.session.query(*headers).join(_context, and_(
_context.id.in_(contexts),
_relationship.source_id == _context.related_object_id,
_relationship.source_type == _context.related_object_type,
))).all()
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:33,代码来源:__init__.py
示例7: _ops_for_date_range
def _ops_for_date_range(self, balance_uids, start_date, end_date, tags = [], change_categories = []):
model = self.get_sa_model()
db = self.get_sa_session()
conditions = [or_(*[model.BalanceChange.balance_uid == balance_uid for balance_uid in balance_uids])]
if start_date:
conditions.append(model.BalanceChange.occurred_on >= start_date)
if end_date:
conditions.append(model.BalanceChange.occurred_on <= end_date)
if isinstance(tags, list) and len(tags) > 0:
conditions.extend([model.BalanceChange.tags.any(tag=tag.strip().lower()) for tag in tags if tag is not None and tag.strip() != ''])
if isinstance(change_categories, list) and len(change_categories) > 0:
conditions.extend([model.BalanceChange.change_category_uid == value.strip() for value in change_categories if value is not None and value.strip() != ''])
try:
summary = db.execute(select([
func.coalesce(func.sum(
case([[model.BalanceChange.amount<0, model.BalanceChange.amount]], else_=0).label("expenses"))),
func.coalesce(func.sum(
case([[model.BalanceChange.amount>0, model.BalanceChange.amount]], else_=0).label("incomes")))
],
and_(*conditions),
from_obj=[model.balance_changes_table])).fetchone()
return {
"expenses": summary[0],
"incomes": summary[1],
}
except:
log.error(_("Can't get summary"), exc_info=1)
return 0
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:34,代码来源:balance_changes.py
示例8: query_person_contacts
def query_person_contacts():
return (
DBSession.query(
Person.id.label('person_id'),
func.array_to_string(
func.array_agg(
case([(Contact.contact_type == 'phone', Contact.contact)])
),
', '
).label('phone'),
func.array_to_string(
func.array_agg(
case([(Contact.contact_type == 'email', Contact.contact)])
),
', '
).label('email'),
func.array_to_string(
func.array_agg(
case([(Contact.contact_type == 'skype', Contact.contact)])
),
', '
).label('skype'),
)
.join(Contact, Person.contacts)
.group_by(Person.id)
)
开发者ID:alishir,项目名称:tcr,代码行数:26,代码来源:persons.py
示例9: testcase_with_dict
def testcase_with_dict(self):
query = select(
[
case(
{
info_table.c.pk < 3: 'lessthan3',
info_table.c.pk >= 3: 'gt3',
}, else_='other'),
info_table.c.pk, info_table.c.info
],
from_obj=[info_table])
assert query.execute().fetchall() == [
('lessthan3', 1, 'pk_1_data'),
('lessthan3', 2, 'pk_2_data'),
('gt3', 3, 'pk_3_data'),
('gt3', 4, 'pk_4_data'),
('gt3', 5, 'pk_5_data'),
('gt3', 6, 'pk_6_data')
]
simple_query = select(
[
case(
{1: 'one', 2: 'two', },
value=info_table.c.pk, else_='other'),
info_table.c.pk
],
whereclause=info_table.c.pk < 4,
from_obj=[info_table])
assert simple_query.execute().fetchall() == [
('one', 1),
('two', 2),
('other', 3),
]
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:35,代码来源:test_case_statement.py
示例10: user_karma_adjustments
def user_karma_adjustments(period=None, user=None):
acct_info = tdb.types_id[Account._type_id]
acct_thing, acct_data = acct_info.thing_table, acct_info.data_table[0]
adj_info = tdb.types_id[KarmaAdjustment._type_id]
adj_thing, adj_data = adj_info.thing_table, adj_info.data_table[0]
aliases = tdb.alias_generator()
adj_data_2 = adj_data.alias(aliases.next())
amount = sa.cast(adj_data_2.c.value, sa.Integer)
cols = [
adj_data.c.value,
sa.func.sum(sa.case([(amount > 0, amount)], else_=0)),
sa.func.sum(sa.case([(amount < 0, amount * -1)], else_=0)),
]
query = sa.and_(
adj_data.c.thing_id == adj_thing.c.thing_id,
adj_data.c.key == "account_id",
adj_data.c.thing_id == adj_data_2.c.thing_id,
adj_data_2.c.key == "amount",
)
if period is not None:
earliest = datetime.now(g.tz) - timedelta(0, period)
query.clauses.append(adj_thing.c.date >= earliest)
if user is not None:
query.clauses.append(adj_data.c.value == str(user._id))
s = sa.select(cols, query, group_by=adj_data.c.value)
rows = s.execute().fetchall()
return [(int(r[0]), (r[1], r[2])) for r in rows]
开发者ID:brendanlong,项目名称:lesswrong,代码行数:32,代码来源:user_stats.py
示例11: reports
def reports(self):
c.title = "Sales Dashboard"
c.items = c.user.programs
if len(c.items) > 0:
nullToken = c.nullToken
"""Each row is (program_id, date, numPurchases, numAffiliatePurchases, affiliateTotal, purchaseTotal """
#JOIN was unnecessary
#c.orders = Session_.query(Order.product_id, func.datediff(Order.date, c.lowerDate), func.count('*').label('total'), func.sum(Order.isReturned, type_=Integer).label('numReturns'), func.count(Order.affiliate_user_id).label('numAffiliatePurchases')).join((Product, and_(Order.seller_user_id==c.user.id, Order.product_id==Product.id))).filter(Order.date >= c.lowerDate).filter(Order.date < c.upperDate).group_by(Order.product_id).group_by(Order.date).all()
c.orders = Session_.query(Order.program_id,
func.datediff(Order.date, c.lowerDate),
func.count('*').label('numPurchases'),
func.count(Order.affiliate_user_id).label('numAffiliatePurchases'),
func.sum(case([(Order.affiliate_user_id != None, Order.amount)], else_=0)).label('affiliateTotal'),
func.sum(Order.amount).label('purchaseTotal'),
).filter(Order.merchant_user_id==c.user.id).filter(Order.date >= c.lowerDate).filter(Order.date <= c.upperDate).group_by(Order.program_id).group_by(Order.date).all()
#c.days = Session_.query(func.datediff(Transaction.date, c.lowerDate), func.sum(Transaction.amount)).filter(Transaction.user_id==c.user.id).filter(Transaction.date >= c.lowerDate).filter(Transaction.date < c.upperDate).group_by(Transaction.date).all()
"""SINGLE PRODUCT FOR OWNER"""
#c.orders = Session_.query(Order.product_id, func.datediff(Order.date, c.lowerDate), func.count('*').label('total'), func.sum(Order.isReturned, type_=Integer).label('numReturns'), func.count(Order.affiliate_user_id).label('numAffiliatePurchases')).filter(Order.product_id==c.product.id).filter(Order.date >= c.lowerDate).filter(Order.date < c.upperDate).group_by(Order.product_id).group_by(Order.date).all()
#c.impressions = Session_.query(Impression.product_id, func.datediff(Impression.date, c.lowerDate), func.count('*').label('total'), func.sum(case([(Impression.affiliate_ts != nullToken, 1)],else_=0)), func.sum(case([(Impression.order_ts != nullToken, 1)],else_=0)), func.sum(case([(and_(Impression.affiliate_ts != nullToken, Impression.order_ts != nullToken), 1)],else_=0)).label('buyConversions'), func.sum(case([(and_(Impression.purchase_ts != nullToken, Impression.order_ts != nullToken), 1)],else_=0)).label('purchaseConversions')).filter(Impression.product_id==c.product.id).filter(Impression.date >= c.lowerDate).filter(Impression.date < c.upperDate).group_by(Impression.product_id).group_by(Impression.date).all()
"""Each row is (product_id, date, affiliateViews, conversionTime)"""
c.impressions = Session_.query(Impression.program_id,
func.datediff(Impression.date, c.lowerDate),
func.sum(case([(Impression.affiliate_ts != nullToken, 1)],else_=0)),
func.avg(case([(and_(Impression.purchase_ts != nullToken, Impression.affiliate_ts != nullToken), func.time_to_sec(func.timediff(Impression.purchase_ts,Impression.affiliate_ts)))],else_=0),
).label('purchaseConversions')).join((Program, and_(c.user.id==Program.merchant_user_id, Impression.program_id==Program.id))).filter(Impression.date >= c.lowerDate).filter(Impression.date <= c.upperDate).group_by(Impression.program_id).group_by(Impression.date).all()
logging.info(c.impressions)
self.__Temp(c)
return render('/account/accountSummary.mak')
开发者ID:emrosenf,项目名称:Subprime,代码行数:34,代码来源:query.py
示例12: before_insert
def before_insert(mapper, connection, instance):
print "making adjustments before insertion"
# If the new term has no parent, connect to root
if instance.parent == None:
category = mapper.mapped_table
values = connection.execute(select([category]).where(category.c.name == "TREE_ROOT")).first().values()
parent = Category()
parent.name = values[0]
parent.level = values[2]
parent.left = values[3]
parent.right = values[4]
instance.parent = parent
category = mapper.mapped_table
# Find right most sibling's right value
right_most_sibling = connection.scalar(select([category.c.rgt]).where(category.c.name == instance.parent.name))
# Update all values greater than rightmost sibiling
connection.execute(
category.update(category.c.rgt >= right_most_sibling).values(
# Update if left bound in greater than rightmost sibling
lft=case([(category.c.lft > right_most_sibling, category.c.lft + 2)], else_=category.c.lft),
# Update if right bound is greater than right most sibling
rgt=case([(category.c.rgt >= right_most_sibling, category.c.rgt + 2)], else_=category.c.rgt),
)
)
instance.left = right_most_sibling
instance.right = right_most_sibling + 1
instance.level = instance.parent.level + 1
开发者ID:johncadigan,项目名称:CategoryGenerator,代码行数:30,代码来源:model.py
示例13: generate
def generate():
users = db_session.query(User.first_name,
User.last_name,
case([(User.email != None,
User.email)], else_=''),
case([(User.dni != None,
User.dni)], else_=''),
case([(User.phone != None,
User.phone)], else_=''),
case([(User.enabled == '1', 'yes')],
else_='no'),
case([(Department.name != None,
Department.name)], else_=''),
func.date_format(User.created_at,
'%Y-%m-%d %H:%i:%s')
).outerjoin(Department)
row_columns = ('Name', 'Lastname', 'Email', 'Dni',
'Phone', 'Complete', 'Department', 'Created_at',)
if enabled:
if enabled == '1':
users = users.filter(User.enabled == True)
if enabled == '0':
users = users.filter(User.enabled == False)
yield ','.join(row_columns) + '\n'
for row in users.all():
yield ','.join(row) + '\n'
开发者ID:renaco,项目名称:dolce-winefest,代码行数:29,代码来源:admin.py
示例14: test_text_doesnt_explode
def test_text_doesnt_explode(self):
for s in [
select(
[
case(
[
(
info_table.c.info == 'pk_4_data',
text("'yes'"))],
else_=text("'no'"))
]).order_by(info_table.c.info),
select(
[
case(
[
(
info_table.c.info == 'pk_4_data',
literal_column("'yes'"))],
else_=literal_column("'no'")
)]
).order_by(info_table.c.info),
]:
if testing.against("firebird"):
eq_(s.execute().fetchall(), [
('no ', ), ('no ', ), ('no ', ), ('yes', ),
('no ', ), ('no ', ),
])
else:
eq_(s.execute().fetchall(), [
('no', ), ('no', ), ('no', ), ('yes', ),
('no', ), ('no', ),
])
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:35,代码来源:test_case_statement.py
示例15: _get_dep_statuses
def _get_dep_statuses(self, ti, session, dep_context):
TI = airflow.models.TaskInstance
TR = airflow.models.TriggerRule
# Checking that all upstream dependencies have succeeded
if not ti.task.upstream_list:
yield self._passing_status(
reason="The task instance did not have any upstream tasks.")
raise StopIteration
if ti.task.trigger_rule == TR.DUMMY:
yield self._passing_status(reason="The task had a dummy trigger rule set.")
raise StopIteration
# TODO(unknown): this query becomes quite expensive with dags that have many
# tasks. It should be refactored to let the task report to the dag run and get the
# aggregates from there.
qry = (
session
.query(
func.coalesce(func.sum(
case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
func.coalesce(func.sum(
case([(TI.state == State.EXCLUDED, 1)], else_=0)), 0),
func.coalesce(func.sum(
case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
func.coalesce(func.sum(
case([(TI.state == State.FAILED, 1)], else_=0)), 0),
func.coalesce(func.sum(
case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0),
func.count(TI.task_id),
)
.filter(
TI.dag_id == ti.dag_id,
TI.task_id.in_(ti.task.upstream_task_ids),
TI.execution_date == ti.execution_date,
TI.state.in_([
State.SUCCESS, State.FAILED, State.EXCLUDED,
State.UPSTREAM_FAILED, State.SKIPPED]),
)
)
successes, excluded, skipped, failed, upstream_failed, done = qry.first()
# Add excluded tasks into successful tasks as they are equivalent for
# dependency purposes. This is done in this way, not using the
# state_for_dependents function, due to the constraints of SQLAlchemy
# queries.
successes = successes + excluded
for dep_status in self._evaluate_trigger_rule(
ti=ti,
successes=successes,
skipped=skipped,
failed=failed,
upstream_failed=upstream_failed,
done=done,
flag_upstream_failed=dep_context.flag_upstream_failed,
session=session):
yield dep_status
开发者ID:owlabs,项目名称:incubator-airflow,代码行数:60,代码来源:trigger_rule_dep.py
示例16: strategy_outline
def strategy_outline(name,goal):
P=models.Projects.query.all()
project=models.Projects.query.filter_by(id=name).first()
pgoal=models.Goals.query.filter_by(id=goal).first()
S=pgoal.strategies.all()
sform=strategy_form(request.values)
delete_form=DeleteRow_form()
q_sum = (db.session.query(
Projects.id.label("project_id"),
func.sum(case([(Tasks.complete == True, 1)], else_=0)).label("x"),
func.sum(case([(and_(Tasks.deadline != None, Tasks.completeDate != None, Tasks.deadline > Tasks.completeDate), 1)], else_=0)).label("y"),
func.count(Tasks.id).label("total"),
Strategies.id.label("strategy_id"),
Goals.id.label("goal_id"),
).join(Goals, Projects.goals).outerjoin(Strategies, Goals.strategies).outerjoin(Tasks, Strategies.tasks).group_by(Projects.id,Goals.id,Strategies.id).filter(Goals.id == goal) )
if request.method == 'POST' and sform.submit.data:
print sform.validate()
if sform.validate() == False:
flash('Failed Field validation.')
flash_errors(sform)
return redirect(url_for('strategy_outline',name=name,goal=goal))
else:
p=models.Strategies(strategy=sform.strategy.data,goa=pgoal)
db.session.add(p)
db.session.commit()
return redirect(url_for('strategy_outline',name=name,goal=goal))
if request.method == 'POST' and delete_form.submitd.data:
pstratrow = delete_form.row_id.data
pstrat=models.Strategies.query.filter_by(id=pstratrow).first()
db.session.delete(pstrat)
db.session.commit()
return redirect(url_for('strategy_outline',name=name,goal=goal))
return render_template("index_for_strategy.html",project=project,S=S,sform=sform,pgoal=pgoal,P=P,zipit=zip(S,q_sum),delete_form=delete_form)
开发者ID:chetstar,项目名称:projectmgmtWindows,代码行数:33,代码来源:views.py
示例17: project_outline
def project_outline(name):
# name=request.args.get('name')
P=models.Projects.query.all()
project=models.Projects.query.filter_by(id=name).first()
G=project.goals.all()
gform=goal_form(request.values)
delete_form=DeleteRow_form()
q_sum = (db.session.query(
Projects.id.label("project_id"),
Goals.id.label("goal_id"),
func.sum(case([(Tasks.complete == True, 1)], else_=0)).label("x"),
func.sum(case([(and_(Tasks.deadline != None, Tasks.completeDate != None, Tasks.deadline > Tasks.completeDate), 1)], else_=0)).label("y"),
func.count(Tasks.id).label("total"),
).join(Goals, Projects.goals).outerjoin(Strategies, Goals.strategies).outerjoin(Tasks, Strategies.tasks).group_by(Projects.id,Goals.id).filter(Projects.id == name) )
if request.method == 'POST' and gform.submit.data:
if gform.validate() == False:
flash('Failed Field validation.')
flash_errors(gform)
return redirect(url_for('project_outline', name=name))
else:
p=models.Goals(goal=gform.goal.data,proj=project)
db.session.add(p)
db.session.commit()
return redirect(url_for('project_outline', name=name))
if request.method == 'POST' and delete_form.submitd.data:
pstratrow = delete_form.row_id.data
pstrat=models.Goals.query.filter_by(id=pstratrow).first()
db.session.delete(pstrat)
db.session.commit()
return redirect(url_for('project_outline',name=name))
# if request.method == 'POST' and delete_form.submit.data:
# delete_row=
return render_template("index_for_goal.html",project=project,G=G,gform=gform,P=P,zipit=zip(G,q_sum),delete_form=delete_form)
开发者ID:chetstar,项目名称:projectmgmtWindows,代码行数:33,代码来源:views.py
示例18: upgrade
def upgrade():
op.create_check_constraint('client_user_id_or_org_id', 'client',
sa.case([(column('user_id') != None, 1)], else_=0) + sa.case([(column('org_id') != None, 1)], else_=0) == 1 # NOQA
)
op.create_check_constraint('permission_user_id_or_org_id', 'permission',
sa.case([(column('user_id') != None, 1)], else_=0) + sa.case([(column('org_id') != None, 1)], else_=0) == 1 # NOQA
)
开发者ID:bugrevelio,项目名称:lastuser,代码行数:8,代码来源:10c4a18dea0_check_constraint.py
示例19: _get_permissions_query
def _get_permissions_query(self, session, identifier):
"""
select domain, json_agg(parts) as permissions from
(select domain, row_to_json(r) as parts from
(select domain, action, array_agg(distinct target) as target from
(select (case when domain is null then '*' else domain end) as domain,
(case when target is null then '*' else target end) as target,
array_agg(distinct (case when action is null then '*' else action end)) as action
from permission
group by domain, target
) x
group by domain, action)
r) parts
group by domain;
"""
thedomain = case([(Domain.name == None, "*")], else_=Domain.name)
theaction = case([(Action.name == None, "*")], else_=Action.name)
theresource = case([(Resource.name == None, "*")], else_=Resource.name)
action_agg = func.array_agg(theaction.distinct())
stmt1 = (
session.query(
Permission.domain_id,
thedomain.label("domain"),
Permission.resource_id,
theresource.label("resource"),
action_agg.label("action"),
)
.select_from(User)
.join(role_membership_table, User.pk_id == role_membership_table.c.user_id)
.join(role_permission_table, role_membership_table.c.role_id == role_permission_table.c.role_id)
.join(Permission, role_permission_table.c.permission_id == Permission.pk_id)
.outerjoin(Domain, Permission.domain_id == Domain.pk_id)
.outerjoin(Action, Permission.action_id == Action.pk_id)
.outerjoin(Resource, Permission.resource_id == Resource.pk_id)
.filter(User.identifier == identifier)
.group_by(Permission.domain_id, Domain.name, Permission.resource_id, Resource.name)
).subquery()
stmt2 = (
session.query(stmt1.c.domain, stmt1.c.action, func.array_agg(stmt1.c.resource.distinct()).label("resource"))
.select_from(stmt1)
.group_by(stmt1.c.domain, stmt1.c.action)
).subquery()
stmt3 = (
session.query(stmt2.c.domain, func.row_to_json(as_row(stmt2)).label("parts")).select_from(stmt2)
).subquery()
final = (
session.query(stmt3.c.domain, cast(func.json_agg(stmt3.c.parts), Text))
.select_from(stmt3)
.group_by(stmt3.c.domain)
)
return final
开发者ID:YosaiProject,项目名称:yosai_alchemystore,代码行数:57,代码来源:accountstore.py
示例20: graphs_stats
def graphs_stats():
P=models.Projects.query.all()
q_sum = (db.session.query(
Projects.id.label("project_id"),
func.sum(case([(Tasks.complete == True, 1)], else_=0)).label("x"),
func.sum(case([(and_(Tasks.deadline != None, Tasks.completeDate != None, Tasks.deadline > Tasks.completeDate), 1)], else_=0)).label("y"),
func.count(Tasks.id).label("total"),
).outerjoin(Goals, Projects.goals).outerjoin(Strategies, Goals.strategies).outerjoin(Tasks, Strategies.tasks).group_by(Projects.id))
return render_template("graph_stats.html", P=P,q_sum=q_sum,zipit=zip(P,q_sum))
开发者ID:chetstar,项目名称:projectmgmtWindows,代码行数:9,代码来源:views.py
注:本文中的sqlalchemy.case函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论