本文整理汇总了Python中sqlalchemy.orm.joinedload函数的典型用法代码示例。如果您正苦于以下问题:Python joinedload函数的具体用法?Python joinedload怎么用?Python joinedload使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了joinedload函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _process
def _process(self):
self.user.settings.set('suggest_categories', True)
tz = session.tzinfo
hours, minutes = timedelta_split(tz.utcoffset(datetime.now()))[:2]
categories = get_related_categories(self.user)
categories_events = []
if categories:
category_ids = {c['categ'].id for c in categories.itervalues()}
today = now_utc(False).astimezone(tz).date()
query = (Event.query
.filter(~Event.is_deleted,
Event.category_chain_overlaps(category_ids),
Event.start_dt.astimezone(session.tzinfo) >= today)
.options(joinedload('category').load_only('id', 'title'),
joinedload('series'),
subqueryload('acl_entries'),
load_only('id', 'category_id', 'start_dt', 'end_dt', 'title', 'access_key',
'protection_mode', 'series_id', 'series_pos', 'series_count'))
.order_by(Event.start_dt, Event.id))
categories_events = get_n_matching(query, 10, lambda x: x.can_access(self.user))
from_dt = now_utc(False) - relativedelta(weeks=1, hour=0, minute=0, second=0)
linked_events = [(event, {'management': bool(roles & self.management_roles),
'reviewing': bool(roles & self.reviewer_roles),
'attendance': bool(roles & self.attendance_roles)})
for event, roles in get_linked_events(self.user, from_dt, 10).iteritems()]
return WPUser.render_template('dashboard.html', 'dashboard',
offset='{:+03d}:{:02d}'.format(hours, minutes), user=self.user,
categories=categories,
categories_events=categories_events,
suggested_categories=get_suggested_categories(self.user),
linked_events=linked_events)
开发者ID:jas01,项目名称:indico,代码行数:31,代码来源:controllers.py
示例2: get_by_uri
def get_by_uri(self, uri):
'''Get all information on a concept or collection, based on a
:term:`URI`.
This method will only find concepts or collections whose :term:`URI` is
actually stored in the database. It will not find anything that has
no :term:`URI` in the database, but does have a matching :term:`URI`
after generation.
:rtype: :class:`skosprovider.skos.Concept` or
:class:`skosprovider.skos.Collection` or `False` if the concept or
collection is unknown to the provider.
'''
try:
thing = self.session\
.query(Thing)\
.options(joinedload('labels'))\
.options(joinedload('notes'))\
.options(joinedload('sources'))\
.filter(
Thing.uri == uri,
Thing.conceptscheme_id == self.conceptscheme_id
).one()
except NoResultFound:
return False
return self._from_thing(thing)
开发者ID:koenedaele,项目名称:skosprovider_sqlalchemy,代码行数:26,代码来源:providers.py
示例3: test_weakref_with_cycles_o2o
def test_weakref_with_cycles_o2o(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User)
s = sessionmaker()()
mapper(User, users, properties={
"address": relationship(Address, backref="user", uselist=False)
})
mapper(Address, addresses)
s.add(User(name="ed", address=Address(email_address="ed1")))
s.commit()
user = s.query(User).options(joinedload(User.address)).one()
user.address.user
eq_(user, User(name="ed", address=Address(email_address="ed1")))
del user
gc_collect()
assert len(s.identity_map) == 0
user = s.query(User).options(joinedload(User.address)).one()
user.address.email_address = 'ed2'
user.address.user # lazyload
del user
gc_collect()
assert len(s.identity_map) == 2
s.commit()
user = s.query(User).options(joinedload(User.address)).one()
eq_(user, User(name="ed", address=Address(email_address="ed2")))
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:33,代码来源:test_session.py
示例4: get_graphs_by_edges_and_nodes_and_names
def get_graphs_by_edges_and_nodes_and_names(db_session, group_ids=None, names=None, nodes=None, edges=None, tags=None,
order=desc(Graph.updated_at), page=0, page_size=10, partial_matching=False,
owner_email=None, is_public=None):
query = db_session.query(Graph)
edges = [] if edges is None else edges
nodes = [] if nodes is None else nodes
names = [] if names is None else names
tags = [] if tags is None else tags
edges = [('%%%s%%' % u, '%%%s%%' % v) for u, v in edges] if partial_matching else edges
nodes = ['%%%s%%' % node for node in nodes] if partial_matching else nodes
names = ['%%%s%%' % name for name in names] if partial_matching else names
tags = ['%%%s%%' % tag for tag in tags]
graph_filter_group = []
if is_public is not None:
graph_filter_group.append(Graph.is_public == is_public)
if owner_email is not None:
graph_filter_group.append(Graph.owner_email == owner_email)
if group_ids is not None:
query = query.filter(Graph.shared_with_groups.any(Group.id.in_(group_ids)))
if len(graph_filter_group) > 0:
query = query.filter(*graph_filter_group)
names_filter_group = [Graph.name.ilike(name) for name in names]
tags_filter_group = [GraphTag.name.ilike(tag) for tag in tags]
nodes_filter_group = [Node.label.ilike(node) for node in nodes]
nodes_filter_group.extend([Node.name.ilike(node) for node in nodes])
edges_filter_group = [and_(Edge.head_node.has(Node.name.ilike(u)), Edge.tail_node.has(Node.name.ilike(v))) for u, v
in edges]
edges_filter_group.extend(
[and_(Edge.tail_node.has(Node.name.ilike(u)), Edge.head_node.has(Node.name.ilike(v))) for u, v in edges])
edges_filter_group.extend(
[and_(Edge.head_node.has(Node.label.ilike(u)), Edge.tail_node.has(Node.label.ilike(v))) for u, v in edges])
edges_filter_group.extend(
[and_(Edge.tail_node.has(Node.label.ilike(u)), Edge.head_node.has(Node.label.ilike(v))) for u, v in edges])
options_group = []
if len(nodes_filter_group) > 0:
options_group.append(joinedload('nodes'))
if len(edges_filter_group) > 0:
options_group.append(joinedload('edges'))
if len(options_group) > 0:
query = query.options(*options_group)
combined_filter_group = []
if len(nodes_filter_group) > 0:
combined_filter_group.append(Graph.nodes.any(or_(*nodes_filter_group)))
if len(edges_filter_group) > 0:
combined_filter_group.append(Graph.edges.any(or_(*edges_filter_group)))
if len(names_filter_group) > 0:
combined_filter_group.append(*names_filter_group)
if len(tags_filter_group) > 0:
combined_filter_group.append(*tags_filter_group)
if len(combined_filter_group) > 0:
query = query.filter(or_(*combined_filter_group))
return query.order_by(order).limit(page_size).offset(page * page_size).all()
开发者ID:jlaw9,项目名称:GraphSpace,代码行数:60,代码来源:dal.py
示例5: _get_ips_except_admin
def _get_ips_except_admin(self, node_id=None,
network_id=None, joined=False):
"""Method for receiving IP addresses for node or network
excluding Admin Network IP address.
:param node_id: Node database ID.
:type node_id: int
:param network_id: Network database ID.
:type network_id: int
:returns: List of free IP addresses as SQLAlchemy objects.
"""
ips = db().query(IPAddr).order_by(IPAddr.id)
if joined:
ips = ips.options(
joinedload('network_data'),
joinedload('network_data.network_group'))
if node_id:
ips = ips.filter_by(node=node_id)
if network_id:
ips = ips.filter_by(network=network_id)
admin_net_id = self.get_admin_network_id(False)
if admin_net_id:
ips = ips.filter(
not_(IPAddr.network == admin_net_id)
)
return ips.all()
开发者ID:loles,项目名称:fuelweb,代码行数:28,代码来源:manager.py
示例6: image_get
def image_get(context, image_id, session=None):
"""Get an image or raise if it does not exist."""
session = session or get_session()
try:
#NOTE(bcwaldon): this is to prevent false matches when mysql compares
# an integer to a string that begins with that integer
image_id = int(image_id)
except (TypeError, ValueError):
raise exception.NotFound("No image found")
try:
image = session.query(models.Image).\
options(joinedload(models.Image.properties)).\
options(joinedload(models.Image.members)).\
filter_by(deleted=_deleted(context)).\
filter_by(id=image_id).\
one()
except exc.NoResultFound:
raise exception.NotFound("No image found with ID %s" % image_id)
# Make sure they can look at it
if not context.is_image_visible(image):
raise exception.NotAuthorized("Image not visible to you")
return image
开发者ID:ashokcse,项目名称:openstack-bill,代码行数:25,代码来源:api.py
示例7: cycle_tasks_cache
def cycle_tasks_cache(notifications):
"""Compile and return Task instances related to given notification records.
Args:
notifications: a list of Notification instances for which to fetch the
corresponding CycleTaskGroupObjectTask instances
accessible by their ID as a key
Returns:
Dictionary containing task instances with their IDs used as keys.
"""
task_ids = [n.object_id for n in notifications
if n.object_type == "CycleTaskGroupObjectTask"]
if not task_ids:
return {}
results = db.session\
.query(CycleTaskGroupObjectTask)\
.options(
orm.joinedload("related_sources"),
orm.joinedload("related_destinations")
)\
.filter(CycleTaskGroupObjectTask.id.in_(task_ids))
return {task.id: task for task in results}
开发者ID:Smotko,项目名称:ggrc-core,代码行数:25,代码来源:data_handler.py
示例8: index
def index(request):
project_ids = [
r[0]
for r in (
request.db.query(Project.id)
.order_by(Project.zscore.desc().nullslast(), func.random())
.limit(5)
.all()
)
]
release_a = aliased(
Release,
request.db.query(Release)
.distinct(Release.project_id)
.filter(Release.project_id.in_(project_ids))
.order_by(
Release.project_id,
Release.is_prerelease.nullslast(),
Release._pypi_ordering.desc(),
)
.subquery(),
)
trending_projects = (
request.db.query(release_a)
.options(joinedload(release_a.project))
.order_by(func.array_idx(project_ids, release_a.project_id))
.all()
)
latest_releases = (
request.db.query(Release)
.options(joinedload(Release.project))
.order_by(Release.created.desc())
.limit(5)
.all()
)
counts = dict(
request.db.query(RowCount.table_name, RowCount.count)
.filter(
RowCount.table_name.in_(
[
Project.__tablename__,
Release.__tablename__,
File.__tablename__,
User.__tablename__,
]
)
)
.all()
)
return {
"latest_releases": latest_releases,
"trending_projects": trending_projects,
"num_projects": counts.get(Project.__tablename__, 0),
"num_releases": counts.get(Release.__tablename__, 0),
"num_files": counts.get(File.__tablename__, 0),
"num_users": counts.get(User.__tablename__, 0),
}
开发者ID:craig5,项目名称:warehouse,代码行数:60,代码来源:views.py
示例9: get_for_linked_object
def get_for_linked_object(cls, linked_object, preload_event=True):
"""Gets the note for the given object.
This only returns a note that hasn't been deleted.
:param linked_object: An event, session, contribution or
subcontribution.
:param preload_event: If all notes for the same event should
be pre-loaded and cached in the app
context.
"""
event = linked_object.event
try:
return g.event_notes[event].get(linked_object)
except (AttributeError, KeyError):
if not preload_event:
return linked_object.note if linked_object.note and not linked_object.note.is_deleted else None
if 'event_notes' not in g:
g.event_notes = {}
query = (event.all_notes
.filter_by(is_deleted=False)
.options(joinedload(EventNote.linked_event),
joinedload(EventNote.session),
joinedload(EventNote.contribution),
joinedload(EventNote.subcontribution)))
g.event_notes[event] = {n.object: n for n in query}
return g.event_notes[event].get(linked_object)
开发者ID:DirkHoffmann,项目名称:indico,代码行数:27,代码来源:notes.py
示例10: test_get_node_networks_optimization
def test_get_node_networks_optimization(self):
self.env.create(
cluster_kwargs={},
nodes_kwargs=[
{"pending_addition": True, "api": True},
{"pending_addition": True, "api": True}
]
)
self.env.network_manager.assign_ips(
[n.id for n in self.env.nodes],
"management"
)
nodes = self.db.query(Node).options(
joinedload('cluster'),
joinedload('interfaces'),
joinedload('interfaces.assigned_networks')).all()
ips_mapped = self.env.network_manager.get_grouped_ips_by_node()
networks_grouped = self.env.network_manager.\
get_networks_grouped_by_cluster()
full_results = []
for node in nodes:
result = self.env.network_manager.get_node_networks_optimized(
node, ips_mapped.get(node.id, []),
networks_grouped.get(node.cluster_id, []))
full_results.append(result)
self.assertEqual(len(full_results), 2)
开发者ID:loles,项目名称:fuelweb,代码行数:29,代码来源:test_network_manager.py
示例11: _get
def _get(context, artifact_id, session, type_name=None, type_version=None,
show_level=ga.Showlevel.BASIC):
values = dict(id=artifact_id)
if type_name is not None:
values['type_name'] = type_name
if type_version is not None:
values['type_version'] = type_version
_set_version_fields(values)
try:
if show_level == ga.Showlevel.NONE:
query = (
session.query(models.Artifact).
options(joinedload(models.Artifact.tags)).
filter_by(**values))
else:
query = (
session.query(models.Artifact).
options(joinedload(models.Artifact.properties)).
options(joinedload(models.Artifact.tags)).
options(joinedload(models.Artifact.blobs).
joinedload(models.ArtifactBlob.locations)).
filter_by(**values))
artifact = query.one()
except orm.exc.NoResultFound:
LOG.warn(_LW("Artifact with id=%s not found") % artifact_id)
raise exception.ArtifactNotFound(id=artifact_id)
if not _check_visibility(context, artifact):
LOG.warn(_LW("Artifact with id=%s is not accessible") % artifact_id)
raise exception.ArtifactForbidden(id=artifact_id)
return artifact
开发者ID:windskyer,项目名称:glance,代码行数:31,代码来源:artifacts.py
示例12: question_dump
def question_dump():
cols = ['name', 'legislature', 'date', 'title', 'score']
ask_query = (
models.Ask.query
.join(models.Ask.question)
.options(
joinedload('question'),
joinedload('mandate'),
joinedload('mandate.person'),
joinedload('match_row'),
)
.order_by(models.Question.date.desc())
)
def make_row(ask):
score = ask.match.score
return {
'name': ask.mandate.person.name,
'legislature': str(ask.mandate.year),
'date': str(ask.question.date),
'title': str(ask.question.title),
'score': '' if score is None else str(score),
}
rows = (make_row(ask) for ask in ask_query.yield_per(10))
data = buffer_on_disk(csv_lines(cols, rows))
return flask.Response(data, mimetype='text/csv')
开发者ID:Cristianf,项目名称:mptracker,代码行数:25,代码来源:questions.py
示例13: unit_update_stat
def unit_update_stat():
dbsession = DBSession()
units = dbsession.query(Unit).options(joinedload(Unit.protocol_o), joinedload(Unit.protocol_i))
dbsession.query(UnitStat).delete()
for unit in units:
stat = UnitStat(unit_id=unit.id)
stat.official = unit.protocol_o != None
stat.independent = unit.protocol_i != None
stat.diff = False
stat.diff_value = 0
if unit.protocol_i and unit.protocol_o:
check = dict()
for i in unit.protocol_o.vote:
check[i.party_id] = i.vote_c
for i in unit.protocol_i.vote:
if check[i.party_id] != i.vote_c:
stat.diff_value += abs(i.vote_c - i.party_id)
stat.diff = True
dbsession.add(stat)
dbsession.add(stat)
开发者ID:nextgis,项目名称:vote2map,代码行数:27,代码来源:models.py
示例14: _load_data
def _load_data(self, id):
c.users_group.permissions = {
'repositories': {},
'repositories_groups': {}
}
ugroup_repo_perms = UsersGroupRepoToPerm.query()\
.options(joinedload(UsersGroupRepoToPerm.permission))\
.options(joinedload(UsersGroupRepoToPerm.repository))\
.filter(UsersGroupRepoToPerm.users_group_id == id)\
.all()
for gr in ugroup_repo_perms:
c.users_group.permissions['repositories'][gr.repository.repo_name] \
= gr.permission.permission_name
ugroup_group_perms = UsersGroupRepoGroupToPerm.query()\
.options(joinedload(UsersGroupRepoGroupToPerm.permission))\
.options(joinedload(UsersGroupRepoGroupToPerm.group))\
.filter(UsersGroupRepoGroupToPerm.users_group_id == id)\
.all()
for gr in ugroup_group_perms:
c.users_group.permissions['repositories_groups'][gr.group.group_name] \
= gr.permission.permission_name
c.group_members_obj = [x.user for x in c.users_group.members]
c.group_members = [(x.user_id, x.username) for x in
c.group_members_obj]
c.available_members = [(x.user_id, x.username) for x in
User.query().all()]
开发者ID:break123,项目名称:rhodecode,代码行数:31,代码来源:users_groups.py
示例15: _paginate_offset
def _paginate_offset(self, clazz, schema, adapt_schema):
"""Return a batch of documents with the given `offset` and `limit`.
"""
validated = self.request.validated
offset = validated['offset'] if 'offset' in validated else 0
limit = min(
validated['limit'] if 'limit' in validated else LIMIT_DEFAULT,
LIMIT_MAX)
base_query = DBSession.query(clazz)
documents = base_query. \
options(joinedload(getattr(clazz, 'locales'))). \
options(joinedload(getattr(clazz, 'geometry'))). \
order_by(clazz.document_id.desc()). \
slice(offset, offset + limit). \
all()
set_available_cultures(documents, loaded=True)
if validated.get('lang') is not None:
set_best_locale(documents, validated.get('lang'))
total = base_query.count()
return {
'documents': [
to_json_dict(
doc,
schema if not adapt_schema else adapt_schema(schema, doc)
) for doc in documents
],
'total': total
}
开发者ID:mfournier,项目名称:v6_api,代码行数:33,代码来源:document.py
示例16: _image_get
def _image_get(context, image_id, session=None, force_show_deleted=False):
"""Get an image or raise if it does not exist."""
_check_image_id(image_id)
session = session or get_session()
try:
query = session.query(models.Image)\
.options(sa_orm.joinedload(models.Image.properties))\
.options(sa_orm.joinedload(models.Image.locations))\
.filter_by(id=image_id)
# filter out deleted images if context disallows it
if not force_show_deleted and not _can_show_deleted(context):
query = query.filter_by(deleted=False)
image = query.one()
except sa_orm.exc.NoResultFound:
msg = "No image found with ID %s" % image_id
LOG.debug(msg)
raise exception.NotFound(msg)
# Make sure they can look at it
if not is_image_visible(context, image):
msg = "Forbidding request, image %s not visible" % image_id
LOG.debug(msg)
raise exception.Forbidden(msg)
return image
开发者ID:AsherBond,项目名称:glance,代码行数:29,代码来源:api.py
示例17: drawlines
def drawlines(self, h):
sl = (
td.s.query(StockLine)
.filter(StockLine.location.in_(self.locations))
.filter(StockLine.capacity == None)
.order_by(StockLine.name)
.options(joinedload("stockonsale"))
.options(joinedload("stockonsale.stocktype"))
.options(undefer_group("qtys"))
.all()
)
f = ui.tableformatter("pl l L r rp")
header = f("Line", "StockID", "Stock", "Used", "Remaining")
def fl(line):
if line.stockonsale:
sos = line.stockonsale[0]
return (line.name, sos.id, sos.stocktype.format(), sos.used, sos.remaining)
return (line.name, "", "", "", "")
ml = [header] + [f(*fl(line)) for line in sl]
y = 0
for l in ml:
for line in l.display(self.w):
self.addstr(y, 0, line)
y = y + 1
if y >= h:
break
开发者ID:sde1000,项目名称:quicktill,代码行数:28,代码来源:stockterminal.py
示例18: render
def render(self, session, **arguments):
q = session.query(Chassis)
q = q.options(subqueryload('model'),
joinedload('model.machine_specs'),
subqueryload('location'),
joinedload('slots'),
subqueryload('slots.machine'),
# A rare case when we don't need primary name/host
lazyload('slots.machine.primary_name'),
lazyload('slots.machine.host'),
subqueryload('interfaces'),
joinedload('interfaces.assignments'),
joinedload('interfaces.assignments.network'),
joinedload('interfaces.assignments.dns_records'))
# Prefer the primary name for ordering
q = q.outerjoin(DnsRecord, (Fqdn, DnsRecord.fqdn_id == Fqdn.id),
DnsDomain)
q = q.options(contains_eager('primary_name'),
contains_eager('primary_name.fqdn'),
contains_eager('primary_name.fqdn.dns_domain'))
q = q.order_by(Fqdn.name, DnsDomain.name, Chassis.label)
return q.all()
开发者ID:jrha,项目名称:aquilon,代码行数:26,代码来源:show_chassis_all.py
示例19: list_tasks
def list_tasks(self, limit=None, details=False, category=None,
offset=None, status=None, sample_id=None, not_status=None):
"""Retrieve list of task.
@param limit: specify a limit of entries.
@param details: if details about must be included
@param category: filter by category
@param offset: list offset
@param status: filter by task status
@param sample_id: filter tasks for a sample
@param not_status: exclude this task status from filter
@return: list of tasks.
"""
session = self.Session()
try:
search = session.query(Task)
if status:
search = search.filter_by(status=status)
if not_status:
search = search.filter(Task.status != not_status)
if category:
search = search.filter_by(category=category)
if details:
search = search.options(joinedload("guest"), joinedload("errors"), joinedload("tags"))
if sample_id is not None:
search = search.filter_by(sample_id=sample_id)
tasks = search.order_by("added_on desc").limit(limit).offset(offset).all()
except SQLAlchemyError as e:
log.debug("Database error listing tasks: {0}".format(e))
return []
finally:
session.close()
return tasks
开发者ID:IFGHou,项目名称:cuckoo,代码行数:34,代码来源:database.py
示例20: get_indexable_contents
def get_indexable_contents(session):
from assembl.models import AgentProfile, Idea, Post
from assembl.models.post import PublicationStates
query = session.query(Idea
).filter(Idea.tombstone_condition()
).filter(Idea.hidden==False
).options(
joinedload(Idea.title).joinedload("entries"),
joinedload(Idea.synthesis_title).joinedload("entries"),
joinedload(Idea.description).joinedload("entries")
)
for idea in query:
yield idea
query = session.query(AgentProfile)
for user in query:
yield user
AllPost = with_polymorphic(Post, '*')
query = session.query(AllPost
).filter(AllPost.tombstone_condition()
).filter(AllPost.hidden==False
).filter(AllPost.publication_state == PublicationStates.PUBLISHED
).options(
joinedload(AllPost.subject).joinedload("entries"),
joinedload(AllPost.body).joinedload("entries")
)
for post in query:
for extract in post.extracts:
yield extract
yield post
开发者ID:assembl,项目名称:assembl,代码行数:34,代码来源:reindex.py
注:本文中的sqlalchemy.orm.joinedload函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论