本文整理汇总了Python中sqlalchemy.orm.subqueryload_all函数的典型用法代码示例。如果您正苦于以下问题:Python subqueryload_all函数的具体用法?Python subqueryload_all怎么用?Python subqueryload_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了subqueryload_all函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: add_addresses
def add_addresses(session):
""" Add an AddressAssignment record for every PrimaryNameAssociation """
q = session.query(PrimaryNameAssociation)
q = q.join(System, DnsDomain)
q = q.filter(System.ip != None)
q = q.filter(~exists().where(AddressAssignment.ip == System.ip))
q = q.options(contains_eager('dns_record'))
q = q.options(contains_eager('dns_record.dns_domain'))
q = q.options(subqueryload_all('hardware_entity.interfaces.vlans.assignments'))
q = q.options(subqueryload_all('hardware_entity.interfaces._vlan_ids'))
count = 0
pnas = q.all()
for pna in pnas:
hw = pna.hardware_entity
if len(hw.interfaces) != 1:
print "{0} has an unexpected number of interfaces, skipping: " \
"{1}".format(hw, len(hw.interfaces))
continue
iface = hw.interfaces[0]
if len(iface.vlans[0].addresses):
print "{0} already has addresses, skipping".format(iface)
continue
#print "Adding AddressAssignment record for {0:l}".format(hw)
iface.vlans[0].addresses.append(pna.dns_record.ip)
count += 1
session.flush()
print "Added %d AddressAssignment records" % count
开发者ID:jrha,项目名称:aquilon,代码行数:29,代码来源:address_fixes.py
示例2: try_get_projects_and_repository
def try_get_projects_and_repository(args):
"""Given a set of HTTP POST arguments, try and find the appropriate
projects and repository.
Possible inputs:
project
Returns: (A list containing only this project) * its repository
repository
Returns: All active projects for this repo * repo
repository living at key 'repository[phabricator.callsign]'
Returns: All active projects for this repo * repo
"""
if args.project:
repository = Repository.query.get(args.project.repository_id)
return [args.project], repository
elif args.repository:
repository = args.repository
projects = list(
Project.query.options(subqueryload_all("plans")).filter(
Project.status == ProjectStatus.active, Project.repository_id == repository.id
)
)
return projects, repository
elif args["repository[phabricator.callsign]"]:
repository = args["repository[phabricator.callsign]"]
projects = list(
Project.query.options(subqueryload_all("plans")).filter(
Project.status == ProjectStatus.active, Project.repository_id == repository.id
)
)
return projects, repository
else:
return None, None
开发者ID:mr-justin,项目名称:changes,代码行数:35,代码来源:build_index.py
示例3: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(Program, cls).eager_query()
return query.options(
orm.subqueryload_all('program_directives.directive'),
orm.subqueryload('cycles'),
orm.subqueryload_all('program_controls.control'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:8,代码来源:program.py
示例4: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(Objective, cls).eager_query()
return query.options(
orm.subqueryload_all('section_objectives.section'),
orm.subqueryload_all('objective_controls.control'),
orm.subqueryload_all('objective_objects'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:8,代码来源:objective.py
示例5: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(Risk, cls).eager_query()
return query.options(
orm.subqueryload_all('control_risks.control'),
# FIXME: make eager-loading work for categorizations
#orm.subqueryload_all('categorizations.categories'),
orm.subqueryload_all('risk_risky_attributes.risky_attribute'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:9,代码来源:risk.py
示例6: query
def query(self, req):
self._domainelements = DBSession.query(DomainElement).all()
return DBSession.query(Language)\
.order_by(Language.id)\
.options(
subqueryload_all('languageidentifier', 'identifier'),
subqueryload_all('countries'),
joinedload_all(Language.valuesets, ValueSet.values),
joinedload_all(WalsLanguage.genus, Genus.family))
开发者ID:Castroyesid,项目名称:wals3,代码行数:9,代码来源:adapters.py
示例7: get_all_syntheses
def get_all_syntheses(self):
from .idea_graph_view import Synthesis
return self.db.query(
Synthesis).options(
subqueryload_all(
'idea_assocs.idea'),
subqueryload_all(
'idealink_assocs.idea_link'),
subqueryload_all(
Synthesis.published_in_post)).filter(
Synthesis.discussion_id == self.id).all()
开发者ID:mydigilife,项目名称:assembl,代码行数:11,代码来源:discussion.py
示例8: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(Directive, cls).eager_query()
return query.options(
orm.joinedload('audit_frequency'),
orm.joinedload('audit_duration'),
orm.subqueryload('controls'),
orm.subqueryload_all('program_directives.program'),
orm.subqueryload_all('directive_controls'),
orm.subqueryload('sections'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:11,代码来源:directive.py
示例9: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(System, cls).eager_query()
return query.options(
orm.joinedload('type'),
orm.joinedload('network_zone'),
orm.subqueryload('responses'),
orm.subqueryload_all('system_controls.control'),
orm.subqueryload_all('sub_system_systems.child'),
orm.subqueryload_all('super_system_systems.parent'),
orm.subqueryload('transactions'))
开发者ID:alaeddine10,项目名称:ggrc-core,代码行数:12,代码来源:system.py
示例10: eager_nodes_handlers
def eager_nodes_handlers(cls, iterable):
"""Eager load objects instances that is used in nodes handler.
:param iterable: iterable (SQLAlchemy query)
:returns: iterable (SQLAlchemy query)
"""
options = (
joinedload('cluster'),
subqueryload_all('nic_interfaces.assigned_networks_list'),
subqueryload_all('bond_interfaces.assigned_networks_list'),
subqueryload_all('ip_addrs.network_data')
)
return cls.eager_base(iterable, options)
开发者ID:gdyuldin,项目名称:fuel-web,代码行数:13,代码来源:node.py
示例11: get
def get(self, job_id):
job = Job.query.options(
subqueryload_all(Job.phases),
joinedload('project', innerjoin=True),
).get(job_id)
if job is None:
return '', 404
previous_runs = Job.query.filter(
Job.project == job.project,
Job.date_created < job.date_created,
Job.status == Status.finished,
Job.id != job.id,
).order_by(Job.date_created.desc())[:NUM_PREVIOUS_RUNS]
# find all parent groups (root trees)
test_groups = sorted(TestGroup.query.filter(
TestGroup.job_id == job.id,
TestGroup.parent_id == None, # NOQA: we have to use == here
), key=lambda x: x.name)
test_failures = TestGroup.query.options(
joinedload('parent'),
).filter(
TestGroup.job_id == job.id,
TestGroup.result == Result.failed,
TestGroup.num_leaves == 0,
).order_by(TestGroup.name.asc())
num_test_failures = test_failures.count()
test_failures = test_failures[:25]
if test_failures:
failure_origins = find_failure_origins(
job, test_failures)
for test_failure in test_failures:
test_failure.origin = failure_origins.get(test_failure)
extended_serializers = {
TestGroup: TestGroupWithOriginSerializer(),
}
log_sources = list(LogSource.query.options(
joinedload('step'),
).filter(
LogSource.job_id == job.id,
).order_by(LogSource.date_created.asc()))
context = self.serialize(job)
context.update({
'phases': job.phases,
'testFailures': {
'total': num_test_failures,
'testGroups': self.serialize(test_failures, extended_serializers),
},
'logs': log_sources,
'testGroups': test_groups,
'previousRuns': previous_runs,
})
return self.respond(context)
开发者ID:ghotiv,项目名称:changes,代码行数:60,代码来源:job_details.py
示例12: get_failure_stats_for_project
def get_failure_stats_for_project(self, project, start_period, end_period):
stats = {
'Test Failures': 0,
'Missing Tests': 0,
}
# TODO(dcramer): we should embed this logic into the job/build results
failing_builds = Build.query.join(
Source, Source.id == Build.source_id,
).filter(
Source.patch_id == None, # NOQA
Build.project_id == project.id,
Build.status == Status.finished,
Build.result == Result.failed,
Build.date_created >= start_period,
Build.date_created < end_period,
).options(
subqueryload_all('stats'),
)
for build in failing_builds:
build_stats = dict((s.name, s.value) for s in build.stats)
if build_stats.get('test_failures', 0):
stats['Test Failures'] += 1
if build_stats.get('tests_missing', 0):
stats['Missing Tests'] += 1
return stats
开发者ID:zbyufei,项目名称:changes,代码行数:25,代码来源:build.py
示例13: get_people
def get_people(filters, page_size=25):
q = Person.query
q = add_person_leaderboard_filters(q)
q = q.options(
orm.subqueryload_all(
Person.contributions,
Contribution.package
)
)
for k, v in filters.iteritems():
if k == "tags":
pass # don't do anything for these for now for people
else:
if k == "host":
k = "main_language"
v = make_language(v)
attr = getattr(Person, k)
q = q.filter(attr==v)
total_count = q.count()
q = q.filter(Person.impact != None)
q = q.order_by(Person.impact.desc())
q = q.limit(page_size)
objects = q.all()
return (total_count, objects)
开发者ID:cfirmo33,项目名称:depsy,代码行数:27,代码来源:package_jobs.py
示例14: create_job
def create_job(job_id):
job = Job.query.get(job_id)
if not job:
return
job_plan = JobPlan.query.options(
subqueryload_all('plan.steps')
).filter(
JobPlan.job_id == job.id,
).join(Plan).first()
try:
if not job_plan:
raise UnrecoverableException('Got create_job task without job plan: %s' % (job_id,))
try:
step = job_plan.plan.steps[0]
except IndexError:
raise UnrecoverableException('Missing steps for plan')
implementation = step.get_implementation()
implementation.execute(job=job)
except UnrecoverableException:
job.status = Status.finished
job.result = Result.aborted
current_app.logger.exception('Unrecoverable exception creating %s', job_id)
return
sync_job.delay(
job_id=job.id.hex,
task_id=job.id.hex,
parent_task_id=job.build_id.hex,
)
开发者ID:davej,项目名称:changes,代码行数:33,代码来源:create_job.py
示例15: go
def go():
eq_(sess.query(Company)
.options(subqueryload_all(
Company.employees.of_type(Engineer),
Engineer.machines))
.all(),
expected)
开发者ID:afeide,项目名称:LuoYunCloud,代码行数:7,代码来源:test_polymorphic_rel.py
示例16: contest_move_index
def contest_move_index(context, request):
"""An alternate move index, displaying contest data instead of battle
data.
"""
supercategories = (
db.DBSession.query(db.ContestSupercategory)
.order_by(db.ContestSupercategory.id)
.options(subqueryload_all('categories.moves'))
.all()
)
pure_points_moves = (
db.DBSession.query(db.Move)
.join(db.ContestCategory)
.filter(db.ContestCategory.identifier == 'pure-points')
.order_by(db.Move.appeal.desc(), db.Move.jam, db.Move.name)
.all()
)
pure_points_moves = itertools.groupby(pure_points_moves,
lambda move: (move.appeal, move.jam))
return {'supercategories': supercategories,
'pure_points_moves': pure_points_moves}
开发者ID:Zhorken,项目名称:tcod-asb,代码行数:25,代码来源:move.py
示例17: get
def get(cls, id):
"""Get the CirculationObject associated with the given id."""
obj = cls.query.options(subqueryload_all('*')).get(id)
if obj is None:
msg = "A {0} object with id {1} doesn't exist"
raise Exception(msg.format(cls.__name__, id))
data = jsonpickle.decode(obj._data)
if hasattr(cls, '_construction_schema'):
for key, func in cls._construction_schema.items():
try:
obj.__setattr__(key, func(data))
except AttributeError:
pass
# Getting data for other modules
from invenio_circulation.views.utils import (
send_signal, flatten)
from invenio_circulation.signals import get_entity
construction_data = flatten(send_signal(get_entity,
cls.__name__, None))
if construction_data:
for key in construction_data:
try:
obj.__setattr__(key, data[key])
except KeyError:
pass
return obj
开发者ID:mvesper,项目名称:invenio-circulation,代码行数:30,代码来源:models.py
示例18: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(Section, cls).eager_query()
return query.options(
orm.joinedload('directive'),
orm.subqueryload_all('control_sections.control'))
开发者ID:FlowGR,项目名称:ggrc-core,代码行数:7,代码来源:section.py
示例19: _lookup_asset
def _lookup_asset(self, asset_id):
session = Session()
asset = session.query(Asset)\
.filter(Asset.id == asset_id)\
.options(subqueryload_all(Asset.gateway_assets, GatewayAsset.gateway))\
.first()
session.close()
return asset
开发者ID:tombnorwood,项目名称:feedserver,代码行数:8,代码来源:server.py
示例20: get
def get(self, job_id):
job = Job.query.options(
subqueryload_all(Job.phases),
joinedload('project', innerjoin=True),
).get(job_id)
if job is None:
return '', 404
phase_list = list(JobPhase.query.options(
subqueryload_all(JobPhase.steps, JobStep.node),
).filter(
JobPhase.job_id == job.id,
).order_by(JobPhase.date_started.asc(), JobPhase.date_created.asc()))
return self.respond(self.serialize(phase_list, {
JobPhase: JobPhaseWithStepsSerializer(),
}))
开发者ID:Aaron1011,项目名称:changes,代码行数:17,代码来源:jobphase_index.py
注:本文中的sqlalchemy.orm.subqueryload_all函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论