本文整理汇总了Python中sqlalchemy.sql.or_函数的典型用法代码示例。如果您正苦于以下问题:Python or_函数的具体用法?Python or_怎么用?Python or_使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了or_函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_table_from_database
def get_table_from_database(cls, db, name_or_id, session=None, d_vid=None):
from databundles.orm import Table
import sqlalchemy.orm.exc
from sqlalchemy.sql import or_, and_
if not name_or_id:
raise ValueError("Got an invalid argument: {}".format(name_or_id))
try:
if d_vid:
return (
session.query(Table)
.filter(
and_(
Table.d_vid == d_vid,
or_(Table.vid == name_or_id, Table.id_ == name_or_id, Table.name == name_or_id),
)
)
.one()
)
else:
return (
session.query(Table)
.filter(or_(Table.vid == name_or_id, Table.id_ == name_or_id, Table.name == name_or_id))
.one()
)
except sqlalchemy.orm.exc.NoResultFound as e:
raise sqlalchemy.orm.exc.NoResultFound("No table for name_or_id: {}".format(name_or_id))
开发者ID:kball,项目名称:databundles,代码行数:32,代码来源:schema.py
示例2: get_committee
def get_committee(self, args, page_num, per_page, committee_id, candidate_id):
if committee_id is not None:
committees = CommitteeDetail.query
committees = committees.filter_by(**{'committee_id': committee_id})
if candidate_id is not None:
committees = CommitteeDetail.query.join(CandidateCommitteeLink).filter(CandidateCommitteeLink.candidate_id==candidate_id)
for argname in ['designation', 'organization_type', 'committee_type']:
if args.get(argname):
if ',' in args[argname]:
committees = committees.filter(getattr(Committee, argname).in_(args[argname].split(',')))
else:
committees = committees.filter(getattr(Committee, argname)==args[argname])
# default year filtering
if args.get('year') is None:
earliest_year = int(sorted(default_year().split(','))[0])
# still going or expired after the earliest year we are looking for
committees = committees.filter(or_(extract('year', CommitteeDetail.expire_date) >= earliest_year, CommitteeDetail.expire_date == None))
# Should this handle a list of years to make it consistent with /candidate ?
elif args.get('year') and args['year'] != '*':
# before expiration
committees = committees.filter(or_(extract('year', CommitteeDetail.expire_date) >= int(args['year']), CommitteeDetail.expire_date == None))
# after origination
committees = committees.filter(extract('year', CommitteeDetail.original_registration_date) <= int(args['year']))
count = committees.count()
return count, committees.order_by(CommitteeDetail.name).paginate(page_num, per_page, False).items
开发者ID:LindsayYoung,项目名称:openFEC,代码行数:32,代码来源:committees.py
示例3: by_title
def by_title(cls, keyword, locale=None):
"""Search a music with its title and localized title.
Whitespaces are ignored.
"""
"""# Search a music its title exactly matched to keyword"""
music = session.query(cls).filter(cls.title == keyword).first()
if music: return music
"""# Search a music its title includes keyword"""
reduced_keyword = keyword.replace(' ', '')
reduced_title = func.replace(cls.title, ' ', '')
music = session.query(cls).\
filter(or_(
cls.title.contains(keyword),
reduced_title.contains(reduced_keyword))).\
limit(1).first()
if music: return music
"""# Search a music its localized title includes keyword"""
if locale is None: return None
reduced_text = func.replace(cls.Localization.text, ' ', '')
music_data = session.query(cls.Localization).\
filter(cls.Localization.locale == locale).\
filter(or_(
cls.Localization.text.contains(keyword),
reduced_text.contains(reduced_keyword))).\
limit(1).first()
if music_data is not None:
return cls.by(music_id=music_data.music_id)
开发者ID:JubeatInfo,项目名称:JubeatInfo2,代码行数:31,代码来源:__init__.py
示例4: system_utilisation
def system_utilisation(system, start, end):
retval = dict((k, datetime.timedelta(0)) for k in
['recipe', 'manual', 'idle_automated', 'idle_manual',
'idle_broken', 'idle_removed'])
if end <= system.date_added:
return retval
if start <= system.date_added:
start = system.date_added
status_durations = system.dyn_status_durations\
.filter(and_(SystemStatusDuration.start_time < end,
or_(SystemStatusDuration.finish_time >= start,
SystemStatusDuration.finish_time == None)))\
.order_by(SystemStatusDuration.start_time).all()
reservations = system.dyn_reservations\
.filter(and_(Reservation.start_time < end,
or_(Reservation.finish_time >= start,
Reservation.finish_time == None)))\
.order_by(Reservation.start_time).all()
prev_finish = start
for reservation in reservations:
# clamp reservation start and finish to be within the period
clamped_res_start = max(reservation.start_time, start)
clamped_res_finish = min(reservation.finish_time or end, end)
# first, do the gap from the end of the previous reservation to the
# start of this one
update_status_durations_in_period(retval, status_durations,
prev_finish, clamped_res_start)
# now do this actual reservation
retval[reservation.type] += clamped_res_finish - clamped_res_start
prev_finish = clamped_res_finish
# lastly, do the gap from the end of the last reservation to the end of the
# reporting period
update_status_durations_in_period(retval, status_durations,
prev_finish, end)
return retval
开发者ID:beaker-project,项目名称:beaker,代码行数:35,代码来源:utilisation.py
示例5: test_single_query
def test_single_query(self):
search = Search(self.Donkey, "people", self.session)
session = self.Donkey.Session()
people_class = self.Donkey.get_class("people")
email_class = self.Donkey.get_class("email")
assert set(QueryFromStringParam(search, 'name < ?', pos_args = ["popp02"]).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).filter(people_class.name < u"popp02").all())) == set()
assert set(QueryFromStringParam(search, 'name < ? and email.email like ?', pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).join(["email"]).filter(and_(people_class.name < u"popp02", email_class.email.like(u"popi%"))).all())) == set()
assert set(QueryFromStringParam(search, "name < ? and not email.email like ?", pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).outerjoin(["email"]).\
filter(and_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%"))))).all())) == set()
assert set(QueryFromStringParam(search, "name < ? or not email.email like ?", pos_args = ["popp02", "popi%"]).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).outerjoin(["email"]).\
filter(or_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%"))))).all())) == set()
assert set(QueryFromStringParam(search, "not (name < ? or not email.email like ?) ", pos_args = ["popp02", "popi%"]
).add_conditions(base_query).all()).symmetric_difference(
set(session.query(people_class.id).outerjoin(["email"]).\
filter(not_(or_(people_class.name < u"popp02", or_(email_class.email == None, not_(email_class.email.like(u"popi%")))))).all())) == set()
开发者ID:kindly,项目名称:reformed,代码行数:28,代码来源:search_param_test.py
示例6: get_tagged_addrs
def get_tagged_addrs(user):
"""Generate a list of tagged addresses for a user"""
query1 = Session.query(Message.to_address)
query2 = Session.query(Message.from_address)
addrs = [addr.address for addr in user.addresses
if '+*' not in addr.address and '-*' not in addr.address]
addrs.append(user.email)
tagged_addrs = [addr.address for addr in user.addresses
if '+*' in addr.address or '-*' in addr.address]
if tagged_addrs:
tagged_opts1 = func._(or_(*[Message.to_address
.like(TAGGED_RE.sub(r'\g<one>%', taddr))
for taddr in tagged_addrs]))
tagged_opts2 = func._(or_(*[Message.from_address
.like(TAGGED_RE.sub(r'\g<one>%', taddr))
for taddr in tagged_addrs]))
query1 = query1.filter(func._(
or_(tagged_opts1, Message.to_address.in_(addrs))))
query2 = query2.filter(func._(
or_(tagged_opts2, Message.from_address.in_(addrs))))
else:
query1 = query1.filter(Message.to_address.in_(addrs))
query2 = query2.filter(Message.from_address.in_(addrs))
query1 = query1.distinct()
query2 = query2.distinct()
to_addrs = [val.to_address for val in query1]
from_addrs = [val.from_address for val in query2]
all_addrs = set(to_addrs + from_addrs)
return [str(crc32(val)) for val in all_addrs]
开发者ID:baruwaproject,项目名称:baruwa2,代码行数:29,代码来源:query.py
示例7: __init__
def __init__(self, dbsession, user):
self.dbsession = dbsession
self.user = user
self.query = self.dbsession.query(
func.count(Message.id).label('total'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.nameinfected == 0, Message.otherinfected == 0,
Message.spam == 0, Message.highspam == 0), 1)],
else_=0)).label('clean'),
func.sum(case([(Message.virusinfected > 0, 1)],
else_=0)).label('virii'),
func.sum(case([(and_(Message.highspam == 0,
Message.spam == 0, Message.virusinfected == 0,
or_(Message.nameinfected > 0, Message.otherinfected > 0)), 1)],
else_=0)).label('infected'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
or_(Message.spam > 0, Message.highspam > 0)), 1)],
else_=0)).label('spam'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
Message.spam > 0, Message.highspam == 0), 1)],
else_=0)).label('lowspam'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
Message.highspam > 0), 1)],
else_=0)).label('highspam'))\
.filter(Message.date == now().date())
开发者ID:TetraAsh,项目名称:baruwa2,代码行数:28,代码来源:query.py
示例8: filter
def filter(self):
"Set filters"
if self.user.is_domain_admin:
dquery = self.dbsession.query(Domain.name).join(downs,
(oa, downs.c.organization_id == oa.c.organization_id))\
.filter(Domain.status == True)\
.filter(oa.c.user_id == self.user.id).all()
domains = [domain.name for domain in dquery]
if not domains:
domains.append('xx')
if self.direction and self.direction == 'in':
self.query = self.query\
.filter(self.model.to_domain.in_(domains))
elif self.direction and self.direction == 'out':
self.query = self.query\
.filter(self.model.from_domain.in_(domains))
else:
self.query = self.query.filter(
func._(or_(self.model.to_domain.in_(domains),
self.model.from_domain.in_(domains))))
if self.user.is_peleb:
addrs = [addr.address for addr in self.user.addresses]
addrs.append(self.user.email)
if self.direction and self.direction == 'in':
self.query = self.query\
.filter(self.model.to_address.in_(addrs))
elif self.direction and self.direction == 'out':
self.query = self.query\
.filter(self.model.from_address.in_(addrs))
else:
self.query = self.query\
.filter(func._(
or_(self.model.to_address.in_(addrs),
self.model.from_address.in_(addrs))))
return self.query
开发者ID:TetraAsh,项目名称:baruwa2,代码行数:35,代码来源:query.py
示例9: search
def search():
search_res= request.args.get('query')
search_res=search_res.replace("'","")
search_res=search_res.replace("!","")
search_res=search_res.replace("(","")
search_res=search_res.replace(")","")
search_res=search_res.replace(":","")
temp_val = search_res.split(" ")
search_list=[]
for search in temp_val:
if search.isdigit():
search_data = conn.execute(select([my_cards]).where(or_(
func.to_tsvector('english', my_cards.c.text).match(search, postgresql_regconfig='english'),
func.to_tsvector('english', my_cards.c.name).match(search, postgresql_regconfig='english'),
func.to_tsvector('english', my_cards.c.cardType).match(search, postgresql_regconfig='english'),
func.to_tsvector('english', my_cards.c.subType).match(search, postgresql_regconfig='english'),
func.to_tsvector('english', my_cards.c.family).match(search, postgresql_regconfig='english'),
my_cards.c.attack==int(search),
my_cards.c.defense==int(search))))
else:
search_data = conn.execute(select([my_cards]).where(or_(
func.to_tsvector('english', my_cards.c.text).match(search, postgresql_regconfig='english'),
func.to_tsvector('english', my_cards.c.name).match(search, postgresql_regconfig='english'),
func.to_tsvector('english', my_cards.c.cardType).match(search, postgresql_regconfig='english'),
func.to_tsvector('english', my_cards.c.subType).match(search, postgresql_regconfig='english'),
func.to_tsvector('english', my_cards.c.family).match(search, postgresql_regconfig='english'))))
search_list+=format_list(search_data)
return render_template('searchTemplate.html',search_data=search_list)
开发者ID:mlaux,项目名称:cs373-idb,代码行数:29,代码来源:__init__.py
示例10: query_database
def query_database(query, raven_client):
macs = [lookup.mac for lookup in query.wifi]
if not macs: # pragma: no cover
return []
result = []
today = util.utcnow().date()
temp_blocked = today - TEMPORARY_BLOCKLIST_DURATION
try:
load_fields = ('lat', 'lon', 'radius')
shards = defaultdict(list)
for mac in macs:
shards[WifiShard.shard_model(mac)].append(mac)
for shard, shard_macs in shards.items():
rows = (
query.session.query(shard)
.filter(shard.mac.in_(shard_macs))
.filter(shard.lat.isnot(None))
.filter(shard.lon.isnot(None))
.filter(or_(
shard.block_count.is_(None),
shard.block_count <
PERMANENT_BLOCKLIST_THRESHOLD))
.filter(or_(
shard.block_last.is_(None),
shard.block_last < temp_blocked))
.options(load_only(*load_fields))
).all()
result.extend(list(rows))
except Exception:
raven_client.captureException()
return result
开发者ID:therewillbecode,项目名称:ichnaea,代码行数:34,代码来源:wifi.py
示例11: check_geometry
def check_geometry(r, feature, o):
# we need both the "original" and "new" geometry to be
# within the restriction area
geom_attr, srid = self._get_geom_col_info(layer)
geom_attr = getattr(o, geom_attr)
geom = feature.geometry
allowed = DBSession.query(func.count(RestrictionArea.id))
allowed = allowed.join(RestrictionArea.roles)
allowed = allowed.join(RestrictionArea.layers)
allowed = allowed.filter(RestrictionArea.readwrite.is_(True))
allowed = allowed.filter(Role.id == self.request.user.role.id)
allowed = allowed.filter(Layer.id == layer.id)
allowed = allowed.filter(or_(
RestrictionArea.area.is_(None),
RestrictionArea.area.ST_Contains(geom_attr)
))
spatial_elt = None
if geom and not isinstance(geom, geojson.geometry.Default):
shape = asShape(geom)
spatial_elt = from_shape(shape, srid=srid)
allowed = allowed.filter(or_(
RestrictionArea.area.is_(None),
RestrictionArea.area.ST_Contains(spatial_elt)
))
if allowed.scalar() == 0:
raise HTTPForbidden()
# check is geometry is valid
self._validate_geometry(spatial_elt)
开发者ID:eleu,项目名称:c2cgeoportal,代码行数:29,代码来源:layers.py
示例12: security_cb
def security_cb(r, feature, o):
# we need both the "original" and "new" geometry to be
# within the restriction area
geom_attr, srid = self._get_geom_col_info(layer)
geom_attr = getattr(o, geom_attr)
geom = feature.geometry
allowed = DBSession.query(func.count(RestrictionArea.id))
allowed = allowed.join(RestrictionArea.roles)
allowed = allowed.join(RestrictionArea.layers)
allowed = allowed.filter(RestrictionArea.readwrite == True)
allowed = allowed.filter(Role.id == self.request.user.role.id)
allowed = allowed.filter(Layer.id == layer.id)
none = None # the only way I found to remove the pep8 warning
allowed = allowed.filter(or_(
RestrictionArea.area == none,
RestrictionArea.area.gcontains(geom_attr)
))
if geom and not isinstance(geom, geojson.geometry.Default):
shape = asShape(geom)
spatial_elt = WKBSpatialElement(buffer(shape.wkb), srid=srid)
allowed = allowed.filter(or_(
RestrictionArea.area == none,
RestrictionArea.area.gcontains(spatial_elt)
))
if allowed.scalar() == 0:
raise HTTPForbidden()
开发者ID:CDTRH,项目名称:c2cgeoportal,代码行数:26,代码来源:layers.py
示例13: __init__
def __init__(self, dbsession, user):
self.dbsession = dbsession
self.user = user
self.query = self.dbsession.query(
func.count(Message.id).label('total'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.nameinfected == 0, Message.otherinfected == 0,
Message.spam == 0, Message.highspam == 0), 1)],
else_=0)).label('clean'),
func.sum(case([(Message.virusinfected > 0, 1)],
else_=0)).label('virii'),
func.sum(case([(and_(Message.highspam == 0,
Message.spam == 0, Message.virusinfected == 0,
or_(Message.nameinfected > 0, Message.otherinfected > 0)), 1)],
else_=0)).label('infected'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
or_(Message.spam > 0, Message.highspam > 0)), 1)],
else_=0)).label('spam'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
Message.spam > 0, Message.highspam == 0), 1)],
else_=0)).label('lowspam'),
func.sum(case([(and_(Message.virusinfected == 0,
Message.otherinfected == 0, Message.nameinfected == 0,
Message.highspam > 0), 1)],
else_=0)).label('highspam'))\
.filter(Message.timestamp.between(
ustartday(self.user.timezone),
uendday(self.user.timezone)))
开发者ID:baruwaproject,项目名称:baruwa2,代码行数:30,代码来源:query.py
示例14: list
def list(self, datasets=None, locations = None, key='vid'):
"""
:param datasets: If specified, must be a dict, which the internal dataset data will be
put into.
:return: vnames of the datasets in the library.
"""
from ..orm import Dataset, Partition
from .files import Files
from sqlalchemy.sql import or_
if datasets is None:
datasets = {}
q1 = (self.session.query(Dataset, Partition).join(Partition)
.filter(Dataset.vid != ROOT_CONFIG_NAME_V))
q2 = (self.session.query(Dataset)
.filter(Dataset.vid != ROOT_CONFIG_NAME_V))
if locations:
if not isinstance(locations,(list, tuple)):
locations=[locations]
terms = [ Dataset.location == location for location in locations]
q1 = q1.filter(or_(*terms))
q2 = q2.filter(or_(*terms))
for d,p in (q1.all() + [ (d,None) for d in q2.all()]):
ck = getattr(d.identity, key)
if ck not in datasets:
dsid = d.identity
datasets[ck] = dsid
else:
dsid = datasets[ck]
# The dataset locations are linked to the identity locations
dsid.locations.set(d.location)
if p and ( not datasets[ck].partitions or p.vid not in datasets[ck].partitions):
pident = p.identity
pident.locations.set(d.location)
datasets[ck].add_partition(pident)
if d.location == Files.TYPE.SOURCE:
files = Files(self)
f = files.query.type(Files.TYPE.SOURCE).ref(dsid.vid).one_maybe
if f:
dsid.bundle_state = f.state
return datasets
开发者ID:kball,项目名称:ambry,代码行数:59,代码来源:database.py
示例15: get_by_date
def get_by_date(
cls, session, calendar, start_date, stop_date,
full_day=None, no_recursive=False,
name=None):
""" Retrieve the list of meetings between two date.
We include the start date and exclude the stop date.
:kwarg full_day: Can be True, False or None. True will
restrict to only meetings which take up the full day. False will
only select meetings which do not take the full day. None will
not restrict. Default to None
:kwarg no_recursive: a boolean specifying whether the list of
meetings returned should exclude recursive meetings.
Default to False, if True recursive meetings will be excluded.
:kwarg name: Defaults to None, if set the meetings returned will be
filtered for this string in their name.
"""
query = session.query(
cls
).filter(
Meeting.calendar == calendar
).filter(
or_(
and_(
(Meeting.meeting_date >= start_date),
(Meeting.meeting_date <= stop_date),
),
and_(
(Meeting.meeting_date_end >= start_date),
(Meeting.meeting_date_end <= stop_date),
),
and_(
(Meeting.meeting_date <= start_date),
(Meeting.meeting_date_end >= stop_date),
),
)
).order_by(
Meeting.meeting_date,
Meeting.meeting_time_start,
Meeting.meeting_name)
if full_day is not None:
query = query.filter(Meeting.full_day == full_day)
if no_recursive:
query = query.filter(Meeting.recursion_frequency == None)
else:
query = query.filter(
or_(
(Meeting.recursion_ends >= Meeting.meeting_date),
(Meeting.recursion_frequency == None),
)
)
if name:
query = query.filter(
Meeting.meeting_name.ilike('%%%s%%' % name)
)
return query.all()
开发者ID:anjali-92,项目名称:fedocal,代码行数:59,代码来源:model.py
示例16: createView
def createView(self):
# make sure the relevant database tables exist!
metadata = Base.metadata
metadata.create_all(self.db.engine, checkfirst=True)
# filter indexes
catalog = self.env.catalog.index_catalog
xmlindex_list = catalog.getIndexes(package_id='seismology',
resourcetype_id='station')
filter = ['network_id', 'location_id', 'station_id', 'channel_id',
'latitude', 'longitude', 'start_datetime', 'end_datetime']
xmlindex_list = [x for x in xmlindex_list if x.label in filter]
if not xmlindex_list:
return
# build up query
query, joins = catalog._createIndexView(xmlindex_list, compact=True)
options = [
sql.functions.max(WaveformChannel.endtime).label("latest_activity"),
# XXX: not UTC!!!!
(sql.func.now() - sql.functions.max(WaveformChannel.endtime)).label("latency"),
(sql.literal_column("end_datetime.keyval") == None).label("active"),
sql.func.random().label("random"),
sql.func.GeomFromText(
sql.text("'POINT(' || longitude.keyval || ' ' || " + \
"latitude.keyval || ')', 4326")).label('geom')
]
for option in options:
query.append_column(option)
oncl = WaveformChannel.network == sql.literal_column("network_id.keyval")
oncl = sql.and_(oncl, WaveformChannel.station == sql.literal_column("station_id.keyval"))
oncl = sql.and_(oncl, WaveformChannel.channel == sql.literal_column("channel_id.keyval"))
oncl = sql.and_(oncl, sql.or_(
WaveformChannel.location == sql.literal_column("location_id.keyval"),
sql.and_(
WaveformChannel.location == None,
sql.literal_column("location_id.keyval") == None
)))
oncl = sql.and_(oncl, WaveformChannel.endtime > sql.literal_column("start_datetime.keyval"))
oncl = sql.and_(oncl, sql.or_(
WaveformChannel.endtime <= sql.literal_column("end_datetime.keyval"),
sql.literal_column("end_datetime.keyval") == None
))
#joins = joins.join("default_waveform_channels", onclause=oncl)
query = query.select_from(joins).group_by(
document_tab.c['id'],
document_meta_tab.c['datetime'],
sql.literal_column("station_id.keyval"),
sql.literal_column("channel_id.keyval"),
sql.literal_column("network_id.keyval"),
sql.literal_column("location_id.keyval"),
sql.literal_column("latitude.keyval"),
sql.literal_column("longitude.keyval"),
sql.literal_column("start_datetime.keyval"),
sql.literal_column("end_datetime.keyval"))
return util.compileStatement(query)
开发者ID:barsch,项目名称:seishub.plugins.exupery,代码行数:57,代码来源:seismic.py
示例17: between
def between(column):
return sql.and_(
sql.or_(
column >= sql.bindparam("start_date"),
sql.bindparam("start_date") == None),
sql.or_(
column <= sql.bindparam("end_date"),
sql.bindparam("end_date") == None),
)
开发者ID:kohsah,项目名称:bungeni-portal,代码行数:9,代码来源:daterange.py
示例18: make_sa_expression
def make_sa_expression(self, search, root_table):
self.search = search
self.root_table = root_table
if self.table:
table = self.table
else:
table = root_table
self.table_class = self.search.name_to_alias[table]
database = self.search.database
# FIXME This hack gets the search working again but
# should be fixed further back in the code
if self.search.aliased_name_path[table] == 'root':
rtable = database[table]
else:
rtable = database[self.search.aliased_name_path[table].table]
if self.field == "id":
self.type = sa.Integer
else:
self.type = rtable.columns[self.field].type
self.parsed_values = []
self.parse_values()
field = getattr(self.table_class, self.field)
table_class = self.table_class
val = self.parsed_values
if self.operator == "<":
return or_(field < val[0], table_class.id == None)
if self.operator == "<=":
return or_(field <= val[0], table_class.id == None)
if self.operator == "<>":
return or_(field <> val[0], table_class.id == None)
if self.operator == "=":
return and_( field == val[0], table_class.id <> None)
if self.operator == ">":
return and_(field > val[0], table_class.id <> None)
if self.operator == ">=":
return and_(field >= val[0], table_class.id <> None)
if self.operator == "in":
return and_(field.in_(list(val)), table_class.id <> None)
if self.operator == "between":
return and_(field.between(val[0], val[1]), table_class.id <> None)
if self.operator == "like":
return and_(field.ilike(val[0]), table_class.id <> None)
if self.operator == "is" and val[0]:
return field == None
if self.operator == "is" and not val[0]:
return field <> None
开发者ID:kindly,项目名称:reformed,代码行数:56,代码来源:search.py
示例19: query_overrides
def query_overrides(request):
db = request.db
data = request.validated
query = db.query(BuildrootOverride)
expired = data.get('expired')
if expired is not None:
if expired:
query = query.filter(BuildrootOverride.expired_date!=None)
else:
query = query.filter(BuildrootOverride.expired_date==None)
packages = data.get('packages')
if packages is not None:
query = query.join(BuildrootOverride.build).join(Build.package)
query = query.filter(or_(*[Package.name==pkg.name for pkg in packages]))
releases = data.get('releases')
if releases is not None:
query = query.join(BuildrootOverride.build).join(Build.release)
query = query.filter(or_(*[Release.name==r.name for r in releases]))
like = data.get('like')
if like is not None:
query = query.join(BuildrootOverride.build)
query = query.filter(or_(*[
Build.nvr.like('%%%s%%' % like)
]))
submitter = data.get('user')
if submitter is not None:
query = query.filter(BuildrootOverride.submitter==submitter)
query = query.order_by(BuildrootOverride.submission_date.desc())
# We can't use ``query.count()`` here because it is naive with respect to
# all the joins that we're doing above.
count_query = query.with_labels().statement\
.with_only_columns([func.count(distinct(BuildrootOverride.id))])\
.order_by(None)
total = db.execute(count_query).scalar()
page = data.get('page')
rows_per_page = data.get('rows_per_page')
pages = int(math.ceil(total / float(rows_per_page)))
query = query.offset(rows_per_page * (page - 1)).limit(rows_per_page)
return dict(
overrides=query.all(),
page=page,
pages=pages,
rows_per_page=rows_per_page,
total=total,
chrome=data.get('chrome'),
display_user=data.get('display_user'),
)
开发者ID:Debjeeti20,项目名称:bodhi,代码行数:56,代码来源:overrides.py
示例20: delete
def delete(self):
db.session.delete(self)
db.session.commit()
# 删除时如果城市没有其他路线取消发布状态
if not MapRelationship.query.filter(or_(MapRelationship.city_from == self.city_from, MapRelationship.city_to == self.city_from)).first():
tmp_city = MapCity.query.filter_by(identify=self.city_from).first()
tmp_city.xor_publish()
if not MapRelationship.query.filter(or_(MapRelationship.city_from == self.city_to, MapRelationship.city_to == self.city_to)).first():
tmp_city = MapCity.query.filter_by(identify=self.city_to).first()
tmp_city.xor_publish()
开发者ID:franciumzh,项目名称:blog,代码行数:10,代码来源:maps.py
注:本文中的sqlalchemy.sql.or_函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论