本文整理汇总了Python中sqlalchemy.orm.joinedload_all函数的典型用法代码示例。如果您正苦于以下问题:Python joinedload_all函数的具体用法?Python joinedload_all怎么用?Python joinedload_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了joinedload_all函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: admin_search_candidates
def admin_search_candidates(self):
params = self.request.params
status = params.get('status')
order = params.get('order')
q = params.get('q')
tags = split_strip(params.get('tags'))
basequery = DBSession.query(SearchResultCandidate) \
.options(joinedload_all('languages.language'), joinedload_all('languages.proficiency'),
joinedload_all('skills.skill'), joinedload_all('skills.level'),
joinedload('preferred_locations'),
joinedload('target_position'))
if status:
status = get_by_name_or_raise(CandidateStatus, status)
basequery = basequery.filter(Candidate.status == status)
if q:
q = q.lower()
basequery = basequery.filter(
or_(func.lower(Candidate.first_name).startswith(q),
func.lower(Candidate.last_name).startswith(q),
func.lower(func.concat(Candidate.first_name, " ", Candidate.last_name)).startswith(q),
func.lower(Candidate.email).startswith(q)))
if tags:
basequery = basequery.outerjoin(CandidateSkill).join(Skill).filter(Skill.name.in_(tags))
if order:
basequery = add_sorting(basequery, order, CANDIDATE_SORTABLES)
return run_paginated_query(self.request, basequery, counter=distinct_counter(SearchResultCandidate.id))
开发者ID:iwein,项目名称:temp,代码行数:27,代码来源:views.py
示例2: by_well_tag
def by_well_tag(self):
well_tag_field = fl.well_tag_field(str(self.form_result['well_tag']))
c.group_by_plate = self.form_result['group_by_plate']
c.tag_id = self.form_result['well_tag']
c.tag_name = Session.query(WellTag).get(c.tag_id).name
c.form = h.LiteralFormSelectPatch(
value = {'well_tag': well_tag_field['value'],
'group_by_plate': [u'1' if c.group_by_plate else u'0']},
option = {'well_tag': [('--','--')]+well_tag_field['options'],
'group_by_plate': [(u'1', '')]}
)
well_tags = Session.query(WellTag).\
filter_by(id=c.tag_id).\
options(joinedload_all(WellTag.tag_wells, QLBWell.plate, QLBPlate.file, innerjoin=True),
joinedload_all(WellTag.tag_wells, QLBWell.plate, QLBPlate.plate, innerjoin=True)).\
all()
c.label_names = []
if not len(well_tags):
c.wells = []
c.well_groups = []
elif c.group_by_plate:
wells = sorted(well_tags[0].wells, key=lambda well: (well.plate_id, well.well_name))
well_groups = [(plate, list(wells)) for plate, wells in itertools.groupby(wells, lambda well: well.plate)]
c.well_groups = sorted(well_groups, key=lambda tup: tup[0].host_datetime)
c.well_groups.reverse()
else:
c.wells = sorted(well_tags[0].wells, key=lambda well: well.host_datetime)
c.wells.reverse()
return render('/box2/by_well_tag.html')
开发者ID:v-makarenko,项目名称:vtoolsmq,代码行数:33,代码来源:box2.py
示例3: reader_history
def reader_history(self, id=None, admin=True):
box2 = self.__setup_box2_context_by_code(id)
c.admin = admin != 'False'
logs = Session.query(Box2Log).filter_by(box2_id=box2.id)\
.order_by('time_effective desc')\
.options(joinedload_all(Box2Log.circuit))\
.all()
statuses = Session.query(DRStatusLog).filter_by(box2_id=box2.id)\
.order_by('time_effective desc')\
.options(joinedload_all(DRStatusLog.reporter))\
.all()
fixes = Session.query(DRFixLog).filter_by(box2_id=box2.id)\
.order_by('time_effective desc')\
.all()
log_pairs = [(logs[i].time_effective, [logs[i],(logs[i+1] if i < len(logs)-1 else None)]) for i in range(len(logs))]
for pair in log_pairs:
pair[1].append((sorted(box2log_mv.labeleditems(pair[1][0]).items()),
sorted(box2log_mv.labeleditems(pair[1][1]).items())))
status_pairs = [(status.time_effective, status) for status in statuses]
fix_pairs = [(fix.time_effective, fix) for fix in fixes]
changes = log_pairs + status_pairs + fix_pairs
c.changes = sorted(changes, key=operator.itemgetter(0))
c.changes.reverse()
return render('/admin/reader_history.html')
开发者ID:v-makarenko,项目名称:vtoolsmq,代码行数:28,代码来源:admin.py
示例4: list
def list(self):
offers = DBSession.query(EmployerOffer).filter(EmployerOffer.employer_id == self.employer.id) \
.options(joinedload_all('candidate.skills.skill'),
joinedload_all('candidate.skills.level'))
if self.request.params.get('status') == 'active':
offers = offers.filter(Offer.by_active(False))
return offers.all()
开发者ID:iwein,项目名称:temp,代码行数:7,代码来源:views.py
示例5: query
def query(self, req):
self._domainelements = DBSession.query(DomainElement).all()
return DBSession.query(Language)\
.order_by(Language.id)\
.options(
joinedload_all(Language.valuesets, ValueSet.values),
joinedload_all(WalsLanguage.genus, Genus.family))
开发者ID:Maggi12,项目名称:wals3,代码行数:7,代码来源:adapters.py
示例6: base_query
def base_query(self, query):
query = query.join(ValueSet).options(
joinedload(Value.valueset), joinedload_all(Counterpart.references, CounterpartReference.source)
)
if self.language:
query = (
query.join(ValueSet.parameter)
.join(ValueSet.contribution)
.options(
joinedload(Value.valueset, ValueSet.contribution), joinedload(Value.valueset, ValueSet.parameter)
)
)
return query.filter(ValueSet.language_pk == self.language.pk)
if self.parameter:
query = (
query.join(ValueSet.language)
.outerjoin(LexibankLanguage.family)
.options(joinedload_all(Value.valueset, ValueSet.language, LexibankLanguage.family))
)
return query.filter(ValueSet.parameter_pk == self.parameter.pk)
if self.contribution:
query = query.join(ValueSet.parameter)
return query.filter(ValueSet.contribution_pk == self.contribution.pk)
return (
query.join(ValueSet.parameter)
.join(ValueSet.language)
.options(joinedload(Value.valueset, ValueSet.parameter), joinedload(Value.valueset, ValueSet.language))
)
开发者ID:clld,项目名称:lexibank,代码行数:32,代码来源:datatables.py
示例7: get_one
def get_one(cls, id_):
query = cls.query.options(joinedload(cls.pool),
joinedload(cls.storage),
joinedload_all('jobs.status'),
joinedload_all('jobs.client'),
).get(int(id_))
return super(Media, cls).get_one(query=query)
开发者ID:gypsymauro,项目名称:almir,代码行数:7,代码来源:models.py
示例8: refined_query
def refined_query(self, query, model, req):
if model == common.Contribution:
query = query.options(
joinedload_all(
common.Contribution.valuesets,
common.ValueSet.parameter,
),
joinedload_all(
common.Contribution.valuesets,
common.ValueSet.values,
common.Value.domainelement),
joinedload_all(
common.Contribution.valuesets,
common.ValueSet.values,
common.Value.sentence_assocs,
common.ValueSentence.sentence),
joinedload(ApicsContribution.language),
)
if model == common.Parameter:
query = query.options(
joinedload_all(
common.Parameter.valuesets,
common.ValueSet.values,
),
joinedload_all(
common.Parameter.valuesets,
common.ValueSet.language,
),
)
return query
开发者ID:SusanneMichaelis,项目名称:apics,代码行数:30,代码来源:__init__.py
示例9: show_ballot_room_editor
def show_ballot_room_editor(ballot_id, ballot_type_name, db):
ballot_type = db.query(m.BallotType).filter(func.lower(m.BallotType.name) == ballot_type_name.lower()).one()
ballot_eventsq = (db
.query(m.BallotEvent)
.join(m.BallotSeason)
.filter(m.BallotEvent.type == ballot_type)
.filter(m.BallotSeason.year <= ballot_id)
.order_by(m.BallotSeason.year.desc())
.limit(2)
)
ballot_events = ballot_eventsq.all()
if ballot_events[0].season.year != ballot_id:
raise HTTPError(404, "No {} ballot for the {} season {} {}".format(ballot_type.name, ballot_id, ballot_eventsq, ballot_events))
else:
ballot = ballot_events[0]
if len(ballot_events) == 2:
last_ballot = ballot_events[1]
else:
last_ballot = None
root = db.query(m.Place).options(
joinedload_all('children.rooms.listing_for'),
joinedload_all('children.children.rooms.listing_for'),
joinedload_all('children.children.children.rooms.listing_for'),
joinedload_all('children.children.children.children.rooms.listing_for'),
).filter(m.Place.parent == None).one()
return template('ballot-event-edit-rooms',
ballot_event=ballot,
last_ballot_event=last_ballot,
root=root)
开发者ID:eric-wieser,项目名称:caius-rooms,代码行数:34,代码来源:ballotadmin.py
示例10: base_query
def base_query(self, query):
query = super(Phonemes, self).base_query(query)
if self.parameter:
query = query.join(ValueSet.contribution).options(
joinedload_all(Value.valueset, ValueSet.language),
joinedload_all(Value.valueset, ValueSet.contribution))
return query
开发者ID:clld,项目名称:phoible,代码行数:7,代码来源:datatables.py
示例11: _get_ideas_real
def _get_ideas_real(discussion, view_def=None, ids=None, user_id=None):
user_id = user_id or Everyone
# optimization: Recursive widget links.
from assembl.models import (
Widget, IdeaWidgetLink, IdeaDescendantsShowingWidgetLink)
universal_widget_links = []
by_idea_widget_links = defaultdict(list)
widget_links = discussion.db.query(IdeaWidgetLink
).join(Widget).join(Discussion).filter(
Widget.test_active(), Discussion.id == discussion.id,
IdeaDescendantsShowingWidgetLink.polymorphic_filter()
).options(joinedload_all(IdeaWidgetLink.idea)).all()
for wlink in widget_links:
if isinstance(wlink.idea, RootIdea):
universal_widget_links.append({
'@type': wlink.external_typename(),
'widget': Widget.uri_generic(wlink.widget_id)})
else:
for id in wlink.idea.get_all_descendants(True):
by_idea_widget_links[Idea.uri_generic(id)].append({
'@type': wlink.external_typename(),
'widget': Widget.uri_generic(wlink.widget_id)})
next_synthesis = discussion.get_next_synthesis()
ideas = discussion.db.query(Idea).filter_by(
discussion_id=discussion.id
)
ideas = ideas.outerjoin(SubGraphIdeaAssociation,
and_(SubGraphIdeaAssociation.sub_graph_id==next_synthesis.id, SubGraphIdeaAssociation.idea_id==Idea.id)
)
ideas = ideas.outerjoin(IdeaLink,
and_(IdeaLink.target_id==Idea.id)
)
ideas = ideas.order_by(IdeaLink.order, Idea.creation_date)
if ids:
ids = [get_database_id("Idea", id) for id in ids]
ideas = ideas.filter(Idea.id.in_(ids))
# remove tombstones
ideas = ideas.filter(and_(*Idea.base_conditions()))
ideas = ideas.options(
joinedload_all(Idea.source_links),
joinedload_all(Idea.has_showing_widget_links),
undefer(Idea.num_children))
permissions = get_permissions(user_id, discussion.id)
Idea.prepare_counters(discussion.id, True)
retval = [idea.generic_json(view_def, user_id, permissions)
for idea in ideas]
retval = [x for x in retval if x is not None]
for r in retval:
if r.get('widget_links', None) is not None:
links = r['widget_links'][:]
links.extend(universal_widget_links)
links.extend(by_idea_widget_links[r['@id']])
r['active_widget_links'] = links
return retval
开发者ID:festrade,项目名称:assembl,代码行数:60,代码来源:idea.py
示例12: home
def home(fmt=None, page=1):
flags = (
g.db.query(SpamFlag)
.order_by(SpamFlag.id.desc())
.options(
joinedload_all(SpamFlag.message, Message.chat),
joinedload_all(SpamFlag.message, Message.chat_user),
joinedload_all(SpamFlag.message, Message.user),
)
.offset((page - 1) * 50).limit(50).all()
)
if not flags and page != 1:
abort(404)
flag_count = g.db.query(func.count("*")).select_from(SpamFlag).scalar()
if fmt == "json":
return jsonify({
"flags": [_.to_dict() for _ in flags],
})
paginator = paginate.Page(
[],
page=page,
items_per_page=50,
item_count=flag_count,
url_maker=lambda page: url_for("spamless2_home", page=page, **request.args),
)
return render_template(
"admin/spamless2/home.html",
flags=flags,
paginator=paginator,
)
开发者ID:MSPARP,项目名称:newparp,代码行数:35,代码来源:spamless2.py
示例13: get_by_unique_key
def get_by_unique_key(self, unique_key, name, default=None):
pkey = (unique_key, name)
session = DBSession()
# Eager load related resources here.
key = session.query(Key).options(
orm.joinedload_all(
Key.resource,
Resource.data,
CurrentPropertySheet.propsheet,
innerjoin=True,
),
orm.joinedload_all(
Key.resource,
Resource.rels,
Link.target,
Resource.data,
CurrentPropertySheet.propsheet,
),
).get(pkey)
if key is None:
return default
model = key.resource
if model.item_type != self.item_type:
return default
return self.Item(self, model)
开发者ID:zhouyu,项目名称:encoded,代码行数:25,代码来源:contentbase.py
示例14: user
def user(request, info, session, userid):
u = session\
.query(User)\
.get(int(userid))
if not u:
raise Http404
sales = session\
.query(Transline)\
.filter(Transline.user == u)\
.options(joinedload('transaction'),
joinedload_all('stockref.stockitem.stocktype.unit'))\
.order_by(desc(Transline.time))[:50]
payments = session\
.query(Payment)\
.filter(Payment.user == u)\
.options(joinedload('transaction'),
joinedload('paytype'))\
.order_by(desc(Payment.time))[:50]
annotations = session\
.query(StockAnnotation)\
.options(joinedload_all('stockitem.stocktype'),
joinedload('type'))\
.filter(StockAnnotation.user == u)\
.order_by(desc(StockAnnotation.time))[:50]
return ('user.html',
{'tillobject': u,
'tuser': u,
'sales': sales,
'payments': payments,
'annotations': annotations,
})
开发者ID:sde1000,项目名称:quicktill,代码行数:35,代码来源:views.py
示例15: _floating_ip_get_by_address
def _floating_ip_get_by_address(context, address, session,
load_instances=True, use_first=True,
use_baked=False):
"""This is a port of nova.db.sqlalchemy.api._floating_ip_get_by_address.
It includes conditionals which select for the behaviors that are currently
within the function vs. alternate behaviors that feature better
optimization.
"""
if use_baked:
result = model_query_baked(context, models.FloatingIp, session=session)
result.bake(lambda query:
query.filter_by(
address=bindparam('address'))).params(address=address)
else:
result = model_query_ordinary(
context, models.FloatingIp, session=session).\
filter_by(address=address)
if load_instances:
# the current source for _floating_ip_get_by_address includes
# an unconditional joinedload two levels deep. In the case
# of floating_ip_update and most likely floating_ip_fixed_ip_associate,
# the rows returned and processed by these joins are not used.
#
# The overhead of this joinedload is by far the biggest hindrance
# to the performance of these two API functions - it multiplies the
# measured function call count / time spent by a factor of twelve.
# So getting rid of eager loads that aren't needed is a very easy
# and effective way to regain significant speed. In this case,
# _floating_ip_get_by_address should accept a flag as to whether
# the extended load of ->fixed_ip->instance is needed or not.
if use_baked:
result.bake(lambda query:
query.options(joinedload_all('fixed_ip.instance')))
else:
result = result.options(joinedload_all('fixed_ip.instance'))
if use_baked:
result = result.all()
if not result:
raise Exception("floating ip not found: %s" % address)
else:
result = result[0]
elif use_first:
result = result.first()
if not result:
raise Exception("floating ip not found: %s" % address)
else:
try:
result = result.one()
except orm_exc.NoResultFound:
raise Exception("floating ip not found: %s" % address)
return result
开发者ID:zzzeek,项目名称:nova_poc,代码行数:58,代码来源:api.py
示例16: query
def query(self, req):
self._domainelements = DBSession.query(DomainElement).all()
return DBSession.query(Language)\
.order_by(Language.id)\
.options(
subqueryload_all('languageidentifier', 'identifier'),
subqueryload_all('countries'),
joinedload_all(Language.valuesets, ValueSet.values),
joinedload_all(WalsLanguage.genus, Genus.family))
开发者ID:Castroyesid,项目名称:wals3,代码行数:9,代码来源:adapters.py
示例17: pubroot
def pubroot(request, info, session):
date = datetime.date.today()
# If it's the early hours of the morning, it's more useful for us
# to consider it still to be yesterday.
if datetime.datetime.now().hour < 4:
date = date - datetime.timedelta(1)
thisweek_start = date - datetime.timedelta(date.weekday())
thisweek_end = thisweek_start + datetime.timedelta(6)
lastweek_start = thisweek_start - datetime.timedelta(7)
lastweek_end = thisweek_end - datetime.timedelta(7)
weekbefore_start = lastweek_start - datetime.timedelta(7)
weekbefore_end = lastweek_end - datetime.timedelta(7)
weeks = [
("Current week", thisweek_start, thisweek_end, business_totals(session, thisweek_start, thisweek_end)),
("Last week", lastweek_start, lastweek_end, business_totals(session, lastweek_start, lastweek_end)),
(
"The week before last",
weekbefore_start,
weekbefore_end,
business_totals(session, weekbefore_start, weekbefore_end),
),
]
currentsession = Session.current(session)
barsummary = (
session.query(StockLine)
.filter(StockLine.location == "Bar")
.order_by(StockLine.dept_id, StockLine.name)
.options(joinedload_all("stockonsale.stocktype.unit"))
.options(undefer_group("qtys"))
.all()
)
stillage = (
session.query(StockAnnotation)
.join(StockItem)
.outerjoin(StockLine)
.filter(
tuple_(StockAnnotation.text, StockAnnotation.time).in_(
select(
[StockAnnotation.text, func.max(StockAnnotation.time)], StockAnnotation.atype == "location"
).group_by(StockAnnotation.text)
)
)
.filter(StockItem.finished == None)
.order_by(StockLine.name != null(), StockAnnotation.time)
.options(joinedload_all("stockitem.stocktype.unit"))
.options(joinedload_all("stockitem.stockline"))
.options(undefer_group("qtys"))
.all()
)
return (
"index.html",
{"currentsession": currentsession, "barsummary": barsummary, "stillage": stillage, "weeks": weeks},
)
开发者ID:sde1000,项目名称:quicktill,代码行数:55,代码来源:views.py
示例18: changes
def changes(request):
"""
select vs.id, v.updated, h.domainelement_pk, v.domainelement_pk from value_history \
as h, value as v, valueset as vs where h.pk = v.pk and v.valueset_pk = vs.pk;
"""
# changes in the 2011 edition: check values with an updated date after 2011 and
# before 2013
E2009 = utc.localize(datetime(2009, 1, 1))
E2012 = utc.localize(datetime(2012, 1, 1))
E2014 = utc.localize(datetime(2014, 6, 30))
E2015 = utc.localize(datetime(2015, 6, 30))
history = inspect(Value.__history_mapper__).class_
query = DBSession.query(Value)\
.outerjoin(history, Value.pk == history.pk)\
.join(ValueSet)\
.order_by(ValueSet.parameter_pk, ValueSet.language_pk)\
.options(joinedload_all(Value.valueset, ValueSet.language),
joinedload_all(Value.valueset, ValueSet.parameter))
changes2011 = query.join(ValueSet.parameter)\
.filter(Parameter.id.contains('A'))\
.filter(Parameter.id != '143A')\
.filter(Parameter.id != '144A')\
.filter(or_(
and_(E2009 < Value.updated, Value.updated < E2012),
and_(history.updated != None,
E2009 < history.updated, history.updated < E2012)))
changes2013 = query.filter(or_(
and_(E2012 < Value.updated, Value.updated < E2014),
and_(E2012 < history.updated, history.updated < E2014)))
changes2014 = query.filter(or_(
and_(E2014 < Value.updated, Value.updated < E2015),
and_(E2014 < history.updated, history.updated < E2015)))
#
# TODO:
#
# history = inspect(ValueSet.__history_mapper__).class_
# current = DBSession.query(ValueSet.pk).subquery()
# removals2013 = DBSession.query(Parameter.id, Parameter.name, count(history.pk))\
# .filter(Parameter.pk == history.parameter_pk)\
# .filter(not_(history.pk.in_(current)))\
# .group_by(Parameter.pk, Parameter.id, Parameter.name)\
# .order_by(Parameter.pk)
grouped = lambda changes: groupby([v.valueset for v in changes2011],
lambda vs: vs.parameter)
return {
'changes2011': grouped(changes2011),
'changes2013': grouped(changes2013),
'changes2014': grouped(changes2014),
'removals2013': []}
开发者ID:Maggi12,项目名称:wals3,代码行数:55,代码来源:views.py
示例19: battles
def battles(clan):
"""
Table of all battles of a clan.
:param clan:
:return:
"""
if not clan in config.CLAN_NAMES:
abort(404)
battles = Battle.query.options(joinedload_all('battle_group.battles')).options(
joinedload_all('attendances.player')).filter_by(clan=clan).all()
return render_template('battles/battles.html', clan=clan, battles=battles)
开发者ID:dot360,项目名称:whyattend,代码行数:11,代码来源:webapp.py
示例20: _get_scenario
def _get_scenario(scenario_id, include_data=True, include_items=True):
try:
scenario_qry = DBSession.query(Scenario).filter(Scenario.scenario_id==scenario_id)
if include_data is True:
scenario_qry = scenario_qry.options(joinedload_all('resourcescenarios'))
if include_items is True:
scenario_qry = scenario_qry.options(joinedload_all('resourcegroupitems'))
scenario = scenario_qry.one()
return scenario
except NoResultFound:
raise ResourceNotFoundError("Scenario %s does not exist."%(scenario_id))
开发者ID:UMWRG,项目名称:HydraPlatform,代码行数:11,代码来源:scenario.py
注:本文中的sqlalchemy.orm.joinedload_all函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论