• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python orm.joinedload函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.joinedload函数的典型用法代码示例。如果您正苦于以下问题:Python joinedload函数的具体用法?Python joinedload怎么用?Python joinedload使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了joinedload函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _process

 def _process(self):
     self.user.settings.set('suggest_categories', True)
     tz = session.tzinfo
     hours, minutes = timedelta_split(tz.utcoffset(datetime.now()))[:2]
     categories = get_related_categories(self.user)
     categories_events = []
     if categories:
         category_ids = {c['categ'].id for c in categories.itervalues()}
         today = now_utc(False).astimezone(tz).date()
         query = (Event.query
                  .filter(~Event.is_deleted,
                          Event.category_chain_overlaps(category_ids),
                          Event.start_dt.astimezone(session.tzinfo) >= today)
                  .options(joinedload('category').load_only('id', 'title'),
                           joinedload('series'),
                           subqueryload('acl_entries'),
                           load_only('id', 'category_id', 'start_dt', 'end_dt', 'title', 'access_key',
                                     'protection_mode', 'series_id', 'series_pos', 'series_count'))
                  .order_by(Event.start_dt, Event.id))
         categories_events = get_n_matching(query, 10, lambda x: x.can_access(self.user))
     from_dt = now_utc(False) - relativedelta(weeks=1, hour=0, minute=0, second=0)
     linked_events = [(event, {'management': bool(roles & self.management_roles),
                               'reviewing': bool(roles & self.reviewer_roles),
                               'attendance': bool(roles & self.attendance_roles)})
                      for event, roles in get_linked_events(self.user, from_dt, 10).iteritems()]
     return WPUser.render_template('dashboard.html', 'dashboard',
                                   offset='{:+03d}:{:02d}'.format(hours, minutes), user=self.user,
                                   categories=categories,
                                   categories_events=categories_events,
                                   suggested_categories=get_suggested_categories(self.user),
                                   linked_events=linked_events)
开发者ID:jas01,项目名称:indico,代码行数:31,代码来源:controllers.py


示例2: get_by_uri

    def get_by_uri(self, uri):
        '''Get all information on a concept or collection, based on a
        :term:`URI`.

        This method will only find concepts or collections whose :term:`URI` is 
        actually stored in the database. It will not find anything that has
        no :term:`URI` in the database, but does have a matching :term:`URI`
        after generation.

        :rtype: :class:`skosprovider.skos.Concept` or
            :class:`skosprovider.skos.Collection` or `False` if the concept or
            collection is unknown to the provider.
        '''
        try:
            thing = self.session\
                        .query(Thing)\
                        .options(joinedload('labels'))\
                        .options(joinedload('notes'))\
                        .options(joinedload('sources'))\
                        .filter(
                            Thing.uri == uri,
                            Thing.conceptscheme_id == self.conceptscheme_id
                        ).one()
        except NoResultFound:
            return False
        return self._from_thing(thing)
开发者ID:koenedaele,项目名称:skosprovider_sqlalchemy,代码行数:26,代码来源:providers.py


示例3: test_weakref_with_cycles_o2o

    def test_weakref_with_cycles_o2o(self):
        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)

        s = sessionmaker()()
        mapper(User, users, properties={
            "address": relationship(Address, backref="user", uselist=False)
        })
        mapper(Address, addresses)
        s.add(User(name="ed", address=Address(email_address="ed1")))
        s.commit()

        user = s.query(User).options(joinedload(User.address)).one()
        user.address.user
        eq_(user, User(name="ed", address=Address(email_address="ed1")))

        del user
        gc_collect()
        assert len(s.identity_map) == 0

        user = s.query(User).options(joinedload(User.address)).one()
        user.address.email_address = 'ed2'
        user.address.user  # lazyload

        del user
        gc_collect()
        assert len(s.identity_map) == 2

        s.commit()
        user = s.query(User).options(joinedload(User.address)).one()
        eq_(user, User(name="ed", address=Address(email_address="ed2")))
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:33,代码来源:test_session.py


示例4: get_graphs_by_edges_and_nodes_and_names

def get_graphs_by_edges_and_nodes_and_names(db_session, group_ids=None, names=None, nodes=None, edges=None, tags=None,
                                            order=desc(Graph.updated_at), page=0, page_size=10, partial_matching=False,
                                            owner_email=None, is_public=None):
	query = db_session.query(Graph)

	edges = [] if edges is None else edges
	nodes = [] if nodes is None else nodes
	names = [] if names is None else names
	tags = [] if tags is None else tags

	edges = [('%%%s%%' % u, '%%%s%%' % v) for u, v in edges] if partial_matching else edges
	nodes = ['%%%s%%' % node for node in nodes] if partial_matching else nodes
	names = ['%%%s%%' % name for name in names] if partial_matching else names
	tags = ['%%%s%%' % tag for tag in tags]

	graph_filter_group = []
	if is_public is not None:
		graph_filter_group.append(Graph.is_public == is_public)
	if owner_email is not None:
		graph_filter_group.append(Graph.owner_email == owner_email)
	if group_ids is not None:
		query = query.filter(Graph.shared_with_groups.any(Group.id.in_(group_ids)))
	if len(graph_filter_group) > 0:
		query = query.filter(*graph_filter_group)

	names_filter_group = [Graph.name.ilike(name) for name in names]
	tags_filter_group = [GraphTag.name.ilike(tag) for tag in tags]
	nodes_filter_group = [Node.label.ilike(node) for node in nodes]
	nodes_filter_group.extend([Node.name.ilike(node) for node in nodes])
	edges_filter_group = [and_(Edge.head_node.has(Node.name.ilike(u)), Edge.tail_node.has(Node.name.ilike(v))) for u, v
	                      in edges]
	edges_filter_group.extend(
		[and_(Edge.tail_node.has(Node.name.ilike(u)), Edge.head_node.has(Node.name.ilike(v))) for u, v in edges])
	edges_filter_group.extend(
		[and_(Edge.head_node.has(Node.label.ilike(u)), Edge.tail_node.has(Node.label.ilike(v))) for u, v in edges])
	edges_filter_group.extend(
		[and_(Edge.tail_node.has(Node.label.ilike(u)), Edge.head_node.has(Node.label.ilike(v))) for u, v in edges])

	options_group = []
	if len(nodes_filter_group) > 0:
		options_group.append(joinedload('nodes'))
	if len(edges_filter_group) > 0:
		options_group.append(joinedload('edges'))
	if len(options_group) > 0:
		query = query.options(*options_group)

	combined_filter_group = []
	if len(nodes_filter_group) > 0:
		combined_filter_group.append(Graph.nodes.any(or_(*nodes_filter_group)))
	if len(edges_filter_group) > 0:
		combined_filter_group.append(Graph.edges.any(or_(*edges_filter_group)))
	if len(names_filter_group) > 0:
		combined_filter_group.append(*names_filter_group)
	if len(tags_filter_group) > 0:
		combined_filter_group.append(*tags_filter_group)

	if len(combined_filter_group) > 0:
		query = query.filter(or_(*combined_filter_group))

	return query.order_by(order).limit(page_size).offset(page * page_size).all()
开发者ID:jlaw9,项目名称:GraphSpace,代码行数:60,代码来源:dal.py


示例5: _get_ips_except_admin

    def _get_ips_except_admin(self, node_id=None,
                              network_id=None, joined=False):
        """Method for receiving IP addresses for node or network
        excluding Admin Network IP address.

        :param node_id: Node database ID.
        :type  node_id: int
        :param network_id: Network database ID.
        :type  network_id: int
        :returns: List of free IP addresses as SQLAlchemy objects.
        """
        ips = db().query(IPAddr).order_by(IPAddr.id)
        if joined:
            ips = ips.options(
                joinedload('network_data'),
                joinedload('network_data.network_group'))
        if node_id:
            ips = ips.filter_by(node=node_id)
        if network_id:
            ips = ips.filter_by(network=network_id)

        admin_net_id = self.get_admin_network_id(False)
        if admin_net_id:
            ips = ips.filter(
                not_(IPAddr.network == admin_net_id)
            )

        return ips.all()
开发者ID:loles,项目名称:fuelweb,代码行数:28,代码来源:manager.py


示例6: image_get

def image_get(context, image_id, session=None):
    """Get an image or raise if it does not exist."""
    session = session or get_session()
    try:
        #NOTE(bcwaldon): this is to prevent false matches when mysql compares
        # an integer to a string that begins with that integer
        image_id = int(image_id)
    except (TypeError, ValueError):
        raise exception.NotFound("No image found")

    try:
        image = session.query(models.Image).\
                       options(joinedload(models.Image.properties)).\
                       options(joinedload(models.Image.members)).\
                       filter_by(deleted=_deleted(context)).\
                       filter_by(id=image_id).\
                       one()
    except exc.NoResultFound:
        raise exception.NotFound("No image found with ID %s" % image_id)

    # Make sure they can look at it
    if not context.is_image_visible(image):
        raise exception.NotAuthorized("Image not visible to you")

    return image
开发者ID:ashokcse,项目名称:openstack-bill,代码行数:25,代码来源:api.py


示例7: cycle_tasks_cache

def cycle_tasks_cache(notifications):
  """Compile and return Task instances related to given notification records.

  Args:
    notifications: a list of Notification instances for which to fetch the
      corresponding CycleTaskGroupObjectTask instances
      accessible by their ID as a key
  Returns:
    Dictionary containing task instances with their IDs used as keys.
  """
  task_ids = [n.object_id for n in notifications
              if n.object_type == "CycleTaskGroupObjectTask"]

  if not task_ids:
    return {}

  results = db.session\
      .query(CycleTaskGroupObjectTask)\
      .options(
          orm.joinedload("related_sources"),
          orm.joinedload("related_destinations")
      )\
      .filter(CycleTaskGroupObjectTask.id.in_(task_ids))

  return {task.id: task for task in results}
开发者ID:Smotko,项目名称:ggrc-core,代码行数:25,代码来源:data_handler.py


示例8: index

def index(request):
    project_ids = [
        r[0]
        for r in (
            request.db.query(Project.id)
            .order_by(Project.zscore.desc().nullslast(), func.random())
            .limit(5)
            .all()
        )
    ]
    release_a = aliased(
        Release,
        request.db.query(Release)
        .distinct(Release.project_id)
        .filter(Release.project_id.in_(project_ids))
        .order_by(
            Release.project_id,
            Release.is_prerelease.nullslast(),
            Release._pypi_ordering.desc(),
        )
        .subquery(),
    )
    trending_projects = (
        request.db.query(release_a)
        .options(joinedload(release_a.project))
        .order_by(func.array_idx(project_ids, release_a.project_id))
        .all()
    )

    latest_releases = (
        request.db.query(Release)
        .options(joinedload(Release.project))
        .order_by(Release.created.desc())
        .limit(5)
        .all()
    )

    counts = dict(
        request.db.query(RowCount.table_name, RowCount.count)
        .filter(
            RowCount.table_name.in_(
                [
                    Project.__tablename__,
                    Release.__tablename__,
                    File.__tablename__,
                    User.__tablename__,
                ]
            )
        )
        .all()
    )

    return {
        "latest_releases": latest_releases,
        "trending_projects": trending_projects,
        "num_projects": counts.get(Project.__tablename__, 0),
        "num_releases": counts.get(Release.__tablename__, 0),
        "num_files": counts.get(File.__tablename__, 0),
        "num_users": counts.get(User.__tablename__, 0),
    }
开发者ID:craig5,项目名称:warehouse,代码行数:60,代码来源:views.py


示例9: get_for_linked_object

    def get_for_linked_object(cls, linked_object, preload_event=True):
        """Gets the note for the given object.

        This only returns a note that hasn't been deleted.

        :param linked_object: An event, session, contribution or
                              subcontribution.
        :param preload_event: If all notes for the same event should
                              be pre-loaded and cached in the app
                              context.
        """
        event = linked_object.event
        try:
            return g.event_notes[event].get(linked_object)
        except (AttributeError, KeyError):
            if not preload_event:
                return linked_object.note if linked_object.note and not linked_object.note.is_deleted else None
            if 'event_notes' not in g:
                g.event_notes = {}
            query = (event.all_notes
                     .filter_by(is_deleted=False)
                     .options(joinedload(EventNote.linked_event),
                              joinedload(EventNote.session),
                              joinedload(EventNote.contribution),
                              joinedload(EventNote.subcontribution)))
            g.event_notes[event] = {n.object: n for n in query}
            return g.event_notes[event].get(linked_object)
开发者ID:DirkHoffmann,项目名称:indico,代码行数:27,代码来源:notes.py


示例10: test_get_node_networks_optimization

    def test_get_node_networks_optimization(self):
        self.env.create(
            cluster_kwargs={},
            nodes_kwargs=[
                {"pending_addition": True, "api": True},
                {"pending_addition": True, "api": True}
            ]
        )

        self.env.network_manager.assign_ips(
            [n.id for n in self.env.nodes],
            "management"
        )

        nodes = self.db.query(Node).options(
            joinedload('cluster'),
            joinedload('interfaces'),
            joinedload('interfaces.assigned_networks')).all()

        ips_mapped = self.env.network_manager.get_grouped_ips_by_node()
        networks_grouped = self.env.network_manager.\
            get_networks_grouped_by_cluster()
        full_results = []
        for node in nodes:
            result = self.env.network_manager.get_node_networks_optimized(
                node, ips_mapped.get(node.id, []),
                networks_grouped.get(node.cluster_id, []))
            full_results.append(result)
        self.assertEqual(len(full_results), 2)
开发者ID:loles,项目名称:fuelweb,代码行数:29,代码来源:test_network_manager.py


示例11: _get

def _get(context, artifact_id, session, type_name=None, type_version=None,
         show_level=ga.Showlevel.BASIC):
    values = dict(id=artifact_id)
    if type_name is not None:
        values['type_name'] = type_name
    if type_version is not None:
        values['type_version'] = type_version
    _set_version_fields(values)
    try:
        if show_level == ga.Showlevel.NONE:
            query = (
                session.query(models.Artifact).
                options(joinedload(models.Artifact.tags)).
                filter_by(**values))
        else:
            query = (
                session.query(models.Artifact).
                options(joinedload(models.Artifact.properties)).
                options(joinedload(models.Artifact.tags)).
                options(joinedload(models.Artifact.blobs).
                        joinedload(models.ArtifactBlob.locations)).
                filter_by(**values))

        artifact = query.one()
    except orm.exc.NoResultFound:
        LOG.warn(_LW("Artifact with id=%s not found") % artifact_id)
        raise exception.ArtifactNotFound(id=artifact_id)
    if not _check_visibility(context, artifact):
        LOG.warn(_LW("Artifact with id=%s is not accessible") % artifact_id)
        raise exception.ArtifactForbidden(id=artifact_id)
    return artifact
开发者ID:windskyer,项目名称:glance,代码行数:31,代码来源:artifacts.py


示例12: question_dump

def question_dump():
    cols = ['name', 'legislature', 'date', 'title', 'score']
    ask_query = (
        models.Ask.query
        .join(models.Ask.question)
        .options(
            joinedload('question'),
            joinedload('mandate'),
            joinedload('mandate.person'),
            joinedload('match_row'),
        )
        .order_by(models.Question.date.desc())
    )
    def make_row(ask):
        score = ask.match.score
        return {
            'name': ask.mandate.person.name,
            'legislature': str(ask.mandate.year),
            'date': str(ask.question.date),
            'title': str(ask.question.title),
            'score': '' if score is None else str(score),
        }
    rows = (make_row(ask) for ask in ask_query.yield_per(10))
    data = buffer_on_disk(csv_lines(cols, rows))
    return flask.Response(data, mimetype='text/csv')
开发者ID:Cristianf,项目名称:mptracker,代码行数:25,代码来源:questions.py


示例13: unit_update_stat

def unit_update_stat():
    dbsession = DBSession()

    units = dbsession.query(Unit).options(joinedload(Unit.protocol_o), joinedload(Unit.protocol_i))
    dbsession.query(UnitStat).delete()

    for unit in units:
        stat = UnitStat(unit_id=unit.id)
        stat.official = unit.protocol_o != None    
        stat.independent = unit.protocol_i != None    
        stat.diff = False
        stat.diff_value = 0

        if unit.protocol_i and unit.protocol_o:

            check = dict()
            for i in unit.protocol_o.vote:
                check[i.party_id] = i.vote_c

            for i in unit.protocol_i.vote:
                if check[i.party_id] != i.vote_c:
                    stat.diff_value += abs(i.vote_c - i.party_id)
                    stat.diff = True
        
        dbsession.add(stat)

    dbsession.add(stat)
开发者ID:nextgis,项目名称:vote2map,代码行数:27,代码来源:models.py


示例14: _load_data

    def _load_data(self, id):
        c.users_group.permissions = {
            'repositories': {},
            'repositories_groups': {}
        }

        ugroup_repo_perms = UsersGroupRepoToPerm.query()\
            .options(joinedload(UsersGroupRepoToPerm.permission))\
            .options(joinedload(UsersGroupRepoToPerm.repository))\
            .filter(UsersGroupRepoToPerm.users_group_id == id)\
            .all()

        for gr in ugroup_repo_perms:
            c.users_group.permissions['repositories'][gr.repository.repo_name]  \
                = gr.permission.permission_name

        ugroup_group_perms = UsersGroupRepoGroupToPerm.query()\
            .options(joinedload(UsersGroupRepoGroupToPerm.permission))\
            .options(joinedload(UsersGroupRepoGroupToPerm.group))\
            .filter(UsersGroupRepoGroupToPerm.users_group_id == id)\
            .all()

        for gr in ugroup_group_perms:
            c.users_group.permissions['repositories_groups'][gr.group.group_name] \
                = gr.permission.permission_name

        c.group_members_obj = [x.user for x in c.users_group.members]
        c.group_members = [(x.user_id, x.username) for x in
                           c.group_members_obj]
        c.available_members = [(x.user_id, x.username) for x in
                               User.query().all()]
开发者ID:break123,项目名称:rhodecode,代码行数:31,代码来源:users_groups.py


示例15: _paginate_offset

    def _paginate_offset(self, clazz, schema, adapt_schema):
        """Return a batch of documents with the given `offset` and `limit`.
        """
        validated = self.request.validated
        offset = validated['offset'] if 'offset' in validated else 0
        limit = min(
            validated['limit'] if 'limit' in validated else LIMIT_DEFAULT,
            LIMIT_MAX)

        base_query = DBSession.query(clazz)

        documents = base_query. \
            options(joinedload(getattr(clazz, 'locales'))). \
            options(joinedload(getattr(clazz, 'geometry'))). \
            order_by(clazz.document_id.desc()). \
            slice(offset, offset + limit). \
            all()
        set_available_cultures(documents, loaded=True)

        if validated.get('lang') is not None:
            set_best_locale(documents, validated.get('lang'))

        total = base_query.count()

        return {
            'documents': [
                to_json_dict(
                    doc,
                    schema if not adapt_schema else adapt_schema(schema, doc)
                ) for doc in documents
            ],
            'total': total
        }
开发者ID:mfournier,项目名称:v6_api,代码行数:33,代码来源:document.py


示例16: _image_get

def _image_get(context, image_id, session=None, force_show_deleted=False):
    """Get an image or raise if it does not exist."""
    _check_image_id(image_id)
    session = session or get_session()

    try:
        query = session.query(models.Image)\
                       .options(sa_orm.joinedload(models.Image.properties))\
                       .options(sa_orm.joinedload(models.Image.locations))\
                       .filter_by(id=image_id)

        # filter out deleted images if context disallows it
        if not force_show_deleted and not _can_show_deleted(context):
            query = query.filter_by(deleted=False)

        image = query.one()

    except sa_orm.exc.NoResultFound:
        msg = "No image found with ID %s" % image_id
        LOG.debug(msg)
        raise exception.NotFound(msg)

    # Make sure they can look at it
    if not is_image_visible(context, image):
        msg = "Forbidding request, image %s not visible" % image_id
        LOG.debug(msg)
        raise exception.Forbidden(msg)

    return image
开发者ID:AsherBond,项目名称:glance,代码行数:29,代码来源:api.py


示例17: drawlines

    def drawlines(self, h):
        sl = (
            td.s.query(StockLine)
            .filter(StockLine.location.in_(self.locations))
            .filter(StockLine.capacity == None)
            .order_by(StockLine.name)
            .options(joinedload("stockonsale"))
            .options(joinedload("stockonsale.stocktype"))
            .options(undefer_group("qtys"))
            .all()
        )
        f = ui.tableformatter("pl l L r rp")
        header = f("Line", "StockID", "Stock", "Used", "Remaining")

        def fl(line):
            if line.stockonsale:
                sos = line.stockonsale[0]
                return (line.name, sos.id, sos.stocktype.format(), sos.used, sos.remaining)
            return (line.name, "", "", "", "")

        ml = [header] + [f(*fl(line)) for line in sl]
        y = 0
        for l in ml:
            for line in l.display(self.w):
                self.addstr(y, 0, line)
                y = y + 1
            if y >= h:
                break
开发者ID:sde1000,项目名称:quicktill,代码行数:28,代码来源:stockterminal.py


示例18: render

    def render(self, session, **arguments):
        q = session.query(Chassis)

        q = q.options(subqueryload('model'),
                      joinedload('model.machine_specs'),
                      subqueryload('location'),
                      joinedload('slots'),
                      subqueryload('slots.machine'),

                      # A rare case when we don't need primary name/host
                      lazyload('slots.machine.primary_name'),
                      lazyload('slots.machine.host'),

                      subqueryload('interfaces'),
                      joinedload('interfaces.assignments'),
                      joinedload('interfaces.assignments.network'),
                      joinedload('interfaces.assignments.dns_records'))

        # Prefer the primary name for ordering
        q = q.outerjoin(DnsRecord, (Fqdn, DnsRecord.fqdn_id == Fqdn.id),
                        DnsDomain)
        q = q.options(contains_eager('primary_name'),
                      contains_eager('primary_name.fqdn'),
                      contains_eager('primary_name.fqdn.dns_domain'))
        q = q.order_by(Fqdn.name, DnsDomain.name, Chassis.label)
        return q.all()
开发者ID:jrha,项目名称:aquilon,代码行数:26,代码来源:show_chassis_all.py


示例19: list_tasks

    def list_tasks(self, limit=None, details=False, category=None,
                   offset=None, status=None, sample_id=None, not_status=None):
        """Retrieve list of task.
        @param limit: specify a limit of entries.
        @param details: if details about must be included
        @param category: filter by category
        @param offset: list offset
        @param status: filter by task status
        @param sample_id: filter tasks for a sample
        @param not_status: exclude this task status from filter
        @return: list of tasks.
        """
        session = self.Session()
        try:
            search = session.query(Task)

            if status:
                search = search.filter_by(status=status)
            if not_status:
                search = search.filter(Task.status != not_status)
            if category:
                search = search.filter_by(category=category)
            if details:
                search = search.options(joinedload("guest"), joinedload("errors"), joinedload("tags"))
            if sample_id is not None:
                search = search.filter_by(sample_id=sample_id)

            tasks = search.order_by("added_on desc").limit(limit).offset(offset).all()
        except SQLAlchemyError as e:
            log.debug("Database error listing tasks: {0}".format(e))
            return []
        finally:
            session.close()
        return tasks
开发者ID:IFGHou,项目名称:cuckoo,代码行数:34,代码来源:database.py


示例20: get_indexable_contents

def get_indexable_contents(session):
    from assembl.models import AgentProfile, Idea, Post
    from assembl.models.post import PublicationStates

    query = session.query(Idea
        ).filter(Idea.tombstone_condition()
        ).filter(Idea.hidden==False
        ).options(
            joinedload(Idea.title).joinedload("entries"),
            joinedload(Idea.synthesis_title).joinedload("entries"),
            joinedload(Idea.description).joinedload("entries")
        )

    for idea in query:
        yield idea

    query = session.query(AgentProfile)
    for user in query:
        yield user

    AllPost = with_polymorphic(Post, '*')
    query = session.query(AllPost
        ).filter(AllPost.tombstone_condition()
        ).filter(AllPost.hidden==False
        ).filter(AllPost.publication_state == PublicationStates.PUBLISHED
        ).options(
            joinedload(AllPost.subject).joinedload("entries"),
            joinedload(AllPost.body).joinedload("entries")
        )
    for post in query:
        for extract in post.extracts:
            yield extract

        yield post
开发者ID:assembl,项目名称:assembl,代码行数:34,代码来源:reindex.py



注:本文中的sqlalchemy.orm.joinedload函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python orm.joinedload_all函数代码示例发布时间:2022-05-27
下一篇:
Python orm.join函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap