本文整理汇总了Python中sqlalchemy.orm.noload函数的典型用法代码示例。如果您正苦于以下问题:Python noload函数的具体用法?Python noload怎么用?Python noload使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了noload函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: replace_email_with_user
def replace_email_with_user(cls, user, relationship_attr):
"""
Replaces all email-based entries matching the user's email
addresses with user-based entries.
If the user is already in the ACL, the two entries are merged.
:param user: A User object.
:param relationship_attr: The name of the relationship pointing
to the object associated with the ACL
entry.
:return: The set of objects where the user has been added to
the ACL.
"""
assert cls.allow_emails
updated = set()
query = (cls
.find(cls.email.in_(user.all_emails))
.options(noload('user'), noload('local_group'), joinedload('event_new').load_only('id')))
for entry in query:
parent = getattr(entry, relationship_attr)
existing = (cls.query
.with_parent(parent, 'acl_entries')
.options(noload('user'), noload('local_group'))
.filter_by(principal=user)
.first())
if existing is None:
entry.principal = user
else:
existing.merge_privs(entry)
parent.acl_entries.remove(entry)
updated.add(parent)
db.session.flush()
return updated
开发者ID:MichelCordeiro,项目名称:indico,代码行数:33,代码来源:principals.py
示例2: get_profile_from_id
def get_profile_from_id(id, id_type="url_slug", show_secrets=False, include_products=True, include_product_relationships=True):
if include_products:
if include_product_relationships:
query_base = Profile.query
else:
query_base = db.session.query(Profile).options(orm.noload('*'), orm.subqueryload(Profile.products))
else:
query_base = db.session.query(Profile).options(orm.noload('*'))
if id_type == "id":
try:
profile = query_base.get(id)
except DataError: # id has to be an int
logger.debug(u"get_profile_from_id no profile found from profile id {id}".format(
id=id))
profile = None
elif id_type == "email":
profile = query_base.filter(func.lower(Profile.email) == func.lower(id)).first()
elif id_type == "url_slug":
profile = query_base.filter(func.lower(Profile.url_slug) == func.lower(id)).first()
if not show_secrets:
profile = hide_profile_secrets(profile)
return profile
开发者ID:dhalperi,项目名称:total-impact-webapp,代码行数:27,代码来源:profile.py
示例3: test_noload
def test_noload(self):
self.assertEqual(
str(self.db.query(Foo).noload('bars')),
str(self.db.query(Foo).options(orm.noload('bars')))
)
self.assertEqual(
str(self.db.query(Foo).noload('bars', 'bazs')),
str(self.db.query(Foo).options(orm.noload('bars').noload('bazs')))
)
self.assertEqual(
str(self.db.query(Foo).noload(Foo.bars)),
str(self.db.query(Foo).options(orm.noload(Foo.bars)))
)
self.assertEqual(
str(self.db.query(Foo).noload(Foo.bars, Bar.bazs)),
str((self.db.query(Foo)
.options(orm.noload(Foo.bars).noload(Bar.bazs))))
)
self.assertEqual(
str((self.db.query(Foo)
.noload('bars',
options=[LoadOption('noload', 'bazs')]))),
str(self.db.query(Foo).options(orm.noload('bars').noload('bazs')))
)
开发者ID:LeoKudrik,项目名称:alchy,代码行数:28,代码来源:test_query.py
示例4: get_application_by_id_admin
def get_application_by_id_admin(application_id):
# this function is copied from get_application_by_id and should replace it
# at a later point
application = (
Application
.query
.filter(
Application.id == application_id
)
.options(
joinedload("supplier"),
joinedload("supplier.domains"),
noload("supplier.domains.assessments")
)
.first_or_404()
)
if application.status == 'deleted':
abort(404)
# Maximum prices are used on the pricing page to encourage value for money
result = (
Domain
.query
.options(
noload('suppliers')
)
.all()
)
domains = {'prices': {'maximum': {}}}
domains['prices']['maximum'] = {domain.name: domain.price_maximum for domain in result}
return jsonify(application=application.serializable, domains=domains)
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:33,代码来源:applications.py
示例5: get_events_managed_by
def get_events_managed_by(user, dt=None):
"""Gets the IDs of events where the user has management privs.
:param user: A `User`
:param dt: Only include events taking place on/after that date
:return: A set of event ids
"""
query = (user.in_event_acls
.join(Event)
.options(noload('user'), noload('local_group'), load_only('event_id'))
.filter(~Event.is_deleted, Event.ends_after(dt))
.filter(EventPrincipal.has_management_permission('ANY')))
return {principal.event_id for principal in query}
开发者ID:indico,项目名称:indico,代码行数:13,代码来源:util.py
示例6: get_events_managed_by
def get_events_managed_by(user, from_dt=None, to_dt=None):
"""Gets the IDs of events where the user has management privs.
:param user: A `User`
:param from_dt: The earliest event start time to look for
:param to_dt: The latest event start time to look for
:return: A set of event ids
"""
query = (user.in_event_acls
.join(Event)
.options(noload('user'), noload('local_group'), load_only('event_id'))
.filter(~Event.is_deleted, Event.starts_between(from_dt, to_dt))
.filter(EventPrincipal.has_management_role('ANY')))
return {principal.event_id for principal in query}
开发者ID:fph,项目名称:indico,代码行数:14,代码来源:util.py
示例7: update_application
def update_application(application_id):
application_json = get_application_json()
application = Application.query.options(
noload('supplier.*')
).get(application_id)
if application is None:
abort(404, "Application '{}' does not exist".format(application_id))
if application.status == 'submitted' and application_json.get('status') == 'saved':
db.session.add(AuditEvent(
audit_type=AuditTypes.revert_application,
user='',
data={},
db_object=application
))
publish_tasks.application.delay(
publish_tasks.compress_application(application),
'reverted'
)
application.update_from_json(application_json)
save_application(application)
errors = ApplicationValidator(application).validate_all()
return (
jsonify(
application=application.serializable,
application_errors=errors),
200)
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:30,代码来源:applications.py
示例8: get_events_with_linked_sessions
def get_events_with_linked_sessions(user, dt=None):
"""Returns a dict with keys representing event_id and the values containing
data about the user rights for sessions within the event
:param user: A `User`
:param dt: Only include events taking place on/after that date
"""
query = (user.in_session_acls
.options(load_only('session_id', 'roles', 'full_access', 'read_access'))
.options(noload('*'))
.options(contains_eager(SessionPrincipal.session).load_only('event_id'))
.join(Session)
.join(Event, Event.id == Session.event_id)
.filter(~Session.is_deleted, ~Event.is_deleted, Event.ends_after(dt)))
data = defaultdict(set)
for principal in query:
roles = data[principal.session.event_id]
if 'coordinate' in principal.roles:
roles.add('session_coordinator')
if 'submit' in principal.roles:
roles.add('session_submission')
if principal.full_access:
roles.add('session_manager')
if principal.read_access:
roles.add('session_access')
return data
开发者ID:DirkHoffmann,项目名称:indico,代码行数:26,代码来源:util.py
示例9: get_supplier_frameworks_info
def get_supplier_frameworks_info(code):
supplier = Supplier.query.filter(
Supplier.code == code
).first_or_404()
service_counts = SupplierFramework.get_service_counts(code)
supplier_frameworks = (
SupplierFramework
.query
.options(
joinedload('framework'),
noload('framework.lots')
)
.filter(
SupplierFramework.supplier == supplier
)
.all()
)
return jsonify(frameworkInterest=[
framework.serialize({
'drafts_count': service_counts.get((framework.framework_id, 'not-submitted'), 0),
'complete_drafts_count': service_counts.get((framework.framework_id, 'submitted'), 0),
'services_count': service_counts.get((framework.framework_id, 'published'), 0)
})
for framework in supplier_frameworks]
)
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:28,代码来源:suppliers.py
示例10: set_coauthors
def set_coauthors(self):
# comment out the commit. this means coauthors made during this commit session don't show up on this refresh
# but doing it because is so much faster
# safe_commit(db)
# now go for it
print u"running coauthors for {}".format(self.orcid_id)
coauthor_orcid_id_query = u"""select distinct orcid_id
from product
where doi in
(select doi from product where orcid_id='{}')""".format(self.orcid_id)
rows = db.engine.execute(text(coauthor_orcid_id_query))
# remove own orcid_id
orcid_ids = [row[0] for row in rows if row[0] if row[0] != self.id]
if not orcid_ids:
return
# don't load products or badges
coauthors = Person.query.filter(Person.orcid_id.in_(orcid_ids)).options(orm.noload('*')).all()
resp = {}
for coauthor in coauthors:
resp[coauthor.orcid_id] = {
"name": coauthor.full_name,
"id": coauthor.id,
"orcid_id": coauthor.orcid_id,
"num_posts": coauthor.num_posts,
}
self.coauthors = resp
开发者ID:ethanwhite,项目名称:impactstory-tng,代码行数:30,代码来源:person.py
示例11: get_scheduled_notes
def get_scheduled_notes(event):
"""Gets all notes of scheduled items inside an event"""
def _sort_note_by(note):
obj = note.object
if hasattr(obj, 'start_dt'):
return obj.start_dt, 0
else:
return obj.contribution.start_dt, obj.position
tt_entries = (event.timetable_entries
.filter(TimetableEntry.type != TimetableEntryType.BREAK)
.options(joinedload('session_block').joinedload('contributions').joinedload('subcontributions'))
.options(joinedload('contribution').joinedload('subcontributions'))
.options(noload('break_'))
.all())
# build a list of all the objects we need notes for. that way we can query
# all notes in a single go afterwards instead of making the already-huge
# timetable query even bigger.
objects = set()
for entry in tt_entries:
objects.add(entry.object)
if entry.type == TimetableEntryType.CONTRIBUTION:
objects.update(sc for sc in entry.object.subcontributions if not sc.is_deleted)
elif entry.type == TimetableEntryType.SESSION_BLOCK:
for contrib in entry.object.contributions:
objects.add(contrib)
objects.update(sc for sc in contrib.subcontributions if not sc.is_deleted)
notes = [x for x in event.all_notes.filter_by(is_deleted=False) if x.object in objects]
return sorted(notes, key=_sort_note_by)
开发者ID:DirkHoffmann,项目名称:indico,代码行数:29,代码来源:util.py
示例12: get_events_with_linked_sessions
def get_events_with_linked_sessions(user, from_dt=None, to_dt=None):
"""Returns a dict with keys representing event_id and the values containing
data about the user rights for sessions within the event
:param user: A `User`
:param from_dt: The earliest event start time to look for
:param to_dt: The latest event start time to look for
"""
query = (user.in_session_acls
.options(load_only('session_id', 'roles', 'full_access', 'read_access'))
.options(noload('*'))
.options(contains_eager(SessionPrincipal.session).load_only('event_id'))
.join(Session)
.join(Event, Event.id == Session.event_id)
.filter(~Session.is_deleted, ~Event.is_deleted, Event.starts_between(from_dt, to_dt)))
data = defaultdict(set)
for principal in query:
roles = data[principal.session.event_id]
if 'coordinate' in principal.roles:
roles.add('session_coordinator')
if 'submit' in principal.roles:
roles.add('session_submission')
if principal.full_access:
roles.add('session_manager')
if principal.read_access:
roles.add('session_access')
return data
开发者ID:belokop,项目名称:indico_bare,代码行数:27,代码来源:util.py
示例13: get_application_by_id
def get_application_by_id(application_id):
application = (
Application
.query
.filter(
Application.id == application_id
)
.options(
joinedload('supplier.domains'),
joinedload('supplier.domains.assessments'),
noload('supplier.domains.assessments.briefs')
)
.first_or_404()
)
if application.status == 'deleted':
abort(404)
# Maximum prices are used on the pricing page to encourage value for money
result = Domain.query.all()
domains = {'prices': {'maximum': {}}}
domains['prices']['maximum'] = {domain.name: domain.price_maximum for domain in result}
errors = ApplicationValidator(application).validate_all()
return jsonify(
application=application.serializable,
domains=domains,
application_errors=errors)
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:28,代码来源:applications.py
示例14: get_events_with_paper_roles
def get_events_with_paper_roles(user, dt=None):
"""
Get the IDs and PR roles of events where the user has any kind
of paper reviewing privileges.
:param user: A `User`
:param dt: Only include events taking place on/after that date
:return: A dict mapping event IDs to a set of roles
"""
paper_roles = {'paper_manager', 'paper_judge', 'paper_content_reviewer', 'paper_layout_reviewer'}
role_criteria = [EventPrincipal.has_management_role(role, explicit=True) for role in paper_roles]
query = (user.in_event_acls
.join(Event)
.options(noload('user'), noload('local_group'), load_only('event_id', 'roles'))
.filter(~Event.is_deleted, Event.ends_after(dt))
.filter(db.or_(*role_criteria)))
return {principal.event_id: set(principal.roles) & paper_roles for principal in query}
开发者ID:DirkHoffmann,项目名称:indico,代码行数:17,代码来源:util.py
示例15: get_profile_stubs_from_url_slug
def get_profile_stubs_from_url_slug(url_slug):
# query_base = db.session.query(Profile).options(orm.lazyload('*'), orm.subqueryload(Profile.products))
# query_base = db.session.query(Profile).options(orm.noload('*'), subqueryload("products").subqueryload("alias_rows"))
query_base = Profile.query.options( orm.noload('*'),
orm.subqueryload(Profile.products),
orm.subqueryload(Profile.products, Product.biblio_rows),
orm.subqueryload(Profile.products, Product.alias_rows))
profile = query_base.filter(func.lower(Profile.url_slug) == func.lower(url_slug)).first()
return profile
开发者ID:Impactstory,项目名称:total-impact-webapp,代码行数:10,代码来源:profile.py
示例16: get_supplier
def get_supplier(code):
supplier = (
Supplier
.query
.filter(
Supplier.code == code,
Supplier.status != 'deleted'
)
.options(
joinedload('domains.domain'),
noload('domains.supplier'),
noload('domains.assessments'),
joinedload('domains.recruiter_info')
)
.first_or_404()
)
supplier.get_service_counts()
return jsonify(supplier=supplier.serializable)
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:19,代码来源:suppliers.py
示例17: applications_list_response
def applications_list_response(with_task_status=False, status=None):
if status:
applications = Application.query.options(
joinedload('supplier'),
noload('supplier.domains')
).filter(Application.status == status)
else:
applications = Application.query.filter(Application.status != 'deleted')
return format_applications(applications, with_task_status)
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:10,代码来源:applications.py
示例18: update_refsets
def update_refsets():
from models.person import Person
print u"getting the badge percentile refsets...."
# only get out the badge objects
q = db.session.query(Person).options(
Load(Person).load_only("campaign", "orcid_id"))
q = q.options(orm.noload('*'))
q = q.options(orm.subqueryload("badges"))
# limit to just what we want for the refset
q = refine_refset_query(q)
# and do the get
rows = q.all()
print u"query finished, now set the values in the lists"
refset_list_dict = defaultdict(list)
for person in rows:
for badge in person.badges:
# print "BADGE", badge
# handle the nones below, with the zeros
if badge.value != None:
refset_list_dict[badge.name].append(badge.value)
num_in_refset = num_people_in_refset()
for name, unsorted_values in refset_list_dict.iteritems():
print u"refreshing refset {}".format(name)
assigner = get_badge_assigner(name)
if assigner.pad_percentiles_with_zeros:
# pad with zeros for all the people who didn't get the badge
unsorted_values.extend([0] * (num_in_refset - len(unsorted_values)))
# now sort
refset_list_dict[name] = sorted(unsorted_values)
# now pick out the cutoffs, minimum value at each of 100
cutoffs = []
for sublist in chunk_into_n_sublists(refset_list_dict[name], 100):
sublist_values = sublist
if sublist_values:
cutoffs.append(min(sublist_values))
this_badge_refset = Refset(name=name, cutoffs=cutoffs)
print u"saving refset {} with cutoffs {}".format(name, cutoffs)
db.session.merge(this_badge_refset)
# and finally save it all
safe_commit(db)
开发者ID:Impactstory,项目名称:impactstory-tng,代码行数:55,代码来源:refset.py
示例19: auth_user
def auth_user():
json_payload = get_json_from_request()
json_has_required_keys(json_payload, ["authUsers"])
json_payload = json_payload["authUsers"]
validate_user_auth_json_or_400(json_payload)
email_address = json_payload.get('email_address', None)
if email_address is None:
# will remove camel case email address with future api
email_address = json_payload.get('emailAddress', None)
user = User.query.options(
joinedload('supplier'),
noload('supplier.*'),
joinedload('application'),
noload('application.*'),
noload('*')
).filter(
User.email_address == email_address.lower()
).first()
if user is None or (user.supplier and user.supplier.status == 'deleted'):
return jsonify(authorization=False), 404
elif encryption.authenticate_user(json_payload['password'], user) and user.active:
user.logged_in_at = datetime.utcnow()
user.failed_login_count = 0
db.session.add(user)
db.session.commit()
validation_result = None
if user.role == 'supplier':
messages = supplier_business.get_supplier_messages(user.supplier_code, False)
validation_result = (
messages._asdict() if messages else None
)
return jsonify(users=user.serialize(), validation_result=validation_result), 200
else:
user.failed_login_count += 1
db.session.add(user)
db.session.commit()
return jsonify(authorization=False), 403
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-api,代码行数:42,代码来源:users.py
示例20: validate_user_password
def validate_user_password(cls, username, password):
user = DBSession.query(cls).options(noload(cls.groups)).filter(cls.username == username.lower()).first()
if user is None:
return None
manager = BCRYPTPasswordManager()
if manager.check(user.credentials, password):
return user
return None
开发者ID:bertjwregeer,项目名称:defcne,代码行数:11,代码来源:user.py
注:本文中的sqlalchemy.orm.noload函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论