本文整理汇总了Python中sqlalchemy.orm.subqueryload函数的典型用法代码示例。如果您正苦于以下问题:Python subqueryload函数的具体用法?Python subqueryload怎么用?Python subqueryload使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了subqueryload函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_subqueryload
def test_subqueryload(self):
self.assertEqual(
str(self.db.query(Foo).subqueryload('bars')),
str(self.db.query(Foo).options(orm.subqueryload('bars')))
)
self.assertEqual(
str(self.db.query(Foo).subqueryload('bars', 'bazs')),
str((self.db.query(Foo)
.options(orm.subqueryload('bars').subqueryload('bazs'))))
)
self.assertEqual(
str(self.db.query(Foo).subqueryload(Foo.bars)),
str(self.db.query(Foo).options(orm.subqueryload(Foo.bars)))
)
self.assertEqual(
str(self.db.query(Foo).subqueryload(Foo.bars, Bar.bazs)),
str((self.db.query(Foo)
.options(orm.subqueryload(Foo.bars).subqueryload(Bar.bazs))))
)
self.assertEqual(
str((self.db.query(Foo)
.subqueryload(
'bars',
options=[LoadOption('subqueryload', 'bazs')]))),
str((self.db.query(Foo)
.options(orm.subqueryload('bars').subqueryload('bazs'))))
)
开发者ID:LeoKudrik,项目名称:alchy,代码行数:31,代码来源:test_query.py
示例2: render
def render(self, session, **arguments):
q = session.query(Chassis)
q = q.options(subqueryload('model'),
joinedload('model.machine_specs'),
subqueryload('location'),
joinedload('slots'),
subqueryload('slots.machine'),
# A rare case when we don't need primary name/host
lazyload('slots.machine.primary_name'),
lazyload('slots.machine.host'),
subqueryload('interfaces'),
joinedload('interfaces.assignments'),
joinedload('interfaces.assignments.network'),
joinedload('interfaces.assignments.dns_records'))
# Prefer the primary name for ordering
q = q.outerjoin(DnsRecord, (Fqdn, DnsRecord.fqdn_id == Fqdn.id),
DnsDomain)
q = q.options(contains_eager('primary_name'),
contains_eager('primary_name.fqdn'),
contains_eager('primary_name.fqdn.dns_domain'))
q = q.order_by(Fqdn.name, DnsDomain.name, Chassis.label)
return q.all()
开发者ID:jrha,项目名称:aquilon,代码行数:26,代码来源:show_chassis_all.py
示例3: render_params_for_submissions
def render_params_for_submissions(self, query, page, page_size=50):
"""Add data about the requested submissions to r_params.
query (sqlalchemy.orm.query.Query): the query giving back all
interesting submissions.
page (int): the index of the page to display.
page_size(int): the number of submissions per page.
"""
query = query\
.options(subqueryload(Submission.task))\
.options(subqueryload(Submission.participation))\
.options(subqueryload(Submission.files))\
.options(subqueryload(Submission.token))\
.options(subqueryload(Submission.results)
.subqueryload(SubmissionResult.evaluations))\
.order_by(Submission.timestamp.desc())
offset = page * page_size
count = query.count()
if self.r_params is None:
self.r_params = self.render_params()
# A page showing paginated submissions can use these
# parameters: total number of submissions, submissions to
# display in this page, index of the current page, total
# number of pages.
self.r_params["submission_count"] = count
self.r_params["submissions"] = \
query.slice(offset, offset + page_size).all()
self.r_params["submission_page"] = page
self.r_params["submission_pages"] = \
(count + page_size - 1) // page_size
开发者ID:artikz,项目名称:cms,代码行数:34,代码来源:base.py
示例4: render
def render(self, session, cluster, **arguments):
q = session.query(Cluster)
vm_q = session.query(VirtualMachine)
vm_q = vm_q.join(ClusterResource, Cluster)
if cluster:
q = q.filter_by(name=cluster)
vm_q = vm_q.filter_by(name=cluster)
vm_q = vm_q.options(joinedload('machine'),
joinedload('machine.primary_name'),
joinedload('machine.primary_name.fqdn'),
lazyload('machine.host'))
q = q.options(subqueryload('_hosts'),
joinedload('_hosts.host'),
joinedload('_hosts.host.machine'),
subqueryload('_metacluster'),
joinedload('_metacluster.metacluster'),
joinedload('resholder'),
subqueryload('resholder.resources'),
subqueryload('service_bindings'),
subqueryload('allowed_personalities'))
q = q.order_by(Cluster.name)
dbclusters = q.all()
if cluster and not dbclusters:
raise NotFoundException("Cluster %s not found." % cluster)
# Manual eager-loading of VM resources. All the code does is making sure
# the data is pinned in the session's cache
machines = {}
for vm in vm_q:
machines[vm.machine.machine_id] = vm
return ClusterList(dbclusters)
开发者ID:jrha,项目名称:aquilon,代码行数:34,代码来源:show_cluster_all.py
示例5: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(Relatable, cls).eager_query()
return cls.eager_inclusions(query, Relatable._include_links).options(
orm.subqueryload('related_sources'),
orm.subqueryload('related_destinations'))
开发者ID:VinnieJohns,项目名称:ggrc-core,代码行数:7,代码来源:relationship.py
示例6: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(ControlControl, cls).eager_query()
return query.options(
orm.subqueryload('control'),
orm.subqueryload('implemented_control'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:control_control.py
示例7: finalize_query
def finalize_query(self, query, fltr, session, qstring=None, order_by=None):
search_query = None
ranked = False
if qstring is not None:
ft_query = and_(SearchObjectIndex.so_uuid == ObjectInfoIndex.uuid, query)
q = session.query(
ObjectInfoIndex,
func.ts_rank_cd(
SearchObjectIndex.search_vector,
func.plainto_tsquery(qstring)
).label('rank'))\
.options(subqueryload(ObjectInfoIndex.search_object))\
.options(subqueryload(ObjectInfoIndex.properties)).filter(ft_query)
query_result = search(q, qstring, vector=SearchObjectIndex.search_vector, sort=order_by is None, regconfig='simple')
ranked = True
else:
query_result = session.query(ObjectInfoIndex).options(subqueryload(ObjectInfoIndex.properties)).filter(query)
if order_by is not None:
query_result = query_result.order_by(order_by)
elif ranked is True:
query_result = query_result.order_by(
desc(
func.ts_rank_cd(
SearchObjectIndex.search_vector,
func.to_tsquery(search_query)
)
)
)
if 'limit' in fltr:
query_result = query_result.limit(fltr['limit'])
return query_result, ranked
开发者ID:gonicus,项目名称:gosa,代码行数:33,代码来源:methods.py
示例8: json_export
def json_export(outfile):
db = Session()
data = dict()
data["materials"] = [
it.to_dict(["locations"])
for it in db.query(Material).options(subqueryload(Material.locations))
]
data["materialTypes"] = [
dict(label=it[0], value=it[0])
for it in db.query(distinct(Material.type)).order_by(Material.type.asc())
if it[0]
]
data["blueprints"] = [
it.to_dict(["ingredients"])
for it in db.query(Blueprint)\
.options(subqueryload(Blueprint.ingredients))\
.options(subqueryload("ingredients.material"))
]
data["blueprintTypes"] = [
dict(label=it[0], value=it[0])
for it in db.query(distinct(Blueprint.type)).order_by(Blueprint.type.asc())
if it[0]
]
with open(outfile, "w") as fp:
fp.write('CollectorDroneData=')
json.dump(data, fp)
db.close()
开发者ID:fre-sch,项目名称:collector-drone,代码行数:28,代码来源:json_export.py
示例9: package
def package(pkg):
db = current_app.config['DB']()
collections = list(queries.collections(db))
package = db.query(tables.Package).get(pkg)
if package is None:
abort(404)
query = queries.dependencies(db, package)
query = query.options(subqueryload('collection_packages'))
query = query.options(subqueryload('collection_packages.links'))
dependencies = list(query)
dependents = list(queries.dependents(db, package))
in_progress_deps = [p for p in dependencies if p.status == 'in-progress']
return render_template(
'package.html',
breadcrumbs=(
(url_for('hello'), 'Python 3 Porting Database'),
(url_for('package', pkg=pkg), pkg),
),
collections=collections,
pkg=package,
dependencies=list(dependencies),
dependents=list(dependents),
deptree=[(package, gen_deptree(dependencies))],
in_progress_deps=in_progress_deps,
)
开发者ID:sYnfo,项目名称:portingdb,代码行数:30,代码来源:htmlreport.py
示例10: family_query
def family_query(self):
query = self.request.db.query(tcg_tables.CardFamily)
query = dbutil.order_by_name(query, tcg_tables.CardFamily)
query = query.options(joinedload('names_local'))
query = query.options(subqueryload('cards.prints'))
query = query.options(subqueryload('cards'))
return query
开发者ID:encukou,项目名称:ptcg-editor,代码行数:7,代码来源:prints.py
示例11: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(SystemSystem, cls).eager_query()
return query.options(
orm.subqueryload('parent'),
orm.subqueryload('child'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:system_system.py
示例12: get_queryset
def get_queryset(self, project=None, db=None, *args, **kwargs):
malware_sha256 = self.kwargs["malware_sha256"]
if malware_sha256:
session = db.Session()
malware = session.query(Malware).filter(Malware.sha256 == malware_sha256).one_or_none()
if not malware:
error = {"error": {"code": "NotFound",
"message": "Malware not found: {} (Project: {})".format(malware_sha256, project)}}
raise NotFound(detail=error)
session = db.Session()
malware = session.query(Malware) \
.options(subqueryload(Malware.tag)) \
.options(subqueryload(Malware.analysis)) \
.options(subqueryload(Malware.note)).filter(Malware.sha256 == malware_sha256).one_or_none()
queryset = getattr(malware, self.malware_relationship_field)
else:
session = db.Session()
queryset = session.query(self.model)
return queryset
开发者ID:cvandeplas,项目名称:viper,代码行数:26,代码来源:views.py
示例13: index
def index():
query = Event.query() \
.options(subqueryload('actor')) \
.options(subqueryload('user')) \
.options(subqueryload('club')) \
.outerjoin(Event.flight) \
.options(contains_eager(Event.flight)) \
.filter(or_(Event.flight == None, Flight.is_rankable())) \
.order_by(Event.time.desc())
query = _filter_query(query, request.args)
page = request.args.get('page', type=int, default=1)
per_page = request.args.get('per_page', type=int, default=50)
events = query.limit(per_page).offset((page - 1) * per_page).all()
events_count = len(events)
if request.args.get('grouped', True, type=str_to_bool):
events = group_events(events)
template_vars = dict(events=events, types=Event.Type)
if page > 1:
template_vars['prev_page'] = page - 1
if events_count == per_page:
template_vars['next_page'] = page + 1
return render_template('timeline/list.jinja', **template_vars)
开发者ID:TobiasLohner,项目名称:SkyLines,代码行数:29,代码来源:timeline.py
示例14: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(ControlSection, cls).eager_query()
return query.options(
orm.subqueryload('control'),
orm.subqueryload('section'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:control_section.py
示例15: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(UserRole, cls).eager_query()
return query.options(
orm.subqueryload('role'),
orm.subqueryload('person'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:models.py
示例16: _get_db_tracks
def _get_db_tracks(session, chunk):
return (session.query(Track).
options(subqueryload(Track.album),
subqueryload(Track.artist),
subqueryload(Track.lastfm_info)).
filter(Track.filename.in_(chunk)).
all())
开发者ID:thesquelched,项目名称:suggestive,代码行数:7,代码来源:mstat.py
示例17: lookup
def lookup(self, department_id, start_time=None, end_time=None):
"""
Returns a list of all shifts for the given department.
Takes the department id as the first parameter. For a list of all
department ids call the "dept.list" method.
Optionally, takes a "start_time" and "end_time" to constrain the
results to a given date range. Dates may be given in any format
supported by the
<a href="http://dateutil.readthedocs.io/en/stable/parser.html">
dateutil parser</a>, plus the string "now".
Unless otherwise specified, "start_time" and "end_time" are assumed
to be in the local timezone of the event.
"""
with Session() as session:
query = session.query(Job).filter_by(department_id=department_id)
if start_time:
start_time = _parse_datetime(start_time)
query = query.filter(Job.start_time >= start_time)
if end_time:
end_time = _parse_datetime(end_time)
query = query.filter(Job.start_time <= end_time)
query = query.options(
subqueryload(Job.department),
subqueryload(Job.shifts).subqueryload(Shift.attendee))
return [job.to_dict(self.fields) for job in query]
开发者ID:magfest,项目名称:ubersystem,代码行数:28,代码来源:api.py
示例18: stop
def stop(self, stop_id, feed_id="", prefetch_parent=True, prefetch_substops=True):
query = self._session.query(Stop)
if prefetch_parent:
query = query.options(subqueryload('parent_station'))
if prefetch_substops:
query = query.options(subqueryload('sub_stops'))
return query.get((feed_id, stop_id))
开发者ID:pailakka,项目名称:gtfslib-python,代码行数:7,代码来源:dao.py
示例19: group
def group(grp):
db = current_app.config['DB']()
collections = list(queries.collections(db))
group = db.query(tables.Group).get(grp)
if group is None:
abort(404)
query = db.query(tables.Package)
query = query.join(tables.Package.group_packages)
query = query.join(tables.GroupPackage.group)
query = query.join(tables.Package.status_obj)
query = query.filter(tables.Group.ident == grp)
query = query.order_by(-tables.Status.weight)
query = queries.order_by_name(db, query)
query = query.options(subqueryload('collection_packages'))
query = query.options(subqueryload('collection_packages.links'))
packages = list(query)
query = query.filter(tables.GroupPackage.is_seed)
seed_groups = query
return render_template(
'group.html',
breadcrumbs=(
(url_for('hello'), 'Python 3 Porting Database'),
(url_for('group', grp=grp), group.name),
),
collections=collections,
grp=group,
packages=packages,
len_packages=len(packages),
deptree=list(gen_deptree(seed_groups)),
status_counts=get_status_counts(packages),
)
开发者ID:rodrigc,项目名称:portingdb,代码行数:35,代码来源:htmlreport.py
示例20: query_factory
def query_factory():
query = self._session.query(Stop)
if prefetch_parent:
query = query.options(subqueryload('parent_station'))
if prefetch_substops:
query = query.options(subqueryload('sub_stops'))
return query
开发者ID:pailakka,项目名称:gtfslib-python,代码行数:7,代码来源:dao.py
注:本文中的sqlalchemy.orm.subqueryload函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论