本文整理汇总了Python中sqlalchemy.orm.lazyload函数的典型用法代码示例。如果您正苦于以下问题:Python lazyload函数的具体用法?Python lazyload怎么用?Python lazyload使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了lazyload函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_recipient_users
def get_recipient_users(self):
groups = []
if self.daily_schedule_subscribers.data:
log.info("Email recipients includes daily schedule subscribers")
groups.append(User.query
.options(
lazyload(User.organisation),
lazyload(User.committee_alerts),
)\
.filter(User.subscribe_daily_schedule == True)
.filter(User.confirmed_at != None)
.all())
if self.committee_ids.data:
log.info("Email recipients includes subscribers for these committees: %s" % self.committee_ids.data)
user_ids = db.session\
.query(distinct(user_committee_alerts.c.user_id))\
.filter(user_committee_alerts.c.committee_id.in_(self.committee_ids.data))\
.all()
user_ids = [u[0] for u in user_ids]
groups.append(User.query
.options(
lazyload(User.organisation),
lazyload(User.committee_alerts),
)\
.filter(User.id.in_(user_ids))
.filter(User.confirmed_at != None)
.all())
return set(u for u in chain(*groups))
开发者ID:Code4SA,项目名称:pmg-cms-2,代码行数:32,代码来源:email_alerts.py
示例2: stockcheck
def stockcheck(request, info, session):
buylist = []
depts = session.query(Department).order_by(Department.id).all()
if request.method == "POST":
form = StockCheckForm(depts, request.POST)
if form.is_valid():
cd = form.cleaned_data
ahead = datetime.timedelta(days=cd["weeks_ahead"] * 7)
behind = datetime.timedelta(days=cd["months_behind"] * 30.4)
min_sale = cd["minimum_sold"]
dept = int(cd["department"])
q = (
session.query(StockType, func.sum(StockOut.qty) / behind.days)
.join(StockItem)
.join(StockOut)
.options(lazyload(StockType.department))
.options(lazyload(StockType.unit))
.options(undefer(StockType.instock))
.filter(StockOut.removecode_id == "sold")
.filter((func.now() - StockOut.time) < behind)
.filter(StockType.dept_id == dept)
.having(func.sum(StockOut.qty) / behind.days > min_sale)
.group_by(StockType)
)
r = q.all()
buylist = [(st, "{:0.1f}".format(sold), "{:0.1f}".format(sold * ahead.days - st.instock)) for st, sold in r]
buylist.sort(key=lambda l: float(l[2]), reverse=True)
else:
form = StockCheckForm(depts)
return ("stockcheck.html", {"form": form, "buylist": buylist})
开发者ID:sde1000,项目名称:quicktill,代码行数:30,代码来源:views.py
示例3: test_state_noload_to_lazy
def test_state_noload_to_lazy(self):
"""Behavioral test to verify the current activity of loader callables."""
users, Address, addresses, User = (
self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User,
)
mapper(User, users, properties={"addresses": relationship(Address, lazy="noload")})
mapper(Address, addresses)
sess = create_session()
u1 = sess.query(User).options(lazyload(User.addresses)).first()
assert isinstance(attributes.instance_state(u1).callables["addresses"], strategies.LoadLazyAttribute)
# expire, it stays
sess.expire(u1)
assert isinstance(attributes.instance_state(u1).callables["addresses"], strategies.LoadLazyAttribute)
# load over it. callable goes away.
sess.query(User).first()
assert "addresses" not in attributes.instance_state(u1).callables
sess.expunge_all()
u1 = sess.query(User).options(lazyload(User.addresses)).first()
sess.expire(u1, ["addresses"])
assert isinstance(attributes.instance_state(u1).callables["addresses"], strategies.LoadLazyAttribute)
# load the attr, goes away
u1.addresses
assert "addresses" not in attributes.instance_state(u1).callables
开发者ID:onetera,项目名称:scandatatransfer,代码行数:32,代码来源:test_expire.py
示例4: _finish_print_pricelist
def _finish_print_pricelist(dept_id, include_all):
# We want all items currently in stock, restricted by department if
# dept_id is not None
l = td.s.query(StockType)\
.select_from(StockItem)\
.filter(StockItem.finished == None)\
.join(StockType)\
.options(lazyload(StockType.department))\
.options(lazyload(StockType.unit))\
.group_by(StockType)\
.order_by(StockType.dept_id, StockType.manufacturer, StockType.name)
if dept_id:
l = l.filter(StockType.dept_id == dept_id)
if not include_all:
l = l.filter(StockItem.stocklineid != None)
l = l.all()
with printer.driver as d:
d.printline("\t{}".format(tillconfig.pubname), emph=1)
d.printline()
d.printline("\tPrice List", colour=1)
d.printline()
current_dept = None
for st in l:
if st.department != current_dept:
if current_dept is not None:
d.printline()
current_dept = st.department
d.printline(current_dept.description, emph=1)
d.printline("{}\t\t{}{}".format(
st.descriptions[0], tillconfig.currency, st.pricestr))
d.printline()
d.printline("\tEnd of list")
开发者ID:sde1000,项目名称:quicktill,代码行数:33,代码来源:managestock.py
示例5: enter
def enter(self):
if self.wfield.f == '' or self.mfield.f == '' or self.minfield.f == '':
ui.infopopup(["You must fill in all three fields."], title="Error")
return
weeks_ahead = int(self.wfield.f)
months_behind = int(self.mfield.f)
min_sale = float(self.minfield.f)
ahead = datetime.timedelta(days=weeks_ahead * 7)
behind = datetime.timedelta(days=months_behind * 30.4)
dept = self.deptfield.read()
self.dismiss()
q = td.s.query(StockType, func.sum(StockOut.qty) / behind.days)\
.join(StockItem)\
.join(StockOut)\
.options(lazyload(StockType.department))\
.options(lazyload(StockType.unit))\
.options(undefer(StockType.instock))\
.filter(StockOut.removecode_id == 'sold')\
.filter((func.now() - StockOut.time) < behind)\
.having(func.sum(StockOut.qty) / behind.days > min_sale)\
.group_by(StockType)
if dept:
q = q.filter(StockType.dept_id == dept.id)
r = q.all()
f = ui.tableformatter(' l r r r ')
lines = [f(st.format(), '{:0.1f}'.format(sold), st.instock,
'{:0.1f}'.format(sold * ahead.days - st.instock))
for st, sold in r]
lines.sort(key=lambda l: float(l.fields[3]), reverse=True)
header = [f('Name', 'Sold per day', 'In stock', 'Buy')]
ui.listpopup(lines, header=header,
title="Stock to buy for next {} weeks".format(weeks_ahead),
colour=ui.colour_info, show_cursor=False,
dismiss=keyboard.K_CASH)
开发者ID:sde1000,项目名称:quicktill,代码行数:34,代码来源:managestock.py
示例6: test_default_forms
def test_default_forms(session):
"""Check that each pokemon has one default form and each species has one
default pokemon."""
q = session.query(tables.Pokemon)
q = q.join(tables.PokemonForm)
q = q.filter(tables.PokemonForm.is_default==True)
q = q.options(lazyload('*'))
q = q.group_by(tables.Pokemon)
q = q.add_columns(func.count(tables.PokemonForm.id))
for pokemon, num_default_forms in q:
if num_default_forms == 0:
pytest.fail("pokemon %s has no default forms" % pokemon.name)
elif num_default_forms > 1:
pytest.fail("pokemon %s has %d default forms" % (pokemon.name, num_default_forms))
q = session.query(tables.PokemonSpecies)
q = q.join(tables.Pokemon)
q = q.filter(tables.Pokemon.is_default==True)
q = q.options(lazyload('*'))
q = q.group_by(tables.PokemonSpecies)
q = q.add_columns(func.count(tables.Pokemon.id))
for species, num_default_pokemon in q:
if num_default_pokemon == 0:
pytest.fail("species %s has no default pokemon" % species.name)
elif num_default_pokemon > 1:
pytest.fail("species %s has %d default pokemon" % (species.name, num_default_pokemon))
开发者ID:DragoonBoots,项目名称:pokedex,代码行数:29,代码来源:test_database_sanity.py
示例7: page
def page(cls, _db, title, url, annotator=None,
use_materialized_works=True):
"""Create a feed of content to preload on devices."""
configured_content = Configuration.policy(Configuration.PRELOADED_CONTENT)
identifiers = [Identifier.parse_urn(_db, urn)[0] for urn in configured_content]
identifier_ids = [identifier.id for identifier in identifiers]
if use_materialized_works:
from core.model import MaterializedWork
q = _db.query(MaterializedWork)
q = q.filter(MaterializedWork.primary_identifier_id.in_(identifier_ids))
# Avoid eager loading of objects that are contained in the
# materialized view.
q = q.options(
lazyload(MaterializedWork.license_pool, LicensePool.data_source),
lazyload(MaterializedWork.license_pool, LicensePool.identifier),
lazyload(MaterializedWork.license_pool, LicensePool.edition),
)
else:
q = _db.query(Work).join(Work.primary_edition)
q = q.filter(Edition.primary_identifier_id.in_(identifier_ids))
works = q.all()
feed = cls(_db, title, url, works, annotator)
annotator.annotate_feed(feed, None)
content = unicode(feed)
return content
开发者ID:datalogics-tsmith,项目名称:circulation,代码行数:31,代码来源:opds.py
示例8: render
def render(self, session, **arguments):
q = session.query(Chassis)
q = q.options(subqueryload('model'),
joinedload('model.machine_specs'),
subqueryload('location'),
joinedload('slots'),
subqueryload('slots.machine'),
# A rare case when we don't need primary name/host
lazyload('slots.machine.primary_name'),
lazyload('slots.machine.host'),
subqueryload('interfaces'),
joinedload('interfaces.assignments'),
joinedload('interfaces.assignments.network'),
joinedload('interfaces.assignments.dns_records'))
# Prefer the primary name for ordering
q = q.outerjoin(DnsRecord, (Fqdn, DnsRecord.fqdn_id == Fqdn.id),
DnsDomain)
q = q.options(contains_eager('primary_name'),
contains_eager('primary_name.fqdn'),
contains_eager('primary_name.fqdn.dns_domain'))
q = q.order_by(Fqdn.name, DnsDomain.name, Chassis.label)
return q.all()
开发者ID:jrha,项目名称:aquilon,代码行数:26,代码来源:show_chassis_all.py
示例9: users_iterate
def users_iterate(self, include_unsubscribed=False):
if include_unsubscribed:
for user in self.session.query(User).options(lazyload('*')).yield_per(10):
yield user
else:
for user in self.session.query(User).options(lazyload('*')).filter(User.unsubscribed == False).yield_per(10):
yield user
开发者ID:syncloud,项目名称:redirect,代码行数:7,代码来源:storage.py
示例10: test_lazyload
def test_lazyload(self):
self.assertEqual(
str(self.db.query(Foo).lazyload('bars')),
str(self.db.query(Foo).options(orm.lazyload('bars')))
)
self.assertEqual(
str(self.db.query(Foo).lazyload('bars', 'bazs')),
str((self.db.query(Foo)
.options(orm.lazyload('bars').lazyload('bazs'))))
)
self.assertEqual(
str(self.db.query(Foo).lazyload(Foo.bars)),
str(self.db.query(Foo).options(orm.lazyload(Foo.bars)))
)
self.assertEqual(
str(self.db.query(Foo).lazyload(Foo.bars, Bar.bazs)),
str((self.db.query(Foo)
.options(orm.lazyload(Foo.bars).lazyload(Bar.bazs))))
)
self.assertEqual(
str((self.db.query(Foo)
.lazyload('bars', options=[LoadOption('lazyload', 'bazs')]))),
str((self.db.query(Foo)
.options(orm.lazyload('bars').lazyload('bazs'))))
)
开发者ID:LeoKudrik,项目名称:alchy,代码行数:29,代码来源:test_query.py
示例11: render
def render(self, session, cluster, **arguments):
q = session.query(self.query_class)
vm_q = session.query(VirtualMachine)
vm_q = vm_q.join(ClusterResource, Cluster)
q = q.filter_by(name=cluster)
vm_q = vm_q.filter_by(name=cluster)
vm_q = vm_q.options(joinedload('machine'),
joinedload('machine.primary_name'),
joinedload('machine.primary_name.fqdn'),
lazyload('machine.host'))
q = q.options(subqueryload('_hosts'),
lazyload('_hosts.cluster'),
joinedload('_hosts.host'),
joinedload('_hosts.host.hardware_entity'),
subqueryload('_metacluster'),
joinedload('_metacluster.metacluster'),
joinedload('resholder'),
subqueryload('resholder.resources'),
subqueryload('service_bindings'),
subqueryload('allowed_personalities'))
q = q.order_by(self.query_class.name)
dbclusters = q.all()
if cluster and not dbclusters:
raise NotFoundException("Cluster %s not found." % cluster)
# Manual eager-loading of VM resources. All the code does is making sure
# the data is pinned in the session's cache
machines = {}
for vm in vm_q:
machines[vm.machine.machine_id] = vm
return dbclusters
开发者ID:piojo,项目名称:aquilon,代码行数:35,代码来源:show_cluster_cluster.py
示例12: test_invocation_systemwide_loaders
def test_invocation_systemwide_loaders(self):
baked.bake_lazy_loaders()
try:
User, Address = self._o2m_fixture()
sess = Session()
q = sess.query(User).options(lazyload(User.addresses))
with mock.patch.object(BakedLazyLoader, "_emit_lazyload") as el:
u1 = q.first()
u1.addresses
# invoked
is_(
el.mock_calls[0][1][1],
u1._sa_instance_state
)
finally:
baked.unbake_lazy_loaders()
clear_mappers()
User, Address = self._o2m_fixture()
sess = Session()
q = sess.query(User).options(lazyload(User.addresses))
with mock.patch.object(BakedLazyLoader, "_emit_lazyload") as el:
u1 = q.first()
u1.addresses
# not invoked
eq_(el.mock_calls, [])
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:28,代码来源:test_baked.py
示例13: get_query
def get_query(self):
return Document.query\
.options(
joinedload(Document.sources),
joinedload(Document.fairness),
joinedload(Document.medium),
lazyload('sources.person'),
lazyload('sources.unnamed_gender'),
lazyload('sources.unnamed_race'))
开发者ID:gabelula,项目名称:mma-dexter,代码行数:9,代码来源:bias.py
示例14: minister_questions_combined
def minister_questions_combined():
"""
Mixture of old QuestionReplies and new CommitteeQuestion objects
folded together in date order to support pagination.
"""
filters = get_filters()
# To make pagination possible, we grab a combined list of IDs,
# paginate that list, and then fetch the details.
# get a combined list of IDs
q1 = db.session.query(CommitteeQuestion.id, CommitteeQuestion.date.label("date"), literal_column("'cq'").label("type"))
for f in filters:
q1 = q1.filter_by(**f)
q2 = db.session.query(QuestionReply.id, QuestionReply.start_date.label("date"), literal_column("'qr'").label("type"))
for f in filters:
q2 = q2.filter_by(**f)
query = q1.union_all(q2).order_by(desc("date"))
query, count, next = paginate_request_query(query)
# pull out the IDs we want
cq_ids = [c[0] for c in query if c[2] == 'cq']
qr_ids = [c[0] for c in query if c[2] == 'qr']
# get committee questions
query = CommitteeQuestion.list()\
.filter(CommitteeQuestion.id.in_(cq_ids))\
.order_by(CommitteeQuestion.date.desc())\
.options(
lazyload('committee'),
lazyload('minister'),
joinedload('asked_by_member'),
lazyload('asked_by_member.memberships'))
for f in filters:
query = query.filter_by(**f)
objects = query.all()
# get question reply objects
query = QuestionReply.list()\
.filter(QuestionReply.id.in_(qr_ids))\
.order_by(QuestionReply.start_date.desc())\
.options(
lazyload('committee'),
lazyload('minister'))
for f in filters:
query = query.filter_by(**f)
# mash them together
objects.extend(query.all())
# sort
objects.sort(key=lambda x: getattr(x, 'date', getattr(x, 'start_date', None)), reverse=True)
out = serializers.queryset_to_json(objects, count=count, next=next)
return send_api_response(out)
开发者ID:kollinsayz,项目名称:pmg-cms-2,代码行数:56,代码来源:api.py
示例15: render
def render(self, session, hostname, machine, cpuname, cpuvendor, cpuspeed,
cpucount, memory, cluster, share, fullinfo, style, **arguments):
if fullinfo or style != 'raw':
q = search_hardware_entity_query(session, Machine, **arguments)
else:
q = search_hardware_entity_query(session, Machine.label, **arguments)
if machine:
q = q.filter_by(label=machine)
if hostname:
dns_rec = DnsRecord.get_unique(session, fqdn=hostname, compel=True)
q = q.filter(Machine.primary_name_id == dns_rec.id)
if cpuname or cpuvendor or cpuspeed is not None:
subq = Cpu.get_matching_query(session, name=cpuname,
vendor=cpuvendor, speed=cpuspeed,
compel=True)
q = q.filter(Machine.cpu_id.in_(subq))
if cpucount is not None:
q = q.filter_by(cpu_quantity=cpucount)
if memory is not None:
q = q.filter_by(memory=memory)
if cluster:
dbcluster = Cluster.get_unique(session, cluster, compel=True)
if isinstance(dbcluster, MetaCluster):
q = q.join('vm_container', ClusterResource, Cluster)
q = q.filter_by(metacluster=dbcluster)
else:
q = q.join('vm_container', ClusterResource)
q = q.filter_by(cluster=dbcluster)
q = q.reset_joinpoint()
if share:
v2shares = session.query(Share.id).filter_by(name=share)
if not v2shares.count():
raise NotFoundException("No shares found with name {0}."
.format(share))
NasAlias = aliased(VirtualNasDisk)
q = q.join('disks', (NasAlias, NasAlias.id == Disk.id))
q = q.filter(NasAlias.share_id.in_(v2shares.subquery()))
q = q.reset_joinpoint()
if fullinfo or style != "raw":
q = q.options(joinedload('location'),
subqueryload('interfaces'),
lazyload('interfaces.hardware_entity'),
joinedload('interfaces.assignments'),
joinedload('interfaces.assignments.dns_records'),
joinedload('chassis_slot'),
subqueryload('chassis_slot.chassis'),
subqueryload('disks'),
subqueryload('host'),
lazyload('host.hardware_entity'),
subqueryload('host.services_used'),
subqueryload('host._cluster'),
lazyload('host._cluster.host'))
return q.all()
return StringAttributeList(q.all(), "label")
开发者ID:piojo,项目名称:aquilon,代码行数:56,代码来源:search_machine.py
示例16: render
def render(self, session, **arguments):
q = session.query(DnsDomain)
q = q.options(undefer('comments'),
subqueryload('dns_maps'),
lazyload('dns_maps.dns_domain'),
subqueryload('_ns_records'),
lazyload('_ns_records.dns_domain'),
joinedload('_ns_records.a_record.fqdn'),
joinedload('_ns_records.a_record.fqdn.dns_domain'))
return q.all()
开发者ID:piojo,项目名称:aquilon,代码行数:10,代码来源:show_dns_domain_all.py
示例17: edit_subscription_route
def edit_subscription_route():
form = SubscriptionForm()
if form.validate_on_submit():
subscription = Subscription.query \
.filter(Subscription.id == request.form.get('subscriptionid')) \
.options(lazyload('user')) \
.options(lazyload('event')).one()
subscription.comment = request.form.get('comment')
subscription.commitment = request.form.get('commitment')
db.session.commit()
flash('Subscription updated')
return redirect(url_for('event_route', id=subscription.event_id))
开发者ID:alexd2580,项目名称:evelyn,代码行数:14,代码来源:views.py
示例18: minister_questions
def minister_questions(minister_id):
"""
Questions asked to a minister
"""
# don't eager load duplicate committee details
query = CommitteeQuestion.list()\
.filter(CommitteeQuestion.minister_id == minister_id)\
.order_by(CommitteeQuestion.date.desc())\
.options(
lazyload('committee'),
lazyload('minister'),
joinedload('asked_by_member'),
lazyload('asked_by_member.memberships'))
return api_resource_list(query)
开发者ID:kollinsayz,项目名称:pmg-cms-2,代码行数:15,代码来源:api.py
示例19: _job_get_next_by_action
def _job_get_next_by_action(session, now, action):
# Round off 'now' to minute precision to allow the SQL query cache to
# do more work
# Testing showed that lazyload is apparently fastest in our specific
# case since we only fetch a single job here and there's only one
# child table, hence only two simple queries vs. subqueryload which
# issues as second more complex query or joinedload which issues
# a single more complex join
now_round_off = now.replace(second=0, microsecond=0)
statuses = ['DONE', 'CANCELLED', 'HARD_TIMED_OUT', 'MAX_RETRIED']
job_ref = session.query(models.Job)\
.options(sa_orm.lazyload('job_metadata'))\
.filter_by(action=action)\
.filter(~models.Job.status.in_(statuses))\
.filter(sa_sql.or_(models.Job.worker_id.is_(None),
models.Job.timeout <= now_round_off))\
.order_by(models.Job.updated_at.asc())\
.first()
# Force loading of the job_metadata
if job_ref is not None:
m = job_ref['job_metadata']
LOG.info(_('Job Metatdata forcefully loaded: %s' % m))
return job_ref
开发者ID:coreywright,项目名称:qonos,代码行数:26,代码来源:api.py
示例20: test_invocation_per_mapper
def test_invocation_per_mapper(self):
"""test that BakedLazyLoader is getting invoked with the
"baked_select" lazy setting.
"""
User, Address = self._o2m_fixture(lazy="baked_select")
sess = Session()
q = sess.query(User).options(lazyload(User.addresses))
with mock.patch.object(BakedLazyLoader, "_emit_lazyload") as el:
u1 = q.first()
u1.addresses
# not invoked
eq_(el.mock_calls, [])
sess = Session()
q = sess.query(User)
with mock.patch.object(BakedLazyLoader, "_emit_lazyload") as el:
u1 = q.first()
u1.addresses
# invoked
is_(
el.mock_calls[0][1][1],
u1._sa_instance_state
)
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:26,代码来源:test_baked.py
注:本文中的sqlalchemy.orm.lazyload函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论