本文整理汇总了Python中sqlalchemy.between函数的典型用法代码示例。如果您正苦于以下问题:Python between函数的具体用法?Python between怎么用?Python between使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了between函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: SelectByDateRange
def SelectByDateRange(self, query, date_type, from_date_string, to_date_string):
print from_date_string[6:10], from_date_string[0:2], from_date_string[3:5]
from_date=datetime.date(int(from_date_string[6:10]), int(from_date_string[0:2]), int(from_date_string[3:5]))
to_date=datetime.date(int(to_date_string[6:10]), int(to_date_string[0:2]), int(to_date_string[3:5]))
if date_type == 'CreatedDate':
return query.filter(between(FileIndex.ctime, from_date, to_date))
elif date_type == 'ModifiedDate':
return query.filter(between(FileIndex.mtime, from_date, to_date))
elif date_type == 'AddedDate':
return query.filter(between(FileIndex.added_time, from_date, to_date))
return query
开发者ID:BwRy,项目名称:DarunGrim,代码行数:12,代码来源:FileStoreDatabase.py
示例2: list_all
def list_all(country_code=None):
"""Show a list of all airports."""
or_args = []
if country_code is None:
or_args = [between(Airport.style, 2, 5)]
else:
or_args = [and_(between(Airport.style, 2, 5),
Airport.country_code == country_code)]
query = session.query(Airport) \
.order_by(Airport.name) \
.filter(*or_args)
print('--- Airports ---')
for airport in query.all():
print(airport.name)
开发者ID:glidernet,项目名称:ogn-python,代码行数:15,代码来源:showairport.py
示例3: listIdentifiers
def listIdentifiers(self, metadataPrefix, set=None, cursor=None,
from_=None, until=None, batch_size=None):
'''List all identifiers for this repository.
'''
data = []
packages = []
if not set:
if not from_ and not until:
packages = Session.query(Package).all()
else:
if from_:
packages = Session.query(Package).\
filter(PackageRevision.revision_timestamp > from_).\
all()
if until:
packages = Session.query(Package).\
filter(PackageRevision.revision_timestamp < until).\
all()
if from_ and until:
packages = Session.query(Package).\
filter(between(PackageRevision.revision_timestamp,
from_,
until)\
).all()
else:
group = Group.get(set)
if group:
packages = group.active_packages()
if from_ and not until:
packages = packages.\
filter(PackageRevision.revision_timestamp > from_)
if until and not from_:
packages = packages.\
filter(PackageRevision.revision_timestamp < until)
if from_ and until:
packages = packages.filter(
between(PackageRevision.revision_timestamp,
from_,
until))
packages = packages.all()
if cursor:
packages = packages[:cursor]
for package in packages:
data.append(common.Header(package.id,
package.metadata_created,
[package.name],
False))
return data
开发者ID:ilrt,项目名称:ckanext-oaipmh,代码行数:48,代码来源:oaipmh_server.py
示例4: prune_keep_record
def prune_keep_record(job_id, keep, storage):
event = storage.session.query(Event).filter_by(job_id=job_id).\
order_by(asc(Event.id)).limit(1).first()
min_id = None
if event is not None:
min_id = event.id
events = storage.session.query(Event).filter_by(job_id=job_id).\
order_by(desc(Event.id)).limit(keep)
event_ids = [e.id for e in events]
if len(event_ids) == 0:
logger.info('No events for {0}'.format(job_id))
return
max_id = min(event_ids)
if min_id == max_id:
logger.info('Min and max event ids for {0} are the same: {1} - {2}'.format( # noqa
job_id, min_id, max_id))
return
if min_id > max_id:
logger.info('Min event id for {0} is larger than max event id: {1} - {2}'.format( # noqa
job_id, min_id, max_id))
return
logger.info('Job ID {0}, Pruning events {1} - {2}'.format(
job_id, min_id, max_id))
stmt = Event.__table__.delete()\
.where(between(Event.id, min_id, max_id))\
.where(Event.job_id == job_id)
storage._engine.execute(stmt)
storage.session.commit()
开发者ID:seatgeek,项目名称:cronq,代码行数:34,代码来源:pruner.py
示例5: active_on
def active_on(cls, date, transient=None):
"""Constructs a check to filter the table 'transient' (with columns
'effective_from' and 'effective_to') down to only those rows active on
'date'.
Args:
date: The datetime on which the transient must be active.
transient: If given, the table, column set or model whose transient
columns will be checked; if None, the class from which this
method was called will be the transient. (Default: None.)
Returns:
A SQLAlchemy expression implementing the transient activity check.
"""
null = None # stop static analysis checkers from moaning about == None
if transient is None:
transient = cls
return sqlalchemy.between(
date,
transient.effective_from,
sqlalchemy.case(
[
# NULL effective_to => always on past effective_from
(transient.effective_to == null, date),
(transient.effective_to != null, transient.effective_to)
]
)
)
开发者ID:UniversityRadioYork,项目名称:lass-pyramid,代码行数:30,代码来源:mixins.py
示例6: index
def index(self):
from gviz_data_table import Table
from rockpack.mainsite.services.user.models import User, UserActivity, UserAccountEvent
if request.args.get('activity') == 'activity':
activity_model, activity_date = UserActivity, UserActivity.date_actioned
else:
activity_model, activity_date = UserAccountEvent, UserAccountEvent.event_date
try:
interval_count = int(request.args['interval_count'])
except Exception:
interval_count = 10
interval = request.args.get('interval')
if interval not in ('week', 'month'):
interval = 'week'
cohort = func.date_part(interval, User.date_joined)
cohort_label = func.min(func.date(User.date_joined))
active_interval = (func.date_part(interval, activity_date) - cohort).label('active_interval')
q = readonly_session.query(User).filter(
User.date_joined > LAUNCHDATE, User.refresh_token != '')
if request.args.get('gender') in ('m', 'f'):
q = q.filter(User.gender == request.args['gender'])
if request.args.get('locale') in app.config['ENABLED_LOCALES']:
q = q.filter(User.locale == request.args['locale'])
if request.args.get('age') in ('13-18', '18-25', '25-35', '35-45', '45-55'):
age1, age2 = map(int, request.args['age'].split('-'))
q = q.filter(between(
func.age(User.date_of_birth),
text("interval '%d years'" % age1),
text("interval '%d years'" % age2)
))
active_users = dict(
((c, int(w)), u) for c, w, u in
q.join(
activity_model,
(activity_model.user == User.id) &
(activity_date >= User.date_joined)
).group_by(cohort, active_interval).values(
cohort, active_interval, func.count(func.distinct(activity_model.user))
)
)
table = Table(
[dict(id='cohort', type=date)] +
[dict(id='%s%d' % (interval, i), type=str) for i in range(interval_count)]
)
totals = q.group_by(cohort).order_by(cohort)
for c, l, t in totals.values(cohort, cohort_label, func.count()):
data = []
for i in range(interval_count):
a = active_users.get((c, i), '')
data.append(a and '%d%% (%d)' % (ceil(a * 100.0 / t), a))
table.append([l] + data)
return self.render('admin/retention_stats.html', data=table.encode())
开发者ID:wonderpl,项目名称:dolly-web,代码行数:60,代码来源:stats_views.py
示例7: query_criteria
def query_criteria(query, table, criteria):
for column, value in criteria.items():
if column in table.c:
if re.search(',',value):
sp = value.split(',')
if sp[0] == 'IN':
if re.search('|',sp[1]):
data = sp[1].split('|')
else:
data = sp[1]
query = query.where(table.c[column].in_(data))
elif sp[0] == 'LIKE':
query = query.where(table.c[column].like('%'+sp[1]+'%'))
elif sp[0] == '<':
query = query.where(table.c[column] < sp[1])
elif sp[0] == '>':
query = query.where(table.c[column] > sp[1])
elif sp[0] == '<>':
query = query.where(table.c[column] != sp[1])
elif sp[0] == '=':
query = query.where(table.c[column] == sp[1])
else:
if value == 'IS NULL':
query = query.where(table.c[column].is_(None))
elif value == 'IS NOT NULL':
query = query.where(table.c[column].isnot(None))
else:
if column == 'bbox':
sp = value.split(',')
query = query.where(and_(between(table.c['LAT'], float(sp[1]), float(sp[3])), between(table.c['LON'], float(sp[0]), float(sp[2]))))
return query
开发者ID:NaturalSolutions,项目名称:ecoReleve-Server,代码行数:31,代码来源:views.py
示例8: _get_event_with_enterqueue
def _get_event_with_enterqueue(session, start, end, match, event):
start = start.strftime(_STR_TIME_FMT)
end = end.strftime(_STR_TIME_FMT)
enter_queues = (session
.query(QueueLog.callid,
cast(QueueLog.time, TIMESTAMP).label('time'))
.filter(and_(QueueLog.event == 'ENTERQUEUE',
between(QueueLog.time, start, end))))
enter_map = {}
for enter_queue in enter_queues.all():
enter_map[enter_queue.callid] = enter_queue.time
if enter_map:
res = (session
.query(QueueLog.event,
QueueLog.queuename,
cast(QueueLog.time, TIMESTAMP).label('time'),
QueueLog.callid,
QueueLog.data3)
.filter(and_(QueueLog.event == match,
QueueLog.callid.in_(enter_map))))
for r in res.all():
yield {
'callid': r.callid,
'queue_name': r.queuename,
'time': enter_map[r.callid],
'event': event,
'talktime': 0,
'waittime': int(r.data3) if r.data3 else 0
}
开发者ID:jaunis,项目名称:xivo-dao,代码行数:33,代码来源:queue_log_dao.py
示例9: post_list
def post_list(self, fromDate, toDate):
return PostId.query.filter(
between(
PostId.publish_date,
fromDate, toDate)).order_by(
desc(PostId.publish_date),
PostId.publish_id).all()
开发者ID:apoleksicki,项目名称:proverbaro,代码行数:7,代码来源:model.py
示例10: run_test
def run_test(args):
start_date = parser.parse(args.start_date)
end_date = parser.parse(args.end_date)
all_games = (db.session.query(Game)
.filter(between(Game.game_day, start_date, end_date))).all()
numb_success = 0
numb_failed = 0
for game in all_games:
computed_spread = compute_spread(
host=game.home_team,
visitor=game.visitor,
date=str(game.game_day),
)
if abs(computed_spread - game.spread) <= 3:
continue
actual_spread = game.actual_spread
success = False
if computed_spread < game.spread and actual_spread < game.spread:
success = True
elif computed_spread > game.spread and actual_spread > game.spread:
success = True
if success:
numb_success += 1
else:
numb_failed += 1
print game
print computed_spread, game.spread, actual_spread, success
print 'total games ', len(all_games)
print numb_success, numb_failed
开发者ID:gvenkataraman,项目名称:experiments,代码行数:32,代码来源:nearest_neighbor.py
示例11: filter_ancestors
def filter_ancestors(self, and_self=False):
"The same as :meth:`filter_descendants` but filters ancestor nodes."
options = self._tree_options
obj = self._get_obj()
#self._get_session_and_assert_flushed(obj)
# Restrict ourselves to just those nodes within the same tree:
tree_id = getattr(obj, self.tree_id_field.name)
filter_ = self.tree_id_field == tree_id
alias = sqlalchemy.alias(options.table)
left_field = self.left_field
filter_ &= sqlalchemy.between(
getattr(alias.c, self.left_field.name),
self.left_field, self.right_field)
filter_ &= getattr(alias.c, self.pk_field.name) == \
getattr(obj, self.pk_field.name)
if not and_self:
filter_ &= self.pk_field != getattr(obj, self.pk_field.name)
# WHERE tree_id = <node.tree_id> AND <node.path> LIKE path || '%'
#filter_ = (self.tree_id_field == tree_id) \
# & sqlalchemy.sql.expression.literal(
# path, sqlalchemy.String
# ).like(options.path_field + '%')
#if and_self:
# filter_ &= self.depth_field <= depth
#else:
# filter_ &= self.depth_field < depth
return filter_
开发者ID:adurieux,项目名称:sqlalchemy-orm-tree,代码行数:32,代码来源:instance.py
示例12: get_log_entries
def get_log_entries():
# Limits and offsets
if 'limit' in request.args:
limit = int(request.args['limit'])
if limit > 100:
limit = 100
else:
limit = 10
if 'offset' in request.args:
offset = int(request.args['offset'])
else:
offset = 0
if 'organization_id' in request.args:
query = LogEntry.query\
.filter(LogEntry.organization_id == request.args['organization_id'])\
.filter(sa.not_(LogEntry.retired))
else:
query = LogEntry.query.filter(sa.not_(LogEntry.retired))
if 'search' in request.args:
if request.args["search"]:
slist = []
clist=[]
for s in request.args["search"].split():
slist.append(sa.or_(sa.func.upper(LogEntry.entry_title).like("%" + s.upper() + "%")))
slist.append(sa.or_(sa.func.upper(LogEntry.entry_description).like("%" + s.upper() + "%")))
clist.append(sa.or_(sa.func.upper(LogEntryComment.comment).like("%" + s.upper() + "%")))
query=LogEntry.query\
.filter(sa.or_(*slist) | LogEntry.id.in_(LogEntryComment.query.with_entities("log_entry_id")\
.filter(sa.or_(*clist))\
.filter(LogEntryComment.retired == False)))\
.filter (LogEntry.user_id.in_(User.query.with_entities("id")\
.filter(sa.func.upper(User.first_name).in_(request.args["search"].upper().split()) | sa.func.upper(User.last_name).in_(request.args["search"].upper().split()))))\
.filter(LogEntry.organization_id == request.args['organization_id'])\
.filter(sa.not_(LogEntry.retired))
if query.count()==0:
query=LogEntry.query\
.filter(sa.or_(*slist) | LogEntry.id.in_(LogEntryComment.query.with_entities("log_entry_id")\
.filter(sa.or_(*clist))\
.filter(LogEntryComment.retired == False)))\
.filter(LogEntry.organization_id == request.args['organization_id'])\
.filter(LogEntry.retired == False)
if 'daterange' in request.args and request.args["daterange"]:
rdates=request.args["daterange"].split('_')
query.whereclause.append(sa.between(LogEntry.entry_time,rdates[0] + " 00:00:00.000",rdates[1] + " 11:59:59.000"))
log_entries = query.order_by(sa.desc(LogEntry.entry_time)).limit(limit).offset(offset).all()
if not log_entries:
return jsonify({}), 204
log_entries = [l.to_json() for l in log_entries]
return jsonify(log_entries=log_entries)
开发者ID:RyanMaciel,项目名称:ooi-ui-services,代码行数:59,代码来源:operator_event.py
示例13: get_last_24_hour_post_count
def get_last_24_hour_post_count(self):
qry = Posts.query.filter(
and_(
Posts.thread.in_(Threads.query.with_entities(Threads.id).filter_by(fk_forum=self.id)),
between(Posts.timestamp, datetime.utcnow() - timedelta(days=1), datetime.utcnow())
),
)
return qry.count()
开发者ID:agmenut,项目名称:sabotage2_revamp,代码行数:8,代码来源:models.py
示例14: traffic_report_filter
def traffic_report_filter(cls, contributor_id):
query = cls.query.filter(cls.status == "published")
query = query.join(Disruption)
query = query.filter(Disruption.contributor_id == contributor_id)
query = query.filter(
between(get_current_time(), Disruption.start_publication_date, Disruption.end_publication_date)
)
return query.all()
开发者ID:vincentlepot,项目名称:Chaos,代码行数:8,代码来源:models.py
示例15: processXML
def processXML(self): # records represents whatever element you're tacking more onto, like entry_exits or clients
if settings.DEBUG:
print "processXML: Appending XML to Base Record"
self.root_element = self.createDoc() #makes root element with XML header attributes
#print '==== root created'
clients = self.createClients(self.root_element) # JCS - tag is <clientRecords> Only node under clients is <Client>
print '==== clientRecords created'
if self.options.reported == True:
Persons = self.session.query(dbobjects.Person).filter(dbobjects.Person.reported == True)
elif self.options.unreported == True:
Persons = self.session.query(dbobjects.Person).filter(or_(dbobjects.Person.reported == False, dbobjects.Person.reported == None))
elif self.options.reported == None:
Persons = self.session.query(dbobjects.Person)
# Now apply the dates to the result set.
if self.options.alldates == None:
Persons = Persons.filter(between(dbobjects.Person.person_id_date_collected, self.options.startDate, self.options.endDate))
pulledConfigID = 0 # JCS Only pull it if it has changed
for self.person in Persons:
#print "person is: ", self.person
export = self.person.fk_person_to_export # this is a single record because:
# person has: export_index_id = Column(Integer, ForeignKey('export.id'))
# export has: fk_export_to_person = relationship('Person', backref='fk_person_to_export')
# Therefore there are multiple persons to one export - but only one export to a person
#print "==== export before pullconfig:", export.id, export # JCS
if pulledConfigID != export.id:
self.pullConfiguration(export.export_id)
pulledConfigID = export.id
self.ph = self.person.fk_person_to_person_historical # JCS This is a list of records
self.race = self.person.fk_person_to_races
self.site_service_part = self.person.site_service_participations # JCS
#information_releases = self.person.fk_person_to_release_of_information # JCS a set
#self.service_event = self.person.fk_person_to_service_event
# Instead of generating a number (above), use the client number that is already provided in the legacy system
# or
# self.iDG.initializeSystemID(self.person.id)
self.sysID = self.person.id # JCS beware set self.sysID
#if settings.DEBUG:
#print "self.person is:", self.person
if self.person: # and not self.person.person_legal_first_name_unhashed+self.person.person_legal_last_name_unhashed == None:
self.client = self.createClient(clients) # JCS - no clients in svc5? yes as clientRecords
# Sub can be: active, anonymous, firstName, suffix, unnamedClient, alias, middleName, childEntryExit,
# childReleaseOfInfo, childGoal
self.customizeClient(self.client)
self.customizeClientPersonalIdentifiers(self.client, self.person)
self.assessment_data = self.createAssessmentData(self.client) # JCS New - self?
self.customizeAssessmentData(self.assessment_data)
if self.site_service_part: # JCS 21 Dec 2012
self.child_entry_exit = self.createChildEntryExit(self.client)
for ssp in self.site_service_part:
self.createEntryExit(self.child_entry_exit, ssp)
# update the reported flag for person (This needs to be applied to all objects that we are getting data from)
self.updateReported(self.person)
开发者ID:211tbc,项目名称:synthesis,代码行数:58,代码来源:svcpointxml5writer.py
示例16: run
def run(long,lat):
lat_min=lat-SEARCH_R
lat_max=lat+SEARCH_R
long_min=long-SEARCH_R
long_max=long+SEARCH_R
geokey_min=CombineGeo(long_min,lat_min)
geokey_max=CombineGeo(long_max,lat_max)
with dbconfig.Session() as session:
users=session.query(UserGeoPosition).filter(
and_(
between(UserGeoPosition.geokey,geokey_min,geokey_max),
between(UserGeoPosition.lat,lat_min,lat_max),
between(UserGeoPosition.long,long_min,long_max)
)
).all()
ulist=[]
for user in users:
ulist.append(user.toJson())
return Res({"users":ulist})
开发者ID:MeTrina,项目名称:websockframework,代码行数:19,代码来源:search.py
示例17: hours_with_calls
def hours_with_calls(session, start, end):
start = start.strftime(_STR_TIME_FMT)
end = end.strftime(_STR_TIME_FMT)
hours = (session
.query(distinct(func.date_trunc('hour', cast(QueueLog.time, TIMESTAMP))).label('time'))
.filter(between(QueueLog.time, start, end)))
for hour in hours.all():
yield hour.time
开发者ID:jaunis,项目名称:xivo-dao,代码行数:10,代码来源:queue_log_dao.py
示例18: prune_record
def prune_record(event_id, max_event_id, event_range, storage):
last = event_id + event_range
if last > max_event_id:
last = max_event_id
stmt = Event.__table__.delete().where(between(Event.id, event_id, last))
storage._engine.execute(stmt)
logger.info('Pruning {0} - {1}'.format(event_id, last))
storage.session.commit()
return last
开发者ID:seatgeek,项目名称:cronq,代码行数:10,代码来源:pruner.py
示例19: listRecords
def listRecords(self, metadataPrefix, set=None, cursor=None, from_=None,
until=None, batch_size=None):
'''Show a selection of records, basically lists all datasets.
'''
data = []
packages = []
if not set:
if not from_ and not until:
packages = Session.query(Package).all()
if from_:
packages = Session.query(Package).\
filter(PackageRevision.revision_timestamp > from_).all()
if until:
packages = Session.query(Package).\
filter(PackageRevision.revision_timestamp < until).all()
if from_ and until:
packages = Session.query(Package).filter(
between(PackageRevision.revision_timestamp,from_,until)).\
all()
else:
group = Group.get(set)
if group:
packages = group.active_packages()
if from_ and not until:
packages = packages.\
filter(PackageRevision.revision_timestamp > from_).\
all()
if until and not from_:
packages = packages.\
filter(PackageRevision.revision_timestamp < until).\
all()
if from_ and until:
packages = packages.filter(
between(PackageRevision.revision_timestamp,
from_,
until))\
.all()
if cursor:
packages = packages[:cursor]
for res in packages:
data.append(self._record_for_dataset(res))
return data
开发者ID:ilrt,项目名称:ckanext-oaipmh,代码行数:42,代码来源:oaipmh_server.py
示例20: _build
def _build(self):
query = db.session.\
query(Project, Task, TimeEntry).\
join(Project.tasks).\
join(Task.time_entries).\
filter(Project.client_id == self.client.id).\
filter(between(TimeEntry.added_at, *self.date_range)).\
order_by(Project.name.asc())
dates = [day.date() for day in
arrow.Arrow.range('day', *self.date_range)]
daily_totals = OrderedDict()
for date in dates:
daily_totals[date] = datetime.timedelta()
projects = []
for project, group in itertools.groupby(query, lambda row: row[0]):
project_total = datetime.timedelta()
daily_data = OrderedDict()
for date in dates:
daily_data[date] = {
'total': datetime.timedelta(),
'tasks': set(),
}
for row in group:
task_title = row[1].title
date = localize(row[2].added_at).date()
duration = row[2].duration
daily_data[date]['tasks'].add(task_title)
daily_data[date]['total'] += duration
project_total += duration
daily_totals[date] += duration
projects.append({
'project': project,
'time': list(daily_data.values()),
'total': project_total,
})
timesheet = {
'client': self.client,
'date_range': {
'beg': self.date_range[0],
'end': self.date_range[1],
},
'projects': projects,
'totals': {
'time': list(daily_totals.values()),
'total': sum(daily_totals.values(), datetime.timedelta()),
},
}
return timesheet
开发者ID:xuhcc,项目名称:airy,代码行数:54,代码来源:report.py
注:本文中的sqlalchemy.between函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论