本文整理汇总了Python中sqlalchemy.func.date函数的典型用法代码示例。如果您正苦于以下问题:Python date函数的具体用法?Python date怎么用?Python date使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了date函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: count_last_week_data
def count_last_week_data(brand_arr):
"""统计上周七天内的数据
[0, 0, 0, 0, 0, 0, 0]
周一-周日:0-6
过期日期:优惠券创建时间+发放时间+1天
1.首先获取当天日期
2.然后获取上周一日期
3.for循环得出上周7天日期
"""
import datetime
expire_data = [0, 0, 0, 0, 0, 0, 0]
get_ticket_data = [0, 0, 0, 0, 0, 0, 0]
callback_data = [0, 0, 0, 0, 0, 0, 0]
today = datetime.date.today()
last_monday = today - timedelta(days=today.weekday() + 7)
for i in range(0, 6):
index_day = last_monday + timedelta(days=i)
expire_data[i] += int(GetTicketRecord.query.filter(
GetTicketRecord.get_expire_date == index_day + timedelta(days=1)).count())
get_ticket_data[i] += int(
GetTicketRecord.query.filter(func.date(GetTicketRecord.create_at) == index_day).filter(
GetTicketRecord.discount.has(Discount.brand_id.in_(brand_arr))).count())
callback_data[i] += int(GetTicketRecord.query.filter(func.date(GetTicketRecord.create_at) == index_day,
GetTicketRecord.status == 'usedit').filter(
GetTicketRecord.discount.has(Discount.brand_id.in_(brand_arr))).count())
return expire_data, get_ticket_data, callback_data
开发者ID:yyt030,项目名称:quanduoduo,代码行数:27,代码来源:site.py
示例2: filter_date
def filter_date(self, query, appstruct):
"""
filter the query and restrict it to the given year
"""
year = appstruct.get('year')
date_range_start = appstruct.get('date_range_start')
date_range_end = appstruct.get('date_range_end')
if date_range_start not in (None, colander.null):
query = query.filter(
func.date(Activity.datetime) >= date_range_start
)
if date_range_end not in (None, colander.null):
query = query.filter(
func.date(Activity.datetime) <= date_range_end
)
if (
year not in (None, colander.null, -1) and
date_range_start in (None, colander.null) and
date_range_end in (None, colander.null)
):
query = query.filter(
func.extract('YEAR', Activity.datetime) == year
)
return query
开发者ID:CroissanceCommune,项目名称:autonomie,代码行数:27,代码来源:activity.py
示例3: get_summary
def get_summary(value_class):
q = db_session.query(
func.date(SR_Values.datetime).label("date")
, func.sum(SR_Values.value).label("daily_value")
).filter(SR_Classes.id == SR_Values.value_class_id
).filter(SR_Classes.accum_flag == true()
).filter(SR_Classes.value_class == value_class
).filter(SR_Values.datetime > datetime.datetime(datetime.datetime.now().year, 1, 1)
).group_by(SR_Classes.value_class, func.month(SR_Values.datetime)
).order_by(SR_Classes.value_class, func.date(SR_Values.datetime))
print q
rows = [{ "name": x.date
, "value": x.daily_value
} for x in q.all()]
q = db_session.query(
func.date(SR_Values.datetime).label("date")
, func.avg(SR_Values.value).label("daily_value")
).filter(SR_Classes.id == SR_Values.value_class_id
).filter(SR_Classes.accum_flag == false()
).filter(SR_Classes.value_class == value_class
).filter(SR_Values.datetime > datetime.datetime(datetime.datetime.now().year, 1, 1)
).group_by(SR_Classes.value_class, func.month(SR_Values.datetime)
).order_by(SR_Classes.value_class, func.date(SR_Values.datetime))
rows.extend([{ "name": x.date
, "value": x.daily_value
} for x in q.all()])
print rows
return rows
开发者ID:DrSkippy,项目名称:simple_record,代码行数:28,代码来源:simple_record.py
示例4: admin_weekly_checkins
def admin_weekly_checkins(date, grade=None):
week = week_magic(date)
if grade is None:
checkins = Checkin.query.filter(and_(func.date(Checkin.checkin_timestamp)>=week[0], func.date(Checkin.checkin_timestamp)<=week[4])).order_by('Checkin.checkin_timestamp')
else:
checkins = Checkin.query.filter(and_(Checkin.user.has(grade=grade), func.date(Checkin.checkin_timestamp)>=week[0], func.date(Checkin.checkin_timestamp)<=week[4])).order_by('Checkin.checkin_timestamp')
return checkins
开发者ID:ECGHelloWorld,项目名称:PhoenixNow,代码行数:7,代码来源:user.py
示例5: do_job
def do_job(self):
"""
SELECT *, coalesce(end_time,now)-start_time
FROM task t, subject s
WHERE t.subject_id=s.id AND start_time > GetDate() - @days
ORDER BY start_time
:return:
"""
session = DBSession()
now = datetime.now()
time_worked = (func.julianday(func.coalesce(Task.end_time,now)) - func.julianday(Task.start_time)) * 86400
query = session.query(Task.start_time,
func.coalesce(Task.end_time, now),
time_worked,
Subject.title,
Task.title) \
.filter(Subject.id==Task.subject_id) \
.filter(func.date(Task.start_time) > func.date('now', '-%s day' % self.args.days)) \
.order_by(Task.start_time)
print '\n'
table = PrettyTable(['Start', 'End', 'Time', 'Subject', 'Title'])
table.align["Title"] = "l"
total_time = 0
day_total = 0
last_date = None
for row in query:
if last_date == None:
last_date = row[0].date()
if row[0].date() != last_date:
table.add_row([
'', '', timedelta(seconds=round(day_total)), '', ''
])
last_date = row[0].date()
day_total = 0
day_total += row[2]
total_time += row[2]
table.add_row([
row[0],
row[1],
timedelta(seconds=round(row[2])),
row[3],
row[4],
])
if day_total > 0:
table.add_row([
'', '', timedelta(seconds=round(day_total)), '', ''
])
print table
print 'Total Work time: %s' % timedelta(seconds=total_time)
print
开发者ID:sonologic,项目名称:timesheet,代码行数:60,代码来源:daily_detail.py
示例6: complete
def complete():
form = MeaningForm()
if not form.validate_on_submit():
return render_template('survey.html', form=form)
# check if session exists
today = strftime("%Y-%m-%d")
sesh = Sessions.query.filter(
func.date(Sessions.created)==func.date(today)
).order_by(desc(Sessions.created)).first()
# create a new session if one does not exist
# for today
if sesh is None:
sesh = Sessions()
db_session.add(sesh)
db_session.commit()
submission = Submissions(session_id=sesh.id, ip=request.remote_addr)
db_session.add(submission)
db_session.commit()
for field in form:
if field.name == "csrf_token":
continue
response = Responses(
submission_id=submission.id,
phrase=field.label.text,
value=field.data
)
db_session.add(response)
db_session.commit()
return render_template('survey.html', success=True)
开发者ID:michigan-com,项目名称:meaning,代码行数:33,代码来源:meaning.py
示例7: get_query
def get_query(self, date_from, date_to):
from rockpack.mainsite.services.video.models import Video
dates = readonly_session.query(
func.date(Video.date_added).distinct().label('date_added')
).filter(
Video.date_added.between(date_from, date_to)
).subquery()
counts = readonly_session.query(
func.date(Video.date_added).label('date_added'),
func.count().label('count')
).filter(
Video.link_title.isnot(None)
).group_by(
func.date(Video.date_added)
).subquery()
query = readonly_session.query(
dates.c.date_added,
func.sum(counts.c.count)
).filter(
counts.c.date_added < dates.c.date_added
).group_by(
dates.c.date_added
)
return query
开发者ID:wonderpl,项目名称:dolly-web,代码行数:28,代码来源:stats_views.py
示例8: getTickets
def getTickets(u, _from, _to, show):
session = pos.database.session()
query = session.query(Ticket).filter((Ticket.user == u) & Ticket.payment_method.in_(show))
if _to is None:
query = query.filter(func.date(Ticket.date_close) == func.date(_from))
else:
query = query.filter(func.date(Ticket.date_close) >= func.date(_from) & func.date(Ticket.date_close) <= func.date(_to))
query = query.order_by(Ticket.date_close.asc(), Ticket.date_open.asc(), Ticket.date_paid.desc())
return query.all()
开发者ID:wahhid,项目名称:wxpos,代码行数:9,代码来源:users.py
示例9: getDiaryEntries
def getDiaryEntries(_from, _to):
session = pos.database.session()
query = session.query(DiaryEntry)
if _to is None:
query = query.filter(func.date(DiaryEntry.date) == func.date(_from))
else:
query = query.filter(func.date(DiaryEntry.date) >= func.date(_from) & func.date(DiaryEntry.date) <= func.date(_to))
query = query.order_by(DiaryEntry.date.asc())
return query.all()
开发者ID:notepadnine,项目名称:wxpos,代码行数:9,代码来源:stockdiary.py
示例10: get_total_usage
def get_total_usage():
entries = DBSession.query(
func.count('*'),
func.date(DropLog.time)
).filter(
DropLog.username != 'openhouse'
).group_by(
func.date(DropLog.time)
).all()
return process_usage(entries)
开发者ID:JDrit,项目名称:drink_stats_v2,代码行数:10,代码来源:drink_log.py
示例11: dau
def dau():
session = get_session()
active_users = []
active_users.append(['Day', 'Active Users'])
for value in session.query(
func.date(Activity.moment),
func.count(Activity.user_id.distinct())).\
group_by(func.date(Activity.moment)).all():
active_users.append(list(value))
return json.dumps(active_users)
开发者ID:aydogankaragoz,项目名称:stats,代码行数:11,代码来源:server_stat.py
示例12: getTickets
def getTickets(_from, _to=None):
session = pos.database.session()
query = session.query(Ticket)
if _to is None:
query = query.filter(func.date(Ticket.date_close) == func.date(_from))
else:
query = query.filter(
func.date(Ticket.date_close) >= func.date(_from) & func.date(Ticket.date_close) <= func.date(_to)
)
query = query.order_by(Ticket.date_close.asc(), Ticket.date_open.asc(), Ticket.date_paid.desc())
return query.all()
开发者ID:wahhid,项目名称:wxpos,代码行数:11,代码来源:sales.py
示例13: calendar
def calendar(year=SimpleDate.getNumYearToday(), month=SimpleDate.getNumMonthToday()):
# TODO: refactor duplicated code!!!
days = range(1, SimpleDate.getNumDaysInMonth(month))
categories = SpendingCategory.query.order_by(SpendingCategory.order.asc()).all()
sums = {}
descriptions = {}
totals = {}
total = 0.0
for cat in categories:
totals[cat.id] = 0.0
for day in days:
sums[day] = {}
descriptions[day] = ""
for cat in categories:
objs = db.session.query(Spending)\
.filter(func.date(Spending.date) >= datetime.date(year,month,1))\
.filter(func.date(Spending.date) <= datetime.date(year,month,SimpleDate.getNumDaysInMonth(month)))\
.filter(Spending.category==cat)\
.filter( func.date(Spending.date) == datetime.date(year,month,day)).all()
sums[day][cat.id] = 0.0
for obj in objs:
totals[cat.id] += obj.amount
total += obj.amount
sums[day][cat.id] += obj.amount
if(obj.name <> ""):
descriptions[day] += "("+obj.name+")"
form = SpendingForm()
return render_template('calendar.html',
form=form,
categories=categories,
prefix = url_for('calendar'),
month = month,
year = year,
today_day = SimpleDate.getNumDayToday(),
today_month = SimpleDate.getNumMonthToday(),
today_year = SimpleDate.getNumYearToday(),
days=days,
sums=sums,
totals=totals,
total=total,
desc=descriptions)
开发者ID:vikin91,项目名称:euroInAFlask,代码行数:53,代码来源:euroInAFlask.py
示例14: client_sale_report
def client_sale_report(dbapi, start, end):
sales_by_date = list(dbapi.db_session.query(
NSale.seller_ruc,
func.date(NSale.timestamp), func.sum(NSale.pretax_amount_usd),
func.sum(NSale.tax_usd), func.count(NSale.uid)).filter(
NSale.timestamp >= start, NSale.timestamp <= end,
NSale.status == Status.NEW, NSale.payment_format != 'None').group_by(
func.date(NSale.timestamp), NSale.seller_ruc))
for ruc, date, sale, tax, count in sales_by_date:
yield SaleReportByDate(
timestamp=date, ruc=ruc, sale_pretax_usd=sale,
tax_usd=tax, count=count
)
开发者ID:qihqi,项目名称:HenryFACTService,代码行数:13,代码来源:dao.py
示例15: index_old
def index_old(self):
from gviz_data_table import Table
from rockpack.mainsite.services.user.models import User, UserActivity
user_count = readonly_session.query(func.count(User.id)).\
filter(User.refresh_token != '').scalar()
header = ('user count', 'max lifetime', 'avg lifetime', 'stddev lifetime',
'max active days', 'avg active days', 'stddev active days')
lifetime = func.date_part('days', func.max(UserActivity.date_actioned) -
func.min(UserActivity.date_actioned)).label('lifetime')
active_days = func.count(func.distinct(func.date(
UserActivity.date_actioned))).label('active_days')
activity = readonly_session.query(UserActivity.user, lifetime, active_days).\
group_by(UserActivity.user)
ctx = {}
for key, having_expr in ('all', None), ('1day', lifetime > 1), ('7day', lifetime > 7):
data = activity.having(having_expr).from_self(
func.count('*'),
func.max(lifetime),
func.avg(lifetime),
func.stddev_samp(lifetime),
func.max(active_days),
func.avg(active_days),
func.stddev_samp(active_days)
).one()
table = Table([
dict(id='metric', type=str),
dict(id='value', type=float),
dict(id='%', type=str),
])
pdata = ('%d%%' % (data[0] * 100 / user_count),) + ('',) * 6
table.extend(zip(*(header, map(float, data), pdata)))
ctx['ret_%s_data' % key] = table.encode()
return self.render('admin/retention_stats_old.html', **ctx)
开发者ID:wonderpl,项目名称:dolly-web,代码行数:34,代码来源:stats_views.py
示例16: run
def run(self):
q = self.session.query(EveType.name,func.sum(Drop.dropped).label('dropped'),\
func.sum(Drop.destroyed).label('destroyed'),func.date(KillMail.timestamp).label('date')).\
join(Drop,Drop.evetype_id == EveType.id).\
join(KillMail, KillMail.id == Drop.killmail_id).\
join(SolarSystem, SolarSystem.id == KillMail.solarsystem_id).\
join(Region, Region.id == SolarSystem.region_id).\
join(EveGroup,EveGroup.id == EveType.group_id).\
join(EveCategory,EveCategory.id == EveGroup.category_id)
if(self.region_ids):
q = q.filter(Region.id.in_(self.region_ids))
if(self.type_ids):
q = q.filter(EveType.id.in_(self.type_ids))
if(self.category_ids):
q = q.filter(EveCategory.id.in_(self.category_ids))
if (self.after):
q = q.filter(KillMail.timestamp >= self.after)
if (self.before):
q = q.filter(KillMail.timestamp <= self.before)
q = q.group_by(EveType.id,desc('date'))
q = q.order_by(EveType.id,desc('date'))
self.query = q
self.columns = q.column_descriptions
self.results = q.all()
开发者ID:csvance,项目名称:inteltool,代码行数:31,代码来源:pvpreport.py
示例17: session_data
def session_data(session_date=None):
if not session_date:
session_date = datetime.today().date().replace(year=2017, month=5, day=1)
# Start application
session_factory = sessionmaker()
# Set up our sqlite connection
db = create_engine('sqlite:///:memory:', echo=False)
Base.metadata.bind = db
Base.metadata.create_all() # This creates the table information. Needs to happen before session inst
# create a configured "Session" class
# session = scoped_session(Session(bind=db))
session = session_factory(bind=db)
# add records from data_src to the local db
for call_id, call_data_dict in test(query_date=session_date.strftime('%Y-%m-%d')).items(): # Get data from PG connection
# call_data = SlaStorage(
# id=call_id,
# start=call_data_dict['Start Time'],
# end=call_data_dict['End Time'],
# data=call_data_dict
# )
call_data = SlaStorage(
id=call_id,
start=call_data_dict.pop('Start Time'),
end=call_data_dict.pop('End Time'),
unique_id1=call_data_dict.pop('Unique Id1'),
unique_id2=call_data_dict.pop('Unique Id2'),
data=call_data_dict
)
session.add(call_data)
session.commit()
# from json import dumps
# from automated_sla_tool.test.flexible_storage import MyEncoder
# for row in session.query(SlaStorage).filter(func.date(session_date)).all():
# print(row.id)
# print(dumps(row.data['Event Summary'], cls=MyEncoder, indent=4))
# print(
# row.id,
# row.data['Talking Duration'],
# (row.end - row.start) - row.data['Talking Duration'], # Wait = Call Duration - Talk Duration - Hold
# row.data['Hold Events'] # Hold = 'Hold', 'Transfer Hold', 'Park'
# )
# for row in session.query(SlaStorage).filter(func.date(session_date)).all():
# if row.data["Call Group"] == '7521':
# print(row.id)
# print(dumps(row.data, cls=MyEncoder, indent=4))
#
# for row in session.query(SlaStorage).filter(func.date(session_date)).all():
# yield row
return session.query(SlaStorage).filter(func.date(session_date)).all()
开发者ID:michaelscales88,项目名称:automated_sla_tool,代码行数:60,代码来源:PG_json_test.py
示例18: api_outlimit_stations
def api_outlimit_stations(request, propid, startdate, enddate, limit):
session = Session()
#response = HttpResponse(mimetype='text/csv')
#response['Content-Disposition'] = 'attachment; filename="obs_' + stid + '_' + propid + '_' + date + '.csv"'
startdate = datetime(int(startdate.split('-')[0]), int(startdate.split('-')[1]), int(startdate.split('-')[2]))
enddate = datetime(int(enddate.split('-')[0]), int(enddate.split('-')[1]), int(enddate.split('-')[2]))
#max = aliased(func.max(Observation.value))
#currentdate = aliased(func.date(Observation.date))
observations = session.query(func.max(Observation.value), Station.municipality, Station.lat, Station.lng, func.date(Observation.date)).\
join(Station).join(Property).\
filter(Observation.date.between(startdate, enddate), Property.name == propid).\
group_by(func.date(Observation.date), Station.code).having(func.max(Observation.value) >= float(limit)).all()
#Observation.value >= limit
#writer = csv.writer(response)
#for obs in observations:
# writer.writerow([obs.station.lat, obs.station.lng, obs.date, obs.value])
resp = []
for obs in observations:
o = {}
o['lat'] = obs[2]
o['lng'] = obs[3]
o['date'] = obs[4].isoformat()
o['municipality'] = obs[1]
o['value'] = obs[0]
resp.append(o)
session.close()
#return response
return resp
开发者ID:johnfelipe,项目名称:bizkaisense,代码行数:35,代码来源:views.py
示例19: notebook
def notebook(context, request):
tasks = Task.query().filter_by(notebook=context).order_by('name').all()
urgent_tasks = [t for t in tasks if t.emergency >= Task.EMERGENCY_THRESHOLD]
urgent_tasks.sort(key=attrgetter('emergency'), reverse=True)
time_spend_today = (DBSession.query(func.sum(Execution.length))
.join('task').filter(Task.notebook==context)
.filter(or_(Execution.executor==request.user,
Execution.executor==None))
.filter(func.date(Execution.time) == datetime.date.today())
.scalar()) or 0
# TODO this could be configurable
time_left = 15 - time_spend_today
suggested_something = False
for t in urgent_tasks:
if time_left <= 0:
break
t.suggested = True
time_left -= t.mean_execution
suggested_something = True
last_executions = (Execution.query()
.join('task').filter(Task.notebook==context)
.order_by(Execution.time.desc()).limit(5).all())
return {
'notebook': context,
'tasks': tasks,
'urgent_tasks': urgent_tasks,
'urgent_tasks_time': int(sum(t.mean_execution for t in urgent_tasks)),
'last_executions': last_executions,
'suggested_something': suggested_something,
}
开发者ID:madjar,项目名称:augias-old,代码行数:32,代码来源:views.py
示例20: index
def index(self):
from gviz_data_table import Table
from rockpack.mainsite.services.user.models import User, UserActivity, UserAccountEvent
if request.args.get('activity') == 'activity':
activity_model, activity_date = UserActivity, UserActivity.date_actioned
else:
activity_model, activity_date = UserAccountEvent, UserAccountEvent.event_date
try:
interval_count = int(request.args['interval_count'])
except Exception:
interval_count = 10
interval = request.args.get('interval')
if interval not in ('week', 'month'):
interval = 'week'
cohort = func.date_part(interval, User.date_joined)
cohort_label = func.min(func.date(User.date_joined))
active_interval = (func.date_part(interval, activity_date) - cohort).label('active_interval')
q = readonly_session.query(User).filter(
User.date_joined > LAUNCHDATE, User.refresh_token != '')
if request.args.get('gender') in ('m', 'f'):
q = q.filter(User.gender == request.args['gender'])
if request.args.get('locale') in app.config['ENABLED_LOCALES']:
q = q.filter(User.locale == request.args['locale'])
if request.args.get('age') in ('13-18', '18-25', '25-35', '35-45', '45-55'):
age1, age2 = map(int, request.args['age'].split('-'))
q = q.filter(between(
func.age(User.date_of_birth),
text("interval '%d years'" % age1),
text("interval '%d years'" % age2)
))
active_users = dict(
((c, int(w)), u) for c, w, u in
q.join(
activity_model,
(activity_model.user == User.id) &
(activity_date >= User.date_joined)
).group_by(cohort, active_interval).values(
cohort, active_interval, func.count(func.distinct(activity_model.user))
)
)
table = Table(
[dict(id='cohort', type=date)] +
[dict(id='%s%d' % (interval, i), type=str) for i in range(interval_count)]
)
totals = q.group_by(cohort).order_by(cohort)
for c, l, t in totals.values(cohort, cohort_label, func.count()):
data = []
for i in range(interval_count):
a = active_users.get((c, i), '')
data.append(a and '%d%% (%d)' % (ceil(a * 100.0 / t), a))
table.append([l] + data)
return self.render('admin/retention_stats.html', data=table.encode())
开发者ID:wonderpl,项目名称:dolly-web,代码行数:60,代码来源:stats_views.py
注:本文中的sqlalchemy.func.date函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论