本文整理汇总了Python中sqlalchemy.or_函数的典型用法代码示例。如果您正苦于以下问题:Python or_函数的具体用法?Python or_怎么用?Python or_使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了or_函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: fix_escaped_quotes
def fix_escaped_quotes(dummy_data, admin_override=False):
if admin_override is False and (not current_user.is_mod()):
return getResponse(error=True, message="You have to have moderator privileges to do that!")
# SELECT * FROM series WHERE title LIKE E'%\\\'%';
bad_title = 0
bad_desc = 0
q = Story.query.filter(or_(Story.title.like(r"%'%"), Story.title.like(r"%’%"), Story.title.like(r"%‘%"), Story.title.like(r"%“%"), Story.title.like(r"%”%")))
items = q.all()
print("Name fixing processing query resulted in %s items" % len(items))
for item in items:
old = item.title
new = old
while any([r"\"" in new, r"\'" in new, "’" in new, "‘" in new, "“" in new, "”" in new]):
new = new.replace(r"\'", "'")
new = new.replace(r'\"', '"')
new = new.replace(r"’", "'")
new = new.replace(r"‘", "'")
new = new.replace(r"“", '"')
new = new.replace(r"”", '"')
have = Story.query.filter(Story.title == new).scalar()
if old != new:
if have:
print("Duplicate item!", (old, new), old==new)
merge_series_ids(have.id, item.id)
else:
print("Fixing title.")
item.title = new
db.session.commit()
bad_title += 1
# FUCK ALL SMART QUOTE BULLSHITS EVER
q = Story.query.filter(or_(Story.description.like(r"%'%"), Story.description.like(r"%’%"), Story.description.like(r"%‘%"), Story.description.like(r"%“%"), Story.description.like(r"%”%")))
items = q.all()
print("Series description processing query resulted in %s items" % len(items))
for item in items:
old = item.description
new = old
while any([r"\"" in new, r"\'" in new, "’" in new, "‘" in new, "“" in new, "”" in new]):
new = new.replace(r"\'", "'")
new = new.replace(r'\"', '"')
new = new.replace(r"’", "'")
new = new.replace(r"‘", "'")
new = new.replace(r"“", '"')
new = new.replace(r"”", '"')
if old != new:
print("Fixing description smart-quotes and over-escapes for series: %s" % item.id)
item.description = new
db.session.commit()
bad_desc += 1
print("Update complete.")
return getResponse("%s main titles, %s descriptions required fixing. %s" % (bad_title, bad_desc, conflicts), error=False)
开发者ID:herp-a-derp,项目名称:tob2,代码行数:60,代码来源:api_handlers_admin.py
示例2: chg_crime
def chg_crime():
# Step Seven: Find updates
dat_crime_table = Table('dat_chicago_crimes_all', Base.metadata,
autoload=True, autoload_with=engine, extend_existing=True)
src_crime_table = Table('src_chicago_crimes_all', Base.metadata,
autoload=True, autoload_with=engine, extend_existing=True)
chg_crime_table = Table('chg_chicago_crimes_all', Base.metadata,
Column('id', Integer, primary_key=True),
extend_existing=True)
chg_crime_table.drop(bind=engine, checkfirst=True)
chg_crime_table.create(bind=engine)
src_cols = [c for c in src_crime_table.columns if c.name not in ['id', 'start_date', 'end_date']]
dat_cols = [c for c in dat_crime_table.columns if c.name not in ['id', 'start_date', 'end_date']]
and_args = []
for s, d in zip(src_cols, dat_cols):
ors = or_(s != None, d != None)
ands = and_(ors, s != d)
and_args.append(ands)
ins = chg_crime_table.insert()\
.from_select(
['id'],
select([src_crime_table.c.id])\
.select_from(src_crime_table.join(dat_crime_table,
src_crime_table.c.id == dat_crime_table.c.id))\
.where(or_(
and_(dat_crime_table.c.current_flag == True,
and_(or_(src_crime_table.c.id != None, dat_crime_table.c.id != None),
src_crime_table.c.id != dat_crime_table.c.id)),
*and_args))
)
conn = engine.contextual_connect()
conn.execute(ins)
return 'Changes found'
开发者ID:EmilyWebber,项目名称:plenario,代码行数:33,代码来源:crime_helpers.py
示例3: _get_relate_filter
def _get_relate_filter(cls, predicate, related_type):
"""Used for filtering by related_assignee.
Returns:
Boolean stating whether such an assignee exists.
"""
# pylint: disable=invalid-name
# The upper case variables are allowed here to shorthand the class names.
Rel = relationship.Relationship
RelAttr = relationship.RelationshipAttr
Person = person.Person
return db.session.query(Rel).join(RelAttr).join(
Person,
or_(and_(
Rel.source_id == Person.id,
Rel.source_type == Person.__name__
), and_(
Rel.destination_id == Person.id,
Rel.destination_type == Person.__name__
))
).filter(and_(
RelAttr.attr_value.contains(related_type),
RelAttr.attr_name == "AssigneeType",
or_(and_(
Rel.source_type == Person.__name__,
Rel.destination_type == cls.__name__,
Rel.destination_id == cls.id
), and_(
Rel.destination_type == Person.__name__,
Rel.source_type == cls.__name__,
Rel.source_id == cls.id
)),
or_(predicate(Person.name), predicate(Person.email))
)).exists()
开发者ID:VinnieJohns,项目名称:ggrc-core,代码行数:34,代码来源:assignable.py
示例4: _apply_filters_to_query
def _apply_filters_to_query(self, query, model, filters, context=None):
if filters:
for key, value in six.iteritems(filters):
column = getattr(model, key, None)
# NOTE(kevinbenton): if column is a hybrid property that
# references another expression, attempting to convert to
# a boolean will fail so we must compare to None.
# See "An Important Expression Language Gotcha" in:
# docs.sqlalchemy.org/en/rel_0_9/changelog/migration_06.html
if column is not None:
if not value:
query = query.filter(sql.false())
return query
if isinstance(column, associationproxy.AssociationProxy):
# association proxies don't support in_ so we have to
# do multiple equals matches
query = query.filter(
or_(*[column == v for v in value]))
else:
query = query.filter(column.in_(value))
elif key == 'shared' and hasattr(model, 'rbac_entries'):
# translate a filter on shared into a query against the
# object's rbac entries
query = query.outerjoin(model.rbac_entries)
rbac = model.rbac_entries.property.mapper.class_
matches = [rbac.target_tenant == '*']
if context:
matches.append(rbac.target_tenant == context.tenant_id)
# any 'access_as_shared' records that match the
# wildcard or requesting tenant
is_shared = and_(rbac.action == 'access_as_shared',
or_(*matches))
if not value[0]:
# NOTE(kevinbenton): we need to find objects that don't
# have an entry that matches the criteria above so
# we use a subquery to exclude them.
# We can't just filter the inverse of the query above
# because that will still give us a network shared to
# our tenant (or wildcard) if it's shared to another
# tenant.
# This is the column joining the table to rbac via
# the object_id. We can't just use model.id because
# subnets join on network.id so we have to inspect the
# relationship.
join_cols = model.rbac_entries.property.local_columns
oid_col = list(join_cols)[0]
is_shared = ~oid_col.in_(
query.session.query(rbac.object_id).
filter(is_shared)
)
query = query.filter(is_shared)
for _nam, hooks in six.iteritems(self._model_query_hooks.get(model,
{})):
result_filter = hooks.get('result_filters', None)
if isinstance(result_filter, six.string_types):
result_filter = getattr(self, result_filter, None)
if result_filter:
query = result_filter(query, filters)
return query
开发者ID:21atlas,项目名称:neutron,代码行数:60,代码来源:common_db_mixin.py
示例5: web_view_user_posts
def web_view_user_posts(request, environment, session, username, page=1,
posts_per_page=15):
"""
returns the <page> <posts_per_page> posts created by <username> as 'posts',
<username>'s user object as 'user', an empty array if there aren't any.
Possible errortype values are:
* 'NoSuchUser' if <username> is unknown to the system.
May raise the following Exceptions:
* Exception('NoSuchUser')
"""
u = get_user_obj(username, session)
own = session.query(post.id).filter(post.owner == u.identity).subquery()
reposts = session.query(post.id).filter(
post.reposters.contains(u.identity)).subquery()
total_num = session.query(model.post).filter(or_(post.id.in_(reposts), post.id.in_(own))).count()
allposts = session.query(model.post).filter(
or_(post.id.in_(reposts), post.id.in_(own))).order_by(desc(post.timestamp)).offset((page-1)*posts_per_page).limit(posts_per_page).all()
posts = [p.downcast() for p in allposts]
return render_template("web_view_user_posts.htmljinja", environment,
posts=posts, page_num=page, total_num=total_num,
posts_per_page=posts_per_page, user=u)
开发者ID:georgemiler,项目名称:wurstgulasch,代码行数:27,代码来源:views.py
示例6: whereInEquipement
def whereInEquipement(self,fullQueryJoin,criteria):
sensorObj = list(filter(lambda x:'FK_Sensor'==x['Column'], criteria))[0]
sensor = sensorObj['Value']
table = Base.metadata.tables['MonitoredSiteEquipment']
joinTable = outerjoin(table,Sensor, table.c['FK_Sensor'] == Sensor.ID)
if sensorObj['Operator'].lower() in ['is','is not'] and sensorObj['Value'].lower() == 'null':
subSelect = select([table.c['FK_MonitoredSite']]
).select_from(joinTable).where(
and_(MonitoredSite.ID== table.c['FK_MonitoredSite']
,or_(table.c['EndDate'] >= func.now(),table.c['EndDate'] == None)
))
if sensorObj['Operator'].lower() == 'is':
fullQueryJoin = fullQueryJoin.where(~exists(subSelect))
else :
fullQueryJoin = fullQueryJoin.where(exists(subSelect))
else :
subSelect = select([table.c['FK_MonitoredSite']]
).select_from(joinTable).where(
and_(MonitoredSite.ID== table.c['FK_MonitoredSite']
,and_(eval_.eval_binary_expr(Sensor.UnicIdentifier,sensorObj['Operator'],sensor)
,or_(table.c['EndDate'] >= func.now(),table.c['EndDate'] == None))
))
fullQueryJoin = fullQueryJoin.where(exists(subSelect))
return fullQueryJoin
开发者ID:FredericBerton,项目名称:ecoReleve-Data,代码行数:26,代码来源:List.py
示例7: _secure_query
def _secure_query(model, *columns):
query = b.model_query(model, columns)
if not issubclass(model, mb.MistralSecureModelBase):
return query
shared_res_ids = []
res_type = RESOURCE_MAPPING.get(model, '')
if res_type:
shared_res = _get_accepted_resources(res_type)
shared_res_ids = [res.resource_id for res in shared_res]
query_criterion = sa.or_(
model.project_id == security.get_project_id(),
model.scope == 'public'
)
# NOTE(kong): Include IN_ predicate in query filter only if shared_res_ids
# is not empty to avoid sqlalchemy SAWarning and wasting a db call.
if shared_res_ids:
query_criterion = sa.or_(
query_criterion,
model.id.in_(shared_res_ids)
)
query = query.filter(query_criterion)
return query
开发者ID:anilyadav,项目名称:mistral,代码行数:29,代码来源:api.py
示例8: get_permissions_query
def get_permissions_query(model_names, permission_type='read',
permission_model=None):
"""Prepare the query based on the allowed contexts and resources for
each of the required objects(models).
"""
type_queries = []
for model_name in model_names:
contexts, resources = query_helpers.get_context_resource(
model_name=model_name,
permission_type=permission_type,
permission_model=permission_model
)
if contexts is not None:
if resources:
resource_sql = and_(
MysqlRecordProperty.type == model_name,
MysqlRecordProperty.key.in_(resources))
else:
resource_sql = false()
type_query = or_(
and_(
MysqlRecordProperty.type == model_name,
context_query_filter(MysqlRecordProperty.context_id, contexts)
),
resource_sql)
type_queries.append(type_query)
return and_(
MysqlRecordProperty.type.in_(model_names),
or_(*type_queries))
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:31,代码来源:mysql.py
示例9: getContacts
def getContacts(search,baoxiang):
start=0
limit=30
if search!='':
search="%"+search+"%"
if baoxiang!="":
baoxiang="%"+baoxiang+"%"
#objs = Contact.objects.filter((Q(hetongbh__icontains=search) | Q(yiqibh__icontains=search)) & Q(baoxiang=baoxiang)).order_by('-yujifahuo_date')[start:start+limit]
objs=session.query(PartsContact).filter(
and_(
or_(PartsContact.hetongbh.like(search),PartsContact.yiqibh.like(search)),
PartsContact.baoxiang.like(baoxiang)
)
).order_by(desc(PartsContact.yujifahuo_date))#[start:start+limit]
else:
objs=session.query(PartsContact).filter(
or_(PartsContact.hetongbh.like(search),PartsContact.yiqibh.like(search))
).order_by(desc(PartsContact.yujifahuo_date))#[start:start+limit] # Contact.objects.filter(Q(hetongbh__icontains=search) | Q(yiqibh__icontains=search)).order_by('-yujifahuo_date')[start:start+limit]
else:
if baoxiang!="":
baoxiang="%"+baoxiang+"%"
objs=session.query(PartsContact).filter(
PartsContact.baoxiang.like(baoxiang)
).order_by(desc(PartsContact.yujifahuo_date))#[start:start+limit]
else:
objs=session.query(PartsContact).order_by(desc(PartsContact.yujifahuo_date))#[start:start+limit]
return objs
开发者ID:mahongquan,项目名称:parts,代码行数:27,代码来源:backend_alchemy.py
示例10: get_loan_info
def get_loan_info(db, movie_id, volume_id=None, collection_id=None):
"""Returns current collection/volume/movie loan data"""
from sqlalchemy import and_, or_
movie = db.Movie.get_by(movie_id=movie_id)
if movie is None:
return False
# fix or add volume/collection data:
if movie.collection_id is not None:
collection_id = movie.collection_id
if movie.volume_id is not None:
volume_id = movie.volume_id
if collection_id>0 and volume_id>0:
return db.Loan.get_by(
and_(or_(db.Loan.c.collection_id==collection_id,
db.Loan.c.volume_id==volume_id,
db.Loan.c.movie_id==movie_id),
db.Loan.c.return_date==None))
elif collection_id>0:
return db.Loan.get_by(
and_(or_(db.Loan.c.collection_id==collection_id,
db.Loan.c.movie_id==movie_id)),
db.Loan.c.return_date==None)
elif volume_id>0:
return db.Loan.get_by(and_(or_(db.Loan.c.volume_id==volume_id,
db.Loan.c.movie_id==movie_id)),
db.Loan.c.return_date==None)
else:
return db.Loan.get_by(db.Loan.c.movie_id==movie_id,db.Loan.c.return_date==None)
开发者ID:BackupTheBerlios,项目名称:griffith-svn,代码行数:30,代码来源:loan.py
示例11: item_search
def item_search(classes=None, bodygroups=None, equip_regions=None, item_name=None):
items_query = TF2Item.query.filter_by(inactive=False)
wildcards = ["%", "_"]
if item_name:
if any([w in item_name for w in wildcards]):
return
items_query = items_query.filter(TF2Item.item_name.contains(item_name))
if len(classes) > 0:
for class_name in classes:
items_query = items_query.filter(TF2Item.class_model.any(TF2ClassModel.class_name == class_name))
sq = db.session.query(TF2ClassModel.defindex, func.count(TF2ClassModel).label("class_count")).group_by(TF2ClassModel.defindex).subquery()
items_query = items_query.join(sq, TF2Item.defindex == sq.c.defindex)
if len(classes) == 9:
pass
elif len(classes) > 1:
items_query = items_query.filter(sq.c.class_count > 1).filter(sq.c.class_count < 9)
elif len(classes) == 1:
items_query = items_query.filter(sq.c.class_count == 1)
else:
return
if equip_regions:
items_query = items_query.filter(TF2Item.equip_regions.any(or_(*[TF2EquipRegion.equip_region == equip_region for equip_region in equip_regions])))
if bodygroups:
items_query = items_query.filter(TF2Item.bodygroups.any(or_(*[TF2BodyGroup.bodygroup == bodygroup for bodygroup in bodygroups])))
bodygroup_count = db.session.query(schema_bodygroup.c.defindex, func.count('*').label("bg_count")).group_by(schema_bodygroup.c.defindex).subquery()
items_query = items_query.join(bodygroup_count, TF2Item.defindex == bodygroup_count.c.defindex).filter(bodygroup_count.c.bg_count == len(bodygroups))
else:
items_query = items_query.filter(TF2Item.bodygroups == None)
return items_query
开发者ID:grynjin,项目名称:mods.tf,代码行数:29,代码来源:views.py
示例12: filter_single_by_time
def filter_single_by_time(cls, type, objects, year=None, week_number=None, day=None):
assert (week_number and year) or day
start_date, end_date = None, None
if year and week_number:
start_date, end_date = get_start_and_end_date_from_week_and_year(
year,
week_number
)
if day:
start_date = day
end_date = day
objects = objects.filter(
or_(
tuple_(
cast(type.start_time, Date), cast(type.end_time, Date)
).op('overlaps')(
tuple_(
start_date, end_date
)
),
or_(
# First range ends on the start date of the second
cast(type.end_time, Date) == start_date,
# Second range ends on the start date of the first
end_date == cast(type.start_time, Date)
)
)
)
return objects
开发者ID:Trondheim-kommune,项目名称:Bookingbasen,代码行数:33,代码来源:SlotsForWeekResource.py
示例13: filter_repeating_by_time
def filter_repeating_by_time(cls, type, objects, year=None, week_number=None, day=None):
assert (week_number and year) or day
start_date, end_date = None, None
if year and week_number:
start_date, end_date = get_start_and_end_date_from_week_and_year(
year,
week_number
)
if day:
start_date = day
end_date = day
objects = objects.filter(
type.week_day == day.isoweekday()
)
objects = objects.filter(
or_(
tuple_(
type.start_date, type.end_date
).op('overlaps')(
tuple_(
start_date, end_date
)
),
or_(
# First range ends on the start date of the second
type.end_date == start_date,
# Second range ends on the start date of the first
end_date == type.start_date
)
)
)
return objects
开发者ID:Trondheim-kommune,项目名称:Bookingbasen,代码行数:35,代码来源:SlotsForWeekResource.py
示例14: main
def main(confdir="/etc/cslbot") -> None:
config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
with open(path.join(confdir, 'config.cfg')) as f:
config.read_file(f)
session = get_session(config)()
channel = '#tjhsst'
type_filter = or_(Log.type == 'privmsg', Log.type == 'pubmsg', Log.type == 'action')
users = session.query(Log.source).filter(Log.target == channel, type_filter).having(func.count(Log.id) > 500).group_by(Log.source).all()
freq = []
for user in users:
lines = session.query(Log.msg).filter(Log.target == channel, Log.source == user[0],
or_(Log.type == 'privmsg', Log.type == 'pubmsg', Log.type == 'action')).all()
text = '\n'.join([x[0] for x in lines])
with open('/tmp/foo', 'w') as f:
f.write(text)
try:
output = subprocess.check_output(['zpaq', 'add', 'foo.zpaq', '/tmp/foo', '-test', '-summary', '1', '-method', '5'],
stderr=subprocess.STDOUT,
universal_newlines=True)
sizes = output.splitlines()[-2]
match = re.match(r'.*\((.*) -> .* -> (.*)\).*', sizes)
if not match:
raise Exception('oh no')
before, after = match.groups()
# 8 bits = 1 byte
count = 1024 * 1024 * 8 * float(after) / len(text)
freq.append((user[0], len(lines), float(after) / float(before) * 100, count))
except subprocess.CalledProcessError as e:
print(e.stdout)
raise e
with open('freq.json', 'w') as f:
json.dump(freq, f, indent=True)
for x in sorted(freq, key=lambda x: x[2]):
print("%s: (%d lines) (%f%% compressed) (%f bits per char)" % x)
开发者ID:tjcsl,项目名称:cslbot,代码行数:34,代码来源:entropy.py
示例15: build_initial_query
def build_initial_query(self, trans, **kwd):
clause_list = []
tool_shed_repository_ids = kwd.get('tool_shed_repository_ids', None)
if tool_shed_repository_ids:
if isinstance(tool_shed_repository_ids, string_types):
try:
# kwd['tool_shed_repository_ids'] may be a json dump of repo ids like u'['aebaa141e7243ebf']'
tool_shed_repository_ids = json.loads(tool_shed_repository_ids)
except ValueError:
pass
tool_shed_repository_ids = util.listify(tool_shed_repository_ids)
for tool_shed_repository_id in tool_shed_repository_ids:
clause_list.append(self.model_class.table.c.id == trans.security.decode_id(tool_shed_repository_id))
if clause_list:
return trans.install_model.context.query(self.model_class) \
.filter(or_(*clause_list))
for tool_shed_repository in trans.install_model.context.query(self.model_class) \
.filter(self.model_class.table.c.deleted == false()):
if tool_shed_repository.status in [trans.install_model.ToolShedRepository.installation_status.NEW,
trans.install_model.ToolShedRepository.installation_status.CLONING,
trans.install_model.ToolShedRepository.installation_status.SETTING_TOOL_VERSIONS,
trans.install_model.ToolShedRepository.installation_status.INSTALLING_TOOL_DEPENDENCIES,
trans.install_model.ToolShedRepository.installation_status.LOADING_PROPRIETARY_DATATYPES]:
clause_list.append(self.model_class.table.c.id == tool_shed_repository.id)
if clause_list:
return trans.install_model.context.query(self.model_class) \
.filter(or_(*clause_list))
return trans.install_model.context.query(self.model_class) \
.filter(self.model_class.table.c.status == trans.install_model.ToolShedRepository.installation_status.NEW)
开发者ID:bwlang,项目名称:galaxy,代码行数:29,代码来源:admin_toolshed_grids.py
示例16: removeMinimum
def removeMinimum(self, m, commit=True):
"""remove a minimum from the database
Remove a minimum and any objects (TransitionState or Distance)
pointing to that minimum.
"""
#delete any distance objects pointing to min2
candidates = self.session.query(Distance).\
filter(or_(Distance.minimum1 == m,
Distance.minimum2 == m))
candidates = list(candidates)
for d in candidates:
self.session.delete(d)
#delete any transition states objects pointing to min2
candidates = self.session.query(TransitionState).\
filter(or_(TransitionState.minimum1 == m,
TransitionState.minimum2 == m))
candidates = list(candidates)
for ts in candidates:
self.on_ts_removed(ts)
self.session.delete(ts)
self.on_minimum_removed(m)
#delete the minimum
self.session.delete(m)
if commit:
self.session.commit()
开发者ID:lsmeeton,项目名称:PyGMIN,代码行数:28,代码来源:database.py
示例17: get_tokens
def get_tokens(self, client_id=None, user_id=None, scopes=(),
exclude_revoked=False, exclude_expired=False):
tokens = self.tokens
if exclude_expired:
tokens = tokens.filter(AccessToken.expires_at > datetime.now())
if exclude_revoked:
tokens = tokens.filter(AccessToken.base_token_id == BaseToken._id)
tokens = tokens.filter(BaseToken.revoked == 0)
if client_id:
tokens = tokens.filter(AccessToken.base_token_id == BaseToken._id)
tokens = tokens.filter(BaseToken.client_id == Client.client_id)
tokens = tokens.filter(or_(Client.client_id == client_id,
Client.alias == client_id))
if user_id:
tokens = tokens.filter(AccessToken.base_token_id == BaseToken._id)
tokens = tokens.filter(BaseToken.user_id == User.user_id)
tokens = tokens.filter(or_(User.email == user_id,
User.user_id == user_id))
for scope in scopes:
tokens = tokens.filter(AccessToken.base_token_id == BaseToken._id)
tokens = tokens.filter(BaseToken.scope_objects.any(value=scope))
return tokens
开发者ID:mete0r,项目名称:goauthc,代码行数:28,代码来源:repo.py
示例18: get
def get(key, cls, attrs=(), page=0, per_page=50, local=True):
#
# Local is a flag that determines whether we only return objects local to the
# calling key's namespace, or whether we will permit global objects with identical names
# to local objects in the response.
#
if page and per_page:
if key.systemwide:
return cls.query.filter(or_(cls.key == None, cls.key == key)).paginate(page, per_page).items
return cls.query.filter(cls.key == key).paginate(page,per_page).items
if attrs:
(attr, identifier) = attrs
attribute = getattr(cls, attr)
if attribute:
if key.systemwide:
item = cls.query.filter(
or_(and_(attribute==identifier, cls.key == None),
and_(attribute==identifier, cls.key == key))
).all()
if local:
if len(item) == 1: return item[0]
for i in item:
if i.key == key: return i
return item
else:
item = cls.query.filter(and_(attribute==identifier, cls.key == key)).first()
return item
raise Exception('Unrecognised attribute "%s" of %s.' % (attr, repr(cls)))
if key.systemwide:
return cls.query.filter(or_(cls.key == None, cls.key == key)).all()
return cls.query.filter(cls.key == key).all()
开发者ID:nivertech,项目名称:Emissary,代码行数:34,代码来源:utils.py
示例19: search_name
def search_name(
self, name=None, count=None, page=None, label_from_instance=None, name_fields=None, *args, exact=False,
**kwargs
):
params = kwargs.get('params')
join = []
if name:
if name_fields is None:
name_fields = chain(*(_resolve_fk_search(f, join) for f in self._meta.get_name_fields()))
if exact:
q = [sa.or_(*[fld.column == name for fld in name_fields])]
else:
q = [sa.or_(*[fld.column.ilike('%' + name + '%') for fld in name_fields])]
if params:
q.append(params)
kwargs = {'params': q}
kwargs['join'] = join
qs = self._search(*args, **kwargs)
if count:
count = qs.count()
if page:
page = int(page)
qs = qs[(page - 1) * CHOICES_PAGE_LIMIT:page * CHOICES_PAGE_LIMIT]
else:
qs = qs[:CHOICES_PAGE_LIMIT]
if isinstance(label_from_instance, list):
label_from_instance = lambda obj, label_from_instance=label_from_instance: (obj.pk, ' - '.join([str(getattr(obj, f, '')) for f in label_from_instance if f in self._meta.fields_dict]))
if callable(label_from_instance):
res = [label_from_instance(obj) for obj in qs]
else:
res = [obj._get_instance_label() for obj in qs]
return {
'count': count,
'items': res,
}
开发者ID:katrid,项目名称:orun,代码行数:35,代码来源:base.py
示例20: evidences
def evidences(cls): # pylint: disable=no-self-argument
"""Return evidences related for that instance."""
return db.relationship(
Evidence,
primaryjoin=lambda: sa.or_(
sa.and_(
cls.id == Relationship.source_id,
Relationship.source_type == cls.__name__,
Relationship.destination_type == "Evidence",
),
sa.and_(
cls.id == Relationship.destination_id,
Relationship.destination_type == cls.__name__,
Relationship.source_type == "Evidence",
)
),
secondary=Relationship.__table__,
secondaryjoin=lambda: sa.or_(
sa.and_(
Evidence.id == Relationship.source_id,
Relationship.source_type == "Evidence",
),
sa.and_(
Evidence.id == Relationship.destination_id,
Relationship.destination_type == "Evidence",
)
),
viewonly=True,
)
开发者ID:egorhm,项目名称:ggrc-core,代码行数:29,代码来源:with_evidence.py
注:本文中的sqlalchemy.or_函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论