本文整理汇总了Python中sqlalchemy.func.max函数的典型用法代码示例。如果您正苦于以下问题:Python max函数的具体用法?Python max怎么用?Python max使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了max函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: newer_version
def newer_version(lowersuite_name, highersuite_name, session, include_equal=False):
'''
Finds newer versions in lowersuite_name than in highersuite_name. Returns a
list of tuples (source, higherversion, lowerversion) where higherversion is
the newest version from highersuite_name and lowerversion is the newest
version from lowersuite_name.
'''
lowersuite = get_suite(lowersuite_name, session)
highersuite = get_suite(highersuite_name, session)
query = session.query(DBSource.source, func.max(DBSource.version)). \
with_parent(highersuite).group_by(DBSource.source)
list = []
for (source, higherversion) in query:
q = session.query(func.max(DBSource.version)). \
filter_by(source=source)
if include_equal:
q = q.filter(DBSource.version >= higherversion)
else:
q = q.filter(DBSource.version > higherversion)
lowerversion = q.with_parent(lowersuite).group_by(DBSource.source).scalar()
if lowerversion is not None:
list.append((source, higherversion, lowerversion))
list.sort()
return list
开发者ID:Debian,项目名称:dak,代码行数:28,代码来源:cruft.py
示例2: populate_months
def populate_months(session):
if session.query(Month).count() > 0:
raise Exception("Months table is already populated.")
demimonth = datetime.timedelta(days=14)
first_chat = session.query(func.min(Chat.date)).scalar()
first_bugevent = session.query(func.min(BugEvent.date)).scalar()
start_date = max(first_chat, first_bugevent)
print "First chat is " + str(first_chat)
print "First bug event is " + str(first_bugevent)
print "Starting months on " + str(start_date)
last_chat = session.query(func.max(Chat.date)).scalar()
last_bugevent = session.query(func.max(BugEvent.date)).scalar()
end_date = min(last_chat, last_bugevent)
print "Last chat is " + str(last_chat)
print "Last bug event is " + str(last_bugevent)
print "End months on or around " + str(end_date)
start = start_date
end = start_date + datetime.timedelta(days=27) # start + 27 days = 28 day span
while end < end_date:
month = Month(first=start, last=end)
session.add(month)
start += demimonth
end += demimonth
session.commit()
开发者ID:colinmorris,项目名称:moz-graphs,代码行数:31,代码来源:months.py
示例3: _analyze
def _analyze(self, session):
newest_timestamp = session.execute(func.max(NodeSample.sampled)).scalar()
self.nodes = session.query(NodeSample).filter(NodeSample.sampled == newest_timestamp).all()
newest_timestamp = session.execute(func.max(LinkSample.sampled)).scalar()
self.links = session.query(LinkSample).filter(LinkSample.sampled == newest_timestamp).all()
self.samples.add(newest_timestamp)
开发者ID:UdS-TelecommunicationsLab,项目名称:SDNalytics,代码行数:7,代码来源:topology.py
示例4: book
def book(bookID):
form = BidForm()
bookS = db.session.query(Book).filter(Book.book_id == bookID).scalar()
print(str(db.session.query(Book).filter(Book.book_id == bookID).as_scalar()))
aucID = db.session.query(Auction.auc_id).filter(Auction.book_id == bookID).scalar()
print(str(db.session.query(Auction.auc_id).filter(Auction.book_id == bookID).as_scalar()))
curPrice = db.session.query(func.max(Bid.bid_price)).filter(Bid.auc_id == aucID).scalar()
print(str(db.session.query(func.max(Bid.bid_price)).filter(Bid.auc_id == aucID).as_scalar()))
if request.method == 'POST':
if not form.validate():
flash('Bid Unsuccessful')
return render_template('auction/book.html', form=form, book=bookS, curPrice=curPrice)
else:
if 'email' not in session:
return redirect(url_for('auth.signin'))
else:
usrID = db.session.query(User.user_id).filter(User.email == session['email']).scalar()
highBid = db.session.query(Bid.bid_price).filter(Bid.auc_id == aucID).\
filter(Bid.bid_price >= form.bid_price.data).first()
if highBid:
flash('Your bid needs to be higher then the current price')
return render_template('auction/book.html', form=form, book=bookS, curPrice=curPrice)
else:
newBid = Bid(auc_id=aucID, user_id=usrID, bid_price=form.bid_price.data)
db.session.add(newBid)
db.session.commit()
flash('Bid Successful')
return render_template('auction/book.html', form=form, book=bookS, curPrice=form.bid_price.data)
elif request.method == 'GET':
return render_template('auction/book.html', form=form, book=bookS, curPrice=curPrice)
开发者ID:ncdesouza,项目名称:bookworm,代码行数:31,代码来源:controllers.py
示例5: compress_smallest_box
def compress_smallest_box():
last_box = session.query(func.max(sqlalchemy.cast(InvCard.box, sqlalchemy.Integer))).first()[0]
box_capacity = list(metadata.bind.execute("select box,60 - count(*) as c from inv_cards where box not null group by box having c>0 order by c desc;"))
remove_box = box_capacity[0][0]
box_capacity = box_capacity[1:]
cards_in_remove_box = InvCard.query.filter_by(box=str(remove_box)).order_by(InvCard.box_index.desc()).all()
move_orders = fit_boxes(box_capacity, len(cards_in_remove_box))
i=0
print "********** move %d cards from box %s **********" % (60-box_capacity[0][1], remove_box)
print "\tall boxes: %s" % sorted([int(x) for x in [remove_box] + [b for b,o in move_orders]])
for box, count in move_orders:
max_index = session.query(func.max(InvCard.box_index)).filter_by(box=box).one()[0]
print "======= moving %d cards to box %s ======" % (count, box)
for card in cards_in_remove_box[i:count+i]:
print u"move %s to %s/%d" % (card, box, max_index)
max_index += 1
card.box = box
card.box_index = max_index
i+=count
if remove_box != last_box:
cards_in_last_box = InvCard.query.filter_by(box=str(last_box)).order_by(InvCard.box_index).all()
print "********** finally, move all %d cards from %s to %s **********" % (len(cards_in_last_box),last_box, remove_box)
for card in cards_in_last_box:
card.box = remove_box
raw_input()
session.commit()
开发者ID:chrismeyersfsu,项目名称:card_scan,代码行数:30,代码来源:compress_boxes.py
示例6: get_todays_electricity
def get_todays_electricity():
return Electricity.query.with_entities(
(func.max(Electricity.meter_280) - func.min(Electricity.meter_280)).label(
'todays_export'), (func.max(Electricity.meter_180) - func.min(Electricity.meter_180)).label(
'todays_import')).filter(
func.strftime('%Y-%m-%d', Electricity.created_at) == datetime.now().strftime('%Y-%m-%d')).group_by(
func.strftime('%Y-%m-%d', Electricity.created_at)).first()
开发者ID:diegoronal,项目名称:solarpi,代码行数:7,代码来源:helper.py
示例7: _compute_ranks
def _compute_ranks():
tc_session.clear()
min_r = select([func.min(BwHistory.rank)],
BwHistory.table.c.router_idhex
== RouterStats.table.c.router_idhex).as_scalar()
avg_r = select([func.avg(BwHistory.rank)],
BwHistory.table.c.router_idhex
== RouterStats.table.c.router_idhex).as_scalar()
max_r = select([func.max(BwHistory.rank)],
BwHistory.table.c.router_idhex
== RouterStats.table.c.router_idhex).as_scalar()
avg_bw = select([func.avg(BwHistory.bw)],
BwHistory.table.c.router_idhex
== RouterStats.table.c.router_idhex).as_scalar()
avg_desc_bw = select([func.avg(BwHistory.desc_bw)],
BwHistory.table.c.router_idhex
== RouterStats.table.c.router_idhex).as_scalar()
RouterStats.table.update(values=
{RouterStats.table.c.min_rank:min_r,
RouterStats.table.c.avg_rank:avg_r,
RouterStats.table.c.max_rank:max_r,
RouterStats.table.c.avg_bw:avg_bw,
RouterStats.table.c.avg_desc_bw:avg_desc_bw}).execute()
#min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar()
RouterStats.table.update(values=
{RouterStats.table.c.percentile:
(100.0*RouterStats.table.c.avg_rank)/max_avg_rank}).execute()
tc_session.commit()
开发者ID:0Chuzz,项目名称:pytorctl,代码行数:32,代码来源:SQLSupport.py
示例8: index_old
def index_old(self):
from gviz_data_table import Table
from rockpack.mainsite.services.user.models import User, UserActivity
user_count = readonly_session.query(func.count(User.id)).\
filter(User.refresh_token != '').scalar()
header = ('user count', 'max lifetime', 'avg lifetime', 'stddev lifetime',
'max active days', 'avg active days', 'stddev active days')
lifetime = func.date_part('days', func.max(UserActivity.date_actioned) -
func.min(UserActivity.date_actioned)).label('lifetime')
active_days = func.count(func.distinct(func.date(
UserActivity.date_actioned))).label('active_days')
activity = readonly_session.query(UserActivity.user, lifetime, active_days).\
group_by(UserActivity.user)
ctx = {}
for key, having_expr in ('all', None), ('1day', lifetime > 1), ('7day', lifetime > 7):
data = activity.having(having_expr).from_self(
func.count('*'),
func.max(lifetime),
func.avg(lifetime),
func.stddev_samp(lifetime),
func.max(active_days),
func.avg(active_days),
func.stddev_samp(active_days)
).one()
table = Table([
dict(id='metric', type=str),
dict(id='value', type=float),
dict(id='%', type=str),
])
pdata = ('%d%%' % (data[0] * 100 / user_count),) + ('',) * 6
table.extend(zip(*(header, map(float, data), pdata)))
ctx['ret_%s_data' % key] = table.encode()
return self.render('admin/retention_stats_old.html', **ctx)
开发者ID:wonderpl,项目名称:dolly-web,代码行数:34,代码来源:stats_views.py
示例9: __init__
def __init__(self, cnae_id, bra_id):
Industry.__init__(self, cnae_id)
self.max_year = db.session.query(func.max(Ybi.year)).filter_by(cnae_id=cnae_id)
self.rais_query = Ybi.query.join(Bra).filter(
Bra.id == Ybi.bra_id,
Ybi.cnae_id == self.cnae_id,
Ybi.bra_id_len == 9,
Ybi.year == self.max_year,
)
self.state_query = Ybi.query.join(Bra).filter(
Ybi.cnae_id == self.cnae_id,
Ybi.year == self.max_year,
Ybi.bra_id_len == 3
).order_by(desc(Ybi.num_jobs)).first()
if bra_id:
self.bra_id = bra_id
self.max_year = db.session.query(func.max(Ybi.year)).filter_by(cnae_id=cnae_id)
self.rais_query = Ybi.query.join(Bra).filter(
Bra.id == Ybi.bra_id,
Ybi.cnae_id == self.cnae_id,
Ybi.bra_id_len == 9,
Ybi.bra_id.like(self.bra_id+'%'),
Ybi.year == self.max_year
)
开发者ID:DataViva,项目名称:dataviva-site,代码行数:25,代码来源:services.py
示例10: new_lease
def new_lease(self, add_ctrl, add_node, options):
"""Get a new lease
return add_ctrl, add_node, options
:param add_ctrl: the controller part of the address
:type add_ctrl: Integer
:param add_node: the node part of the address. 0 for controller
:type add_node: Integer
:returns: A dict with all informations
:rtype: dict()
"""
#Check for malformated request
self._new_lease_lock.acquire()
try:
if add_ctrl == -1:
#A new controller wants an hadd.
#Find and return max(add_ctrl), 0
max_ctrl = self.dbsession.query(func.max(jntmodel.Lease.add_ctrl)).scalar()
if max_ctrl < 10:
add_ctrl = 10
else:
add_ctrl = max_ctrl + 1
add_node = 0
else:
#A new node wants an hadd
#check if add_ctrl,0 exists
#Find and return add_ctrl, max(add_node)
max_node = self.dbsession.query(func.max(jntmodel.Lease.add_node)).filter(jntmodel.Lease.add_ctrl==add_ctrl).scalar()
if max_node is None:
return None
add_node = max_node + 1
return self.repair_lease(add_ctrl, add_node, options)
finally:
self._new_lease_lock.release()
开发者ID:bibi21000,项目名称:janitoo_dhcp,代码行数:34,代码来源:lease.py
示例11: createGraph
def createGraph(self):
# Get the highest node id
query = self.session.query(func.max(self.edge_data.start_node), func.max(self.edge_data.end_node))
for q in query:
counter = max(q) + 1
self.consoleAppend("Start creating the graph")
self.virtual_edges = dict()
for edge in self.query:
key1 = edge.start_node, edge.end_node
key2 = edge.end_node, edge.start_node
# Edge has not been added to the graph yet has not been added
if self.G.get_edge_data(*key1) is None or self.G.get_edge_data(*key2) is None:
if self.weighted:
self.G.add_edge(edge.start_node, edge.end_node, weight=edge.length)
else:
self.G.add_edge(edge.start_node, edge.end_node)
else:
# Add a vitual node
if self.weighted:
self.G.add_edge(edge.start_node, counter, weight=edge.length / 2)
self.G.add_edge(counter, edge.end_node, weight=edge.length / 2)
else:
self.G.add_edge(edge.start_node, counter)
self.G.add_edge(counter, edge.end_node)
vedge = edge.start_node, counter
# vedge1 bc value is equal to vegde2 bc value by definition -> only one edge vedge
self.virtual_edges[key1] = vedge
self.virtual_edges[key2] = vedge
counter = counter + 1
self.consoleAppend("Graph created")
开发者ID:jcaillet,项目名称:mca,代码行数:32,代码来源:betweenness.py
示例12: api_outlimit_stations
def api_outlimit_stations(request, propid, startdate, enddate, limit):
session = Session()
#response = HttpResponse(mimetype='text/csv')
#response['Content-Disposition'] = 'attachment; filename="obs_' + stid + '_' + propid + '_' + date + '.csv"'
startdate = datetime(int(startdate.split('-')[0]), int(startdate.split('-')[1]), int(startdate.split('-')[2]))
enddate = datetime(int(enddate.split('-')[0]), int(enddate.split('-')[1]), int(enddate.split('-')[2]))
#max = aliased(func.max(Observation.value))
#currentdate = aliased(func.date(Observation.date))
observations = session.query(func.max(Observation.value), Station.municipality, Station.lat, Station.lng, func.date(Observation.date)).\
join(Station).join(Property).\
filter(Observation.date.between(startdate, enddate), Property.name == propid).\
group_by(func.date(Observation.date), Station.code).having(func.max(Observation.value) >= float(limit)).all()
#Observation.value >= limit
#writer = csv.writer(response)
#for obs in observations:
# writer.writerow([obs.station.lat, obs.station.lng, obs.date, obs.value])
resp = []
for obs in observations:
o = {}
o['lat'] = obs[2]
o['lng'] = obs[3]
o['date'] = obs[4].isoformat()
o['municipality'] = obs[1]
o['value'] = obs[0]
resp.append(o)
session.close()
#return response
return resp
开发者ID:johnfelipe,项目名称:bizkaisense,代码行数:35,代码来源:views.py
示例13: last_trial
def last_trial(cls, script=None, parent_required=False,
session=None):
"""Return last trial according to start time
Keyword arguments:
script -- specify the desired script (default=None)
parent_required -- valid only if script exists (default=False)
"""
model = cls.m
session = session or relational.session
trial = (
session.query(model)
.filter(model.start.in_(
select([func.max(model.start)])
.where(model.script == script)
))
).first()
if trial or parent_required:
return trial
return (
session.query(model)
.filter(model.start.in_(
select([func.max(model.start)])
))
).first()
开发者ID:gems-uff,项目名称:noworkflow,代码行数:25,代码来源:trial.py
示例14: get_extent
def get_extent():
return DBSession.query(
func.min(func.ST_XMin(AdminZone.geometry)),
func.min(func.ST_YMin(AdminZone.geometry)),
func.max(func.ST_XMax(AdminZone.geometry)),
func.max(func.ST_YMax(AdminZone.geometry)),
).first()
开发者ID:kamalhg,项目名称:nosfinanceslocales,代码行数:7,代码来源:maps.py
示例15: get_stats
def get_stats():
result = db.session.query(
func.min(Stop.stop_lat), func.min(Stop.stop_lon), func.max(Stop.stop_lat), func.max(Stop.stop_lon), func.count()
).first()
data = {"minLat": result[0], "minLon": result[1], "maxLat": result[2], "maxLon": result[3], "numbers": result[4]}
return jsonify({"stops": data})
开发者ID:fabid,项目名称:gtfseditor,代码行数:7,代码来源:stats.py
示例16: main
def main():
logging.config.fileConfig(path_to_cfg)
start_utc = datetime.utcnow()
start_time = time()
global r
try:
r = reddit.Reddit(user_agent=cfg_file.get('reddit', 'user_agent'))
logging.info('Logging in as %s', cfg_file.get('reddit', 'username'))
r.login(cfg_file.get('reddit', 'username'),
cfg_file.get('reddit', 'password'))
subreddits = Subreddit.query.filter(Subreddit.enabled == True).all()
sr_dict = dict()
for subreddit in subreddits:
sr_dict[subreddit.name.lower()] = subreddit
mod_subreddit = r.get_subreddit('mod')
except Exception as e:
logging.error(' ERROR: %s', e)
# check reports
items = mod_subreddit.get_reports(limit=1000)
stop_time = datetime.utcnow() - REPORT_BACKLOG_LIMIT
check_items('report', items, sr_dict, stop_time)
# check spam
items = mod_subreddit.get_spam(limit=1000)
stop_time = (db.session.query(func.max(Subreddit.last_spam))
.filter(Subreddit.enabled == True).one()[0])
check_items('spam', items, sr_dict, stop_time)
# check new submissions
items = mod_subreddit.get_new_by_date(limit=1000)
stop_time = (db.session.query(func.max(Subreddit.last_submission))
.filter(Subreddit.enabled == True).one()[0])
check_items('submission', items, sr_dict, stop_time)
# check new comments
comment_multi = '+'.join([s.name for s in subreddits
if not s.reported_comments_only])
if comment_multi:
comment_multi_sr = r.get_subreddit(comment_multi)
items = comment_multi_sr.get_comments(limit=1000)
stop_time = (db.session.query(func.max(Subreddit.last_comment))
.filter(Subreddit.enabled == True).one()[0])
check_items('comment', items, sr_dict, stop_time)
# respond to modmail
try:
respond_to_modmail(r.user.get_modmail(), start_utc)
except Exception as e:
logging.error(' ERROR: %s', e)
# check reports html
try:
check_reports_html(sr_dict)
except Exception as e:
logging.error(' ERROR: %s', e)
logging.info('Completed full run in %s', elapsed_since(start_time))
开发者ID:LateNitePie,项目名称:AutoModerator,代码行数:60,代码来源:modbot.py
示例17: _get_cache_key
def _get_cache_key(self, session):
updated_cron = session.query(
func.max(paasmaker.model.ApplicationInstanceTypeCron.updated)
).scalar()
deleted_cron = session.query(
func.max(paasmaker.model.ApplicationInstanceTypeCron.deleted)
).scalar()
updated_version = session.query(
func.max(paasmaker.model.ApplicationVersion.updated)
).scalar()
deleted_version = session.query(
func.max(paasmaker.model.ApplicationVersion.deleted)
).scalar()
max_date = self._max_dates(
updated_cron,
deleted_cron,
updated_version,
deleted_version
)
summer = hashlib.md5()
summer.update(max_date)
key = summer.hexdigest()
return key
开发者ID:kaze,项目名称:paasmaker,代码行数:26,代码来源:cronrunner.py
示例18: get_current_data
def get_current_data():
"""Returns JSON describing the last thing in the system log."""
global ol0
global wd0
#perform query
opLog = db.session.query(ol0).filter(ol0.id==db.session.query(ol0).with_entities(func.max(ol0.id)).one()[0])[0]
wData = db.session.query(wd0).filter(wd0.id==db.session.query(wd0).with_entities(func.max(wd0.id)).one()[0])[0]
mTime = unix_time(opLog.time)
inTemp = opLog.indoorTemp
setPtTemp = opLog.setpointTemp
state = unicode(thermoStateStr[opLog.state])
extTemp = wData.extTemp
extTempTime = unix_time(wData.time)
return jsonify({
u'inTemp' : inTemp,
u'inTempTime' : mTime,
u'outTemp' : extTemp,
u'outTempTime' : extTempTime,
u'setPtTemp' : setPtTemp,
u'opMode' : state
})
开发者ID:evanfletcher42,项目名称:thermostat-rpi,代码行数:26,代码来源:views.py
示例19: api_organisation_activities
def api_organisation_activities(organisation_code, test_id, hierarchy_id=None):
if (("offset" in request.args) and (int(request.args['offset'])>=0)):
offset = int(request.args['offset'])
else:
offset = 0
organisation = Organisation.query.filter(Organisation.organisation_code==organisation_code
).first()
if (hierarchy_id):
if (hierarchy_id=="None"): hierarchy_id=None
"""test_count = db.session.query(func.count(Result.result_identifier)
).filter(Organisation.organisation_code == organisation_code,
Result.test_id==test_id,
Result.result_hierarchy==hierarchy_id
).join(Package
).join(OrganisationPackage
).join(Organisation
).all()"""
test_results = db.session.query(Result.result_identifier,
Result.result_data,
func.max(Result.runtime_id)
).filter(Organisation.organisation_code == organisation_code,
Result.test_id==test_id,
Result.result_hierarchy==hierarchy_id
).group_by(Result.result_identifier
).group_by(Result.result_data
).join(Package
).join(OrganisationPackage
).join(Organisation
).limit(50
).offset(offset
).all()
else:
"""test_count = db.session.query(func.count(Result.result_identifier)
).filter(Organisation.organisation_code == organisation_code,
Result.test_id==test_id
).join(Package
).join(OrganisationPackage
).join(Organisation
).all()"""
test_results = db.session.query(Result.result_identifier,
Result.result_data,
func.max(Result.runtime_id)
).filter(Organisation.organisation_code == organisation_code,
Result.test_id==test_id
).group_by(Result.result_identifier
).join(Package
).join(OrganisationPackage
).join(Organisation
).limit(50
).offset(offset
).all()
test_results = dict(map(lambda x: (x[0],x[1]), test_results))
if ((organisation_code == None) or (test_results==None)):
abort(404)
else:
return jsonify({"results": test_results})
开发者ID:andylolz,项目名称:IATI-Data-Quality,代码行数:59,代码来源:api.py
示例20: index
def index(self):
max = DBSession.query(func.max(Sensor.lat)).one()[0]
min = DBSession.query(func.min(Sensor.lat)).one()[0]
lat = (max + min) / 2
max = DBSession.query(func.max(Sensor.lng)).one()[0]
min = DBSession.query(func.min(Sensor.lng)).one()[0]
lng = (max + min) / 2
return dict(page='map', lat=lat, lng=lng)
开发者ID:tylerwhall,项目名称:Flimsy,代码行数:8,代码来源:map.py
注:本文中的sqlalchemy.func.max函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论