本文整理汇总了Python中sqlalchemy.func.row_number函数的典型用法代码示例。如果您正苦于以下问题:Python row_number函数的具体用法?Python row_number怎么用?Python row_number使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了row_number函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_over
def test_over(self):
eq_(
select([
flds.c.intcol, func.row_number().over(order_by=flds.c.strcol)
]).execute().fetchall(),
[(13, 1), (5, 2)]
)
开发者ID:rlugojr,项目名称:sqlalchemy,代码行数:7,代码来源:test_query.py
示例2: column_windows
def column_windows(session, column, windowsize):
"""Return a series of WHERE clauses against
a given column that break it into windows.
Result is an iterable of tuples, consisting of
((start, end), whereclause), where (start, end) are the ids.
"""
def int_for_range(start_id, end_id):
"create a range"
if end_id:
return and_(column >= start_id, column < end_id)
else:
return column >= start_id
qry = session.query(column,
func.row_number().
over(order_by=column).
label('rownum')
).from_self(column)
if windowsize > 1:
qry = qry.filter("rownum %% %d=1" % windowsize)
intervals = [qid for qid, in qry]
while intervals:
start = intervals.pop(0)
if intervals:
end = intervals[0]
else:
end = None
yield int_for_range(start, end)
开发者ID:baruwaproject,项目名称:baruwa2,代码行数:33,代码来源:settings.py
示例3: top_players
def top_players(self):
"""Top players on this server by total playing time."""
try:
top_players_q = DBSession.query(
fg.row_number().over(
order_by=expr.desc(func.sum(PlayerGameStat.alivetime))).label("rank"),
Player.player_id, Player.nick,
func.sum(PlayerGameStat.alivetime).label("alivetime"))\
.filter(Player.player_id == PlayerGameStat.player_id)\
.filter(Game.game_id == PlayerGameStat.game_id)\
.filter(Game.server_id == self.server_id)\
.filter(Player.player_id > 2)\
.filter(PlayerGameStat.create_dt > (self.now - timedelta(days=self.lifetime)))\
.order_by(expr.desc(func.sum(PlayerGameStat.alivetime)))\
.group_by(Player.nick)\
.group_by(Player.player_id)
if self.last:
top_players_q = top_players_q.offset(self.last)
if self.limit:
top_players_q = top_players_q.limit(self.limit)
top_players = top_players_q.all()
except Exception as e:
log.debug(e)
raise HTTPNotFound
return top_players
开发者ID:antzucaro,项目名称:XonStat,代码行数:30,代码来源:server.py
示例4: get_top_scorers
def get_top_scorers(self):
"""Top players by score. Shared by all renderers."""
cutoff = self.now - timedelta(days=self.lifetime)
cutoff = self.now - timedelta(days=120)
top_scorers_q = DBSession.query(
fg.row_number().over(order_by=expr.desc(func.sum(PlayerGameStat.score))).label("rank"),
Player.player_id, Player.nick, func.sum(PlayerGameStat.score).label("total_score"))\
.filter(Player.player_id == PlayerGameStat.player_id)\
.filter(Game.game_id == PlayerGameStat.game_id)\
.filter(Game.map_id == self.map_id)\
.filter(Player.player_id > 2)\
.filter(PlayerGameStat.create_dt > cutoff)\
.order_by(expr.desc(func.sum(PlayerGameStat.score)))\
.group_by(Player.nick)\
.group_by(Player.player_id)
if self.last:
top_scorers_q = top_scorers_q.offset(self.last)
if self.limit:
top_scorers_q = top_scorers_q.limit(self.limit)
top_scorers = top_scorers_q.all()
return top_scorers
开发者ID:antzucaro,项目名称:XonStat,代码行数:26,代码来源:map.py
示例5: top_maps
def top_maps(self):
"""Returns the raw data shared by all renderers."""
try:
top_maps_q = DBSession.query(
fg.row_number().over(order_by=expr.desc(func.count())).label("rank"),
Game.map_id, Map.name, func.count().label("times_played"))\
.filter(Map.map_id == Game.map_id)\
.filter(Game.server_id == self.server_id)\
.filter(Game.create_dt > (self.now - timedelta(days=self.lifetime)))\
.group_by(Game.map_id)\
.group_by(Map.name) \
.order_by(expr.desc(func.count()))
if self.last:
top_maps_q = top_maps_q.offset(self.last)
if self.limit:
top_maps_q = top_maps_q.limit(self.limit)
top_maps = top_maps_q.all()
except Exception as e:
log.debug(e)
raise HTTPNotFound
return top_maps
开发者ID:antzucaro,项目名称:XonStat,代码行数:25,代码来源:server.py
示例6: test_over
def test_over(self):
stmt = select([column("foo"), column("bar")])
stmt = select(
[func.row_number().over(order_by="foo", partition_by="bar")]
).select_from(stmt)
self.assert_compile(
stmt,
"SELECT row_number() OVER (PARTITION BY bar ORDER BY foo) "
"AS anon_1 FROM (SELECT foo, bar)",
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:11,代码来源:test_text.py
示例7: test_no_paren_fns
def test_no_paren_fns(self):
for fn, expected in [
(func.uid(), "uid"),
(func.UID(), "UID"),
(func.sysdate(), "sysdate"),
(func.row_number(), "row_number()"),
(func.rank(), "rank()"),
(func.now(), "CURRENT_TIMESTAMP"),
(func.current_timestamp(), "CURRENT_TIMESTAMP"),
(func.user(), "USER"),
]:
self.assert_compile(fn, expected)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:12,代码来源:test_compiler.py
示例8: get_child_query_by_priority
def get_child_query_by_priority (self, child_name):
CHILD_NAME_TO_MODEL_MAPPER = {
'skills_categories': SkillCategory,
'skills': Skill,
'perks': Perk,
'items': Item,
'item_groups': ItemGroup,
'races': Race,
'character_class': CharacterClass,
'dices': Dice,
}
_model = CHILD_NAME_TO_MODEL_MAPPER[child_name]
# partition_by = rules_id ?
return DBSession.query(_model, func.row_number().over(order_by = _model.priority).label('_priority')).filter_by(rules_id = self.id)
开发者ID:the-tosh,项目名称:boardless-open,代码行数:16,代码来源:models.py
示例9: iter_bounds
def iter_bounds(db_session, column, batch_size, importlimit):
"""
Return a list of (lower bound, upper bound) tuples which contain row ids to
iterate through a table in batches of ``batch_size``. If ``importlimit`` is
greater than zero, return only enough tuples to contain ``importlimit``
rows. The second element of the last tuple in the returned list may be
``None``. This happens if the last batch will contain less than
``batch_size`` rows.
:param sqlalchemy.orm.session.Session db_session:
:param sqlalchemy.Column column:
:param int batch_size:
:param int importlimit:
:rtype: [(int, int)]
"""
q = db_session.query(
column,
func.row_number().
over(order_by=column).
label('rownum')
).\
from_self(column)
if batch_size > 1:
q = q.filter("rownum %% %d=1" % batch_size)
if importlimit:
q = q.filter("rownum <= %d" % (importlimit))
intervals = [id for id in q]
bounds = []
while intervals:
start = intervals.pop(0)[0]
if intervals:
end = intervals[0][0]
elif importlimit:
# If there's an importlimit, just add a noop bound. This way,
# :func:`sir.indexing.index_entity` doesn't require any
# information about the limit
end = start
else:
end = None
bounds.append((start, end))
return bounds
开发者ID:baykelper,项目名称:mb-sir,代码行数:46,代码来源:querying.py
示例10: test_clause_expansion
def test_clause_expansion(self):
Data = self.classes.Data
b1 = Bundle('b1', Data.id, Data.d1, Data.d2)
sess = Session()
self.assert_compile(
sess.query(Data).order_by(b1),
"SELECT data.id AS data_id, data.d1 AS data_d1, "
"data.d2 AS data_d2, data.d3 AS data_d3 FROM data "
"ORDER BY data.id, data.d1, data.d2"
)
self.assert_compile(
sess.query(func.row_number().over(order_by=b1)),
"SELECT row_number() OVER (ORDER BY data.id, data.d1, data.d2) "
"AS anon_1 FROM data"
)
开发者ID:23andMe,项目名称:sqlalchemy,代码行数:18,代码来源:test_bundle.py
示例11: column_windows
def column_windows(session, column, windowsize):
"""Return a series of WHERE clauses against
a given column that break it into windows.
Result is an iterable of tuples, consisting of
((start, end), whereclause), where (start, end) are the ids.
Requires a database that supports window functions,
i.e. Postgresql, SQL Server, Oracle.
Enhance this yourself ! Add a "where" argument
so that windows of just a subset of rows can
be computed.
"""
def int_for_range(start_id, end_id):
if end_id:
return and_(
column>=start_id,
column<end_id
)
else:
return column>=start_id
q = session.query(
column,
func.row_number().\
over(order_by=column).\
label('rownum')
).\
from_self(column)
if windowsize > 1:
q = q.filter("rownum %% %d=1" % windowsize)
intervals = [id for id, in q]
while intervals:
start = intervals.pop(0)
if intervals:
end = intervals[0]
else:
end = None
yield int_for_range(start, end)
开发者ID:groceryheist,项目名称:UWBotThings,代码行数:43,代码来源:populateHashTag.py
示例12: collections
def collections(self):
print 'The userid is %d' % g.user.id
session = DBSession()
#products = session.query(Product).filter(Product.user_id == g.user.id).all()
products = session.query(User) \
.join(User.products) \
.join(Product.productitems) \
.group_by(Product.id,Product.created_date,Product.title) \
.order_by(Product.created_date) \
.values( Product.id.label('product_id'),
Product.title.label('title'),
Product.created_date.label('created_date'),
(func.row_number().over(order_by='products.created_date').label('number')),
(func.count(ProductItem.id)).label('total'))
if products is not None:
if(self.request.method =='GET'):
return render_template('user_collections.html',products=products)
return render_template('user_collections.html',products=products)
开发者ID:jeeka321,项目名称:Production,代码行数:19,代码来源:AccountController.py
示例13: upgrade
def upgrade():
batches = Table('test_batches', MetaData(schema='jsil'),
Column('id', Integer),
Column('job_id', Integer),
Column('condor_proc', SmallInteger),
)
indexed_batches = select([
batches.c.id,
(func.row_number()\
.over(partition_by=batches.c.job_id, order_by=batches.c.id)\
- op.inline_literal(1)).label('idx')
]).alias('indexed_batches')
op.execute(
batches.update()\
.where(batches.c.condor_proc == op.inline_literal(-1))\
.where(batches.c.id == indexed_batches.c.id)\
.values(condor_proc = indexed_batches.c.idx)
)
开发者ID:resource-reasoning,项目名称:testing-website,代码行数:20,代码来源:574e747f1d4_populate_batch_condor_proc.py
示例14: get_latest_fixes
def get_latest_fixes(self, max_age=timedelta(hours=6), **kw):
row_number = over(func.row_number(),
partition_by=TrackingFix.pilot_id,
order_by=desc(TrackingFix.time))
tracking_delay = cast(cast(User.tracking_delay, String) + ' minutes', Interval)
subq = DBSession.query(TrackingFix.id,
row_number.label('row_number')) \
.outerjoin(TrackingFix.pilot) \
.filter(TrackingFix.time >= datetime.utcnow() - max_age) \
.filter(TrackingFix.time <= datetime.utcnow() - tracking_delay) \
.filter(TrackingFix.location_wkt != None) \
.subquery()
query = DBSession.query(TrackingFix) \
.options(joinedload(TrackingFix.pilot)) \
.filter(TrackingFix.id == subq.c.id) \
.filter(subq.c.row_number == 1) \
.order_by(desc(TrackingFix.time))
return query
开发者ID:citterio,项目名称:Skylines,代码行数:22,代码来源:tracking.py
示例15: limit_groups
def limit_groups(query, model, partition_by, order_by, limit=None, offset=0):
"""Limits the number of rows returned for each group
This utility allows you to apply a limit/offset to grouped rows of a query.
Note that the query will only contain the data from `model`; i.e. you cannot
add additional entities.
:param query: The original query, including filters, joins, etc.
:param model: The model class for `query`
:param partition_by: The column to group by
:param order_by: The column to order the partitions by
:param limit: The maximum number of rows for each partition
:param offset: The number of rows to skip in each partition
"""
inner = query.add_columns(over(func.row_number(), partition_by=partition_by,
order_by=order_by).label('rownum')).subquery()
query = model.query.select_entity_from(inner)
if limit:
return query.filter(offset < inner.c.rownum, inner.c.rownum <= (limit + offset))
else:
return query.filter(offset < inner.c.rownum)
开发者ID:DirkHoffmann,项目名称:indico,代码行数:23,代码来源:queries.py
示例16: setup_classes
def setup_classes(cls):
Base = cls.DeclarativeBasic
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
a_id = Column(ForeignKey("a.id"))
cs = relationship("C")
class C(Base):
__tablename__ = "c"
id = Column(Integer, primary_key=True)
b_id = Column(ForeignKey("b.id"))
partition = select(
[
B,
func.row_number()
.over(order_by=B.id, partition_by=B.a_id)
.label("index"),
]
).alias()
partitioned_b = aliased(B, alias=partition)
A.partitioned_bs = relationship(
partitioned_b,
primaryjoin=and_(
partitioned_b.a_id == A.id, partition.c.index < 10
),
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:36,代码来源:test_ac_relationships.py
示例17: fulltextsearch
def fulltextsearch(self):
try:
lang = self.request.registry.settings['default_locale_name']
except KeyError:
return HTTPInternalServerError(
detail='default_locale_name not defined in settings')
try:
lang = self.languages[lang]
except KeyError:
return HTTPInternalServerError(
detail='%s not defined in languages' % lang)
if 'query' not in self.request.params:
return HTTPBadRequest(detail='no query')
query = self.request.params.get('query')
maxlimit = self.settings.get('maxlimit', 200)
try:
limit = int(self.request.params.get(
'limit',
self.settings.get('defaultlimit', 30)))
except ValueError:
return HTTPBadRequest(detail='limit value is incorrect')
if limit > maxlimit:
limit = maxlimit
try:
partitionlimit = int(self.request.params.get('partitionlimit', 0))
except ValueError:
return HTTPBadRequest(detail='partitionlimit value is incorrect')
if partitionlimit > maxlimit:
partitionlimit = maxlimit
terms = '&'.join(re.sub("'", "''", w) + ':*' for w in query.split(' ') if w != '')
_filter = "%(tsvector)s @@ to_tsquery('%(lang)s', '%(terms)s')" % \
{'tsvector': 'ts', 'lang': lang, 'terms': terms}
if self.request.user is None or self.request.user.role is None:
_filter = and_(_filter, FullTextSearch.public.is_(True))
else:
_filter = and_(
_filter,
or_(
FullTextSearch.public.is_(True),
FullTextSearch.role_id.is_(None),
FullTextSearch.role_id == self.request.user.role.id
)
)
# The numbers used in ts_rank_cd() below indicate a normalization method.
# Several normalization methods can be combined using |.
# 2 divides the rank by the document length
# 8 divides the rank by the number of unique words in document
# By combining them, shorter results seem to be preferred over longer ones
# with the same ratio of matching words. But this relies only on testing it
# and on some assumptions about how it might be calculated
# (the normalization is applied two times with the combination of 2 and 8,
# so the effect on at least the one-word-results is therefore stronger).
rank = "ts_rank_cd(%(tsvector)s, " \
"to_tsquery('%(lang)s', '%(terms)s'), 2|8)" % {
'tsvector': 'ts',
'lang': lang,
'terms': terms
}
if partitionlimit:
# Here we want to partition the search results based on
# layer_name and limit each partition.
row_number = func.row_number() \
.over(
partition_by=FullTextSearch.layer_name,
order_by=(desc(rank), FullTextSearch.label)) \
.label('row_number')
subq = DBSession.query(FullTextSearch) \
.add_columns(row_number).filter(_filter).subquery()
query = DBSession.query(subq.c.id, subq.c.label, subq.c.params,
subq.c.layer_name, subq.c.the_geom)
query = query.filter(subq.c.row_number <= partitionlimit)
else:
query = DBSession.query(FullTextSearch).filter(_filter)
query = query.order_by(desc(rank))
query = query.order_by(FullTextSearch.label)
query = query.limit(limit)
objs = query.all()
features = []
for o in objs:
if o.the_geom is not None:
properties = {
"label": o.label,
"layer_name": o.layer_name,
"params": o.params,
}
geom = to_shape(o.the_geom)
feature = Feature(id=o.id, geometry=geom,
properties=properties, bbox=geom.bounds)
features.append(feature)
#.........这里部分代码省略.........
开发者ID:kailIII,项目名称:c2cgeoportal,代码行数:101,代码来源:fulltextsearch.py
示例18: paginate
def paginate(query, page_num, page_size, order_by_column,
partition_by_column=None, order_by=None, session=None):
"""
Modify the `query` object with paginated _row_number and order by clause on
the specified `column`. The window size is created dynamically based on the
application user input. This function adds a pagination wrapper around the
query object on the specified column(s).
Args:
query(object): SQLAlchemy query object or Subquery object.
page_num(int): Page number
page_size(int): Number of record of per page
order_by_column(object or list): SQLAlchemy column(s) object(s).
partition_by_column(object or list): SQLAlchemy column(s) object(s)
There is a major assumption that the value in this
column should be unique per record (not repeating)
in the initial input query.
order_by(str): Order by clause, 'asc' for ascending or 'desc' for
descending. Default is 'asc'.
session(object): database session connection object.
Returns:
An output query object wrapped with paginated where clause based
on row_number (_row_number), sorted by and partitioned by the respective
column(s).
"""
if not hasattr(query, 'session'):
# subquery object is passed.
if not session:
raise AttributeError("query object has no attribute 'session'")
else:
# query object is passed.
session = query.session
if partition_by_column is not None:
if order_by:
partition_by_column = _get_order_by_columns(partition_by_column,
order_by)
paginate_column = func.row_number().over(
partition_by=partition_by_column,
order_by=order_by_column).label('_row_number')
else:
if order_by:
order_by_column = _get_order_by_columns(order_by_column,
order_by)
paginate_column = func.row_number().over(
order_by=order_by_column).label('_row_number')
pagination_subquery = _get_paginated_subquery(session,
query,
paginate_column)
start_page = _get_window_top(page_num, page_size)
end_page = _get_window_bottom(page_num, page_size)
return _paged_query_object(session,
pagination_subquery,
start_page,
end_page)
开发者ID:AmeetSM,项目名称:Napoleon-Sphinx-Documentation,代码行数:62,代码来源:pagination.py
示例19: fulltextsearch
def fulltextsearch(self):
try:
lang = self.request.registry.settings["default_locale_name"]
except KeyError:
return HTTPInternalServerError(detail="default_locale_name not defined in settings")
try:
lang = self.languages[lang]
except KeyError:
return HTTPInternalServerError(detail="%s not defined in languages" % lang)
if "query" not in self.request.params:
return HTTPBadRequest(detail="no query")
query = self.request.params.get("query")
maxlimit = self.settings.get("maxlimit", 200)
try:
limit = int(self.request.params.get("limit", self.settings.get("defaultlimit", 30)))
except ValueError:
return HTTPBadRequest(detail="limit value is incorrect")
if limit > maxlimit:
limit = maxlimit
try:
partitionlimit = int(self.request.params.get("partitionlimit", 0))
except ValueError:
return HTTPBadRequest(detail="partitionlimit value is incorrect")
if partitionlimit > maxlimit:
partitionlimit = maxlimit
terms = "&".join(w + ":*" for w in query.split(" ") if w != "")
_filter = "%(tsvector)s @@ to_tsquery('%(lang)s', '%(terms)s')" % {
"tsvector": "ts",
"lang": lang,
"terms": terms,
}
# flake8 does not like `== True`
if self.request.user is None:
_filter = and_(_filter, FullTextSearch.public == True) # NOQA
else:
_filter = and_(
_filter,
or_(
FullTextSearch.public == True, # NOQA
FullTextSearch.role_id == None,
FullTextSearch.role_id == self.request.user.role.id,
),
)
# The numbers used in ts_rank_cd() below indicate a normalization method.
# Several normalization methods can be combined using |.
# 2 divides the rank by the document length
# 8 divides the rank by the number of unique words in document
# By combining them, shorter results seem to be preferred over longer ones
# with the same ratio of matching words. But this relies only on testing it
# and on some assumptions about how it might be calculated
# (the normalization is applied two times with the combination of 2 and 8,
# so the effect on at least the one-word-results is therefore stronger).
rank = "ts_rank_cd(%(tsvector)s, " "to_tsquery('%(lang)s', '%(terms)s'), 2|8)" % {
"tsvector": "ts",
"lang": lang,
"terms": terms,
}
if partitionlimit:
# Here we want to partition the search results based on
# layer_name and limit each partition.
row_number = (
func.row_number()
.over(partition_by=FullTextSearch.layer_name, order_by=(desc(rank), FullTextSearch.label))
.label("row_number")
)
subq = DBSession.query(FullTextSearch).add_columns(row_number).filter(_filter).subquery()
query = DBSession.query(subq.c.id, subq.c.label, subq.c.params, subq.c.layer_name, subq.c.the_geom)
query = query.filter(subq.c.row_number <= partitionlimit)
else:
query = DBSession.query(FullTextSearch).filter(_filter)
query = query.order_by(desc(rank))
query = query.order_by(FullTextSearch.label)
query = query.limit(limit)
objs = query.all()
features = []
for o in objs:
if o.the_geom is not None:
properties = {"label": o.label, "layer_name": o.layer_name, "params": o.params}
geom = wkb_loads(str(o.the_geom.geom_wkb))
feature = Feature(id=o.id, geometry=geom, properties=properties, bbox=geom.bounds)
features.append(feature)
# TODO: add callback function if provided in self.request, else return geojson
return FeatureCollection(features)
开发者ID:pgiraud,项目名称:c2cgeoportal,代码行数:95,代码来源:fulltextsearch.py
示例20: traffic_history_query
def traffic_history_query():
timestamptz = TIMESTAMP(timezone=True)
events = union_all(
select([TrafficCredit.amount,
TrafficCredit.timestamp,
literal("Credit").label('type')]
).where(TrafficCredit.user_id == literal_column('arg_user_id')),
select([(-TrafficVolume.amount).label('amount'),
TrafficVolume.timestamp,
cast(TrafficVolume.type, TEXT).label('type')]
).where(TrafficVolume.user_id == literal_column('arg_user_id'))
).cte('traffic_events')
def round_time(time_expr, ceil=False):
round_func = func.ceil if ceil else func.trunc
step_epoch = func.extract('epoch', literal_column('arg_step'))
return cast(func.to_timestamp(round_func(func.extract('epoch', time_expr) / step_epoch) * step_epoch), timestamptz)
balance = select([TrafficBalance.amount, TrafficBalance.timestamp])\
.select_from(User.__table__.outerjoin(TrafficBalance))\
.where(User.id == literal_column('arg_user_id'))\
.cte('balance')
balance_amount = select([balance.c.amount]).as_scalar()
balance_timestamp = select([balance.c.timestamp]).as_scalar()
# Bucket layout
# n = interval / step
# 0: Aggregates all prior traffic_events so that the balance value can be calculated
# 1 - n: Traffic history entry
# n+1: Aggregates all data after the last point in time, will be discarded
buckets = select([literal_column('bucket'),
(func.row_number().over(order_by=literal_column('bucket')) - 1).label('index')]
).select_from(
func.generate_series(
round_time(cast(literal_column('arg_start'), timestamptz)) - literal_column('arg_step'),
round_time(cast(literal_column('arg_start'), timestamptz) + literal_column('arg_interval')),
literal_column('arg_step')
).alias('bucket')
).order_by(
literal_column('bucket')
).cte('buckets')
def cond_sum(condition, label, invert=False):
return func.sum(case(
[(condition, events.c.amount if not invert else -events.c.amount)],
else_=None)).label(label)
hist = select([buckets.c.bucket,
cond_sum(events.c.type == 'Credit', 'credit'),
cond_sum(events.c.type == 'Ingress', 'ingress', invert=True),
cond_sum(events.c.type == 'Egress', 'egress', invert=True),
func.sum(events.c.amount).label('amount'),
cond_sum(and_(balance_timestamp != None, events.c.timestamp < balance_timestamp), 'before_balance'),
cond_sum(or_(balance_timestamp == None, events.c.timestamp >= balance_timestamp), 'after_balance')]
).select_from(buckets.outerjoin(
events, func.width_bucket(
events.c.timestamp, select([func.array(select([buckets.c.bucket]).select_from(buckets).where(buckets.c.index != 0).label('dummy'))])
) == buckets.c.index
)).where(
# Discard bucket n+1
buckets.c.index < select([func.max(buckets.c.index)])
).group_by(
buckets.c.bucket
).order_by(
buckets.c.bucket
).cte('traffic_hist')
# Bucket is located before the balance and no traffic_events exist before it
first_event_timestamp = select([func.min(events.c.timestamp)]).as_scalar()
case_before_balance_no_data = (
and_(balance_timestamp != None, hist.c.bucket < balance_timestamp,
or_(first_event_timestamp == None,
hist.c.bucket < first_event_timestamp
)),
None
)
# Bucket is located after the balance
case_after_balance = (
or_(balance_timestamp == None, hist.c.bucket >= balance_timestamp),
func.coalesce(balance_amount, 0) + func.coalesce(
func.sum(hist.c.after_balance).over(
order_by=hist.c.bucket.asc(), rows=(None, 0)),
0)
)
# Bucket is located before the balance, but there still exist traffic_events before it
else_before_balance = (
func.coalesce(balance_amount, 0) +
func.coalesce(hist.c.after_balance, 0) -
func.coalesce(
func.sum(hist.c.before_balance).over(
order_by=hist.c.bucket.desc(), rows=(None, -1)
), 0)
)
#.........这里部分代码省略.........
开发者ID:agdsn,项目名称:pycroft,代码行数:101,代码来源:traffic.py
注:本文中的sqlalchemy.func.row_number函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论