本文整理汇总了Python中sqlalchemy.cast函数的典型用法代码示例。如果您正苦于以下问题:Python cast函数的具体用法?Python cast怎么用?Python cast使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了cast函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: weekList
def weekList():
if not redis.llen('rank:week'):
rows = db_session.query(
User.id,
User.username,
label('number', func.count(Investment.amount)),
label('total_amount', func.sum(Investment.amount))
).filter(
Investment.user_id == User.id,
cast(Investment.added_at, Date) <= datetime.datetime.today(),
cast(Investment.added_at, Date) >= datetime.datetime.today() -
datetime.timedelta(weeks=1)
).group_by(User.id).order_by(
func.sum(Investment.amount).desc()
).limit(15).all()
rank_list = []
for i in rows:
i = dict(zip(i.keys(), i))
data = {
'id': i['id'],
'username': i['username'],
'total_amount': float(i['total_amount']),
'number': i['number']
}
rank_list.append(data)
redis.rpush('rank:week', json.dumps(data))
redis.expire('rank:week', 3600)
else:
rank_list = [json.loads(i.decode()) for i in redis.lrange('rank:week', 0, redis.llen('rank:week'))]
return rank_list
开发者ID:xxguo,项目名称:leopard,代码行数:33,代码来源:rank.py
示例2: apply
def apply(self, query):
if self.op == self.default_op and self.value1 is None:
return query
if self.op in (ops.between, ops.not_between):
left = min(self.value1, self.value2)
right = max(self.value1, self.value2)
cond = self.sa_col.between(sa.cast(left, sa.Time), sa.cast(right, sa.Time))
if self.op == ops.not_between:
cond = ~cond
return query.filter(cond)
# Casting this because some SQLAlchemy dialects (MSSQL) convert the value to datetime
# before binding.
val = sa.cast(self.value1, sa.Time)
if self.op == ops.eq:
query = query.filter(self.sa_col == val)
elif self.op == ops.not_eq:
query = query.filter(self.sa_col != val)
elif self.op == ops.less_than_equal:
query = query.filter(self.sa_col <= val)
elif self.op == ops.greater_than_equal:
query = query.filter(self.sa_col >= val)
else:
query = super(TimeFilter, self).apply(query)
return query
开发者ID:level12,项目名称:webgrid,代码行数:27,代码来源:filters.py
示例3: add_bucket_field_to_query
def add_bucket_field_to_query(self, q, q_entities, field, field_entity):
# Get min, max if not provided.
field_min = 0
field_max = 0
if not field.has_key("min") or not field.has_key("max"):
field_min, field_max = self.get_field_min_max(field)
# Override calculated min/max if values were provided.
if field.has_key("min"):
field_min = field.get("min")
if field.has_key("max"):
field_max = field.get("max")
num_buckets = field.get("num_buckets", 10)
# Get bucket width.
bucket_width = (field_max - field_min) / num_buckets
# Get bucket field entities.
# Bit of a trick here: we use field_max - bucket_width because normally last bucket gets all values >= field_max.
# Here we use one less bucket, and then filter. This essentially makes the last bucket include values <= field_max.
bucket_entity = func.width_bucket(field_entity, field_min, field_max - bucket_width, num_buckets - 1)
q = q.filter(field_entity <= field_max)
bucket_label_entity = (
cast(field_min + (bucket_entity - 1) * bucket_width, String)
+ " to "
+ cast(field_min + bucket_entity * bucket_width, String)
)
bucket_label_entity = bucket_label_entity.label(field["label"])
q_entities.add(bucket_label_entity)
return q, bucket_label_entity
开发者ID:adorsk-noaa,项目名称:SASI-Model.pre20120621,代码行数:33,代码来源:sa_dao.py
示例4: retrieve_simulation_latest_job
def retrieve_simulation_latest_job(uid, job_type=JOB_TYPE_COMPUTING):
"""Returns set of latest jobs for active simulations.
:param str uid: Simulation UID.
:param str job_type: Type of job.
:returns: Job details.
:rtype: list
"""
j = types.Job
s = types.Simulation
qry = session.raw_query(
s.id, #0
j.typeof, #1
j.execution_state, #2
cast(j.is_compute_end, Integer), #3
cast(j.is_error, Integer), #4
as_datetime_string(j.execution_start_date), #5
as_datetime_string(j.execution_end_date) #6
)
qry = qry.join(j, s.uid == j.simulation_uid)
qry = qry.order_by(j.execution_start_date.desc())
qry = qry.filter(j.execution_start_date != None)
qry = qry.filter(j.execution_state != None)
qry = qry.filter(j.typeof == job_type)
qry = qry.filter(s.uid == uid)
return qry.first()
开发者ID:Prodiguer,项目名称:hermes-server,代码行数:31,代码来源:dao_monitoring_simulation.py
示例5: _repayments
def _repayments(self):
today_repayments = Plan.query.filter(
cast(Plan.plan_time, Date) == date.today(),
Plan.status == get_enum('PLAN_PENDING')
).order_by('plan_time desc').limit(10)
today_repay_amount = db_session.query(
func.sum(Plan.amount)
).filter(
cast(Plan.plan_time, Date) == date.today(),
Plan.status == get_enum('PLAN_PENDING')
).scalar()
if not today_repay_amount:
today_repay_amount = 0
total_repay_amount = db_session.query(
func.sum(Plan.amount)
).filter(Plan.status == get_enum('PLAN_PENDING')).scalar()
if not total_repay_amount:
total_repay_amount = 0
app.jinja_env.globals['today_repay_amount'] = today_repay_amount
app.jinja_env.globals['total_repay_amount'] = total_repay_amount
app.jinja_env.globals['today_repayments'] = today_repayments
开发者ID:xxguo,项目名称:leopard,代码行数:26,代码来源:__init__.py
示例6: get_period_schedule
def get_period_schedule(meetings, period, limit_min=None, limit_max=None):
if period:
if limit_max:
meetings = meetings.filter(
cast(schedule.Agenda.endtime, Time) < limit_max)
else:
meetings = meetings.filter(
cast(schedule.Agenda.endtime, Time) < period.end)
if limit_min:
meetings = meetings.filter(
cast(schedule.Agenda.starttime, Time) > limit_min)
else:
meetings = meetings.filter(
cast(schedule.Agenda.starttime, Time) > period.begin)
meetings = meetings.order_by(schedule.Agenda.starttime).all()
try:
first_patient_schedule_time = _get_first_meeting_time(meetings[0])
last_patient_schedule_time = _get_last_meeting_time(meetings[-1])
except IndexError:
if not period:
return ( (None, None), None, (None, None) )
else:
return ( (None, period.begin), None, (None, period.end) )
if not period:
return ( (first_patient_schedule_time, None), meetings,
(last_patient_schedule_time, None)
)
return ( _get_first_scheduled_time( period,
first_patient_schedule_time),
meetings,
_get_last_scheduled_time(period,
last_patient_schedule_time)
)
开发者ID:fluxspir,项目名称:odontux,代码行数:34,代码来源:schedule.py
示例7: _get_event_with_enterqueue
def _get_event_with_enterqueue(session, start, end, match, event):
start = start.strftime(_STR_TIME_FMT)
end = end.strftime(_STR_TIME_FMT)
enter_queues = (session
.query(QueueLog.callid,
cast(QueueLog.time, TIMESTAMP).label('time'))
.filter(and_(QueueLog.event == 'ENTERQUEUE',
between(QueueLog.time, start, end))))
enter_map = {}
for enter_queue in enter_queues.all():
enter_map[enter_queue.callid] = enter_queue.time
if enter_map:
res = (session
.query(QueueLog.event,
QueueLog.queuename,
cast(QueueLog.time, TIMESTAMP).label('time'),
QueueLog.callid,
QueueLog.data3)
.filter(and_(QueueLog.event == match,
QueueLog.callid.in_(enter_map))))
for r in res.all():
yield {
'callid': r.callid,
'queue_name': r.queuename,
'time': enter_map[r.callid],
'event': event,
'talktime': 0,
'waittime': int(r.data3) if r.data3 else 0
}
开发者ID:jaunis,项目名称:xivo-dao,代码行数:33,代码来源:queue_log_dao.py
示例8: update_scores
def update_scores(self, model, chunksize=10000):
# update node scores
waypoint_nodes = (
self.session.query(
Node,
ST_X(cast(Node.loc, Geometry)),
ST_Y(cast(Node.loc, Geometry)))
.filter(Node.num_ways != 0)
.order_by(func.random())) # random order
# process nodes in chunks for memory efficiency.
# note: normalization of scores is done per chunk, which should be a
# reasonable approximation to global normalization when the chunks are
# large since the query specifies random ordering
for chunk in _grouper(chunksize, waypoint_nodes):
nodes, x, y = zip(*chunk)
X = np.vstack((x, y)).T
scores = model.score_samples(X)
for node, score in zip(nodes, scores):
node.score = score
# update cumulative scores
sq = (
self.session.query(
Waypoint.id.label('id'),
func.sum(Node.score).over(
partition_by=Waypoint.way_id,
order_by=Waypoint.idx).label('cscore'))
.join(Node)
.subquery())
(self.session.query(Waypoint)
.filter(Waypoint.id == sq.c.id)
.update({Waypoint.cscore: sq.c.cscore}))
开发者ID:mcwitt,项目名称:scenicstroll,代码行数:35,代码来源:route_db.py
示例9: core_individuals_stations
def core_individuals_stations(request):
""" Get the stations of an identified individual. Parameter is : id (int)"""
try:
id = int(request.params["id"])
# Query
query = (
select(
[
cast(V_Individuals_LatLonDate.c.lat, Float),
cast(V_Individuals_LatLonDate.c.lon, Float),
V_Individuals_LatLonDate.c.date,
]
)
.where(V_Individuals_LatLonDate.c.ind_id == id)
.order_by(desc(V_Individuals_LatLonDate.c.date))
)
# Create list of features from query result
epoch = datetime.utcfromtimestamp(0)
features = [
{
"type": "Feature",
"properties": {"date": (date - epoch).total_seconds()},
"geometry": {"type": "Point", "coordinates": [lon, lat]},
}
for lat, lon, date in reversed(DBSession.execute(query).fetchall())
]
result = {"type": "FeatureCollection", "features": features}
return result
except:
return []
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:31,代码来源:individuals.py
示例10: query_recursive_tree
def query_recursive_tree():
structure_tree = (
DBSession.query(
Structure.id,
Structure.name,
Structure.parent_id,
cast(1, Integer()).label('depth'),
array([cast(Structure.name, Text)]).label('name_path'),
array([Structure.id]).label('path'),
)
.filter(Structure.condition_root_level())
.cte(name='structure_tree', recursive=True)
)
st = aliased(structure_tree, name='st')
s = aliased(Structure, name='s')
structure_tree = structure_tree.union_all(
DBSession.query(
s.id, s.name, s.parent_id,
(st.c.depth + 1).label('depth'),
func.array_append(
st.c.name_path, cast(s.name, Text)
).label('name_path'),
func.array_append(st.c.path, s.id).label('path'),
)
.filter(s.parent_id == st.c.id)
)
return DBSession.query(structure_tree)
开发者ID:alishir,项目名称:tcr,代码行数:27,代码来源:structures.py
示例11: getactivitystatistic
def getactivitystatistic():
"""get activity statistic information"""
try:
token = request.json['token']
activityid = request.json['activityid']
activity = getactivitybyid(activityid)
u = getuserinformation(token)
if u != None and activity != None:
state = 'successful'
reason = ''
registeredTotal = activity.users.count()
registeredToday = activity.users.filter(cast(models.attentactivity.timestamp, Date) == date.today()).count()
likedTotal = activity.likeusers.count()
likedToday = activity.likeusers.filter(cast(models.likeactivity.timestamp, Date) == date.today()).count()
result = {
'activity':activity.title,
'registeredTotal':registeredTotal,
'registeredToday':registeredToday,
'likedTotal':likedTotal,
'likedToday':likedToday,
}
else:
state = 'fail'
reason = 'invalid access'
result = ''
except Exception,e:
print e
state = 'fail'
reason = 'exception'
result = ''
开发者ID:iThinkSeu,项目名称:LoveinSEU,代码行数:34,代码来源:activityroute.py
示例12: applySearchParam
def applySearchParam(self, query, searchParam):
if searchParam.CustomerId:
query = query.filter(Order.CustomerId == searchParam.CustomerId)
if searchParam.CustomerName:
query = query.filter(CustomerContactDetails.FirstName.like("%%%s" % searchParam.CustomerName))
if searchParam.IpAddress:
query = query.filter(Order.IpAddress == searchParam.IpAddress)
if searchParam.FromDate and not searchParam.ToDate:
query = query.filter(cast(Order.OrderDate, Date) >= searchParam.FromDate)
if not searchParam.FromDate and searchParam.ToDate:
query = query.filter(cast(Order.OrderDate, Date) <= searchParam.ToDate)
if searchParam.FromDate and searchParam.ToDate:
query = query.filter(
cast(Order.OrderDate, Date) >= searchParam.FromDate, cast(Order.OrderDate, Date) <= searchParam.ToDate
)
if searchParam.MinAmount and not searchParam.MaxAmount:
query = query.filter(Order.OrderAmount >= searchParam.MinAmount)
if not searchParam.MinAmount and searchParam.MaxAmount:
query = query.filter(Order.OrderAmount <= searchParam.MaxAmount)
if searchParam.MinAmount and searchParam.MaxAmount:
query = query.filter(Order.OrderAmount >= searchParam.MinAmount, Order.OrderAmount <= searchParam.MaxAmount)
if searchParam.InvoiceStatus == "opened":
query = query.filter(or_((Order.OrderAmount - Order.PaidAmount) > 0.5, Order.OrderAmount == 0))
elif searchParam.InvoiceStatus == "closed":
query = query.filter(Order.OrderAmount != 0, (Order.PaidAmount - Order.OrderAmount) > 0.001)
elif searchParam.InvoiceStatus == "overdue":
query = query.filter((Order.OrderAmount - Order.PaidAmount) > 0.5, Order.DueDate < func.now())
return query
开发者ID:cackharot,项目名称:viper-pos,代码行数:31,代码来源:ReportService.py
示例13: get_stats
def get_stats(cls, date_from=None, date_to=None, limit=100):
"""
Return overall stats/summary or stats for a given date range if given
- date_to is inclusive
- if just date_from is given, only records for that date are returned
"""
query = db.query(cast(Visit.timestamp, Date),
func.count(Visit.id))
if date_from:
date_from, date_to = process_date_range(date_from, date_to)
query = query.filter(Visit.timestamp >= date_from,
Visit.timestamp < date_to)
query = query.group_by(cast(Visit.timestamp, Date))
query = query.order_by(cast(Visit.timestamp, Date))
if limit:
query = query.limit(limit)
stats = query.all()
return stats
开发者ID:kashifpk,项目名称:blogs_compulife,代码行数:25,代码来源:models.py
示例14: get_repeating
def get_repeating(self, resource, except_applications, start_date, end_date, statuses):
query = current_app.db_session.query(RepeatingSlot)
objects = query.filter(Application.resource == resource,
RepeatingSlot.application_id == Application.id,
Application.status.in_(statuses))
if except_applications:
objects = objects.filter(
~RepeatingSlot.application_id.in_(except_applications)
)
objects = objects.filter(
or_(
tuple_(
RepeatingSlot.start_date, RepeatingSlot.end_date
).op('overlaps')(
tuple_(
cast(start_date, Date), cast(end_date, Date)
)
),
or_(
# First range ends on the start date of the second
RepeatingSlot.end_date == cast(start_date, Date),
# Second range ends on the start date of the first
cast(end_date, Date) == RepeatingSlot.start_date
)
)
)
return objects.all()
开发者ID:Trondheim-kommune,项目名称:Bookingbasen,代码行数:30,代码来源:WeeklyRepeatingSlotsForResource.py
示例15: _add_ordering
def _add_ordering(sql_query, table, column_type, column_name, order):
# Special case for this column, which sorts contigs correctly:
if column_name == 'contig':
get_contig_num = cast(
text("SUBSTRING({} FROM '\d+')".format(table.c.contig)),
type_=Integer)
starts_with_chr = (text("SUBSTRING({} FROM '^chr(\d+)')"
.format(table.c.contig)) != literal(''))
starts_with_number = (text("SUBSTRING({} FROM '^\d+')"
.format(table.c.contig)) != literal(''))
# 10000 used here to mean "should be at the end of all the numbers",
# assuming we never hit a chromosome number >= 10000.
contig_num_col = case(
[(starts_with_chr, get_contig_num),
(starts_with_number, get_contig_num)],
else_=literal(10000)
)
contig_len_col = func.length(table.c.contig)
contig_col = table.c.contig
if order == 'desc':
contig_len_col = desc(contig_len_col)
contig_col = desc(contig_col)
return sql_query.order_by(contig_num_col, contig_len_col, contig_col)
sqla_type = vcf_type_to_sqla_type(column_type)
column = cast(table.c[column_name], type_=sqla_type)
column = {'asc': asc(column), 'desc': desc(column)}.get(order)
return sql_query.order_by(column)
开发者ID:hammerlab,项目名称:cycledash,代码行数:27,代码来源:genotypes.py
示例16: get_requested_slots
def get_requested_slots(self, resource, except_applications, start_date, end_date):
query = current_app.db_session.query(RepeatingSlotRequest)
objects = query.filter(
RepeatingSlotRequest.application.has(resource_id=resource.id)
)
objects = objects.filter(
RepeatingSlotRequest.application.has(status="Pending")
)
if except_applications:
objects = objects.filter(
~RepeatingSlotRequest.application_id.in_(except_applications)
)
objects = objects.filter(
or_(
tuple_(
RepeatingSlotRequest.start_date, RepeatingSlotRequest.end_date
).op('overlaps')(
tuple_(
cast(start_date, Date), cast(end_date, Date)
)
),
or_(
# First range ends on the start date of the second
RepeatingSlotRequest.end_date == cast(start_date, Date),
# Second range ends on the start date of the first
cast(end_date, Date) == RepeatingSlotRequest.start_date
)
)
)
return objects.all()
开发者ID:Trondheim-kommune,项目名称:Bookingbasen,代码行数:35,代码来源:WeeklyRepeatingSlotsForResource.py
示例17: incr
def incr(self, category, ids, time=0, delta=1):
self.delete_if_expired(category, ids)
expiration = expiration_from_time(time)
rp = self.table.update(sa.and_(self.table.c.category==category,
self.table.c.ids==ids,
self.table.c.kind=='num'),
values = {
self.table.c.value:
sa.cast(
sa.cast(self.table.c.value,
sa.Integer) + delta,
sa.String),
self.table.c.expiration: expiration
}
).execute()
if rp.rowcount == 1:
return self.get(category, ids)
elif rp.rowcount == 0:
existing_value = self.get(category, ids)
if existing_value is None:
raise ValueError("[%s][%s] can't be incr()ed -- it's not set" %
(category, ids))
else:
raise ValueError("[%s][%s] has non-integer value %r" %
(category, ids, existing_value))
else:
raise ValueError("Somehow %d rows got updated" % rp.rowcount)
开发者ID:JediWatchman,项目名称:reddit,代码行数:29,代码来源:hardcachebackend.py
示例18: _log_deposit_action
def _log_deposit_action(deposit, user, cashbox: str, final_amount: float, action: str):
# Without these casts, the strings will end up as type 'unknown' in postgres, where the function lookup will fail due to incorrect type signature
username = cast(user.username, String)
action_string = cast(action, String)
deposit_name = cast(deposit.name, String)
proc = procs.exam_deposit_action(cash_box_ids[cashbox], action_string, final_amount, username, deposit_name)
sqla.session.execute(proc)
开发者ID:Kha,项目名称:odie-server,代码行数:7,代码来源:accounting.py
示例19: search_route
def search_route():
query = request.args.get('q')
per_page = request.args.get('per_page', 10, type=int)
page = request.args.get('page', 0, type=int)
if query is None:
return json.dumps({'results':[]})
result = {}
with session_scope() as session:
results = session.query(
City.name,
City.country,
func.ST_Y(cast(City.location, Geometry())),
func.ST_X(cast(City.location, Geometry())),
City.id
) \
.filter(unaccent(City.name).ilike(unaccent('%' + query + '%'))) \
.limit(per_page + 1) \
.offset(page * per_page) \
.all()
more = len(results) == per_page + 1
results = results[:per_page]
result = json.dumps({
'results':
[{'id': c[4], 'text': '{}, {}'.format(c[0], c[1]), 'coords': (c[2], c[3])} for i,c in enumerate(results)],
'more': more})
return result
开发者ID:simlmx,项目名称:meteomap,代码行数:27,代码来源:site.py
示例20: filter_single_by_time
def filter_single_by_time(cls, type, objects, year=None, week_number=None, day=None):
assert (week_number and year) or day
start_date, end_date = None, None
if year and week_number:
start_date, end_date = get_start_and_end_date_from_week_and_year(
year,
week_number
)
if day:
start_date = day
end_date = day
objects = objects.filter(
or_(
tuple_(
cast(type.start_time, Date), cast(type.end_time, Date)
).op('overlaps')(
tuple_(
start_date, end_date
)
),
or_(
# First range ends on the start date of the second
cast(type.end_time, Date) == start_date,
# Second range ends on the start date of the first
end_date == cast(type.start_time, Date)
)
)
)
return objects
开发者ID:Trondheim-kommune,项目名称:Bookingbasen,代码行数:33,代码来源:SlotsForWeekResource.py
注:本文中的sqlalchemy.cast函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论