本文整理汇总了Python中sqlalchemy.orm.contains_eager函数的典型用法代码示例。如果您正苦于以下问题:Python contains_eager函数的具体用法?Python contains_eager怎么用?Python contains_eager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了contains_eager函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get
def get(self, course_id):
course = Courses.query.get_or_404(course_id)
require(READ, course)
# Get all questions for this course, default order is most recent first
post = Posts(courses_id=course_id)
question = PostsForQuestions(post=post)
base_query = PostsForQuestions.query. \
options(joinedload("criteria").joinedload("criterion")). \
options(joinedload("selfevaltype")). \
options(undefer_group('counts')). \
join(Posts). \
options(contains_eager('post').joinedload("user").joinedload('usertypeforsystem')). \
options(contains_eager('post').joinedload("files")). \
filter(Posts.courses_id == course_id). \
order_by(desc(Posts.created))
if allow(MANAGE, question):
questions = base_query.all()
else:
now = datetime.datetime.utcnow()
questions = base_query. \
filter(or_(PostsForQuestions.answer_start.is_(None), now >= PostsForQuestions.answer_start)).\
all()
restrict_users = not allow(MANAGE, question)
on_question_list_get.send(
self,
event_name=on_question_list_get.name,
user=current_user,
course_id=course_id)
return {
"questions": marshal(questions, dataformat.get_posts_for_questions(restrict_users, include_answers=False))
}
开发者ID:gitter-badger,项目名称:acj-versus,代码行数:34,代码来源:question.py
示例2: _list
def _list():
query = (
Notification.query(recipient_id=request.user_id)
.join("event")
.options(contains_eager("event"))
.options(subqueryload("event.actor"))
.outerjoin(Event.flight)
.options(contains_eager("event.flight"))
.filter(or_(Event.flight == None, Flight.is_rankable()))
.order_by(Event.time.desc())
)
query = _filter_query(query, request.args)
page = request.args.get("page", type=int, default=1)
per_page = request.args.get("per_page", type=int, default=50)
query = query.limit(per_page)
query = query.offset((page - 1) * per_page)
def get_event(notification):
event = notification.event
event.unread = notification.time_read is None
return event
events = list(convert_event(get_event(notification)) for notification in query)
return jsonify(events=events)
开发者ID:skylines-project,项目名称:skylines,代码行数:28,代码来源:notifications.py
示例3: export
def export(request):
"""Handle exporting a user's bookmarks to file"""
rdict = request.matchdict
username = rdict.get("username")
if request.user is not None:
current_user = request.user.username
else:
current_user = None
bmark_list = (
Bmark.query.join(Bmark.tags)
.options(contains_eager(Bmark.tags))
.join(Bmark.hashed)
.options(contains_eager(Bmark.hashed))
.filter(Bmark.username == username)
.all()
)
BmarkLog.export(username, current_user)
request.response_content_type = "text/html"
headers = [("Content-Disposition", 'attachment; filename="bookie_export.html"')]
setattr(request, "response_headerlist", headers)
return {"bmark_list": bmark_list}
开发者ID:aldeka,项目名称:Bookie,代码行数:27,代码来源:utils.py
示例4: index
def index():
if 'application/json' not in request.headers.get('Accept', ''):
return render_template('ember-page.jinja', active_page='notifications')
query = Notification.query(recipient=g.current_user) \
.join('event') \
.options(contains_eager('event')) \
.options(subqueryload('event.actor')) \
.outerjoin(Event.flight) \
.options(contains_eager('event.flight')) \
.filter(or_(Event.flight == None, Flight.is_rankable())) \
.order_by(Event.time.desc())
query = _filter_query(query, request.args)
page = request.args.get('page', type=int, default=1)
per_page = request.args.get('per_page', type=int, default=50)
query = query.limit(per_page)
query = query.offset((page - 1) * per_page)
def get_event(notification):
event = notification.event
event.unread = (notification.time_read is None)
return event
events = map(get_event, query)
return jsonify(events=(map(convert_event, events)))
开发者ID:kerel-fs,项目名称:skylines,代码行数:29,代码来源:notifications.py
示例5: add_interfaces
def add_interfaces(session):
""" Add a default interface for all HW that has an IP """
q = session.query(HardwareEntity)
q = q.filter(~exists().where(Interface.hardware_entity_id == HardwareEntity.id))
q = q.outerjoin(PrimaryNameAssociation, System, DnsDomain)
q = q.options(contains_eager('_primary_name_asc'))
q = q.options(contains_eager('_primary_name_asc.dns_record'))
q = q.options(contains_eager('_primary_name_asc.dns_record.dns_domain'))
q = q.filter(System.ip != None)
hws = q.all()
count = 0
for hw in hws:
if hw.hardware_type == "machine":
interface = "eth0"
itype = "public"
elif hw.hardware_type == "switch":
interface = "xge"
itype = "oa"
else:
interface = "oa"
itype = "oa"
#print "Adding default interface for {0:l}".format(hw)
dbinterface = Interface(hardware_entity=hw, name=interface,
interface_type="oa",
comments="Created automatically by upgrade script")
session.add(dbinterface)
count += 1
session.flush()
print "Added %d interfaces" % count
开发者ID:jrha,项目名称:aquilon,代码行数:33,代码来源:address_fixes.py
示例6: render
def render(self, session, **arguments):
q = session.query(Switch)
q = q.options(subqueryload('location'),
subqueryload('interfaces'),
joinedload('interfaces.assignments'),
joinedload('interfaces.assignments.dns_records'),
joinedload('interfaces.assignments.network'),
subqueryload('observed_macs'),
undefer('observed_macs.creation_date'),
subqueryload('observed_vlans'),
undefer('observed_vlans.creation_date'),
joinedload('observed_vlans.network'),
subqueryload('model'),
# Switches don't have machine specs, but the formatter
# checks for their existence anyway
joinedload('model.machine_specs'))
# Prefer the primary name for ordering
q = q.outerjoin(DnsRecord, (Fqdn, DnsRecord.fqdn_id == Fqdn.id),
DnsDomain)
q = q.options(contains_eager('primary_name'),
contains_eager('primary_name.fqdn'),
contains_eager('primary_name.fqdn.dns_domain'))
q = q.reset_joinpoint()
q = q.order_by(Fqdn.name, DnsDomain.name, Switch.label)
return q.all()
开发者ID:jrha,项目名称:aquilon,代码行数:28,代码来源:show_switch_all.py
示例7: LoadInfo
def LoadInfo():
d = {}
d['localtime'] = localtime
d['site_url'] = config.site_url
d['site_generator'] = config.site_generator
d['site_name'] = config.site_name;
d['site_subname'] = config.site_subname;
d['site_title'] = config.site_name + ' - ' + config.site_subname;
d['site_keywords'] = config.site_keywords;
d['site_description'] = config.site_description;
d['myhtml'] = myhtml;
#get latest items
session = Session()
try:
d['catlist'] = session.query(ItemCat).\
order_by(ItemCat.orders.asc()).\
all()
d['newitemlist'] = session.query(Item).\
join(Item.itemcat).\
filter(Item.pubdate < datetime.datetime.utcnow() ).\
filter(Item.ispass == True).\
options(contains_eager(Item.itemcat)).\
order_by(Item.pubdate.desc())\
[:15]
d['newcommentlist'] = session.query(ItemComment).\
join(ItemComment.item).\
filter(Item.pubdate < datetime.datetime.utcnow() ).\
filter(ItemComment.ispass == True).\
options(contains_eager(ItemComment.item)).\
order_by(ItemComment.adddate.desc())\
[:15]
finally:
session.close()
return d
开发者ID:xiexiao,项目名称:blogPy,代码行数:35,代码来源:pages.py
示例8: GET
def GET(self, page = 1 ):
d = LoadInfo()
pageSize = 5
page = int(page)
startIndex = pageSize*(page-1)
session = Session()
try:
d['itemlist'] = session.query(Item).\
join(Item.itemcat).\
filter(Item.pubdate < datetime.datetime.utcnow() ).\
filter(Item.ispass == True).\
options(contains_eager(Item.itemcat)).\
order_by(Item.pubdate.desc())\
[startIndex:startIndex+pageSize]
if page==0: page=1
#total
d['recodecount'] = session.query(Item).\
join(Item.itemcat).\
filter(Item.pubdate < datetime.datetime.utcnow() ).\
filter(Item.ispass == True).\
options(contains_eager(Item.itemcat)).\
count()
d['pagecount'] = int(math.ceil( d['recodecount'] / float(pageSize)));
if d['pagecount']==0: d['pagecount']=1
d['page'] = page
d['itemcat'] = session.query(ItemCat).all()
finally:
session.close()
#local not show stat
if web.ctx.env.get('HTTP_HOST','').startswith('localhost'):
d['debug'] = True
return render.index(**d)
开发者ID:xiexiao,项目名称:blogPy,代码行数:32,代码来源:pages.py
示例9: add_addresses
def add_addresses(session):
""" Add an AddressAssignment record for every PrimaryNameAssociation """
q = session.query(PrimaryNameAssociation)
q = q.join(System, DnsDomain)
q = q.filter(System.ip != None)
q = q.filter(~exists().where(AddressAssignment.ip == System.ip))
q = q.options(contains_eager('dns_record'))
q = q.options(contains_eager('dns_record.dns_domain'))
q = q.options(subqueryload_all('hardware_entity.interfaces.vlans.assignments'))
q = q.options(subqueryload_all('hardware_entity.interfaces._vlan_ids'))
count = 0
pnas = q.all()
for pna in pnas:
hw = pna.hardware_entity
if len(hw.interfaces) != 1:
print "{0} has an unexpected number of interfaces, skipping: " \
"{1}".format(hw, len(hw.interfaces))
continue
iface = hw.interfaces[0]
if len(iface.vlans[0].addresses):
print "{0} already has addresses, skipping".format(iface)
continue
#print "Adding AddressAssignment record for {0:l}".format(hw)
iface.vlans[0].addresses.append(pna.dns_record.ip)
count += 1
session.flush()
print "Added %d AddressAssignment records" % count
开发者ID:jrha,项目名称:aquilon,代码行数:29,代码来源:address_fixes.py
示例10: render
def render(self, session, **arguments):
q = session.query(Chassis)
q = q.options(subqueryload('model'),
joinedload('model.machine_specs'),
subqueryload('location'),
joinedload('slots'),
subqueryload('slots.machine'),
# A rare case when we don't need primary name/host
lazyload('slots.machine.primary_name'),
lazyload('slots.machine.host'),
subqueryload('interfaces'),
joinedload('interfaces.assignments'),
joinedload('interfaces.assignments.network'),
joinedload('interfaces.assignments.dns_records'))
# Prefer the primary name for ordering
q = q.outerjoin(DnsRecord, (Fqdn, DnsRecord.fqdn_id == Fqdn.id),
DnsDomain)
q = q.options(contains_eager('primary_name'),
contains_eager('primary_name.fqdn'),
contains_eager('primary_name.fqdn.dns_domain'))
q = q.order_by(Fqdn.name, DnsDomain.name, Chassis.label)
return q.all()
开发者ID:jrha,项目名称:aquilon,代码行数:26,代码来源:show_chassis_all.py
示例11: _poll
def _poll(self, limit, max_age):
if not self.link:
self.link = url(
controller='forum', action='threads', forum_id=self.forum_id)
thread_q = meta.Session.query(forum_model.Thread) \
.filter_by(forum_id=self.forum_id) \
.join((forum_model.Post, forum_model.Thread.first_post)) \
.options(
contains_eager(forum_model.Thread.first_post, alias=forum_model.Post),
contains_eager(forum_model.Thread.first_post, forum_model.Post.thread, alias=forum_model.Thread),
joinedload(forum_model.Thread.first_post, forum_model.Post.author),
)
if max_age:
thread_q = thread_q.filter(forum_model.Post.posted_time >= max_age)
threads = thread_q \
.order_by(forum_model.Post.posted_time.desc()) \
[:limit]
updates = []
for thread in threads:
update = FrontPageThread(
source = self,
time = thread.first_post.posted_time,
post = thread.first_post,
)
updates.append(update)
return updates
开发者ID:encukou,项目名称:spline,代码行数:31,代码来源:frontpage_sources.py
示例12: list
def list():
query = Notification.query(recipient_id=request.user_id) \
.join('event') \
.options(contains_eager('event')) \
.options(subqueryload('event.actor')) \
.outerjoin(Event.flight) \
.options(contains_eager('event.flight')) \
.filter(or_(Event.flight == None, Flight.is_rankable())) \
.order_by(Event.time.desc())
query = _filter_query(query, request.args)
page = request.args.get('page', type=int, default=1)
per_page = request.args.get('per_page', type=int, default=50)
query = query.limit(per_page)
query = query.offset((page - 1) * per_page)
def get_event(notification):
event = notification.event
event.unread = (notification.time_read is None)
return event
events = map(get_event, query)
return jsonify(events=(map(convert_event, events)))
开发者ID:GliderGeek,项目名称:skylines,代码行数:26,代码来源:notifications.py
示例13: type_browse
def type_browse(context, request):
types = (
session.query(t.Type)
.join(t.Type.names_local)
# Force inner join here to strip out e.g. Shadow, which has no
# efficacy
.join(t.Type.damage_efficacies)
.order_by(t.Type.names_table.name)
.options(
contains_eager(t.Type.names_local),
contains_eager(t.Type.damage_efficacies),
)
.all()
)
efficacy_map = {}
for attacking_type in types:
submap = efficacy_map[attacking_type] = {}
for efficacy in attacking_type.damage_efficacies:
submap[efficacy.target_type] = efficacy.damage_factor
template_ns = dict(
types=types,
efficacy_map=efficacy_map,
)
return template_ns
开发者ID:nobitis,项目名称:veekun-pokedex,代码行数:28,代码来源:type.py
示例14: test_outerouterjoin_eager
def test_outerouterjoin_eager(self):
self.assertEqual(
str(self.db.query(Foo).outerjoin_eager('bars')),
str(self.db.query(Foo).outerjoin('bars').options(orm.contains_eager('bars'))),
'it should outerjoin eager on single string entity'
)
self.assertEqual(
str(self.db.query(Foo).outerjoin_eager(Foo.bars)),
str(self.db.query(Foo).outerjoin(Foo.bars).options(orm.contains_eager(Foo.bars))),
'it should outerjoin eager on single model entity'
)
self.assertEqual(
str(self.db.query(Foo).outerjoin_eager('bars', 'bazs')),
str(
self.db.query(Foo).outerjoin('bars', 'bazs').options(
orm.contains_eager('bars').contains_eager('bazs')
)
),
'it should outerjoin eager on multiple string entities'
)
self.assertEqual(
str(self.db.query(Foo).outerjoin_eager(Foo.bars, Bar.bazs)),
str(
self.db.query(Foo).outerjoin(Foo.bars, Bar.bazs).options(
orm.contains_eager(Foo.bars).contains_eager(Bar.bazs)
)
),
'it should outerjoin eager on multiple model entities'
)
开发者ID:jeffknupp,项目名称:alchy,代码行数:32,代码来源:test_query.py
示例15: query
def query(self):
query = self.request.dbsession.query(
distinct(Estimation.id),
Estimation,
)
query = query.outerjoin(Task.company)
query = query.outerjoin(Task.customer)
query = query.options(
contains_eager(Task.customer).load_only(
Customer.id,
Customer.label,
)
)
query = query.options(
contains_eager(Task.company).load_only(
Company.id,
Company.name
)
)
query = query.options(
load_only(
"name",
"internal_number",
"status",
"signed_status",
"geninv",
"date",
"description",
"ht",
"tva",
"ttc",
)
)
return query
开发者ID:CroissanceCommune,项目名称:autonomie,代码行数:34,代码来源:lists.py
示例16: test_outerouterjoin_eager_with_alias
def test_outerouterjoin_eager_with_alias(self):
bar_alias = orm.aliased(Bar)
baz_alias = orm.aliased(Baz)
self.assertEqual(
str(self.db.query(Foo).outerjoin_eager('bars', alias=bar_alias)),
str((self.db.query(Foo)
.outerjoin(bar_alias, 'bars')
.options(orm.contains_eager('bars', alias=bar_alias)))),
'it should outerjoin eager on alias and string entity'
)
self.assertEqual(
str(self.db.query(Foo).outerjoin_eager(Foo.bars, alias=bar_alias)),
str((self.db.query(Foo)
.outerjoin(bar_alias, Foo.bars)
.options(orm.contains_eager(Foo.bars, alias=bar_alias)))),
'it should outerjoin eager on alias and model entity'
)
self.assertEqual(
str(self.db.query(Foo).outerjoin_eager('bars',
'bazs',
alias={'bars': bar_alias,
'bazs': baz_alias})),
str((self.db.query(Foo)
.outerjoin((bar_alias, 'bars'), (baz_alias, 'bazs'))
.options((orm.contains_eager('bars', alias=bar_alias)
.contains_eager('bazs', alias=baz_alias))))),
'it should join eager on multiple aliases'
)
开发者ID:LeoKudrik,项目名称:alchy,代码行数:31,代码来源:test_query.py
示例17: export
def export(request):
"""Handle exporting a user's bookmarks to file"""
rdict = request.matchdict
username = rdict.get('username')
if request.user is not None:
current_user = request.user.username
else:
current_user = None
bmark_list = Bmark.query.join(Bmark.tags).\
options(
contains_eager(Bmark.tags)
).\
join(Bmark.hashed).\
options(
contains_eager(Bmark.hashed)
).\
filter(Bmark.username == username).all()
BmarkLog.export(username, current_user)
request.response_content_type = 'text/html'
headers = [('Content-Disposition',
'attachment; filename="bookie_export.html"')]
setattr(request, 'response_headerlist', headers)
return {
'bmark_list': bmark_list,
}
开发者ID:Cfhansen,项目名称:Bookie,代码行数:31,代码来源:utils.py
示例18: get
def get(video_id):
"""Get a video identified by `video_id`.
**Example request:**
.. sourcecode:: http
GET /videos/1/ HTTP/1.1
Accept: application/json
**Example response:**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"name": "Joe Schmoe",
"first_name": "Joe",
"second_name": "Schmoe",
"image_url": "abc"
}
:statuscode 200: success
:statuscode 404: video does not exist
"""
columns = ['Video.name', 'Video.image_url',
'Video.video_sources', 'VideoSource.name', 'VideoSource.url',
'VideoSource.source_id',
'Video.products', 'Product.model', 'Product.uri', 'Product.brand',
'Product.product_images', 'ProductImage.url',
'Product.product_style', 'RefProductStyle.name']
try:
message = 'success'
data = Video.query.outerjoin(Video.video_sources, Video.products,
Product.product_style, Product.product_images,
).options(
contains_eager(Video.video_sources),
contains_eager(Video.products),
).filter(Video.id==video_id
).order_by(RefProductStyle.name).limit(100).all()[0]
except IndexError as error:
message = "'%s' record does not exist." % video_id
return jsonify(message=message, success=False, error=404), 404
except Exception as error:
message = '%s: %s' % (error.__class__.__name__, error)
return jsonify(message=message, success=False, error=500), 500
if data is None:
message = "'%s' record does not exist." % video_id
return jsonify(message=message, success=False, error=404), 404
else:
## need to use the JSONEncoder class for datetime objects
response = make_response(json.dumps(dict(data=data, message=message,
success=True), cls=json_encoder(False, columns)))
response.headers['Content-Type'] = 'application/json'
response.headers['mimetype'] = 'application/json'
return response
开发者ID:phriscage,项目名称:inspired,代码行数:59,代码来源:views.py
示例19: duplicate_run_model
def duplicate_run_model(self, model_id, user):
"""
Duplicate the run model and all its parameters to a new run model
:param model_id: model run id to duplicate
:param user: the user duplicating the model
:return: nothing
"""
id_for_user_upload_driving_dataset = self._dataset_service.get_id_for_user_upload_driving_dataset()
self.delete_model_run_being_created(user)
with self.transaction_scope() as session:
model_run_to_duplicate = session\
.query(ModelRun)\
.join(ModelRunStatus)\
.outerjoin(ParameterValue)\
.outerjoin(LandCoverAction) \
.filter(ModelRun.id == model_id) \
.filter(or_(
ModelRun.user_id == user.id,
ModelRunStatus.name == constants.MODEL_RUN_STATUS_PUBLISHED,
ModelRunStatus.name == constants.MODEL_RUN_STATUS_PUBLIC,)) \
.options(contains_eager(ModelRun.parameter_values)) \
.options(contains_eager(ModelRun.land_cover_actions)) \
.one()
new_model_run_name = model_run_to_duplicate.name
if self._is_duplicate_name(model_run_to_duplicate.name, session, user):
new_model_run_name = "{} (Copy)".format(model_run_to_duplicate.name)
copy_id = 1
while self._is_duplicate_name(new_model_run_name, session, user):
copy_id += 1
new_model_run_name = "{} (Copy {})".format(model_run_to_duplicate.name, copy_id)
new_model_run = ModelRun()
new_model_run.duplicate_from(model_run_to_duplicate)
new_model_run.name = new_model_run_name
new_model_run.user = user
new_model_run.change_status(session, constants.MODEL_RUN_STATUS_CREATED)
for parameter_value in model_run_to_duplicate.parameter_values:
new_parameter = ParameterValue()
new_parameter.duplicate_from(parameter_value)
new_parameter.model_run = new_model_run
for land_cover_action in model_run_to_duplicate.land_cover_actions:
new_land_cover_action = LandCoverAction()
new_land_cover_action.duplicate_from(land_cover_action)
new_land_cover_action.model_run = new_model_run
session.add(new_model_run)
if model_run_to_duplicate.driving_dataset_id == id_for_user_upload_driving_dataset:
try:
self._job_runner_client.duplicate_uploaded_driving_data(model_run_to_duplicate.id, new_model_run.id)
except ServiceException:
self.delete_model_run_being_created(user)
raise ServiceException("Could not duplicate the model run because "
"the user uploaded data can not be duplicated")
开发者ID:NERC-CEH,项目名称:jules-jasmin,代码行数:59,代码来源:model_run_service.py
示例20: query
def query(self):
query = DBSESSION().query(Task)
query = query.with_polymorphic([Invoice, CancelInvoice])
query = query.outerjoin(Invoice.payments)
query = query.outerjoin(Task.customer)
query = query.options(contains_eager(Invoice.payments).load_only(Payment.id, Payment.date, Payment.mode))
query = query.options(contains_eager(Task.customer).load_only(Customer.name, Customer.code, Customer.id))
return query
开发者ID:CroissanceCommune,项目名称:autonomie,代码行数:8,代码来源:company_invoice.py
注:本文中的sqlalchemy.orm.contains_eager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论