本文整理汇总了Python中sqlalchemy.func.min函数的典型用法代码示例。如果您正苦于以下问题:Python min函数的具体用法?Python min怎么用?Python min使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了min函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_extent
def get_extent():
return DBSession.query(
func.min(func.ST_XMin(AdminZone.geometry)),
func.min(func.ST_YMin(AdminZone.geometry)),
func.max(func.ST_XMax(AdminZone.geometry)),
func.max(func.ST_YMax(AdminZone.geometry)),
).first()
开发者ID:kamalhg,项目名称:nosfinanceslocales,代码行数:7,代码来源:maps.py
示例2: check_status
def check_status(date_report, session):
"""
Finds lead in db and check its status
"""
query = session.query(func.min(db.Calltouch.datetime), func.min(db.Telephony.datetime), db.Telephony.duration,
db.Calltouch.telephone, db.Calltouch.status, db.Calltouch.deadline) \
.join(db.Telephony, db.Telephony.telephone_to == db.Calltouch.telephone) \
.filter(func.DATE(db.Calltouch.deadline) == date_report, func.DATE(db.Telephony.datetime) == date_report,
db.Telephony.datetime > db.Calltouch.datetime, db.Calltouch.status != 'Doubled').group_by(db.Calltouch.telephone)
for i in query.all():
lead_date, call_date, duration, telephone, status, deadline = i
if duration <= 40 and call_date <= deadline:
status = 'Short call'
elif duration >= 40 and call_date > deadline:
status = 'Late call'
elif duration <= 40 and call_date > deadline:
status = 'Short and Late call'
elif duration >= 40 and call_date <= deadline:
status = 'Good call'
else:
status = 'Bad call'
session.query(db.Calltouch).filter(db.Calltouch.datetime == lead_date,
db.Calltouch.telephone == telephone).update({'status': status},
synchronize_session=False)
开发者ID:korolchukegor,项目名称:calls,代码行数:27,代码来源:callbacks.py
示例3: populate_months
def populate_months(session):
if session.query(Month).count() > 0:
raise Exception("Months table is already populated.")
demimonth = datetime.timedelta(days=14)
first_chat = session.query(func.min(Chat.date)).scalar()
first_bugevent = session.query(func.min(BugEvent.date)).scalar()
start_date = max(first_chat, first_bugevent)
print "First chat is " + str(first_chat)
print "First bug event is " + str(first_bugevent)
print "Starting months on " + str(start_date)
last_chat = session.query(func.max(Chat.date)).scalar()
last_bugevent = session.query(func.max(BugEvent.date)).scalar()
end_date = min(last_chat, last_bugevent)
print "Last chat is " + str(last_chat)
print "Last bug event is " + str(last_bugevent)
print "End months on or around " + str(end_date)
start = start_date
end = start_date + datetime.timedelta(days=27) # start + 27 days = 28 day span
while end < end_date:
month = Month(first=start, last=end)
session.add(month)
start += demimonth
end += demimonth
session.commit()
开发者ID:colinmorris,项目名称:moz-graphs,代码行数:31,代码来源:months.py
示例4: get
def get(self):
date = datetime.datetime.strptime(self.request.GET.get('date'), '%d.%m.%Y')
start_date = datetime.datetime.combine(date, day_start)
end_date = datetime.datetime.combine(date, day_end)
entries_p = self.session.query(User.id, User.name, func.min(PresenceEntry.ts), func.max(PresenceEntry.ts))\
.filter(User.id==PresenceEntry.user_id)\
.filter((User.location=="poznan") | (User.location==None))\
.filter(PresenceEntry.ts>=start_date)\
.filter(PresenceEntry.ts<=end_date)\
.group_by(User.id, User.name)\
.order_by(User.name)
entries_w = self.session.query(User.id, User.name, func.min(PresenceEntry.ts), func.max(PresenceEntry.ts))\
.filter(User.id==PresenceEntry.user_id)\
.filter(User.location=="wroclaw")\
.filter(PresenceEntry.ts>=start_date)\
.filter(PresenceEntry.ts<=end_date)\
.group_by(User.id, User.name)\
.order_by(User.name)
return dict(
entries_p=((user_id, user_name, start, stop, start.time() > hour_9) for (user_id, user_name, start, stop) in entries_p),
entries_w=((user_id, user_name, start, stop, start.time() > hour_9) for (user_id, user_name, start, stop) in entries_w),
date=date,
prev_date=previous_day(date),
next_date=next_day(date),
excuses=excuses.presence(),
justification=excuses.presence_status(date, self.request.user.id),
)
开发者ID:pytlakp,项目名称:intranet-1,代码行数:29,代码来源:presence.py
示例5: get_stats
def get_stats():
result = db.session.query(
func.min(Stop.stop_lat), func.min(Stop.stop_lon), func.max(Stop.stop_lat), func.max(Stop.stop_lon), func.count()
).first()
data = {"minLat": result[0], "minLon": result[1], "maxLat": result[2], "maxLon": result[3], "numbers": result[4]}
return jsonify({"stops": data})
开发者ID:fabid,项目名称:gtfseditor,代码行数:7,代码来源:stats.py
示例6: get_new_datasets
def get_new_datasets(pkg_ids=None):
'''
Return a list of new pkgs and date when they were created,
in format: [(id, datetime), ...]
If pkg_ids list is passed, limit query to just those packages.
'''
# Can't filter by time in select because 'min' function has to
# be 'for all time' else you get first revision in the time period.
package_revision = table('package_revision')
revision = table('revision')
s = select([package_revision.c.id, func.min(revision.c.timestamp)],
from_obj=[package_revision.join(revision)])
if pkg_ids:
s = s.where(and_(package_revision.c.id.in_(pkg_ids),
package_revision.c.type == 'dataset'))
else:
s = s.where(package_revision.c.type == 'dataset')
s = s.group_by(package_revision.c.id).\
order_by(func.min(revision.c.timestamp))
res = model.Session.execute(s).fetchall() # [(id, datetime), ...]
res_pickleable = []
for pkg_id, created_datetime in res:
res_pickleable.append((pkg_id, created_datetime))
return res_pickleable
开发者ID:keitaroinc,项目名称:ckanext-sweden,代码行数:25,代码来源:helpers.py
示例7: get_todays_electricity
def get_todays_electricity():
return Electricity.query.with_entities(
(func.max(Electricity.meter_280) - func.min(Electricity.meter_280)).label(
'todays_export'), (func.max(Electricity.meter_180) - func.min(Electricity.meter_180)).label(
'todays_import')).filter(
func.strftime('%Y-%m-%d', Electricity.created_at) == datetime.now().strftime('%Y-%m-%d')).group_by(
func.strftime('%Y-%m-%d', Electricity.created_at)).first()
开发者ID:diegoronal,项目名称:solarpi,代码行数:7,代码来源:helper.py
示例8: RecalculatePoints
def RecalculatePoints():
if marketscraper.cookies is None:
loginScraper(m_user, m_password)
#aggregate item drop rates with market
drop_items = RefreshMarketWithMobDrops()
#recalculate the points for the guild including run credit factors
d = datetime.now()
latest_item = MappedMarketResult.query.order_by(MappedMarketResult.date.desc()).all()
if len(latest_item) > 0:
d = latest_item[0].date
market_results = db.session.query(MappedMarketResult.itemid, func.min(MappedMarketResult.price)).filter(MappedMarketResult.itemid.in_(drop_items)).filter(MappedMarketResult.date >= d).group_by(MappedMarketResult.itemid).all()
guild_treasure = db.session.query(MappedGuildTreasure.itemid, func.min(MappedGuildTreasure.minMarketPrice)).filter(MappedGuildTreasure.itemid.in_(drop_items)).group_by(MappedGuildTreasure.itemid).all()
#guild treasure results take precedence over market results
#convert to a dictionary
market_results_d = {}
for mr in market_results:
if market_results_d.has_key(mr[0]):
market_results_d[mr[0]].append(mr[1])
else:
market_results_d[mr[0]] = [mr[1]]
for k,v in market_results:
market_results_d[k].append(v)
for k, v in guild_treasure:
market_results_d[k] = []
for k,v in guild_treasure:
market_results_d[k].append(v)
market_results = min_values(market_results_d)
relevant_runs_query = MappedRun.query.filter(MappedRun.success == True).all()
rcs = [rrq.chars for rrq in relevant_runs_query]
rcs = [item for sublist in rcs for item in sublist]
players_not_mapped_characters = [pc for pc in rcs if pc.mappedplayer_id is None]
player_names = [pc.PlayerName for pc in players_not_mapped_characters]
player_names = list(set(player_names))
for pn in player_names:
#players who have points and unclaimed emails will have their points calculated but they won't be able to use them.
#players will need to register. Perhaps this players can get an invite?
mp_exists = MappedPlayer.query.filter(MappedPlayer.Name==pn)
mp = None
if mp_exists.count() == 0:
continue
#removing placeholder characters for now.
#mp = MappedPlayer(pn, 'NEED_EMAIL')
else:
mp = mp_exists.all()[0]
db.session.add(mp)
chars_to_map = [pc for pc in players_not_mapped_characters if pc.PlayerName == pn]
mp.Chars = chars_to_map
db.session.commit()
for run in relevant_runs_query:
players = [c.mappedplayer_id for c in run.chars]
players = list(set(players))
CalculatePoints(run, run.mobs_killed, players, market_results, d)
开发者ID:plooploops,项目名称:rosterrun,代码行数:59,代码来源:guildpoints.py
示例9: index
def index(self):
max = DBSession.query(func.max(Sensor.lat)).one()[0]
min = DBSession.query(func.min(Sensor.lat)).one()[0]
lat = (max + min) / 2
max = DBSession.query(func.max(Sensor.lng)).one()[0]
min = DBSession.query(func.min(Sensor.lng)).one()[0]
lng = (max + min) / 2
return dict(page='map', lat=lat, lng=lng)
开发者ID:tylerwhall,项目名称:Flimsy,代码行数:8,代码来源:map.py
示例10: nextjob
def nextjob(self, queue):
"""
Make the next PENDING job active, where pending jobs are sorted
by priority. Priority is assigned on the basis of usage and the
order of submissions.
"""
session = db.Session()
# Define a query which returns the lowest job id of the pending jobs
# with the minimum priority
_priority = select([func.min(Job.priority)],
Job.status=='PENDING')
min_id = select([func.min(Job.id)],
and_(Job.priority == _priority,
Job.status == 'PENDING'))
for _ in range(10): # Repeat if conflict over next job
# Get the next job, if there is one
try:
job = session.query(Job).filter(Job.id==min_id).one()
#print job.id, job.name, job.status, job.date, job.start, job.priority
except NoResultFound:
return {'request': None}
# Mark the job as active and record it in the active queue
(session.query(Job)
.filter(Job.id == job.id)
.update({'status': 'ACTIVE',
'start': datetime.utcnow(),
}))
activejob = db.ActiveJob(jobid=job.id, queue=queue)
session.add(activejob)
# If the job was already taken, roll back and try again. The
# first process to record the job in the active list wins, and
# will change the job status from PENDING to ACTIVE. Since the
# job is no longer pending, the so this
# should not be an infinite loop. Hopefully if the process
# that is doing the transaction gets killed in the middle then
# the database will be clever enough to roll back, otherwise
# we will never get out of this loop.
try:
session.commit()
except IntegrityError:
session.rollback()
continue
break
else:
logging.critical('dispatch could not assign job %s'%job.id)
raise IOError('dispatch could not assign job %s'%job.id)
request = store.get(job.id,'request')
# No reason to include time; email or twitter does that better than
# we can without client locale information.
notify.notify(user=job.notify,
msg=job.name+" started",
level=1)
return { 'id': job.id, 'request': request }
开发者ID:HMP1,项目名称:bumps,代码行数:58,代码来源:dispatcher.py
示例11: index
def index(page = 1):
form = DataForm()
user_data = Data.query.filter_by(user_id = g.user.id)
#ms = user_data.order_by(Data.systolic_pressure.desc()).first()
four_weeks_ago = datetime.datetime.now() - datetime.timedelta(weeks=4)
maxs = db.session.query(func.max(Data.systolic_pressure).label('max_systolic')).filter_by(user_id = g.user.id).one()
max_systolic = maxs.max_systolic
mins = db.session.query(func.min(Data.systolic_pressure).label('min_systolic')).filter_by(user_id = g.user.id).one()
min_systolic = mins.min_systolic
avgs = db.session.query(func.avg(Data.systolic_pressure).label('avg_systolic')).filter_by(user_id = g.user.id).\
filter(Data.timestamp > four_weeks_ago).one()
avg_systolic = avgs.avg_systolic
maxd = db.session.query(func.max(Data.diastolic_pressure).label('max_diastolic')).filter_by(user_id = g.user.id).one()
max_diastolic = maxd.max_diastolic
mind = db.session.query(func.min(Data.diastolic_pressure).label('min_diastolic')).filter_by(user_id = g.user.id).one()
min_diastolic = mind.min_diastolic
avgd = db.session.query(func.avg(Data.diastolic_pressure).label('avg_diastolic')).filter_by(user_id = g.user.id).\
filter(Data.timestamp > four_weeks_ago).one()
avg_diastolic = avgd.avg_diastolic
maxc = db.session.query(func.max(Data.cardiac_rate).label('max_rate')).filter_by(user_id = g.user.id).one()
max_rate = maxc.max_rate
minc = db.session.query(func.min(Data.cardiac_rate).label('min_rate')).filter_by(user_id = g.user.id).one()
min_rate = minc.min_rate
avgc = db.session.query(func.avg(Data.cardiac_rate).label('avg_rate')).filter_by(user_id = g.user.id).\
filter(Data.timestamp > four_weeks_ago).one()
avg_rate = avgc.avg_rate
if form.validate_on_submit():
data = Data(systolic_pressure = form.systolic_pressure.data,
diastolic_pressure = form.diastolic_pressure.data,
cardiac_rate = form.cardiac_rate.data,
timestamp = datetime.datetime.now(),
body = form.note.data,
user = g.user)
db.session.add(data)
db.session.commit()
db.session.close()
flash('Added successfully')
return redirect(url_for('index'))
datas = user_data.order_by(Data.timestamp.desc()).paginate(page, DATAS_PER_PAGE, False)
return render_template('index.html',
title = 'Home',
form = form,
max_systolic = max_systolic,
min_systolic = min_systolic,
avg_systolic = avg_systolic,
max_diastolic = max_diastolic,
min_diastolic = min_diastolic,
avg_diastolic = avg_diastolic,
max_rate = max_rate,
min_rate = min_rate,
avg_rate = avg_rate,
datas = datas)
开发者ID:lawrencesun,项目名称:meddata,代码行数:58,代码来源:views.py
示例12: post
def post(self):
"""
-- POR HOTEL
select *
from availability
where hotel_id = 2 and
date between '2015-05-04' and '2015-05-06'
group by hotel_id
having min(available) = 1
-- POR CIDADE <city_id=2> retorna hoteis <119, 383>
select *
from availability as aa
join hotel as hh
on aa.hotel_id = hh.id
where hh.city_id=2 and
date between '2015-05-04' and '2015-05-06'
GROUP BY aa.hotel_id
HAVING min(aa.available) = 1
"""
params = self.parser.parse_args()
kind = params.get('kind')
_id = params['id']
start_date = params.get('start_date')
end_date = params.get('end_date')
limit = params.get('limit')
offset = params.get('offset')
filters = {"available": True}
date_filter = ''
if start_date and end_date:
filters.pop('available')
date_filter = Availability.date.between(start_date, end_date)
if kind == 'city':
query = Availability.query.filter_by(**filters). \
from_self(). \
join(Availability.hotel).filter_by(city_id=_id). \
filter(date_filter). \
group_by(Availability.hotel_id). \
having(func.min(Availability.available) == 1). \
order_by(Hotel.name)
else:
filters.update({"hotel_id": _id})
query = Availability.query.filter_by(**filters). \
filter(date_filter). \
group_by(Availability.hotel_id). \
having(func.min(Availability.available) == 1)
total = query.count()
query = query.limit(limit).offset(offset).all()
serializer = AvailabilitySerializer(query, many=True)
return {'total': total, 'results': serializer.data}
开发者ID:leidsondias,项目名称:desafiohu1,代码行数:56,代码来源:resources.py
示例13: mesicni_vypis_alluser
def mesicni_vypis_alluser(mesic):
# form=Card.find_by_number(current_user.card_number)
# form = db.session.query(Card.time).filter_by(card_number=current_user.card_number)
form = db.session.query(func.strftime('%Y-%m-%d', Card.time).label("date"),
func.max(func.strftime('%H:%M', Card.time)).label("Max"), \
func.min(func.strftime('%H:%M', Card.time)).label("Min"),
(func.max(Card.time) - func.min(Card.time)).label("Rozdil")) \
.filter(func.strftime('%Y-%-m', Card.time) == mesic).group_by(func.strftime('%Y-%m-%d', Card.time))
# .group_by([func.day(Card.time)])
return render_template("auth/mesicni_vypisy.tmpl", form=form, user=current_user)
开发者ID:TeZzo1,项目名称:MQTT,代码行数:10,代码来源:views.py
示例14: blacklist_moving_wifis
def blacklist_moving_wifis(self, ago=1, offset=0, batch=1000):
# TODO: this doesn't take into account wifi AP's which have
# permanently moved after a certain date
# maximum difference of two decimal places, ~5km at equator
# or ~2km at 67 degrees north
max_difference = 500000
day, max_day = daily_task_days(ago)
# only look at the past 30 days for movement
max_past_days = day - timedelta(days=30)
try:
with self.db_session() as session:
query = session.query(distinct(WifiMeasure.key)).filter(
WifiMeasure.created < max_day).filter(
WifiMeasure.created >= day).order_by(
WifiMeasure.id).limit(batch).offset(offset)
new_wifis = [w[0] for w in query.all()]
if not new_wifis: # pragma: no cover
# nothing to be done
return []
# check min/max lat/lon
query = session.query(
WifiMeasure.key, func.max(WifiMeasure.lat),
func.min(WifiMeasure.lat), func.max(WifiMeasure.lon),
func.min(WifiMeasure.lon)).filter(
WifiMeasure.key.in_(new_wifis)).filter(
WifiMeasure.created > max_past_days).group_by(WifiMeasure.key)
results = query.all()
moving_keys = set()
for result in results:
wifi_key, max_lat, min_lat, max_lon, min_lon = result
diff_lat = abs(max_lat - min_lat)
diff_lon = abs(max_lon - min_lon)
if diff_lat >= max_difference or diff_lon >= max_difference:
moving_keys.add(wifi_key)
if moving_keys:
utcnow = datetime.utcnow()
query = session.query(WifiBlacklist.key).filter(
WifiBlacklist.key.in_(moving_keys))
already_blocked = set([a[0] for a in query.all()])
moving_keys = moving_keys - already_blocked
if not moving_keys:
return []
for key in moving_keys:
# TODO: on duplicate key ignore
session.add(WifiBlacklist(key=key, created=utcnow))
remove_wifi.delay(list(moving_keys))
session.commit()
return moving_keys
except IntegrityError as exc: # pragma: no cover
logger.exception('error')
return []
except Exception as exc: # pragma: no cover
raise self.retry(exc=exc)
开发者ID:MikeNicholls,项目名称:ichnaea,代码行数:54,代码来源:tasks.py
示例15: _make_stats_query
def _make_stats_query(self, event_filter):
query = self.session.query(
func.min(Meter.timestamp).label('tsmin'),
func.max(Meter.timestamp).label('tsmax'),
func.avg(Meter.counter_volume).label('avg'),
func.sum(Meter.counter_volume).label('sum'),
func.min(Meter.counter_volume).label('min'),
func.max(Meter.counter_volume).label('max'),
func.count(Meter.counter_volume).label('count'))
return make_query_from_filter(query, event_filter)
开发者ID:markmc,项目名称:ceilometer,代码行数:11,代码来源:impl_sqlalchemy.py
示例16: parameter_stat
def parameter_stat(self):
r = self.session.query(
func.min(Result.turnout_p).label('turnout_min'),
func.max(Result.turnout_p).label('turnout_max'),
func.min(Result.absentee_p).label('absentee_min'),
func.max(Result.absentee_p).label('absentee_max'),
).one()
return dict(
turnout=dict(min=r.turnout_min, max=r.turnout_max),
absentee=dict(min=r.absentee_min, max=r.absentee_max),
)
开发者ID:nextgis,项目名称:votemapper,代码行数:12,代码来源:__init__.py
示例17: _make_stats_query
def _make_stats_query(self, event_filter):
query = self.session.query(
func.min(Meter.timestamp).label("tsmin"),
func.max(Meter.timestamp).label("tsmax"),
func.avg(Meter.counter_volume).label("avg"),
func.sum(Meter.counter_volume).label("sum"),
func.min(Meter.counter_volume).label("min"),
func.max(Meter.counter_volume).label("max"),
func.count(Meter.counter_volume).label("count"),
)
return make_query_from_filter(query, event_filter)
开发者ID:100PercentIT,项目名称:ceilometer,代码行数:12,代码来源:impl_sqlalchemy.py
示例18: _make_stats_query
def _make_stats_query(sample_filter):
session = sqlalchemy_session.get_session()
query = session.query(
func.min(Meter.timestamp).label('tsmin'),
func.max(Meter.timestamp).label('tsmax'),
func.avg(Meter.counter_volume).label('avg'),
func.sum(Meter.counter_volume).label('sum'),
func.min(Meter.counter_volume).label('min'),
func.max(Meter.counter_volume).label('max'),
func.count(Meter.counter_volume).label('count'))
return make_query_from_filter(query, sample_filter)
开发者ID:hummermania,项目名称:ceilometer,代码行数:12,代码来源:impl_sqlalchemy.py
示例19: _make_stats_query
def _make_stats_query(sample_filter):
session = sqlalchemy_session.get_session()
query = session.query(
func.min(Meter.timestamp).label("tsmin"),
func.max(Meter.timestamp).label("tsmax"),
func.avg(Meter.counter_volume).label("avg"),
func.sum(Meter.counter_volume).label("sum"),
func.min(Meter.counter_volume).label("min"),
func.max(Meter.counter_volume).label("max"),
func.count(Meter.counter_volume).label("count"),
)
return make_query_from_filter(query, sample_filter)
开发者ID:pabelanger,项目名称:ceilometer,代码行数:13,代码来源:impl_sqlalchemy.py
示例20: __iter__
def __iter__(self):
"""Iterate over the schedule for the day."""
if not self.rooms:
raise StopIteration
def rowspan(start, end):
"""Find the rowspan for an entry in the schedule table.
This uses a binary search for the given end time from a
sorted list of start times in order to find the index of the
first start time that occurs after the given end time. This
method is used to prevent issues that can occur with
overlapping start and end times being included in the same
list.
"""
return bisect_left(times, end) - times.index(start)
times = sorted({slot.start for slot in self.slots})
# While we typically only care about the start times here, the
# list is iterated over two items at a time. Without adding a
# final element, the last time slot would be omitted. Any value
# could be used here as bisect_left only assumes the list is
# sorted, but using a meaningful value feels better.
times.append(self.slots[-1].end)
slots = db.session.query(
Slot.id,
Slot.content_override,
Slot.kind,
Slot.start,
Slot.end,
func.count(rooms_slots.c.slot_id).label('room_count'),
func.min(Room.order).label('order'),
).join(rooms_slots, Room).filter(Slot.day == self).order_by(
func.count(rooms_slots.c.slot_id), func.min(Room.order)
).group_by(
Slot.id, Slot.content_override, Slot.kind, Slot.start, Slot.end
).all()
for time, next_time in pairwise(times):
row = {'time': time, 'slots': []}
for slot in slots:
if slot.start == time:
slot.rowspan = rowspan(slot.start, slot.end)
slot.colspan = slot.room_count
if not slot.content_override:
slot.presentation = Presentation.query.filter(
Presentation.slot_id == slot.id).first()
row['slots'].append(slot)
if row['slots'] or next_time is None:
yield row
开发者ID:jmeekr,项目名称:pygotham,代码行数:51,代码来源:models.py
注:本文中的sqlalchemy.func.min函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论