本文整理汇总了Python中sqlalchemy.orm.undefer_group函数的典型用法代码示例。如果您正苦于以下问题:Python undefer_group函数的具体用法?Python undefer_group怎么用?Python undefer_group使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了undefer_group函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: go
def go():
l = q.options(
undefer_group('primary'), undefer_group('secondary')).all()
o2 = l[2]
eq_(o2.opened, 1)
eq_(o2.userident, 7)
eq_(o2.description, 'order 3')
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:7,代码来源:test_deferred.py
示例2: load_user_roles
def load_user_roles(user, permissions):
"""Load all user roles for user
Args:
user (Person): Person object
permissions (dict): dict where the permissions will be stored
Returns:
source_contexts_to_rolenames (dict): Role names for contexts
"""
# Add permissions from all DB-managed roles
user_roles = db.session.query(UserRole)\
.options(
orm.undefer_group('UserRole_complete'),
orm.undefer_group('Role_complete'),
orm.joinedload('role'))\
.filter(UserRole.person_id == user.id)\
.order_by(UserRole.updated_at.desc())\
.all()
source_contexts_to_rolenames = {}
for user_role in user_roles:
source_contexts_to_rolenames.setdefault(
user_role.context_id, list()).append(user_role.role.name)
if isinstance(user_role.role.permissions, dict):
collect_permissions(
user_role.role.permissions, user_role.context_id, permissions)
开发者ID:egorhm,项目名称:ggrc-core,代码行数:26,代码来源:__init__.py
示例3: person
def person(id):
try:
person = Person.query\
.filter_by(id=id)\
.options(undefer_group('extra'),
undefer_group('profile'))\
.one()
except NoResultFound, e:
return render_template('not-found.html'), 404
开发者ID:lifthrasiir,项目名称:pokr.kr,代码行数:9,代码来源:person.py
示例4: pubroot
def pubroot(request, info, session):
date = datetime.date.today()
# If it's the early hours of the morning, it's more useful for us
# to consider it still to be yesterday.
if datetime.datetime.now().hour < 4:
date = date - datetime.timedelta(1)
thisweek_start = date - datetime.timedelta(date.weekday())
thisweek_end = thisweek_start + datetime.timedelta(6)
lastweek_start = thisweek_start - datetime.timedelta(7)
lastweek_end = thisweek_end - datetime.timedelta(7)
weekbefore_start = lastweek_start - datetime.timedelta(7)
weekbefore_end = lastweek_end - datetime.timedelta(7)
weeks = [
("Current week", thisweek_start, thisweek_end, business_totals(session, thisweek_start, thisweek_end)),
("Last week", lastweek_start, lastweek_end, business_totals(session, lastweek_start, lastweek_end)),
(
"The week before last",
weekbefore_start,
weekbefore_end,
business_totals(session, weekbefore_start, weekbefore_end),
),
]
currentsession = Session.current(session)
barsummary = (
session.query(StockLine)
.filter(StockLine.location == "Bar")
.order_by(StockLine.dept_id, StockLine.name)
.options(joinedload_all("stockonsale.stocktype.unit"))
.options(undefer_group("qtys"))
.all()
)
stillage = (
session.query(StockAnnotation)
.join(StockItem)
.outerjoin(StockLine)
.filter(
tuple_(StockAnnotation.text, StockAnnotation.time).in_(
select(
[StockAnnotation.text, func.max(StockAnnotation.time)], StockAnnotation.atype == "location"
).group_by(StockAnnotation.text)
)
)
.filter(StockItem.finished == None)
.order_by(StockLine.name != null(), StockAnnotation.time)
.options(joinedload_all("stockitem.stocktype.unit"))
.options(joinedload_all("stockitem.stockline"))
.options(undefer_group("qtys"))
.all()
)
return (
"index.html",
{"currentsession": currentsession, "barsummary": barsummary, "stillage": stillage, "weeks": weeks},
)
开发者ID:sde1000,项目名称:quicktill,代码行数:55,代码来源:views.py
示例5: relate_ca
def relate_ca(assessment, template):
"""Generates custom attribute list and relates it to Assessment objects
Args:
assessment (model instance): Assessment model
template: Assessment Temaplte instance (may be None)
"""
if not template:
return None
ca_definitions = all_models.CustomAttributeDefinition.query.options(
orm.undefer_group('CustomAttributeDefinition_complete'),
).filter_by(
definition_id=template.id,
definition_type="assessment_template",
).order_by(
all_models.CustomAttributeDefinition.id
)
created_cads = []
for definition in ca_definitions:
cad = all_models.CustomAttributeDefinition(
title=definition.title,
definition=assessment,
attribute_type=definition.attribute_type,
multi_choice_options=definition.multi_choice_options,
multi_choice_mandatory=definition.multi_choice_mandatory,
mandatory=definition.mandatory,
helptext=definition.helptext,
placeholder=definition.placeholder,
)
db.session.add(cad)
created_cads.append(cad)
return created_cads
开发者ID:google,项目名称:ggrc-core,代码行数:33,代码来源:assessment.py
示例6: department
def department(request, info, session, departmentid, as_spreadsheet=False):
d = session\
.query(Department)\
.get(int(departmentid))
if d is None:
raise Http404
include_finished = request.GET.get("show_finished", "off") == "on"
items = session\
.query(StockItem)\
.join(StockType)\
.filter(StockType.department == d)\
.order_by(desc(StockItem.id))\
.options(joinedload_all('stocktype.unit'),
undefer_group('qtys'),
joinedload('stockline'),
joinedload('delivery'),
joinedload('finishcode'))
if not include_finished:
items = items.filter(StockItem.finished == None)
if as_spreadsheet:
return spreadsheets.stock(
session, items.all(), tillname=info.tillname,
filename="{}-dept{}-stock.ods".format(
info.tillname, departmentid))
pager = Pager(request, items, preserve_query_parameters=["show_finished"])
return ('department.html',
{'tillobject': d,
'department': d, 'pager': pager,
'include_finished': include_finished})
开发者ID:sde1000,项目名称:quicktill,代码行数:33,代码来源:views.py
示例7: update_cycle_task_group_parent_state
def update_cycle_task_group_parent_state(objs):
"""Update cycle status for sent cycle task group"""
if not objs:
return
cycles_dict = {}
cycle_groups_dict = collections.defaultdict(set)
group_ids = []
for obj in objs:
cycle_groups_dict[obj.cycle].add(obj)
group_ids.append(obj.id)
cycles_dict[obj.cycle.id] = obj.cycle
# collect all groups that are in same cycles that group from sent list
groups = models.CycleTaskGroup.query.filter(
models.CycleTaskGroup.cycle_id.in_([c.id for c in cycle_groups_dict]),
).options(
orm.undefer_group("CycleTaskGroup_complete")
).distinct().with_for_update().all()
for group in groups:
cycle_groups_dict[cycles_dict[group.cycle_id]].add(group)
updated_cycles = []
for cycle in cycles_dict.itervalues():
old_status = cycle.status
_update_parent_status(cycle, {g.status for g in cycle_groups_dict[cycle]})
cycle.start_date, cycle.end_date = _get_date_range(
cycle_groups_dict[cycle])
cycle.next_due_date = _get_min_next_due_date(cycle_groups_dict[cycle])
if old_status != cycle.status:
updated_cycles.append(Signals.StatusChangeSignalObjectContext(
instance=cycle, old_status=old_status, new_status=cycle.status))
if updated_cycles:
Signals.status_change.send(models.Cycle, objs=updated_cycles)
开发者ID:egorhm,项目名称:ggrc-core,代码行数:32,代码来源:__init__.py
示例8: meeting_dialogue
def meeting_dialogue(id):
glossary_js = generate_glossary_js()
try:
meeting = Meeting.query.filter_by(id=id)\
.options(undefer_group('extra')).one()
except NoResultFound, e:
abort(404)
开发者ID:mkim0710,项目名称:pokr.kr,代码行数:7,代码来源:meeting.py
示例9: request_find_password
def request_find_password(user_login):
user = get_user(user_login, orm.undefer_group('profile'))
if user.email:
token, expired_at = generate_token(user)
url = url_for('.change_password_form',
user_login=user.login, token=token, _external=True)
expired_at = datetime.datetime.utcfromtimestamp(expired_at)
msg = Message('[LangDev.org] Change your password: ' + user.login,
recipients=[user.email])
msg.body = textwrap.dedent('''
You can change your password through the following link:
{url}
But the above link will be expired at {expired_at} UTC.
''').format(url=url, expired_at=expired_at)
current_app.mail.send(msg)
email = hide_email(user.email)
result = Result(user=user, email=email)
status_code = 201
else:
result = Result(user=user, error='Has no email address')
status_code = 403
response = render('user/request_find_password', result, **result)
response.status_code = status_code
return response
开发者ID:hyeshik,项目名称:langdev.org,代码行数:25,代码来源:user.py
示例10: eager_query
def eager_query(cls):
from sqlalchemy import orm
query = super(Event, cls).eager_query().order_by(cls.id.desc())
return query.options(
orm.undefer_group('Revision_complete'),
orm.subqueryload('revisions'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:event.py
示例11: get
def get(self, course_id):
course = Courses.query.get_or_404(course_id)
require(READ, course)
# Get all questions for this course, default order is most recent first
post = Posts(courses_id=course_id)
question = PostsForQuestions(post=post)
base_query = PostsForQuestions.query. \
options(joinedload("criteria").joinedload("criterion")). \
options(joinedload("selfevaltype")). \
options(undefer_group('counts')). \
join(Posts). \
options(contains_eager('post').joinedload("user").joinedload('usertypeforsystem')). \
options(contains_eager('post').joinedload("files")). \
filter(Posts.courses_id == course_id). \
order_by(desc(Posts.created))
if allow(MANAGE, question):
questions = base_query.all()
else:
now = datetime.datetime.utcnow()
questions = base_query. \
filter(or_(PostsForQuestions.answer_start.is_(None), now >= PostsForQuestions.answer_start)).\
all()
restrict_users = not allow(MANAGE, question)
on_question_list_get.send(
self,
event_name=on_question_list_get.name,
user=current_user,
course_id=course_id)
return {
"questions": marshal(questions, dataformat.get_posts_for_questions(restrict_users, include_answers=False))
}
开发者ID:gitter-badger,项目名称:acj-versus,代码行数:34,代码来源:question.py
示例12: drawlines
def drawlines(self, h):
sl = (
td.s.query(StockLine)
.filter(StockLine.location.in_(self.locations))
.filter(StockLine.capacity == None)
.order_by(StockLine.name)
.options(joinedload("stockonsale"))
.options(joinedload("stockonsale.stocktype"))
.options(undefer_group("qtys"))
.all()
)
f = ui.tableformatter("pl l L r rp")
header = f("Line", "StockID", "Stock", "Used", "Remaining")
def fl(line):
if line.stockonsale:
sos = line.stockonsale[0]
return (line.name, sos.id, sos.stocktype.format(), sos.used, sos.remaining)
return (line.name, "", "", "", "")
ml = [header] + [f(*fl(line)) for line in sl]
y = 0
for l in ml:
for line in l.display(self.w):
self.addstr(y, 0, line)
y = y + 1
if y >= h:
break
开发者ID:sde1000,项目名称:quicktill,代码行数:28,代码来源:stockterminal.py
示例13: _get_near_flights
def _get_near_flights(flight, location, time, max_distance=1000):
# calculate max_distance in degrees at the earth's sphere (approximate,
# cutoff at +-85 deg)
max_distance_deg = (max_distance / METERS_PER_DEGREE) / \
math.cos(math.radians(min(abs(location.latitude), 85)))
# the distance filter is geometric only, so max_distance must be given in
# SRID units (which is degrees for WGS84). The filter will be more and more
# inaccurate further to the poles. But it's a lot faster than the geograpic
# filter...
result = Flight.query() \
.options(undefer_group('path')) \
.filter(Flight.id != flight.id) \
.filter(Flight.takeoff_time <= time) \
.filter(Flight.landing_time >= time) \
.filter(func.ST_DWithin(Flight.locations,
location.to_wkt_element(),
max_distance_deg))
result = _patch_query(result)
flights = []
for flight in result:
# find point closest to given time
closest = min(range(len(flight.timestamps)),
key=lambda x: abs((flight.timestamps[x] - time).total_seconds()))
trace = to_shape(flight.locations).coords
if closest == 0 or closest == len(trace) - 1:
point = trace[closest]
else:
# interpolate flight trace between two fixes
next_smaller = closest if flight.timestamps[closest] < time else closest - 1
next_larger = closest if flight.timestamps[closest] > time else closest + 1
dx = (time - flight.timestamps[next_smaller]).total_seconds() / \
(flight.timestamps[next_larger] - flight.timestamps[next_smaller]).total_seconds()
point_next = trace[closest]
point_prev = trace[closest]
point = [point_prev[0] + (point_next[0] - point_prev[0]) * dx,
point_prev[1] + (point_next[1] - point_prev[1]) * dx]
point_distance = location.geographic_distance(
Location(latitude=point[1], longitude=point[0]))
if point_distance > max_distance:
continue
flights.append(flight)
# limit to 5 flights
if len(flights) == 5:
break
return flights
开发者ID:kerel-fs,项目名称:skylines,代码行数:58,代码来源:flight.py
示例14: find_users
def find_users(emails):
"""Find or generate user.
If Integration Server is specified not found in DB user is generated
with Creator role.
"""
# pylint: disable=too-many-locals
if not settings.INTEGRATION_SERVICE_URL:
return Person.query.filter(Person.email.in_(emails)).options(
orm.undefer_group('Person_complete')).all()
# Verify emails
usernames = [email.split('@')[0] for email in emails
if is_authorized_domain(email) and
not is_external_app_user_email(email)]
service = client.PersonClient()
ldaps = service.search_persons(usernames)
authorized_domain = getattr(settings, "AUTHORIZED_DOMAIN", "")
verified_emails = {'%[email protected]%s' % (ldap['username'], authorized_domain)
for ldap in ldaps}
# Find users in db
users = Person.query.filter(Person.email.in_(emails)).all()
found_emails = {user.email for user in users}
# Create new users
new_emails = verified_emails - found_emails
new_usernames = [email.split('@')[0] for email in new_emails]
new_users = [('%[email protected]%s' % (ldap['username'], authorized_domain),
'%s %s' % (ldap['firstName'], ldap['lastName']))
for ldap in ldaps if ldap['username'] in new_usernames]
for email, name in new_users:
user = create_user(email,
name=name,
modified_by_id=get_current_user_id())
users.append(user)
# bulk create people
if new_users:
log_event(db.session)
db.session.commit()
creator_role_granted = False
# Grant Creator role to all users
for user in users:
if user.system_wide_role == SystemWideRoles.NO_ACCESS:
add_creator_role(user)
creator_role_granted = True
# bulk create people roles
if creator_role_granted:
log_event(db.session)
db.session.commit()
return users
开发者ID:google,项目名称:ggrc-core,代码行数:58,代码来源:user_generator.py
示例15: stockline
def stockline(request, info, session, stocklineid):
try:
s = (
session.query(StockLine)
.filter_by(id=int(stocklineid))
.options(joinedload_all("stockonsale.stocktype.unit"))
.options(undefer_group("qtys"))
.one()
)
except NoResultFound:
raise Http404
return ("stockline.html", {"stockline": s})
开发者ID:sde1000,项目名称:quicktill,代码行数:12,代码来源:views.py
示例16: delivery
def delivery(request, info, session, deliveryid):
try:
d = (
session.query(Delivery)
.filter_by(id=int(deliveryid))
.options(joinedload_all("items.stocktype.unit"))
.options(joinedload_all("items.stockline"))
.options(undefer_group("qtys"))
.one()
)
except NoResultFound:
raise Http404
return ("delivery.html", {"delivery": d})
开发者ID:sde1000,项目名称:quicktill,代码行数:13,代码来源:views.py
示例17: related_objects
def related_objects(self, id):
"""Get data for assessment related_objects page."""
# id name is used as a kw argument and can't be changed here
# pylint: disable=invalid-name,redefined-builtin
with benchmark("check assessment permissions"):
assessment = models.Assessment.query.options(
orm.undefer_group("Assessment_complete")
).get(id)
if not permissions.is_allowed_read_for(assessment):
raise Forbidden()
with benchmark("Get assessment related_objects data"):
data = self._get_related_data(assessment)
with benchmark("Make response"):
return self.json_success_response(data, )
开发者ID:google,项目名称:ggrc-core,代码行数:14,代码来源:assessment.py
示例18: stocktype
def stocktype(request, info, session, stocktype_id):
try:
s = session.query(StockType).filter_by(id=int(stocktype_id)).one()
except NoResultFound:
raise Http404
include_finished = request.GET.get("show_finished", "off") == "on"
items = (
session.query(StockItem)
.filter(StockItem.stocktype == s)
.options(undefer_group("qtys"))
.order_by(desc(StockItem.id))
)
if not include_finished:
items = items.filter(StockItem.finished == None)
items = items.all()
return ("stocktype.html", {"stocktype": s, "items": items, "include_finished": include_finished})
开发者ID:sde1000,项目名称:quicktill,代码行数:16,代码来源:views.py
示例19: stock
def stock(request, info, session, stockid):
try:
s = (
session.query(StockItem)
.filter_by(id=int(stockid))
.options(joinedload_all("stocktype.department"))
.options(joinedload_all("stocktype.stockline_log.stockline"))
.options(joinedload_all("delivery.supplier"))
.options(joinedload_all("stockunit.unit"))
.options(joinedload_all("annotations.type"))
.options(subqueryload_all("out.transline.transaction"))
.options(undefer_group("qtys"))
.one()
)
except NoResultFound:
raise Http404
return ("stock.html", {"stock": s})
开发者ID:sde1000,项目名称:quicktill,代码行数:17,代码来源:views.py
示例20: get_custom_attribute_definitions
def get_custom_attribute_definitions(cls):
"""Get all applicable CA definitions (even ones without a value yet)."""
from ggrc.models.custom_attribute_definition import \
CustomAttributeDefinition as cad
if cls.__name__ == "Assessment":
query = cad.query.filter(or_(
cad.definition_type == utils.underscore_from_camelcase(cls.__name__),
cad.definition_type == "assessment_template",
))
else:
query = cad.query.filter(
cad.definition_type == utils.underscore_from_camelcase(cls.__name__)
)
return query.options(
orm.undefer_group('CustomAttributeDefinition_complete')
)
开发者ID:Smotko,项目名称:ggrc-core,代码行数:17,代码来源:customattributable.py
注:本文中的sqlalchemy.orm.undefer_group函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论