本文整理汇总了Python中sqlalchemy.orm.load_only函数的典型用法代码示例。如果您正苦于以下问题:Python load_only函数的具体用法?Python load_only怎么用?Python load_only使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了load_only函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: quarterly_review
def quarterly_review(quarter):
quarter = str(quarter)
sortingDictionary = {0: Attendee.id, 1: Attendee.year, 2: Attendee.first_name, 3: Attendee.last_name }
siftingDictionary = {1: "freshman", 2: "sophomore", 3: "junior", 4: "senior", 5: "other" }
# get params
sort = request.args.get('sort', 0, type=int)
sift = request.args.get('sift', 0, type=int)
weeks = [0 for i in range(10)]
if (sift == 0):
users = db.session.query(Attendee).order_by(sortingDictionary[sort]).options(load_only("id", "first_name", "last_name", "year"))
userCount = db.session.query(Attendee).order_by(sortingDictionary[sort]).options(load_only("id", "first_name", "last_name", "year")).count()
else:
users = db.session.query(Attendee).filter_by(year=siftingDictionary[sift]).order_by(sortingDictionary[sort]).options(load_only("id", "first_name", "last_name", "year"))
userCount = db.session.query(Attendee).filter_by(year=siftingDictionary[sift]).order_by(sortingDictionary[sort]).options(load_only("id", "first_name", "last_name", "year")).count()
attendanceArray = [[0 for i in range(10)] for j in range(userCount)]
sumArray = [0 for i in range(userCount)]
weekDB = db.session.query(LargeGroup).filter_by(quarter=quarter).options(load_only("id"))
# set up full quarter week array with db ID's if exists, 0 otherwise
for week in weekDB:
try:
weeks[int(week.weekNumber)-1] = week.id
except ValueError,e:
print str(e)
开发者ID:phouse512,项目名称:arkaios,代码行数:28,代码来源:app.py
示例2: main
def main():
html_tag_regex = '<[a-zA-Z]+.*>'
contributions = (Contribution.query
.filter(Contribution.description.op('~')(html_tag_regex))
.options(load_only('id', 'description'))
.all())
subcontributions = (SubContribution.query
.filter(SubContribution.description.op('~')(html_tag_regex))
.options(load_only('id', 'description'))
.all())
categories = (Category.query
.filter(Category.description.op('~')(html_tag_regex))
.options(load_only('id', 'description'))
.all())
def as_dict(objs):
return {x.id: x.description for x in objs}
def format_table(model):
return model.__table__.fullname
object_descriptions = {
format_table(Contribution): as_dict(contributions),
format_table(SubContribution): as_dict(subcontributions),
format_table(Category): as_dict(categories)
}
env = Environment(loader=FileSystemLoader(os.path.dirname(__file__)))
template = env.get_template('fix_descriptions_template.html')
print(template.render(object_descriptions=htmlsafe_dumps(object_descriptions)))
开发者ID:ThiefMaster,项目名称:indico,代码行数:31,代码来源:generate_processing_page.py
示例3: post
def post(self, course_id, question_id, answer_id, comment_id):
"""
Create an answer comment
"""
Courses.exists_or_404(course_id)
PostsForQuestions.query.options(load_only('id')).get_or_404(question_id)
PostsForAnswers.query.options(load_only('id')).get_or_404(answer_id)
comment = PostsForComments.query.get_or_404(comment_id)
require(EDIT, comment)
params = existing_comment_parser.parse_args()
# make sure the comment id in the rul and the id matches
if params['id'] != comment.id:
return {"error": "Comment id does not match URL."}, 400
# modify comment according to new values, preserve original values if values not passed
comment.content = params.get("content")
if not comment.content:
return {"error": "The comment content is empty!"}, 400
comment.answer_assoc.type = params.get("type")
db.session.add(comment)
on_answer_comment_modified.send(
self,
event_name=on_answer_comment_modified.name,
user=current_user,
course_id=course_id,
data=get_model_changes(comment))
db.session.commit()
return marshal(comment, dataformat.get_answer_comment())
开发者ID:gitter-badger,项目名称:acj-versus,代码行数:29,代码来源:comment.py
示例4: autocomplete
def autocomplete():
search = unicode(request.args.get('q'))
products = Product.query.options(load_only("title", "id")).filter(Product.title.startswith(search), Product.status == 'publish').limit(5).all()
products2 = Product.query.options(load_only("title", "id")).filter(Product.title.contains('%' + search + '%'), Product.status == 'publish').limit(5).all()
p = {}
q = []
for product in products:
# p.extend(['{ title:'+product.title+', image: '+product.image+'}'])
p = {"label": product.title, "url": "/p/" + str(product.id)}
q.extend([p])
for product in products2:
r = {"label": product.title, "url": "/p/" + str(product.id)}
q.extend([r])
seen = set() # http://stackoverflow.com/questions/9427163/remove-duplicate-dict-in-list-in-python
l = []
for d in q:
t = tuple(d.items())
if t not in seen:
seen.add(t)
l.append(d)
products = json.dumps(l)
response = Response(products, mimetype='application/json')
return response
开发者ID:OSPK,项目名称:salink,代码行数:25,代码来源:views.py
示例5: _relevant_to_snapshot
def _relevant_to_snapshot(object_name, ids):
"""Filter by relevant object over snapshot"""
snapshot_qs = models.Snapshot.query.filter(
models.Snapshot.parent_type == models.Audit.__name__,
models.Snapshot.child_type == object_name,
models.Snapshot.child_id.in_(ids),
).options(
load_only(models.Snapshot.id),
).distinct(
).subquery(
"snapshot"
)
dest_qs = models.Relationship.query.filter(
models.Relationship.destination_id == snapshot_qs.c.id,
models.Relationship.destination_type == models.Snapshot.__name__,
models.Relationship.source_type == object_class.__name__,
).options(
load_only("source_id")
).distinct()
source_qs = models.Relationship.query.filter(
models.Relationship.source_id == snapshot_qs.c.id,
models.Relationship.source_type == models.Snapshot.__name__,
models.Relationship.destination_type == object_class.__name__,
).options(
load_only("destination_id")
).distinct()
ids_qs = dest_qs.union(source_qs).distinct().subquery("ids")
return object_class.id == ids_qs.c.relationships_source_id
开发者ID:VinnieJohns,项目名称:ggrc-core,代码行数:28,代码来源:query_helper.py
示例6: post
def post(self, course_id, question_id, criteria_id):
Courses.exists_or_404(course_id)
PostsForQuestions.query.options(load_only('id')).get_or_404(question_id)
Criteria.query.options(load_only('id')).get_or_404(criteria_id)
question = PostsForQuestions(post=Posts(courses_id=course_id))
criteria_question = CriteriaAndPostsForQuestions(question=question)
require(CREATE, criteria_question)
criteria_question = CriteriaAndPostsForQuestions.query.filter_by(criteria_id=criteria_id). \
filter_by(questions_id=question_id).first()
if criteria_question:
criteria_question.active = True
else:
criteria_question = CriteriaAndPostsForQuestions()
criteria_question.criteria_id = criteria_id
criteria_question.questions_id = question_id
db.session.add(criteria_question)
on_question_criteria_create.send(
self,
event_name=on_question_criteria_create.name,
user=current_user,
course_id=course_id,
data={'question_id': question_id, 'criteria_id': criteria_id})
db.session.commit()
return {'criterion': marshal(criteria_question, dataformat.get_criteria_and_posts_for_questions())}
开发者ID:gitter-badger,项目名称:acj-versus,代码行数:30,代码来源:criteria.py
示例7: render
def render(self, ctx, req):
if req.params.get('sEcho'):
# triggered from a datatable, thus potentially filtered and sorted
items = ctx.get_query(limit=1000)
else:
# triggered without any filter parameters
items = ctx.rdf_index_query(req.db.query(ctx.db_model()).order_by(ctx.db_model().pk))
if isinstance(ctx.model.name, property):
items = [(item.id, None) for item in items.options(load_only('id'))]
else:
items = [(item.id, item.name)
for item in items.options(load_only('id', 'name'))]
return convert(super(RdfIndex, self).render(items, req), 'xml', self.rdflibname)
开发者ID:clld,项目名称:clld,代码行数:13,代码来源:rdf.py
示例8: get_events_with_linked_sessions
def get_events_with_linked_sessions(user, from_dt=None, to_dt=None):
"""Returns a dict with keys representing event_id and the values containing
data about the user rights for sessions within the event
:param user: A `User`
:param from_dt: The earliest event start time to look for
:param to_dt: The latest event start time to look for
"""
query = (user.in_session_acls
.options(load_only('session_id', 'roles', 'full_access', 'read_access'))
.options(noload('*'))
.options(contains_eager(SessionPrincipal.session).load_only('event_id'))
.join(Session)
.join(Event, Event.id == Session.event_id)
.filter(~Session.is_deleted, ~Event.is_deleted, Event.starts_between(from_dt, to_dt)))
data = defaultdict(set)
for principal in query:
roles = data[principal.session.event_id]
if 'coordinate' in principal.roles:
roles.add('session_coordinator')
if 'submit' in principal.roles:
roles.add('session_submission')
if principal.full_access:
roles.add('session_manager')
if principal.read_access:
roles.add('session_access')
return data
开发者ID:belokop,项目名称:indico_bare,代码行数:27,代码来源:util.py
示例9: serialize_category_ical
def serialize_category_ical(category, user, event_filter):
"""Export the events in a category to iCal
:param category: The category to export
:param user: The user who needs to be able to access the events
:param event_filter: A SQLalchemy criterion to restrict which
events will be returned. Usually something
involving the start/end date of the event.
"""
own_room_strategy = joinedload('own_room')
own_room_strategy.load_only('building', 'floor', 'number', 'name')
own_room_strategy.lazyload('owner')
own_venue_strategy = joinedload('own_venue').load_only('name')
query = (Event.query
.filter(Event.category_chain.contains([int(category.getId())]),
~Event.is_deleted,
event_filter)
.options(load_only('id', 'start_dt', 'end_dt', 'title', 'description', 'own_venue_name',
'own_room_name', 'protection_mode'),
subqueryload('acl_entries'),
joinedload('person_links'),
own_room_strategy,
own_venue_strategy)
.order_by(Event.start_dt))
events = [e for e in query if e.can_access(user)]
cal = ical.Calendar()
cal.add('version', '2.0')
cal.add('prodid', '-//CERN//INDICO//EN')
now = now_utc(False)
for event in events:
url = url_for('event.conferenceDisplay', confId=event.id, _external=True)
location = ('{} ({})'.format(event.room_name, event.venue_name)
if event.venue_name and event.room_name
else (event.venue_name or event.room_name))
cal_event = ical.Event()
cal_event.add('uid', u'indico-event-{}@cern.ch'.format(event.id))
cal_event.add('dtstamp', now)
cal_event.add('dtstart', event.start_dt)
cal_event.add('dtend', event.end_dt)
cal_event.add('url', url)
cal_event.add('summary', event.title)
cal_event.add('location', location)
description = []
if event.person_links:
speakers = [u'{} ({})'.format(x.full_name, x.affiliation) if x.affiliation else x.full_name
for x in event.person_links]
description.append(u'Speakers: {}'.format(u', '.join(speakers)))
if event.description:
desc_text = unicode(event.description) or u'<p/>' # get rid of RichMarkup
try:
description.append(unicode(html.fromstring(desc_text).text_content()))
except ParserError:
# this happens e.g. if desc_text contains only a html comment
pass
description.append(url)
cal_event.add('description', u'\n'.join(description))
cal.add_component(cal_event)
return BytesIO(cal.to_ical())
开发者ID:belokop,项目名称:indico_bare,代码行数:60,代码来源:util.py
示例10: serialize_category_atom
def serialize_category_atom(category, url, user, event_filter):
"""Export the events in a category to Atom
:param category: The category to export
:param url: The URL of the feed
:param user: The user who needs to be able to access the events
:param event_filter: A SQLalchemy criterion to restrict which
events will be returned. Usually something
involving the start/end date of the event.
"""
query = (Event.query
.filter(Event.category_chain.contains([int(category.getId())]),
~Event.is_deleted,
event_filter)
.options(load_only('id', 'start_dt', 'title', 'description', 'protection_mode'),
subqueryload('acl_entries'))
.order_by(Event.start_dt))
events = [e for e in query if e.can_access(user)]
feed = AtomFeed(feed_url=url, title='Indico Feed [{}]'.format(to_unicode(category.getTitle())))
for event in events:
feed.add(title=event.title,
summary=unicode(event.description), # get rid of RichMarkup
url=url_for('event.conferenceDisplay', confId=event.id, _external=True),
updated=event.start_dt)
return BytesIO(feed.to_string().encode('utf-8'))
开发者ID:belokop,项目名称:indico_bare,代码行数:26,代码来源:util.py
示例11: monitor_api_key_limits
def monitor_api_key_limits(self):
result = {}
try:
today = util.utcnow().strftime('%Y%m%d')
keys = self.redis_client.keys('apilimit:*:' + today)
if keys:
values = self.redis_client.mget(keys)
keys = [k.split(':')[1] for k in keys]
else:
values = []
names = {}
if keys:
with self.db_session(commit=False) as session:
query = (ApiKey.querykeys(session, keys)
.options(load_only('valid_key', 'shortname')))
for api_key in query.all():
names[api_key.valid_key] = api_key.name
result = {}
for k, v in zip(keys, values):
name = names.get(k, k)
value = int(v)
result[name] = value
self.stats_client.gauge('apilimit.' + name, value)
except Exception: # pragma: no cover
# Log but ignore the exception
self.raven_client.captureException()
return result
开发者ID:condemoreloer,项目名称:ichnaea,代码行数:29,代码来源:tasks.py
示例12: conflict_create
def conflict_create(order_id):
"""
Renders conflict create page.
"""
order = Order.query.get(order_id)
form = CreateConflict(formdata=request.form)
form.user_connected.choices = [
(user.username, user.username) for user in User.query.options(load_only("username")).all()
]
form.user_connected.choices.append(("None", "None"))
form.user_connected.default = ("None", "None")
if request.method == "POST":
conflict = Conflict()
conflict.did_order_come = request.form.get("did_order_come") == "y"
conflict.i_know_who = request.form.get("i_know_who") == "y"
conflict.user_connected = request.form.get("user_connected")
conflict.order_connected = order.id
conflict.created_by_user = current_user.username
db.session.add(conflict)
db.session.commit()
if conflict.i_know_who:
new_conflict = Conflict.query.order_by(Conflict.date_added.desc()).first()
conflict_url = server_url() + url_for("conflict_resolve", conf_id=new_conflict.id)
msg = Message("Lunch app new conflict", recipients=[conflict.user_connected])
msg.body = (
"You were chosen as the one who ate my lunch! "
"Please use the link below to respond"
" \n\n {}".format(conflict_url)
)
mail.send(msg)
flash("Conflict created")
return redirect("my_orders")
return render_template("conflict_create.html", form=form)
开发者ID:alazaro,项目名称:lunch-app,代码行数:33,代码来源:views.py
示例13: get_all_report_hashes
def get_all_report_hashes(
db, date_from=None, date_to=None, opsys=None, opsys_releases=None, limit_from=None, limit_to=None
):
"""
Return ReportHash instance if there is at least one bug in database for selected date range
"""
query = db.session.query(ReportHash).join(Report).options(load_only("hash"))
if opsys and opsys != "*":
if opsys == "rhel":
opsys = "Red Hat Enterprise Linux"
query = query.join(ReportOpSysRelease).join(OpSysRelease).join(OpSys).filter(OpSys.name == opsys)
if opsys_releases and opsys_releases != "*":
query = query.filter(OpSysRelease.version == opsys_releases)
if date_from and date_from != "*":
query = query.filter(Report.last_occurrence >= date_from)
if date_to and date_to != "*":
query = query.filter(Report.last_occurrence <= date_to)
if limit_from is not None and limit_to is not None:
query = query.slice(limit_from, limit_to)
return query.all()
开发者ID:abrt,项目名称:faf,代码行数:27,代码来源:queries.py
示例14: get_events_with_submitted_surveys
def get_events_with_submitted_surveys(user, from_dt=None, to_dt=None):
"""Gets the IDs of events where the user submitted a survey.
:param user: A `User`
:param from_dt: The earliest event start time to look for
:param to_dt: The latest event start time to look for
:return: A set of event ids
"""
event_date_filter = True
if from_dt and to_dt:
event_date_filter = IndexedEvent.start_date.between(from_dt, to_dt)
elif from_dt:
event_date_filter = IndexedEvent.start_date >= from_dt
elif to_dt:
event_date_filter = IndexedEvent.start_date <= to_dt
# Survey submissions are not stored in links anymore, so we need to get them directly
query = (
user.survey_submissions.options(load_only("survey_id"))
.options(joinedload(SurveySubmission.survey).load_only("event_id"))
.join(Survey)
.join(Event)
.join(IndexedEvent, IndexedEvent.id == Survey.event_id)
.filter(~Survey.is_deleted, ~Event.is_deleted)
.filter(event_date_filter)
)
return {submission.survey.event_id for submission in query}
开发者ID:MichelCordeiro,项目名称:indico,代码行数:26,代码来源:util.py
示例15: validate_section_id
def validate_section_id(self, field):
session = self.get_session()
field.data = field.data if field.data > 0 else None
if field.data is None:
return
if field.data not in [x.id for x in session.query(Section).options(load_only("id")).all()]:
raise ValueError(u'Неверный раздел')
开发者ID:Impish-,项目名称:echoba,代码行数:7,代码来源:forms.py
示例16: query_database
def query_database(query, raven_client):
macs = [lookup.mac for lookup in query.wifi]
if not macs: # pragma: no cover
return []
result = []
today = util.utcnow().date()
temp_blocked = today - TEMPORARY_BLOCKLIST_DURATION
try:
load_fields = ('lat', 'lon', 'radius')
shards = defaultdict(list)
for mac in macs:
shards[WifiShard.shard_model(mac)].append(mac)
for shard, shard_macs in shards.items():
rows = (
query.session.query(shard)
.filter(shard.mac.in_(shard_macs))
.filter(shard.lat.isnot(None))
.filter(shard.lon.isnot(None))
.filter(or_(
shard.block_count.is_(None),
shard.block_count <
PERMANENT_BLOCKLIST_THRESHOLD))
.filter(or_(
shard.block_last.is_(None),
shard.block_last < temp_blocked))
.options(load_only(*load_fields))
).all()
result.extend(list(rows))
except Exception:
raven_client.captureException()
return result
开发者ID:therewillbecode,项目名称:ichnaea,代码行数:34,代码来源:wifi.py
示例17: get_problem_info
def get_problem_info(shortname):
"""Serve the PDF description of a problem"""
pid = (database.session.query(Problem)
.options(load_only('pid', 'shortname'))
.filter(Problem.shortname == shortname)
.first().pid)
return serve_info_pdf(str(pid))
开发者ID:AuburnACM,项目名称:auacm,代码行数:7,代码来源:views.py
示例18: bounding_box_query
def bounding_box_query(ne_lat, ne_lng, sw_lat, sw_lng, start_date, end_date,
fatal, severe, light, inaccurate, is_thin=False, yield_per=None):
# example:
# ne_lat=32.36292402647484&ne_lng=35.08873443603511&sw_lat=32.29257266524761&sw_lng=34.88445739746089
# >>> m = Marker.bounding_box_query(32.36, 35.088, 32.292, 34.884)
# >>> m.count()
# 250
accurate = not inaccurate
markers = Marker.query \
.filter(Marker.longitude <= ne_lng) \
.filter(Marker.longitude >= sw_lng) \
.filter(Marker.latitude <= ne_lat) \
.filter(Marker.latitude >= sw_lat) \
.filter(Marker.created >= start_date) \
.filter(Marker.created < end_date) \
.order_by(desc(Marker.created))
if yield_per:
markers = markers.yield_per(yield_per)
if accurate:
markers = markers.filter(Marker.locationAccuracy == 1)
if not fatal:
markers = markers.filter(Marker.severity != 1)
if not severe:
markers = markers.filter(Marker.severity != 2)
if not light:
markers = markers.filter(Marker.severity != 3)
if is_thin:
markers = markers.options(load_only("id", "longitude", "latitude"))
return markers
开发者ID:alexz3,项目名称:anyway,代码行数:29,代码来源:models.py
示例19: test_load_only_path_specific
def test_load_only_path_specific(self):
User = self.classes.User
Address = self.classes.Address
Order = self.classes.Order
users = self.tables.users
addresses = self.tables.addresses
orders = self.tables.orders
mapper(User, users, properties=util.OrderedDict([
("addresses", relationship(Address, lazy="joined")),
("orders", relationship(Order, lazy="joined"))
]))
mapper(Address, addresses)
mapper(Order, orders)
sess = create_session()
q = sess.query(User).options(
load_only("name").defaultload("addresses").load_only("id", "email_address"),
defaultload("orders").load_only("id")
)
# hmmmm joinedload seems to be forcing users.id into here...
self.assert_compile(
q,
"SELECT users.id AS users_id, users.name AS users_name, "
"addresses_1.id AS addresses_1_id, "
"addresses_1.email_address AS addresses_1_email_address, "
"orders_1.id AS orders_1_id FROM users "
"LEFT OUTER JOIN addresses AS addresses_1 "
"ON users.id = addresses_1.user_id "
"LEFT OUTER JOIN orders AS orders_1 ON users.id = orders_1.user_id"
)
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:35,代码来源:test_deferred.py
示例20: update_linked_route_titles
def update_linked_route_titles(waypoint, update_types, user_id):
"""When a waypoint is the main waypoint of a route, the field
`title_prefix`, which caches the waypoint name, has to be updated.
This method takes care of updating all routes, that the waypoint is
"main waypoint" of.
"""
if UpdateType.LANG not in update_types:
# if the locales did not change, no need to continue
return
linked_routes = DBSession.query(Route). \
filter(Route.main_waypoint_id == waypoint.document_id). \
options(joinedload(Route.locales).load_only(
RouteLocale.lang, RouteLocale.id)). \
options(load_only(Route.document_id)). \
all()
if linked_routes:
waypoint_locales = waypoint.locales
waypoint_locales_index = {
locale.lang: locale for locale in waypoint_locales}
for route in linked_routes:
set_route_title_prefix(
route, waypoint_locales, waypoint_locales_index)
开发者ID:arnaud-morvan,项目名称:v6_api,代码行数:25,代码来源:waypoint.py
注:本文中的sqlalchemy.orm.load_only函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论