本文整理汇总了Python中sqlalchemy.sql.case函数的典型用法代码示例。如果您正苦于以下问题:Python case函数的具体用法?Python case怎么用?Python case使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了case函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_cash_flows
def get_cash_flows():
date_range_filter_schema = DateRangeFilterSchema().load(request.args)
if date_range_filter_schema.errors:
return {'errors': date_range_filter_schema.errors}, 400
cash_flow_schema = CashFlowSchema()
amounts = db.session.query(
func.sum(Record.amount).label("cash_flow"),
func.sum(
case([(Record.record_type == Record.RECORD_TYPE_INCOME, Record.amount)], else_=0)
).label('income'),
func.sum(
case([(Record.record_type == Record.RECORD_TYPE_EXPENSE, Record.amount)], else_=0)
).label('expense'),
func.date_trunc('month', Record.date).label("date"),
).group_by(
func.date_trunc('month', Record.date)
).order_by(
func.date_trunc('month', Record.date)
)
if 'date_from' in date_range_filter_schema.data:
amounts = amounts.filter(Record.date >= date_range_filter_schema.data['date_from'])
if 'date_to' in date_range_filter_schema.data:
amounts = amounts.filter(Record.date < date_range_filter_schema.data['date_to'])
return {'objects': cash_flow_schema.dump(amounts, many=True).data}
开发者ID:monetario,项目名称:core,代码行数:29,代码来源:balance.py
示例2: milestone_list
def milestone_list():
fmt = get_format(request)
if fmt == 'html':
return send_file(root_path('static', 'index.html'),
mimetype='text/html')
user = get_user(request)
session = db.session
q = session.query(
Ticket.milestone,
func.SUM(1).label("total"),
func.SUM(case([(u'closed', 1)], Ticket.status, 0)).label("closed"),
func.SUM(case([(u'closed', 0)], Ticket.status, 1)).label("open"),
).group_by(Ticket.milestone).subquery()
rows = session.query(Milestone, q.c.total, q.c.closed, q.c.open).\
filter(Milestone.completed == 0).\
join((q, Milestone.name == q.c.milestone)).\
order_by(Milestone.due == 0,
Milestone.due,
func.UPPER(Milestone.name)).\
all()
if fmt == 'json':
return jsonify({
'template': 'milestone_list',
'milestones': [orm_dict(r.Milestone,
total=r.total,
closed=r.closed,
open=r.open) for r in rows],
'user': user,
'title': 'Roadmap',
})
abort(404)
开发者ID:etrepum,项目名称:offtrac,代码行数:31,代码来源:wsgi.py
示例3: execute
def execute(self, request, user, name):
alliance = Alliance.load(name)
if alliance is None:
return HttpResponseRedirect(reverse("alliance_ranks"))
ph = aliased(PlanetHistory)
members = count().label("members")
size = sum(ph.size).label("size")
value = sum(ph.value).label("value")
score = sum(ph.score).label("score")
avg_size = size.op("/")(members).label("avg_size")
avg_value = value.op("/")(members).label("avg_value")
t10v = count(case(whens=((ph.value_rank <= 10 ,1),), else_=None)).label("t10v")
t100v = count(case(whens=((ph.value_rank <= 100 ,1),), else_=None)).label("t100v")
pho = aliased(PlanetHistory)
sizeo = sum(pho.size).label("sizeo")
valueo = sum(pho.value).label("valueo")
scoreo = sum(pho.score).label("scoreo")
Q = session.query(PlanetHistory.tick.label("tick"),
Alliance.id.label("id"),
literal_column("rank() OVER (PARTITION BY planet_history.tick ORDER BY sum(planet_history.size) DESC)").label("size_rank"),
literal_column("rank() OVER (PARTITION BY planet_history.tick ORDER BY sum(planet_history.value) DESC)").label("value_rank"),
)
Q = Q.filter(PlanetHistory.active == True)
Q = Q.join(PlanetHistory.current)
Q = Q.join(Planet.intel)
Q = Q.join(Intel.alliance)
Q = Q.group_by(PlanetHistory.tick, Alliance.id)
ranks = Q.subquery()
Q = session.query(ph.tick, members,
size, value,
avg_size, avg_value,
size-sizeo, value-valueo, score-scoreo,
t10v, t100v,
)
Q = Q.filter(ph.active == True)
Q = Q.join(ph.current)
Q = Q.join(Planet.intel)
Q = Q.join(Intel.alliance)
Q = Q.outerjoin((pho, and_(ph.id==pho.id, ph.tick-1==pho.tick),))
Q = Q.filter(Intel.alliance == alliance)
Q = Q.group_by(ph.tick)
Q = Q.from_self().add_columns(ranks.c.size_rank, ranks.c.value_rank)
Q = Q.outerjoin((ranks, and_(ph.tick == ranks.c.tick, alliance.id == ranks.c.id),))
Q = Q.order_by(desc(ph.tick))
history = Q.all()
return render("ialliancehistory.tpl", request, alliance=alliance, members=alliance.intel_members, history=history)
开发者ID:Hardware-Hacks,项目名称:merlin,代码行数:53,代码来源:ialliancehistory.py
示例4: get_balance
def get_balance(year, month):
(_, day) = calendar.monthrange(year, month)
start_date = datetime.date(year, month, 1)
end_date = datetime.date(year, month, day)
balance_schema = BalanceSchema()
amounts = db.session.query(
func.sum(Record.amount).label("cash_flow"),
func.sum(
case([(Record.record_type == Record.RECORD_TYPE_INCOME, Record.amount)], else_=0)
).label('income'),
func.sum(
case([(Record.record_type == Record.RECORD_TYPE_EXPENSE, Record.amount)], else_=0)
).label('expense'),
func.date_trunc('month', Record.date).label("date"),
).filter(
func.extract('year', Record.date) == year,
func.extract('month', Record.date) == month,
).group_by(
func.date_trunc('month', Record.date)
).first()
current_balance = db.session.query(
func.sum(
case([(Record.date < start_date, Record.amount)], else_=0)
).label('start_balance'),
func.sum(Record.amount).label("end_balance")
).filter(
Record.date <= end_date
).first()
if amounts:
balance = balance_schema.dump({
'cash_flow': amounts.cash_flow,
'income': amounts.income,
'expense': amounts.expense,
'date': amounts.date,
'start_balance': current_balance.start_balance,
'end_balance': current_balance.end_balance,
}).data
else:
balance = balance_schema.dump({
'cash_flow': 0,
'income': 0,
'expense': 0,
'date': end_date,
'start_balance': current_balance.start_balance,
'end_balance': current_balance.end_balance,
}).data
return balance
开发者ID:monetario,项目名称:core,代码行数:51,代码来源:balance.py
示例5: upgrade
def upgrade():
op.add_column('request',
sa.Column('payout', sa.Numeric(precision=15, scale=2), index=True,
nullable=True))
bind = op.get_bind()
absolute = select([abs_table.c.value.label('value'),
mod_table.c.request_id.label('request_id')])\
.select_from(join(abs_table, mod_table,
mod_table.c.id == abs_table.c.id))\
.where(mod_table.c.voided_user_id == None)\
.alias()
relative = select([rel_table.c.value.label('value'),
mod_table.c.request_id.label('request_id')])\
.select_from(join(rel_table, mod_table,
mod_table.c.id == rel_table.c.id))\
.where(mod_table.c.voided_user_id == None)\
.alias()
abs_sum = select([request.c.id.label('request_id'),
request.c.base_payout.label('base_payout'),
func.sum(absolute.c.value).label('sum')])\
.select_from(outerjoin(request, absolute,
request.c.id == absolute.c.request_id))\
.group_by(request.c.id)\
.alias()
rel_sum = select([request.c.id.label('request_id'),
func.sum(relative.c.value).label('sum')])\
.select_from(outerjoin(request, relative,
request.c.id == relative.c.request_id))\
.group_by(request.c.id)\
.alias()
total_sum = select([abs_sum.c.request_id.label('request_id'),
((
abs_sum.c.base_payout +
case([(abs_sum.c.sum == None, Decimal(0))],
else_=abs_sum.c.sum)) *
(
1 +
case([(rel_sum.c.sum == None, Decimal(0))],
else_=rel_sum.c.sum))).label('payout')])\
.select_from(join(abs_sum, rel_sum,
abs_sum.c.request_id == rel_sum.c.request_id))
payouts = bind.execute(total_sum)
for request_id, payout in payouts:
up = update(request).where(request.c.id == request_id).values(
payout=payout)
bind.execute(up)
op.alter_column('request', 'payout', nullable=False,
existing_type=sa.Numeric(precision=15, scale=2))
开发者ID:Acidity,项目名称:evesrp,代码行数:49,代码来源:3e5e1d3a02c_denormalize_request_payout.py
示例6: message_totals
def message_totals(dbsession, user):
"Message totals query"
query = dbsession.query(Message.date,
func.count(Message.date).label('mail_total'),
func.sum(case([(Message.virusinfected > 0, 1)],
else_=0)).label('virus_total'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.spam > 0), 1)],
else_=0)).label('spam_total'),
func.sum(Message.size).label('total_size')
).group_by(Message.date).order_by(
desc(Message.date))
uquery = UserFilter(dbsession, user, query)
query = uquery.filter()
return query
开发者ID:TetraAsh,项目名称:baruwa2,代码行数:15,代码来源:query.py
示例7: query
def query(cls, db, name=None, lang=None, order_by=True):
name = _listify(name)
lang = _listify(lang)
query = db.query(cls)
if len(name) == 1:
query = query.filter(cls.name == name[0])
elif len(name) > 1:
query = query.filter(cls.name.in_(name))
if len(lang) == 1:
query = query.filter(cls.lang == lang[0])
elif len(lang) > 1:
query = query.filter(cls.lang.in_(lang))
# Handle ordering
if order_by:
if order_by is True:
if not lang:
query = query.order_by(cls.lang)
elif len(lang) > 1:
query = query.order_by(
sql.case(
value=cls.lang,
whens=list((item, ix) for ix, item in enumerate(lang))
)
)
if len(name) != 1:
query = query.order_by(cls.name)
else:
# noinspection PyArgumentList
query = query.order_by(*order_by)
return query
开发者ID:FuelRats,项目名称:pipsqueak,代码行数:32,代码来源:db.py
示例8: createView
def createView(self):
# filter indexes
catalog = self.env.catalog.index_catalog
xmlindex_list = catalog.getIndexes(package_id='seismology',
resourcetype_id='event')
filter = ['datetime', 'latitude', 'longitude', 'depth',
'magnitude', 'magnitude_type', 'event_type', 'np1_strike',
'np1_dip', 'np1_rake', 'mt_mrr', 'mt_mtt', 'mt_mpp',
'mt_mrt', 'mt_mrp', 'mt_mtp', 'localisation_method']
xmlindex_list = [x for x in xmlindex_list if x.label in filter]
if not xmlindex_list:
return
# build up query
query, joins = catalog._createIndexView(xmlindex_list, compact=True)
options = [
sql.literal_column("datetime.keyval").label("end_datetime"),
sql.literal_column("datetime.keyval").label("start_datetime"),
sql.case(
value=sql.literal_column("localisation_method.keyval"),
whens={'manual': 'circle'},
else_='square').label('gis_localisation_method'),
sql.func.GeomFromText(
sql.text("'POINT(' || longitude.keyval || ' ' || " + \
"latitude.keyval || ')', 4326")).label('geom')
]
for option in options:
query.append_column(option)
query = query.select_from(joins)
return util.compileStatement(query)
开发者ID:barsch,项目名称:seishub.plugins.exupery,代码行数:29,代码来源:seismic.py
示例9: create_mapper
def create_mapper(rack_specs_tbl):
"Mapper factory."
rs = rack_specs_tbl
polymorphic_select = select([
rs,
(case([(rs.c.has_movable_subitems,
literal(RACK_SPECS_TYPES.TUBE_RACK_SPECS))],
else_=literal(RACK_SPECS_TYPES.PLATE_SPECS))).label(
'rackspecs_type')
],
).alias('rackspecs')
m = mapper(RackSpecs, polymorphic_select,
id_attribute='rack_specs_id',
slug_expression=lambda cls: as_slug_expression(cls.name),
properties=dict(
manufacturer=relationship(Organization),
shape=relationship(RackShape, uselist=False,
back_populates='specs'),
rack_specs_type=
column_property(polymorphic_select.c.rackspecs_type),
),
polymorphic_on=polymorphic_select.c.rackspecs_type,
polymorphic_identity=RACK_SPECS_TYPES.RACK_SPECS,
)
RackSpecs.has_tubes = synonym('has_movable_subitems')
return m
开发者ID:helixyte,项目名称:TheLMA,代码行数:26,代码来源:rackspecs.py
示例10: tree_stats
def tree_stats(request, treedef, tree, parentid):
tree_table = datamodel.get_table(tree)
parentid = None if parentid == 'null' else int(parentid)
node = getattr(models, tree_table.name)
descendant = aliased(node)
node_id = getattr(node, node._id)
descendant_id = getattr(descendant, node._id)
treedef_col = tree_table.name + "TreeDefID"
same_tree_p = getattr(descendant, treedef_col) == int(treedef)
is_descendant_p = sql.and_(
sql.between(descendant.nodeNumber, node.nodeNumber, node.highestChildNodeNumber),
same_tree_p)
target, make_joins = getattr(StatsQuerySpecialization, tree)()
target_id = getattr(target, target._id)
direct_count = sql.cast(
sql.func.sum(sql.case([(sql.and_(target_id != None, descendant_id == node_id), 1)], else_=0)),
types.Integer)
all_count = sql.func.count(target_id)
with models.session_context() as session:
query = session.query(node_id, direct_count, all_count) \
.join(descendant, is_descendant_p) \
.filter(node.ParentID == parentid) \
.group_by(node_id)
query = make_joins(request.specify_collection, query, descendant_id)
results = list(query)
return HttpResponse(toJson(results), content_type='application/json')
开发者ID:Colombia1819,项目名称:specify7,代码行数:34,代码来源:tree_views.py
示例11: get_measures
def get_measures(self):
"""Find all data that should be included in the report.
The data is returned as a list of tuples containing a
:py:class:`Module <euphorie.client.model.Module>`,
:py:class:`Risk <euphorie.client.model.Risk>` and
:py:class:`ActionPlan <euphorie.client.model.ActionPlan>`. Each
entry in the list will correspond to a row in the generated Excel
file.
This implementation differs from Euphorie in its ordering:
it sorts on risk priority instead of start date.
"""
query = (
Session.query(model.Module, model.Risk, model.ActionPlan)
.filter(sql.and_(model.Module.session == self.session, model.Module.profile_index > -1))
.filter(sql.not_(model.SKIPPED_PARENTS))
.filter(sql.or_(model.MODULE_WITH_RISK_OR_TOP5_FILTER, model.RISK_PRESENT_OR_TOP5_FILTER))
.join(
(
model.Risk,
sql.and_(
model.Risk.path.startswith(model.Module.path),
model.Risk.depth == model.Module.depth + 1,
model.Risk.session == self.session,
),
)
)
.join((model.ActionPlan, model.ActionPlan.risk_id == model.Risk.id))
.order_by(sql.case(value=model.Risk.priority, whens={"high": 0, "medium": 1}, else_=2), model.Risk.path)
)
return query.all()
开发者ID:pombredanne,项目名称:osha.oira,代码行数:32,代码来源:report.py
示例12: filter_by_watches
def filter_by_watches(self, user):
"""Filter the gallery down to only things `user` is watching."""
# XXX make this work for multiple users
self.query = self.query.filter(or_(
# Check for by/for/of watching
# XXX need an index on relationship_type, badly!
model.Artwork.id.in_(
self.session.query(model.UserArtwork.artwork_id)
.join((model.UserWatch, model.UserArtwork.user_id == model.UserWatch.other_user_id))
.filter(model.UserWatch.user_id == user.id)
.filter(case(
value=model.UserArtwork.relationship_type,
whens={
u'by': model.UserWatch.watch_by,
u'for': model.UserWatch.watch_for,
u'of': model.UserWatch.watch_of,
},
))
),
# Check for upload watching
model.Artwork.uploader_user_id.in_(
self.session.query(model.UserWatch.other_user_id)
.filter(model.UserWatch.user_id == user.id)
.filter(model.UserWatch.watch_upload == True) # gross
),
))
开发者ID:krinndnz,项目名称:floof,代码行数:26,代码来源:gallery.py
示例13: status
def status(cls):
return case(
[
(and_(cls.shipped_on.is_(None), cls.ship_method.like("%/%/%")), "Cancelled"),
(cls.shipped_on.isnot(None), "Shipped"),
],
else_="Open",
)
开发者ID:tipt,项目名称:part_analysis,代码行数:8,代码来源:models.py
示例14: __call__
def __call__(self, column_clause, cuboid=None):
if cuboid and cuboid.fact_count_column is not None:
count = func.sum(cuboid.fact_count_column)
return case([(count == 0, 0)], else_=(
func.sum(column_clause * cuboid.fact_count_column) /
cast(count,
types.Numeric)))
return func.avg(column_clause)
开发者ID:Kozea,项目名称:pypet,代码行数:8,代码来源:aggregates.py
示例15: prevp
def prevp(cls):
sess1 = Session()
cls1 = aliased(cls)
cls2 = aliased(cls)
res = case([
(cls.pi != 0, sess1.query(cls2).join(cls, cls.ab_id==cls2.ab_id).filter(cls2.pi == cls.pi - 1).first()),
], else_ = None)
if res is not None:
sess1.expunge(res)
开发者ID:zirmite,项目名称:flb,代码行数:9,代码来源:fx.py
示例16: license_approved_names
def license_approved_names():
lic = db.licenses.alias()
return (select([
lic.c.license_id,
case(
[[lic.c.is_spdx_official == True, lic.c.short_name]],
else_='LicenseRef-' + lic.c.short_name
).label('short_name')
])
)
开发者ID:sschuberth,项目名称:dosocs2,代码行数:10,代码来源:queries.py
示例17: has_permission
def has_permission(cls, principal, permission):
# This will need to emit some SQL that iterates through a
# sequence of ACEs, looking for allows or denies. Fake some
# simple case for now.
from sqlalchemy.sql import case
return case(
[
(cls.id == 2,
True),
], else_=False)
开发者ID:pauleveritt,项目名称:pyramid_sqltraversal,代码行数:10,代码来源:node.py
示例18: test_string_literals_not_oldstyle_quoted_in_func
def test_string_literals_not_oldstyle_quoted_in_func():
s = str(
sql.case(
[(sql.column("WHOA") == "Jason's", 1)],
else_=0
).compile(
compile_kwargs={"literal_binds": True},
dialect=bq.BQDialect()
)
)
assert "Jason's" not in s
开发者ID:gcbirzan,项目名称:sqlalchemy-bigquery,代码行数:11,代码来源:test_suite.py
示例19: last_observation
def last_observation(self):
session = object_session(self)
q = session.query(ObservedMac)
q = q.filter_by(mac_address=self.mac)
# Group the results into 'any port number but zero' and 'port 0'.
# This prioritizes any port over the uplink port.
# Saying that port 0 is an uplink port isn't very elegant, also
# with real port names it's not even true.
q = q.order_by(desc(case([(ObservedMac.port == "0", 0)], else_=1)))
q = q.order_by(desc(ObservedMac.last_seen))
return q.first()
开发者ID:jrha,项目名称:aquilon,代码行数:11,代码来源:interface.py
示例20: accumulator
def accumulator(self, column_name, new_row, agg_row, old_row=None):
new_count = new_row.count
new_total = new_row.c[column_name] * new_row.count
if old_row is not None:
new_count = new_count - old_row.count
new_total = (new_total -
(old_row.c[column_name] * old_row.count))
agg_count = func.coalesce(agg_row.count, 0)
agg_value = func.coalesce(agg_row.c[column_name]) * agg_count
total_count = new_count + agg_count
return case([(total_count == 0, 0)],
else_=(agg_value + new_total) / total_count)
开发者ID:Kozea,项目名称:pypet,代码行数:12,代码来源:aggregates.py
注:本文中的sqlalchemy.sql.case函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论