本文整理汇总了Python中sqlalchemy.func.date_trunc函数的典型用法代码示例。如果您正苦于以下问题:Python date_trunc函数的具体用法?Python date_trunc怎么用?Python date_trunc使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了date_trunc函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get
def get(self, short):
db = backend.Backend.instance().get_session()
try:
short_uri = db.query(models.ShortURI)\
.filter(models.ShortURI.short == short)\
.one()
hits = db.query(func.date_trunc('day', models.Hit.created), func.count())\
.filter(models.Hit.short_id == short_uri.id)\
.group_by(func.date_trunc('day', models.Hit.created))\
.limit(100)
params = {
"short_uri": short_uri,
"hits": hits
}
self.jinja_render("info.html", **params)
self.finish()
except NoResultFound:
self.set_status(404)
self.jinja_render("404.html")
self.finish()
finally:
db.close()
开发者ID:alyakimov,项目名称:tornado-shortener,代码行数:27,代码来源:main.py
示例2: day_report
def day_report(session, aid, date_from=None, date_to=None):
q = session.query(func.date_trunc('day', Transaction.date), Destination.direction,
func.sum(Transaction.amount))\
.join(Transaction.accounts)\
.filter(Destination.account == aid)\
.filter(Transaction.canceled == False)
if date_from:
q = q.filter(Transaction.date >= date_from)
if date_to:
q = q.filter(Transaction.date < date_to)
result = q.group_by(func.date_trunc('day', Transaction.date), Destination.direction)
data = []
kredit = debet = 0
last_data = None
for r in result:
if last_data is not None and last_data != r[0]:
data.append((last_data, Balance(debet, kredit)))
kredit = debet = 0
last_data = r[0]
if r[1]:
debet = r[2]
else:
kredit = r[2]
data.append((last_data, Balance(debet, kredit)))
return data
开发者ID:baverman,项目名称:taburet,代码行数:32,代码来源:model.py
示例3: get_interval
def get_interval(self, start_date=None, end_date=None, product=None, interval='month'):
query = db.session.query(func.sum(Record.quantity), Product.id, func.date_trunc(interval, Record.date))
query = query.join(Product).join(Account).filter(Account.id==self.id)
# query.filter(Record.date>=func.cast('2013-10-15', sa.types.Date)).filter(Record.date<func.cast('2013-10-15', sa.types.Date)+func.cast('1 month', sa.types.Interval))
if start_date:
query = query.filter(Record.date>=start_date)
if end_date:
query = query.filter(Record.date<=end_date)
if product:
query = query.filter(Product.id==product.id)
query = query.group_by(Product.id).group_by(func.date_trunc(interval, Record.date))
try:
ret_val = []
products = {}
for x in query.all():
# print "foo", x
if x[1] in products:
prod = products[x[1]]
else:
prod = products[x[1]] = Product.query.filter(Product.id==x[1]).one()
rates = prod.rate_pricing(x[0])
ret_val.append({
'product_id': prod.id,
'quantity': x[0],
'date': x[2].strftime('%Y-%m'),
'rates': rates,
'price': sum([r['price_at_quantity'] for r in rates])
})
return ret_val
except NoResultFound, e:
return []
开发者ID:sasimpson,项目名称:publican,代码行数:32,代码来源:models.py
示例4: query
def query(self):
q = select([func.date_trunc(self.group, LogItem.log_date).label('day'),
func.count().label('visitCount'),
func.count(LogItem.log_host.distinct()).label('uniqueVisitCount')],
and_(LogItem.log_hebpk == self.hebPk,
LogItem.log_date.between(self.minDate,
self.maxDate)))
q = q.group_by(func.date_trunc(self.group, LogItem.log_date))
q = q.order_by(func.date_trunc(self.group, LogItem.log_date))
return q
开发者ID:gitesdewallonie,项目名称:gites.stats,代码行数:10,代码来源:visits.py
示例5: timeseries
def timeseries(self, agg_unit, start, end, geom=None, column_filters=None):
# Reading this blog post
# http://no0p.github.io/postgresql/2014/05/08/timeseries-tips-pg.html
# inspired this implementation.
t = self.point_table
# Special case for the 'quarter' unit of aggregation.
step = '3 months' if agg_unit == 'quarter' else '1 ' + agg_unit
# Create a CTE to represent every time bucket in the timeseries
# with a default count of 0
day_generator = func.generate_series(func.date_trunc(agg_unit, start),
func.date_trunc(agg_unit, end),
step)
defaults = select([sa.literal_column("0").label('count'),
day_generator.label('time_bucket')])\
.alias('defaults')
where_filters = [t.c.point_date >= start, t.c.point_date <= end]
if column_filters is not None:
# Column filters has to be iterable here, because the '+' operator
# behaves differently for SQLAlchemy conditions. Instead of
# combining the conditions together, it would try to build
# something like :param1 + <column_filters> as a new condition.
where_filters += [column_filters]
# Create a CTE that grabs the number of records contained in each time
# bucket. Will only have rows for buckets with records.
actuals = select([func.count(t.c.hash).label('count'),
func.date_trunc(agg_unit, t.c.point_date).
label('time_bucket')])\
.where(sa.and_(*where_filters))\
.group_by('time_bucket')
# Also filter by geometry if requested
if geom:
contains = func.ST_Within(t.c.geom, func.ST_GeomFromGeoJSON(geom))
actuals = actuals.where(contains)
# Need to alias to make it usable in a subexpression
actuals = actuals.alias('actuals')
# Outer join the default and observed values
# to create the timeseries select statement.
# If no observed value in a bucket, use the default.
name = sa.literal_column("'{}'".format(self.dataset_name))\
.label('dataset_name')
bucket = defaults.c.time_bucket.label('time_bucket')
count = func.coalesce(actuals.c.count, defaults.c.count).label('count')
ts = select([name, bucket, count]).\
select_from(defaults.outerjoin(actuals, actuals.c.time_bucket == defaults.c.time_bucket))
return ts
开发者ID:gitter-badger,项目名称:plenario,代码行数:53,代码来源:models.py
示例6: timeseries
def timeseries(self, agg_unit, start, end, geom=None, column_filters=None):
# Reading this blog post
# http://no0p.github.io/postgresql/2014/05/08/timeseries-tips-pg.html
# inspired this implementation.
t = self.point_table
if agg_unit == 'quarter':
step = '3 months'
else:
step = '1 ' + agg_unit
# Create a CTE to represent every time bucket in the timeseries
# with a default count of 0
day_generator = func.generate_series(func.date_trunc(agg_unit, start),
func.date_trunc(agg_unit, end),
step)
defaults = select([sa.literal_column("0").label('count'),
day_generator.label('time_bucket')])\
.alias('defaults')
# Create a CTE that grabs the number of records
# contained in each time bucket.
# Will only have rows for buckets with records.
where_filters = [t.c.point_date >= start,
t.c.point_date <= end]
if column_filters:
where_filters += column_filters
actuals = select([func.count(t.c.hash).label('count'),
func.date_trunc(agg_unit, t.c.point_date).
label('time_bucket')])\
.where(sa.and_(*where_filters))\
.group_by('time_bucket')
# Also filter by geometry if requested
if geom:
contains = func.ST_Within(t.c.geom, func.ST_GeomFromGeoJSON(geom))
actuals = actuals.where(contains)
# Need to alias to make it usable in a subexpression
actuals = actuals.alias('actuals')
# Outer join the default and observed values
# to create the timeseries select statement.
# If no observed value in a bucket, use the default.
name = sa.literal_column("'{}'".format(self.dataset_name))\
.label('dataset_name')
bucket = defaults.c.time_bucket.label('time_bucket')
count = func.coalesce(actuals.c.count, defaults.c.count).label('count')
ts = select([name, bucket, count]).\
select_from(defaults.outerjoin(actuals, actuals.c.time_bucket == defaults.c.time_bucket))
return ts
开发者ID:carhart,项目名称:plenario,代码行数:52,代码来源:models.py
示例7: fetch_hourly
def fetch_hourly(self, page, rows, sidx, sord='asc', _search='false',
searchOper=None, searchField=None, searchString=None, **kw):
''' Function called on AJAX request made by FlexGrid
Fetch data from DB, return the list of rows + total + current page
'''
if not in_any_group('admin','STATS'):
return dict(page=0, total=0, rows=[])
try:
page = int(page)
rows = int(rows)
offset = (page-1) * rows
except:
page = 1
rows = 24
offset = 0
log.info('fetch_hourly : page=%d, rows=%d, offset=%d, sidx=%s, sord=%s' % (
page, rows, offset, sidx, sord))
# Initialize data, in case no data is available for that time slice
data = [{'id': x, 'cell': ['%d h 00 < %d h 00' % (x, x+1), 0, None]}
for x in range(24)]
# Count calls by hour
if db_engine=='oracle':
req = func.to_char(CDR.calldate, 'HH24')
else: # PostgreSql
req = func.date_trunc('hour', cast(CDR.calldate, TIME))
cdrs = DBSession.query(req, func.count(req), func.sum(CDR.billsec))
if self.stats_type:
# Monthly stats
d = datetime.datetime.strptime(self.stats_type, '%m/%d/%Y')
if db_engine=='oracle':
cdrs = cdrs.filter(func.trunc(CDR.calldate, 'month') == \
func.trunc(d, 'month'))
else: # PostgreSql
cdrs = cdrs.filter(func.date_trunc('month', CDR.calldate) == \
func.date_trunc('month', d))
cdrs = cdrs.group_by(req)
# cdrs = cdrs.order_by(func.sum(CDR.billsec))
for i, c in enumerate(cdrs):
if db_engine=='oracle':
j = int(c[0])
else: # PostgreSql
j = c[0].seconds / 3600
data[j] = {'id': j, 'cell': ['%d h 00 < %d h 00' % (j,j+1), c[1], hms(c[2])]}
return dict(page=page, total=24, rows=data[offset:offset+page*rows])
开发者ID:sysnux,项目名称:astportal,代码行数:50,代码来源:stats.py
示例8: dataset
def dataset():
raw_query_params = request.args.copy()
agg = raw_query_params.get('agg')
if not agg:
agg = 'day'
else:
del raw_query_params['agg']
datatype = 'json'
if raw_query_params.get('datatype'):
datatype = raw_query_params['datatype']
del raw_query_params['datatype']
valid_query, query_clauses, resp, status_code = make_query(MasterTable,raw_query_params)
if valid_query:
time_agg = func.date_trunc(agg, MasterTable.c['obs_date'])
base_query = session.query(time_agg,
func.count(MasterTable.c['obs_date']),
MasterTable.c['dataset_name'])
base_query = base_query.filter(MasterTable.c['current_flag'] == True)
for clause in query_clauses:
base_query = base_query.filter(clause)
base_query = base_query.group_by(MasterTable.c['dataset_name'])\
.group_by(time_agg)\
.order_by(time_agg)
values = [o for o in base_query.all()]
results = []
for value in values:
d = {
'dataset_name': value[2],
'group': value[0],
'count': value[1],
}
results.append(d)
results = sorted(results, key=itemgetter('dataset_name'))
for k,g in groupby(results, key=itemgetter('dataset_name')):
d = {'dataset_name': ' '.join(k.split('_')).title()}
d['temporal_aggregate'] = agg
d['items'] = list(g)
resp['objects'].append(d)
resp['meta']['status'] = 'ok'
if datatype == 'json':
resp = make_response(json.dumps(resp, default=dthandler), status_code)
resp.headers['Content-Type'] = 'application/json'
elif datatype == 'csv':
if not raw_query_params.get('dataset_name'):
resp = {
'meta': {
'status': 'error',
'message': 'If you want data in a CSV format, you also need to specify a dataset_name'
},
'objects': []
}
else:
data = resp['objects'][0]
fields = data['items'][0].keys()
resp = make_response(make_csv(data['items'], fields), 200)
resp.headers['Content-Type'] = 'text/csv'
dname = raw_query_params['dataset_name']
filedate = datetime.now().strftime('%Y-%m-%d')
resp.headers['Content-Disposition'] = 'attachment; filename=%s_%s.csv' % (dname, filedate)
return resp
开发者ID:bepetersn,项目名称:wopr-api,代码行数:60,代码来源:api.py
示例9: main
def main():
"""Load figure objects into database."""
with app.app_context():
root = os.path.join(app.config['TELEMETRY_ROOTDIRECTORY'], "ShaneAO")
for filepath in glob.iglob(os.path.join(root, "*", "figures", "*", "*", "*.png")):
# First, is this already in the database?
c = app.session.query(Figure).filter(Figure.filepath==filepath).count()
if c == 1:
continue
elif c > 1:
# Purge them all, if we find more than one.
app.session.query(Figure).filter(Figure.filepath==filepath).delete()
# Set the dataset parts
parts = filepath.split(os.path.sep)
created = datetime.datetime.strptime(parts[-5], "%Y-%m-%d").date()
sequence = int(parts[-3][1:])
telpath = parts[-2].replace(".","/")
query = app.session.query(Dataset).filter(func.date_trunc("day",Dataset.created) == created)
query = query.filter(Dataset.sequence == sequence)
dataset = query.one_or_none()
if dataset is None:
click.echo("Dataset missing for '{0}'".format(filepath))
continue
telemetry = dataset.telemetry[telpath]
fig = Figure(filepath=filepath, telemetry=telemetry, figure_type=parts[-1].split(".")[0])
app.session.add(fig)
click.echo("Added '{0}'".format(filepath))
app.session.commit()
开发者ID:alexrudy,项目名称:ShaneAOTelemetry,代码行数:29,代码来源:load_figures.py
示例10: gold_revenue_on
def gold_revenue_on(date):
NON_REVENUE_STATUSES = ("declined", "chargeback", "fudge")
query = (select([sa_sum(gold_table.c.pennies)])
.where(~ gold_table.c.status.in_(NON_REVENUE_STATUSES))
.where(func.date_trunc('day', gold_table.c.date) == date))
rows = ENGINE.execute(query)
return rows.fetchone()[0] or 0
开发者ID:Beowulfgang,项目名称:reddit,代码行数:7,代码来源:gold.py
示例11: history__facebook
def history__facebook():
grain = _get_grain()
# Date filter
date_group = func.date_trunc(grain, SnapshotOfFacebook.timestamp)
# Grouped query
S = SnapshotOfFacebook
q = Session.query()\
.add_column( date_group )\
.add_column( func.max(S.likes) )\
.group_by(date_group)\
.order_by(date_group.desc())
response = _prepare(q.count())
q = q.offset( response['offset'] )\
.limit( response['per_page'] )
# Inner function transforms SELECT tuple into recognizable format
_dictize = lambda x: {
'timestamp':x[0].isoformat(),
'likes':x[1]
}
results = {
'history': [ _dictize(x) for x in q ],
'likes' : Session.query(S).order_by(S.timestamp.desc()).first().likes
}
# Write response
response['grain'] = grain
response['data'] = results
return response
开发者ID:e6,项目名称:activityapi,代码行数:27,代码来源:api1.py
示例12: api_data_dates
def api_data_dates():
dates = (
db.session.query(func.date_trunc('day', AccessLog.created_at).label('date'), count(AccessLog.id).label('accesses'))
.filter(AccessLog.user == current_user)
.group_by('date').order_by('date').all()
)
return flask.jsonify(dates=[(date.isoformat(), cnt) for date, cnt in dates])
开发者ID:falquaddoomi,项目名称:slm_histviz,代码行数:8,代码来源:api.py
示例13: timeline
def timeline(self):
dbFacade = self.dbFacade()
model = dbFacade.model
conditions = self._get_base_conditions(use_resolution=True)
if conditions is None:
return "<graph></graph>"
resolution = request.params.get('resolution', 'days')
time_expression = {
'weeks': cast(func.date_trunc('week', model.BalanceChange.transaction_date), DATE),
'months': cast(func.date_trunc('month', model.BalanceChange.transaction_date), DATE)
}.get(resolution, model.BalanceChange.transaction_date)
timeline = dbFacade.db.execute(select([time_expression.label('time'), func.abs(func.coalesce(func.sum(model.BalanceChange.amount))).label('sum')],
and_(*conditions),
from_obj=[model.balance_changes_table],
group_by=['time'])).fetchall()
time2sums = dict([(row.time, row.sum) for row in timeline])
c.sets = []
if len(time2sums) > 0:
(start_date, end_date) = h.get_dates()
if resolution == 'months':
for date in months_range(start_date, end_date):
show = 1
sum = time2sums.get(date, 0)
c.sets.append({ 'name': self._timeline_name(date), 'value': sum, 'showName': show})
elif resolution == 'weeks':
for date in weeks_range(start_date, end_date):
show = 1
sum = time2sums.get(date, 0)
c.sets.append({ 'name': self._timeline_name(date), 'value': sum, 'showName': show})
else:
for date in days_range(start_date, end_date):
show = date.weekday() == 0 and 1 or 0
sum = time2sums.get(date, 0)
c.sets.append({ 'name': self._timeline_name(date), 'value': sum, 'showName': show})
response.headers['Content-Type'] = 'text/xml; charset=utf-8'
return render_jinja2('reports/timeline-xml.jinja')
开发者ID:pawelniewie,项目名称:5groszy.pl,代码行数:46,代码来源:balance_reports.py
示例14: build_query_to_report
def build_query_to_report(self, query, aggregate_table, params):
assert params in self._known_units
res = params
truncated_time = func.date_trunc(res, aggregate_table.c.time_step)
return (query
.column(label("time_slice", func.extract("epoch", truncated_time)))
.group_by(truncated_time))
开发者ID:Supermighty,项目名称:speeduplouisville,代码行数:8,代码来源:aggregate.py
示例15: history__github
def history__github():
grain = _get_grain()
# Filtered list of github IDs
repo = request.args.get('repo', None)
repoFilter = None
if repo is not None:
repo = repo.split(',')
repoFilter = SnapshotOfGithub.repo_name.in_(repo)
# Date filter
date_group = func.date_trunc(grain, SnapshotOfGithub.timestamp)
# Query: Range of dates
q1 = Session.query()\
.add_column( func.distinct(date_group).label('d') )\
.order_by(date_group.desc())
response = _prepare(q1.count())
q1 = q1.offset( response['offset'] )\
.limit( response['per_page'] )
if q1.count():
date_column = q1.subquery().columns.d
(min_date,max_date) = Session.query(func.min(date_column), func.max(date_column)).first()
else:
# Impossible date range
(min_date,max_date) = datetime.now()+timedelta(days=1),datetime.now()
# Grouped query
S = SnapshotOfGithub
q = Session.query()\
.add_column( func.sum(S.watchers) )\
.add_column( func.max(S.forks) )\
.add_column( func.max(S.open_issues) )\
.add_column( func.max(S.size) )\
.add_column( date_group )\
.add_column( S.repo_name )\
.group_by(date_group)\
.group_by(S.repo_name)\
.order_by(date_group.desc())\
.filter( date_group>=min_date )\
.filter( date_group<=max_date )\
.filter( repoFilter )
results = {}
_dictize = lambda x: {
'watchers':x[0],
'forks':x[1],
'issues':x[2],
'size':x[3],
'timestamp':x[4].date().isoformat(),
}
for x in q:
repo_name = x[5]
results[repo_name] = results.get(repo_name, { 'repo':repo_name, 'data':[] })
results[repo_name]['data'].append( _dictize(x) )
# Inner function transforms SELECT tuple into recognizable format
response['grain'] = grain
response['data'] = results
response['repos'] = repo
response['min_date'] = min_date.date().isoformat()
response['max_date'] = max_date.date().isoformat()
return response
开发者ID:e6,项目名称:activityapi,代码行数:57,代码来源:api1.py
示例16: resources
def resources(self):
# Get the oldest tracking date
oldest_created_date = model.Session.query(
Resource.created,
).order_by(Resource.created).limit(1).scalar()
# If oldest date is none (no stats yet) we don't want to continue
if oldest_created_date:
# Calc difference between dates
delta = datetime.now() - oldest_created_date
# If we have data for more than 31 days, we'll show by month; otherwise segment by da
if delta.days > 10:
c.date_interval = 'month'
label_formatter = '%b %Y'
else:
c.date_interval = 'day'
label_formatter = '%d/%m/%y'
date_func = func.date_trunc(c.date_interval, Resource.created)
q = model.Session.query(
date_func.label('date'),
func.count().label('count')
)
q = q.order_by(date_func)
q = q.group_by(date_func)
c.graph_options = {
'series': {
'lines': {'show': True},
'points': {'show': True}
},
'xaxis': {
'mode': 'time',
'ticks': []
},
'yaxis': {
'tickDecimals': 0
}
}
c.graph_data = []
total = 0
for i, stat in enumerate(q.all()):
total += stat.count
c.graph_data.append([i, total])
formatted_date = stat.date.strftime(label_formatter)
c.graph_options['xaxis']['ticks'].append([i, formatted_date])
return p.toolkit.render('stats/resources.html', {'title': 'Resource statistics'})
开发者ID:NaturalHistoryMuseum,项目名称:ckanext-nhm,代码行数:56,代码来源:stats.py
示例17: hours_with_calls
def hours_with_calls(session, start, end):
start = start.strftime(_STR_TIME_FMT)
end = end.strftime(_STR_TIME_FMT)
hours = (session
.query(distinct(func.date_trunc('hour', cast(QueueLog.time, TIMESTAMP))).label('time'))
.filter(between(QueueLog.time, start, end)))
for hour in hours.all():
yield hour.time
开发者ID:jaunis,项目名称:xivo-dao,代码行数:10,代码来源:queue_log_dao.py
示例18: history__mailman
def history__mailman():
grain = _get_grain()
# Filtered list of mailman IDs
lists = request.args.get('list')
listFilter = None
if lists is not None:
lists = lists.split(',')
listFilter = SnapshotOfMailman.list_name.in_(lists)
# Date filter
date_group = func.date_trunc(grain, SnapshotOfMailman.timestamp)
# Query: Range of dates
q1 = Session.query()\
.add_column( func.distinct(date_group).label('d') )\
.order_by(date_group.desc())
response = _prepare(q1.count())
q1 = q1.offset( response['offset'] )\
.limit( response['per_page'] )
if q1.count():
subquery = q1.subquery()
(min_date,max_date) = Session.query(func.min(subquery.columns.d), func.max(subquery.columns.d)).first()
else:
# Impossible date range
(min_date,max_date) = datetime.now()+timedelta(days=1),datetime.now()
# Grouped query
S = SnapshotOfMailman
q = Session.query()\
.add_column( func.sum(S.posts_today) )\
.add_column( func.max(S.subscribers) )\
.add_column( date_group )\
.add_column( S.list_name )\
.group_by(date_group)\
.group_by(S.list_name)\
.order_by(date_group.desc())\
.filter( date_group>=min_date )\
.filter( date_group<=max_date )\
.filter( listFilter )
results = {}
# Inner function transforms SELECT tuple into recognizable format
_dictize = lambda x: {
'posts':x[0],
'subscribers':x[1],
'timestamp':x[2].isoformat(),
}
# Build output datastructure from rows
for x in q:
list_name = x[3]
results[list_name] = results.get(list_name, { 'list_name':list_name, 'data':[] })
results[list_name]['data'].append( _dictize(x) )
# Write response
response['grain'] = grain
response['data'] = results
response['list'] = lists
response['min_date'] = min_date.isoformat()
response['max_date'] = max_date.isoformat()
return response
开发者ID:e6,项目名称:activityapi,代码行数:55,代码来源:api1.py
示例19: get_historical_metrics
def get_historical_metrics():
metrics = {}
metrics["briefs_total_count"] = []
brief_day = func.date_trunc('day', Brief.published_at)
briefs_by_day = select([brief_day, func.count(brief_day)])\
.where(Brief.withdrawn_at.is_(None))\
.where(Brief.published_at.isnot(None))\
.order_by(brief_day)\
.group_by(brief_day)
for (day, count) in db.session.execute(briefs_by_day):
metrics["briefs_total_count"].append({"value": count, "ts": pendulum.instance(day).to_iso8601_string()})
metrics["brief_response_count"] = []
brief_responses_day = func.date_trunc('day', BriefResponse.created_at)
brief_responses_by_day = select([brief_responses_day, func.count(brief_responses_day)]) \
.order_by(brief_responses_day) \
.group_by(brief_responses_day)
for (day, count) in db.session.execute(brief_responses_by_day):
metrics["brief_response_count"].append({"value": count, "ts": pendulum.instance(day).to_iso8601_string()})
metrics["buyer_count"] = []
buyer_day = func.date_trunc('day', User.created_at)
buyers_by_day = select([buyer_day, func.count(buyer_day)])\
.where(User.email_address.contains("+").is_(False) | User.email_address.contains("digital.gov.au").is_(False))\
.where(User.active.is_(True)) \
.where(User.role == 'buyer') \
.order_by(buyer_day)\
.group_by(buyer_day)
for (day, count) in db.session.execute(buyers_by_day):
metrics["buyer_count"].append({"value": count, "ts": pendulum.instance(day).to_iso8601_string()})
metrics["supplier_count"] = []
supplier_day = func.date_trunc('day', Supplier.creation_time)
suppliers_by_day = select([supplier_day, func.count(supplier_day)]) \
.where(Supplier.abn != Supplier.DUMMY_ABN) \
.order_by(supplier_day) \
.group_by(supplier_day)
for (day, count) in db.session.execute(suppliers_by_day):
metrics["supplier_count"].append({"value": count, "ts": pendulum.instance(day).to_iso8601_string()})
return jsonify(metrics)
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:42,代码来源:metrics.py
示例20: authored_month_counts_q
def authored_month_counts_q(session):
s = session
# Careful with the datetime-truncation here - ensure we're working in UTC
# before we bin by month!
month_counts_qry = s.query(
func.date_trunc('month',
func.timezone('UTC',Voevent.author_datetime)
).distinct().label('month_id'),
(func.count(Voevent.ivorn)).label('month_count'),
).select_from(Voevent).group_by('month_id')
return month_counts_qry
开发者ID:timstaley,项目名称:voeventdb,代码行数:11,代码来源:query.py
注:本文中的sqlalchemy.func.date_trunc函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论