• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python orm.subqueryload函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.orm.subqueryload函数的典型用法代码示例。如果您正苦于以下问题:Python subqueryload函数的具体用法?Python subqueryload怎么用?Python subqueryload使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了subqueryload函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_subqueryload

    def test_subqueryload(self):
        self.assertEqual(
            str(self.db.query(Foo).subqueryload('bars')),
            str(self.db.query(Foo).options(orm.subqueryload('bars')))
        )

        self.assertEqual(
            str(self.db.query(Foo).subqueryload('bars', 'bazs')),
            str((self.db.query(Foo)
                 .options(orm.subqueryload('bars').subqueryload('bazs'))))
        )

        self.assertEqual(
            str(self.db.query(Foo).subqueryload(Foo.bars)),
            str(self.db.query(Foo).options(orm.subqueryload(Foo.bars)))
        )

        self.assertEqual(
            str(self.db.query(Foo).subqueryload(Foo.bars, Bar.bazs)),
            str((self.db.query(Foo)
                 .options(orm.subqueryload(Foo.bars).subqueryload(Bar.bazs))))
        )

        self.assertEqual(
            str((self.db.query(Foo)
                 .subqueryload(
                     'bars',
                     options=[LoadOption('subqueryload', 'bazs')]))),
            str((self.db.query(Foo)
                 .options(orm.subqueryload('bars').subqueryload('bazs'))))
        )
开发者ID:LeoKudrik,项目名称:alchy,代码行数:31,代码来源:test_query.py


示例2: render

    def render(self, session, **arguments):
        q = session.query(Chassis)

        q = q.options(subqueryload('model'),
                      joinedload('model.machine_specs'),
                      subqueryload('location'),
                      joinedload('slots'),
                      subqueryload('slots.machine'),

                      # A rare case when we don't need primary name/host
                      lazyload('slots.machine.primary_name'),
                      lazyload('slots.machine.host'),

                      subqueryload('interfaces'),
                      joinedload('interfaces.assignments'),
                      joinedload('interfaces.assignments.network'),
                      joinedload('interfaces.assignments.dns_records'))

        # Prefer the primary name for ordering
        q = q.outerjoin(DnsRecord, (Fqdn, DnsRecord.fqdn_id == Fqdn.id),
                        DnsDomain)
        q = q.options(contains_eager('primary_name'),
                      contains_eager('primary_name.fqdn'),
                      contains_eager('primary_name.fqdn.dns_domain'))
        q = q.order_by(Fqdn.name, DnsDomain.name, Chassis.label)
        return q.all()
开发者ID:jrha,项目名称:aquilon,代码行数:26,代码来源:show_chassis_all.py


示例3: render_params_for_submissions

    def render_params_for_submissions(self, query, page, page_size=50):
        """Add data about the requested submissions to r_params.

        query (sqlalchemy.orm.query.Query): the query giving back all
            interesting submissions.
        page (int): the index of the page to display.
        page_size(int): the number of submissions per page.

        """
        query = query\
            .options(subqueryload(Submission.task))\
            .options(subqueryload(Submission.participation))\
            .options(subqueryload(Submission.files))\
            .options(subqueryload(Submission.token))\
            .options(subqueryload(Submission.results)
                     .subqueryload(SubmissionResult.evaluations))\
            .order_by(Submission.timestamp.desc())

        offset = page * page_size
        count = query.count()

        if self.r_params is None:
            self.r_params = self.render_params()

        # A page showing paginated submissions can use these
        # parameters: total number of submissions, submissions to
        # display in this page, index of the current page, total
        # number of pages.
        self.r_params["submission_count"] = count
        self.r_params["submissions"] = \
            query.slice(offset, offset + page_size).all()
        self.r_params["submission_page"] = page
        self.r_params["submission_pages"] = \
            (count + page_size - 1) // page_size
开发者ID:artikz,项目名称:cms,代码行数:34,代码来源:base.py


示例4: render

    def render(self, session, cluster, **arguments):
        q = session.query(Cluster)
        vm_q = session.query(VirtualMachine)
        vm_q = vm_q.join(ClusterResource, Cluster)
        if cluster:
            q = q.filter_by(name=cluster)
            vm_q = vm_q.filter_by(name=cluster)

        vm_q = vm_q.options(joinedload('machine'),
                            joinedload('machine.primary_name'),
                            joinedload('machine.primary_name.fqdn'),
                            lazyload('machine.host'))

        q = q.options(subqueryload('_hosts'),
                      joinedload('_hosts.host'),
                      joinedload('_hosts.host.machine'),
                      subqueryload('_metacluster'),
                      joinedload('_metacluster.metacluster'),
                      joinedload('resholder'),
                      subqueryload('resholder.resources'),
                      subqueryload('service_bindings'),
                      subqueryload('allowed_personalities'))
        q = q.order_by(Cluster.name)
        dbclusters = q.all()
        if cluster and not dbclusters:
            raise NotFoundException("Cluster %s not found." % cluster)

        # Manual eager-loading of VM resources. All the code does is making sure
        # the data is pinned in the session's cache
        machines = {}
        for vm in vm_q:
            machines[vm.machine.machine_id] = vm

        return ClusterList(dbclusters)
开发者ID:jrha,项目名称:aquilon,代码行数:34,代码来源:show_cluster_all.py


示例5: eager_query

  def eager_query(cls):
    from sqlalchemy import orm

    query = super(Relatable, cls).eager_query()
    return cls.eager_inclusions(query, Relatable._include_links).options(
        orm.subqueryload('related_sources'),
        orm.subqueryload('related_destinations'))
开发者ID:VinnieJohns,项目名称:ggrc-core,代码行数:7,代码来源:relationship.py


示例6: eager_query

  def eager_query(cls):
    from sqlalchemy import orm

    query = super(ControlControl, cls).eager_query()
    return query.options(
        orm.subqueryload('control'),
        orm.subqueryload('implemented_control'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:control_control.py


示例7: finalize_query

    def finalize_query(self, query, fltr, session, qstring=None, order_by=None):
        search_query = None
        ranked = False
        if qstring is not None:
            ft_query = and_(SearchObjectIndex.so_uuid == ObjectInfoIndex.uuid, query)
            q = session.query(
                ObjectInfoIndex,
                func.ts_rank_cd(
                    SearchObjectIndex.search_vector,
                    func.plainto_tsquery(qstring)
                ).label('rank'))\
            .options(subqueryload(ObjectInfoIndex.search_object))\
            .options(subqueryload(ObjectInfoIndex.properties)).filter(ft_query)

            query_result = search(q, qstring, vector=SearchObjectIndex.search_vector, sort=order_by is None, regconfig='simple')
            ranked = True
        else:
            query_result = session.query(ObjectInfoIndex).options(subqueryload(ObjectInfoIndex.properties)).filter(query)

        if order_by is not None:
            query_result = query_result.order_by(order_by)
        elif ranked is True:
            query_result = query_result.order_by(
                desc(
                    func.ts_rank_cd(
                        SearchObjectIndex.search_vector,
                        func.to_tsquery(search_query)
                    )
                )
            )
        if 'limit' in fltr:
            query_result = query_result.limit(fltr['limit'])
        return query_result, ranked
开发者ID:gonicus,项目名称:gosa,代码行数:33,代码来源:methods.py


示例8: json_export

def json_export(outfile):
    db = Session()
    data = dict()
    data["materials"] = [
        it.to_dict(["locations"])
        for it in db.query(Material).options(subqueryload(Material.locations))
    ]
    data["materialTypes"] = [
        dict(label=it[0], value=it[0])
        for it in db.query(distinct(Material.type)).order_by(Material.type.asc())
        if it[0]
    ]
    data["blueprints"] = [
        it.to_dict(["ingredients"])
        for it in db.query(Blueprint)\
                .options(subqueryload(Blueprint.ingredients))\
                .options(subqueryload("ingredients.material"))
    ]
    data["blueprintTypes"] = [
        dict(label=it[0], value=it[0])
        for it in db.query(distinct(Blueprint.type)).order_by(Blueprint.type.asc())
        if it[0]
    ]
    with open(outfile, "w") as fp:
        fp.write('CollectorDroneData=')
        json.dump(data, fp)

    db.close()
开发者ID:fre-sch,项目名称:collector-drone,代码行数:28,代码来源:json_export.py


示例9: package

def package(pkg):
    db = current_app.config['DB']()
    collections = list(queries.collections(db))

    package = db.query(tables.Package).get(pkg)
    if package is None:
        abort(404)

    query = queries.dependencies(db, package)
    query = query.options(subqueryload('collection_packages'))
    query = query.options(subqueryload('collection_packages.links'))
    dependencies = list(query)

    dependents = list(queries.dependents(db, package))

    in_progress_deps = [p for p in dependencies if p.status == 'in-progress']

    return render_template(
        'package.html',
        breadcrumbs=(
            (url_for('hello'), 'Python 3 Porting Database'),
            (url_for('package', pkg=pkg), pkg),
        ),
        collections=collections,
        pkg=package,
        dependencies=list(dependencies),
        dependents=list(dependents),
        deptree=[(package, gen_deptree(dependencies))],
        in_progress_deps=in_progress_deps,
    )
开发者ID:sYnfo,项目名称:portingdb,代码行数:30,代码来源:htmlreport.py


示例10: family_query

 def family_query(self):
     query = self.request.db.query(tcg_tables.CardFamily)
     query = dbutil.order_by_name(query, tcg_tables.CardFamily)
     query = query.options(joinedload('names_local'))
     query = query.options(subqueryload('cards.prints'))
     query = query.options(subqueryload('cards'))
     return query
开发者ID:encukou,项目名称:ptcg-editor,代码行数:7,代码来源:prints.py


示例11: eager_query

  def eager_query(cls):
    from sqlalchemy import orm

    query = super(SystemSystem, cls).eager_query()
    return query.options(
        orm.subqueryload('parent'),
        orm.subqueryload('child'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:system_system.py


示例12: get_queryset

    def get_queryset(self, project=None, db=None, *args, **kwargs):
        malware_sha256 = self.kwargs["malware_sha256"]

        if malware_sha256:
            session = db.Session()
            malware = session.query(Malware).filter(Malware.sha256 == malware_sha256).one_or_none()

            if not malware:
                error = {"error": {"code": "NotFound",
                                   "message": "Malware not found: {} (Project: {})".format(malware_sha256, project)}}
                raise NotFound(detail=error)

            session = db.Session()

            malware = session.query(Malware) \
                .options(subqueryload(Malware.tag)) \
                .options(subqueryload(Malware.analysis)) \
                .options(subqueryload(Malware.note)).filter(Malware.sha256 == malware_sha256).one_or_none()

            queryset = getattr(malware, self.malware_relationship_field)

        else:
            session = db.Session()
            queryset = session.query(self.model)

        return queryset
开发者ID:cvandeplas,项目名称:viper,代码行数:26,代码来源:views.py


示例13: index

def index():
    query = Event.query() \
        .options(subqueryload('actor')) \
        .options(subqueryload('user')) \
        .options(subqueryload('club')) \
        .outerjoin(Event.flight) \
        .options(contains_eager(Event.flight)) \
        .filter(or_(Event.flight == None, Flight.is_rankable())) \
        .order_by(Event.time.desc())

    query = _filter_query(query, request.args)

    page = request.args.get('page', type=int, default=1)
    per_page = request.args.get('per_page', type=int, default=50)

    events = query.limit(per_page).offset((page - 1) * per_page).all()
    events_count = len(events)

    if request.args.get('grouped', True, type=str_to_bool):
        events = group_events(events)

    template_vars = dict(events=events, types=Event.Type)

    if page > 1:
        template_vars['prev_page'] = page - 1
    if events_count == per_page:
        template_vars['next_page'] = page + 1

    return render_template('timeline/list.jinja', **template_vars)
开发者ID:TobiasLohner,项目名称:SkyLines,代码行数:29,代码来源:timeline.py


示例14: eager_query

  def eager_query(cls):
    from sqlalchemy import orm

    query = super(ControlSection, cls).eager_query()
    return query.options(
        orm.subqueryload('control'),
        orm.subqueryload('section'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:control_section.py


示例15: eager_query

  def eager_query(cls):
    from sqlalchemy import orm

    query = super(UserRole, cls).eager_query()
    return query.options(
        orm.subqueryload('role'),
        orm.subqueryload('person'))
开发者ID:sriharshakappala,项目名称:ggrc-core,代码行数:7,代码来源:models.py


示例16: _get_db_tracks

 def _get_db_tracks(session, chunk):
     return (session.query(Track).
             options(subqueryload(Track.album),
                     subqueryload(Track.artist),
                     subqueryload(Track.lastfm_info)).
             filter(Track.filename.in_(chunk)).
             all())
开发者ID:thesquelched,项目名称:suggestive,代码行数:7,代码来源:mstat.py


示例17: lookup

    def lookup(self, department_id, start_time=None, end_time=None):
        """
        Returns a list of all shifts for the given department.

        Takes the department id as the first parameter. For a list of all
        department ids call the "dept.list" method.

        Optionally, takes a "start_time" and "end_time" to constrain the
        results to a given date range. Dates may be given in any format
        supported by the
        <a href="http://dateutil.readthedocs.io/en/stable/parser.html">
        dateutil parser</a>, plus the string "now".

        Unless otherwise specified, "start_time" and "end_time" are assumed
        to be in the local timezone of the event.
        """
        with Session() as session:
            query = session.query(Job).filter_by(department_id=department_id)
            if start_time:
                start_time = _parse_datetime(start_time)
                query = query.filter(Job.start_time >= start_time)
            if end_time:
                end_time = _parse_datetime(end_time)
                query = query.filter(Job.start_time <= end_time)
            query = query.options(
                    subqueryload(Job.department),
                    subqueryload(Job.shifts).subqueryload(Shift.attendee))
            return [job.to_dict(self.fields) for job in query]
开发者ID:magfest,项目名称:ubersystem,代码行数:28,代码来源:api.py


示例18: stop

 def stop(self, stop_id, feed_id="", prefetch_parent=True, prefetch_substops=True):
     query = self._session.query(Stop)
     if prefetch_parent:
         query = query.options(subqueryload('parent_station'))
     if prefetch_substops:
         query = query.options(subqueryload('sub_stops'))
     return query.get((feed_id, stop_id))
开发者ID:pailakka,项目名称:gtfslib-python,代码行数:7,代码来源:dao.py


示例19: group

def group(grp):
    db = current_app.config['DB']()
    collections = list(queries.collections(db))

    group = db.query(tables.Group).get(grp)
    if group is None:
        abort(404)

    query = db.query(tables.Package)
    query = query.join(tables.Package.group_packages)
    query = query.join(tables.GroupPackage.group)
    query = query.join(tables.Package.status_obj)
    query = query.filter(tables.Group.ident == grp)
    query = query.order_by(-tables.Status.weight)
    query = queries.order_by_name(db, query)
    query = query.options(subqueryload('collection_packages'))
    query = query.options(subqueryload('collection_packages.links'))
    packages = list(query)

    query = query.filter(tables.GroupPackage.is_seed)
    seed_groups = query

    return render_template(
        'group.html',
        breadcrumbs=(
            (url_for('hello'), 'Python 3 Porting Database'),
            (url_for('group', grp=grp), group.name),
        ),
        collections=collections,
        grp=group,
        packages=packages,
        len_packages=len(packages),
        deptree=list(gen_deptree(seed_groups)),
        status_counts=get_status_counts(packages),
    )
开发者ID:rodrigc,项目名称:portingdb,代码行数:35,代码来源:htmlreport.py


示例20: query_factory

 def query_factory():
     query = self._session.query(Stop)
     if prefetch_parent:
         query = query.options(subqueryload('parent_station'))
         if prefetch_substops:
             query = query.options(subqueryload('sub_stops'))
     return query
开发者ID:pailakka,项目名称:gtfslib-python,代码行数:7,代码来源:dao.py



注:本文中的sqlalchemy.orm.subqueryload函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python orm.subqueryload_all函数代码示例发布时间:2022-05-27
下一篇:
Python orm.sessionmaker函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap