• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python func.count函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.expression.func.count函数的典型用法代码示例。如果您正苦于以下问题:Python count函数的具体用法?Python count怎么用?Python count使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了count函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_migrate_batch_stureg

    def test_migrate_batch_stureg(self):
        batch_guid = '2bb942b9-75cf-4055-a67a-8b9ab53a9dfc'
        batch = {UdlStatsConstants.REC_ID: '6',
                 UdlStatsConstants.BATCH_GUID: batch_guid, UdlStatsConstants.TENANT: self.__tenant,
                 UdlStatsConstants.SCHEMA_NAME: None, Constants.DEACTIVATE: False,
                 UdlStatsConstants.LOAD_TYPE: LoadType.STUDENT_REGISTRATION,
                 UdlStatsConstants.BATCH_OPERATION: 's',
                 UdlStatsConstants.SNAPSHOT_CRITERIA: '{"reg_system_id": "015247bd-058c-48cd-bb4d-f6cffe5b40c1", "academic_year": 2015}'}
        self.insert_into_udl_stats(batch[UdlStatsConstants.REC_ID], batch_guid, self.__tenant, batch[UdlStatsConstants.LOAD_TYPE])

        preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
        count_to_source_query = select([func.count()]).select_from(preprod_conn.get_table(Constants.STUDENT_REG))
        count_to_be_inserted = preprod_conn.execute(count_to_source_query).fetchall()[0][0]
        self.assertEqual(10, count_to_be_inserted)

        prod_conn = EdMigrateDestConnection(tenant=get_unittest_preprod_tenant_name())
        student_reg_table = prod_conn.get_table(Constants.STUDENT_REG)
        count_query = select([func.count()]).select_from(student_reg_table)
        count_before = prod_conn.execute(count_query).fetchall()[0][0]
        self.assertEqual(2581, count_before)

        count_snapshot_query = select([func.count()], student_reg_table.c.academic_year == 2015).select_from(student_reg_table)
        count_to_be_deleted = prod_conn.execute(count_snapshot_query).fetchall()[0][0]
        self.assertEqual(1217, count_to_be_deleted)

        rtn = migrate_batch(batch)
        self.assertTrue(rtn)

        expected_count_after = count_before - count_to_be_deleted + count_to_be_inserted
        count_after = prod_conn.execute(count_query).fetchall()[0][0]
        self.assertEqual(expected_count_after, count_after)
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:31,代码来源:test_migrate.py


示例2: get_community_tags

 def get_community_tags(self, item=None, limit=None):
     """Returns community tags for an item."""
     # Get item-tag association class.
     item_class = item.__class__
     item_tag_assoc_class = self.get_tag_assoc_class(item_class)
     if not item_tag_assoc_class:
         return []
     # Build select statement.
     cols_to_select = [item_tag_assoc_class.table.c.tag_id, func.count('*')]
     from_obj = item_tag_assoc_class.table.join(item_class.table).join(galaxy.model.Tag.table)
     where_clause = (self.get_id_col_in_item_tag_assoc_table(item_class) == item.id)
     order_by = [func.count("*").desc()]
     group_by = item_tag_assoc_class.table.c.tag_id
     # Do query and get result set.
     query = select(columns=cols_to_select,
                    from_obj=from_obj,
                    whereclause=where_clause,
                    group_by=group_by,
                    order_by=order_by,
                    limit=limit)
     result_set = self.sa_session.execute(query)
     # Return community tags.
     community_tags = []
     for row in result_set:
         tag_id = row[0]
         community_tags.append(self.get_tag_by_id(tag_id))
     return community_tags
开发者ID:osallou,项目名称:galaxy,代码行数:27,代码来源:tags.py


示例3: dataset_counts

 def dataset_counts(cls, datasets_q):
     sq = datasets_q.subquery()
     q = select([cls.code, func.count(cls.dataset_id)],
                group_by=cls.code,
                order_by=func.count(cls.dataset_id).desc())
     q = q.where(cls.dataset_id == sq.c.id)
     return db.session.bind.execute(q).fetchall()
开发者ID:CivicVision,项目名称:datahub,代码行数:7,代码来源:facets.py


示例4: clientstats

    def clientstats(db):
        startdatestring = request.json['startdate']
        enddatestring = request.json['enddate']
        dateformat = '%d-%m-%Y'

        startdate = datetime.strptime(startdatestring, dateformat)
        enddate = datetime.strptime(enddatestring, dateformat)

        selectedRequestsBrowser = db.query(Request.browsername, func.count(Request.browsername)).filter_by(isPageview=True).filter(Request.datetime.between(startdate, enddate)).group_by(Request.browsername)
        selectedRequestsPlatform = db.query(Request.platformname, func.count(Request.platformname)).filter_by(isPageview=True).filter(Request.datetime.between(startdate, enddate)).group_by(Request.platformname)

        returnDict = {'postdata' : request.json, 'returndata' : {'browserstats' : [], 'platformstats' : []}}

        for selectedRequestBrowser in selectedRequestsBrowser:
            tempDict = {'browser' : selectedRequestBrowser[0],
            'pageviews' : selectedRequestBrowser[1]}

            returnDict['returndata']['browserstats'].append(tempDict)

        for selectedRequestPlatform in selectedRequestsPlatform:
            tempDict = {'platform' : selectedRequestPlatform[0],
            'pageviews' : selectedRequestPlatform[1]}

            returnDict['returndata']['platformstats'].append(tempDict)

        return returnDict
开发者ID:olafeee,项目名称:logniter,代码行数:26,代码来源:apiserver.py


示例5: by_filter

    def by_filter(cls, session, opts, **kwargs):
        """
        Get packages from given filters.

        :param session: SQLAlchemy session
        :type session: :class:`sqlalchemy.Session`

        :param opts: filtering options
        :type opts: `dict

        :return: package instances
        :rtype: generator of :class:`pyshop.models.Package`
        """
        where = []

        if opts.get('local_only'):
            where.append(cls.local == True)

        if opts.get('names'):
            where.append(cls.name.in_(opts['names']))

        if opts.get('classifiers'):
            ids = [c.id for c in opts.get('classifiers')]
            cls_pkg = classifier__package
            qry = session.query(cls_pkg.c.package_id,
                                func.count('*'))
            qry = qry.filter(cls_pkg.c.classifier_id.in_(ids))
            qry = qry.group_by(cls_pkg.c.package_id)
            qry = qry.having(func.count('*') >= len(ids))
            where.append(cls.id.in_([r[0] for r in qry.all()]))

        return cls.find(session, where=where, **kwargs)
开发者ID:EasyPost,项目名称:pyshop,代码行数:32,代码来源:models.py


示例6: dataset_counts

 def dataset_counts(cls, datasets):
     ds_ids = [d.id for d in datasets]
     if not len(ds_ids):
         return []
     q = select([cls.code, func.count(cls.dataset_id)],
                cls.dataset_id.in_(ds_ids), group_by=cls.code,
                order_by=func.count(cls.dataset_id).desc())
     return db.session.bind.execute(q).fetchall()
开发者ID:ToroidalATLAS,项目名称:openspending,代码行数:8,代码来源:common.py


示例7: calculate_score

 def calculate_score(self, educatives, seminar):
     total_score = 0
     score = self.score
     ken_count = session.query(Educative.hug_id, Educative.ken_id, func.count(Educative.ken_id)).group_by(Educative.hug_id, Educative.ken_id).all()
     for ken in ken_count:
         total_score += (ken[2] ** 2) * score
     second_ken_count = session.query(Educative.hug_id, Educative.second_ken_id, func.count(Educative.second_ken_id)).group_by(Educative.hug_id, Educative.second_ken_id).all()
     for second_ken in second_ken_count:
         total_score += (second_ken[2] ** 2) * score
     return total_score
开发者ID:aviadnissel,项目名称:Liron,代码行数:10,代码来源:default_constraints.py


示例8: PageContent

    def PageContent(self):
        results = dict()
        
        storySubselect = self.session.query(func.count(Story.idstory).label('storycount')).group_by(Story.categoryid).add_column(Story.categoryid).subquery()
        charSubselect = self.session.query(func.count(Character.idcharacter).label('charactercount')).group_by(Character.categoryid).add_column(Character.categoryid).subquery()

        query = self.session.query(Category, storySubselect.c.storycount, charSubselect.c.charactercount).order_by(Category.name)
        query = query.join((storySubselect, Category.idcategory == storySubselect.c.categoryid))
        query = query.join((charSubselect, Category.idcategory == charSubselect.c.categoryid))
        
        results['categories'] = [(r[0].name, r[1], r[2], self.request.route_url("search", category=r[0].idcategory)) for r in query]
        numPerRow = 3
        results['numPerRow'] = numPerRow
        results['rows'] = (len(results['categories'])/numPerRow) + 1
        return results
开发者ID:moatra,项目名称:fffn1,代码行数:15,代码来源:searchselect.py


示例9: store_item

    def store_item(self, item, spider): # {{{
        item_table = self.table

        dbobj = {}
        for k,v in item.iteritems():
            if isinstance(v, list) or isinstance(v, dict):
                dbobj[k] = json.dumps(v)
            else:
                dbobj[k] = v

        conn = self.engine.connect()

        page_url = item['page_url']
        where = item_table.c.page_url == page_url
        sel = select([func.count(item_table.c.id)]).where(where)
        cnt = conn.execute(sel).scalar()
        if cnt:
            assert cnt==1, 'More than one item with page_url %s' % page_url
            upd = item_table.update().where(where)
            conn.execute(upd, dbobj)
            status = 'updated'
        else:
            ins = item_table.insert()
            conn.execute(ins, dbobj)
            status = 'inserted'
        log.msg('Item %s into %s: %s' % (status, item_table.name, page_url), level=log.DEBUG, spider=spider)

        conn.close()
开发者ID:1060460048,项目名称:sedemo-spider,代码行数:28,代码来源:store.py


示例10: get_sub_entries_count

    def get_sub_entries_count(self, identity = None, exclude_identity = None):
        """
Returns the number of child entries of this instance.

:param offset: SQLAlchemy query offset
:param limit: SQLAlchemy query limit
:param identity: Count only DataLinker children of the given identity
:param exclude_identity: Count only DataLinker children not be of the given identity

:return: (int) Number of child entries
:since:  v0.2.00
        """

        if (self.log_handler is not None): self.log_handler.debug("#echo(__FILEPATH__)# -{0!r}.get_sub_entries_count()- (#echo(__LINE__)#)", self, context = "pas_datalinker")

        if (identity is None and exclude_identity is None): _return = self.get_data_attributes("sub_entries")['sub_entries']
        elif (identity is not None and exclude_identity is not None): raise ValueException("Defining both an identity and to exclude an identity is not supported")
        else:
            with self:
                db_query = self.local.db_instance.rel_children.with_entities(sql.count(_DbDataLinker.id))

                if (identity is not None): db_query = db_query.filter(_DbDataLinker.identity == identity)
                elif (exclude_identity is not None): db_query = db_query.filter(_DbDataLinker.identity != exclude_identity)

                db_query = DataLinker._db_apply_id_site_condition(db_query)

                _return = db_query.scalar()
            #
        #

        return _return
开发者ID:dNG-git,项目名称:pas_datalinker,代码行数:31,代码来源:data_linker.py


示例11: rank

 def rank(self):
     inner_team = aliased(Team)
     return (DBSession.query(func.count('*') + 1).
             select_from(inner_team).
             filter(inner_team.score > Team.score).
             correlate(Team).
             label('rank'))
开发者ID:Immortalem,项目名称:fluxscoreboard,代码行数:7,代码来源:team.py


示例12: count

 def count(self):
     """ Get a count of the number of distinct objects. """
     q = select(from_obj=self.join(self.alias))
     q = self.filter(q, partial=True)
     q = q.column(func.count(func.distinct(self.alias.c.id)).label('num'))
     rp = db.session.execute(q)
     return rp.fetchone().num
开发者ID:01-,项目名称:grano,代码行数:7,代码来源:__init__.py


示例13: get_stats

 def get_stats(cls):
     return dict(
         DBSession.query(Provider.pk, func.count(cls.ref_pk).label('c'))
         .filter(Provider.pk == cls.provider_pk)
         .group_by(Provider.pk)
         .order_by(desc('c'))
         .all())
开发者ID:mitcho,项目名称:glottolog3,代码行数:7,代码来源:models.py


示例14: get_number_solved_subquery

def get_number_solved_subquery():
    """
    Get a subquery that returns how many teams have solved a challenge.

    Example usage:

        .. code-block:: python

            number_of_solved_subquery = get_number_solved_subquery()
            challenge_query = (DBSession.query(Challenge,
                                               number_of_solved_subquery)

    Here we query for a list of all challenges and additionally fetch the
    number of times it has been solved. This subquery will use the outer
    challenge to correlate on, so make sure to provide one or this query
    makes no sense.
    """
    from fluxscoreboard.models import dynamic_challenges
    query = (DBSession.query(func.count('*')).
             filter(Challenge.id == Submission.challenge_id).
             correlate(Challenge).as_scalar())
    for name, module in dynamic_challenges.registry.items():
        dyn_cnt = module.solved_count_query().filter(Challenge.module == name)
        query = query + dyn_cnt.as_scalar()
    return query.label("solved_count")
开发者ID:Immortalem,项目名称:fluxscoreboard,代码行数:25,代码来源:team.py


示例15: is_activated_by_callfilter_id

def is_activated_by_callfilter_id(session, callfilter_id):
    return (session.query(func.count(Callfiltermember.active))
            .join((Callfilter, Callfilter.id == Callfiltermember.callfilterid))
            .filter(and_(Callfiltermember.callfilterid == callfilter_id,
                         Callfiltermember.bstype == 'secretary',
                         Callfiltermember.active == 1))
            .first()[0])
开发者ID:jaunis,项目名称:xivo-dao,代码行数:7,代码来源:callfilter_dao.py


示例16: get_number_comments

 def get_number_comments(self, status=None):
     if not status:
         return Session.query(sa.func.count(Comment.id)).filter_by(change_id=self.id).first()[0]
     
     
     date = Session.query(func.max(CommentStatus.created_date).label('date'), Comment.id)
     date = date.filter(CommentStatus.comment_id==Comment.id).filter(Comment.change_id==self.id)
     date = date.group_by(CommentStatus.comment_id, Comment.id)
     subq = date.subquery()
     
     q = Session.query(func.count(Comment.id)).outerjoin((subq, subq.c.id==Comment.id))
     q = q.outerjoin((CommentStatus, CommentStatus.comment_id==Comment.id))
     q = q.filter(Comment.change_id==self.id).filter(Comment.status!=STATUS_REMOVED)
     q = q.filter(Comment.in_reply_to_id==None)
     
     if status == STATUS_OPEN:
         q = q.filter(sa.or_(
             CommentStatus.id==None,
             sa.and_(CommentStatus.created_date==subq.columns.date, CommentStatus.status==status)
         ))
         return q.scalar()
     else:
         q = q.filter(
             sa.and_(CommentStatus.created_date==subq.columns.date, CommentStatus.status==status)
         )
         return q.scalar()
开发者ID:Nullicopter,项目名称:Desio,代码行数:26,代码来源:projects.py


示例17: bioconcept_interaction_starter

    def bioconcept_interaction_starter():
        nex_session = nex_session_maker()

        id_to_bioentity = dict([(x.id, x) for x in nex_session.query(Locus).all()])
        id_to_bioconcept = dict([(x.id, x) for x in nex_session.query(Bioconcept).all()])

        bad_interactors = set([x.id for x in nex_session.query(Bioconcept).filter(Bioconcept.format_name.in_({'vegetative_growth',
                                                                                                                'haploinsufficient',
                                                                                                                'viable',
                                                                                                                'heat_sensitivity',
                                                                                                                'toxin_resistance',
                                                                                                                'chronological_lifespan',
                                                                                                                'competitive_fitness',
                                                                                                                'desiccation_resistance',
                                                                                                                'resistance_to_cycloheximide',
                                                                                                                'resistance_to_methyl_methanesulfonate',
                                                                                                                'resistance_to_sirolimus',
                                                                                                                'vacuolar_morphology',
                                                                                                                'inviable'})).all()])

        #Go
        for row in nex_session.query(Goevidence.locus_id, Goevidence.go_id, func.count(Goevidence.id)).filter(Goevidence.annotation_type != 'computational').group_by(Goevidence.locus_id, Goevidence.go_id).all():
            go = id_to_bioconcept[row[1]]
            locus = id_to_bioentity[row[0]]
            if go.go_aspect == 'biological process' and go.id not in bad_interactors:
                yield {'interaction_type': 'GO', 'evidence_count': row[2], 'bioentity': locus, 'interactor': go}

        #Phenotype
        for row in nex_session.query(Phenotypeevidence.locus_id, Phenotypeevidence.phenotype_id, func.count(Phenotypeevidence.id)).group_by(Phenotypeevidence.locus_id, Phenotypeevidence.phenotype_id).all():
            observable = id_to_bioconcept[row[1]].observable
            locus = id_to_bioentity[row[0]]
            if observable.id not in bad_interactors:
                yield {'interaction_type': 'PHENOTYPE', 'evidence_count': row[2], 'bioentity': locus, 'interactor': observable}

        nex_session.close()
开发者ID:kkarra,项目名称:SGDBackend,代码行数:35,代码来源:auxiliary.py


示例18: get_staging_demographic_counts

    def get_staging_demographic_counts(self):
        demographics = ['hispanicorlatinoethnicity', 'americanindianoralaskanative', 'asian', 'blackorafricanamerican',
                        'nativehawaiianorotherpacificislander', 'white', 'demographicracetwoormoreraces',
                        'ideaindicator', 'lepstatus', 'section504status', 'economicdisadvantagestatus',
                        'migrantstatus']
        results_dict = {}
        with get_udl_connection() as conn:
            stg_outcome = conn.get_table('stg_sbac_asmt_outcome')
            for entry in demographics:
                query = select([func.count(stg_outcome.c[entry])], from_obj=stg_outcome).where(stg_outcome.c[entry].in_(['Y', 'y', 'yes']))
                result = conn.execute(query)
                for row in result:
                    demo_count = row[0]

                results_dict[entry] = demo_count

        corrleated_results = {
            'dmg_eth_hsp': results_dict['hispanicorlatinoethnicity'],
            'dmg_eth_ami': results_dict['americanindianoralaskanative'],
            'dmg_eth_asn': results_dict['asian'],
            'dmg_eth_blk': results_dict['blackorafricanamerican'],
            'dmg_eth_pcf': results_dict['nativehawaiianorotherpacificislander'],
            'dmg_eth_wht': results_dict['white'],
            'dmg_eth_2om': results_dict['demographicracetwoormoreraces'],
            'dmg_prg_iep': results_dict['ideaindicator'],
            'dmg_prg_lep': results_dict['lepstatus'],
            'dmg_prg_504': results_dict['section504status'],
            'dmg_sts_ecd': results_dict['economicdisadvantagestatus'],
            'dmg_sts_mig': results_dict['migrantstatus'],
        }

        return corrleated_results
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:32,代码来源:util.py


示例19: test_migrate_student_reg

    def test_migrate_student_reg(self):
        Unittest_with_edcore_sqlite.setUpClass(EdMigrateDestConnection.get_datasource_name(TestMigrate.test_tenant),
                                               use_metadata_from_db=False)
        preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
        prod_conn = EdMigrateDestConnection(tenant=get_unittest_prod_tenant_name())
        batch_guid = "0aa942b9-75cf-4055-a67a-8b9ab53a9dfc"
        student_reg_table = preprod_conn.get_table(Constants.STUDENT_REG)
        get_query = select([student_reg_table.c.student_reg_rec_id]).order_by(student_reg_table.c.student_reg_rec_id)
        count_query = select([func.count().label('student_reg_rec_ids')],
                             student_reg_table.c.student_reg_rec_id.in_(range(15541, 15551)))

        rset = preprod_conn.execute(get_query)
        row = rset.fetchall()
        self.assertEqual(10, len(row))
        self.assertListEqual([(15541,), (15542,), (15543,), (15544,), (15545,), (15546,), (15547,), (15548,), (15549,), (15550,)],
                             row)
        rset.close()

        rset = prod_conn.execute(count_query)
        row = rset.fetchone()
        self.assertEqual(0, row['student_reg_rec_ids'])
        rset.close()

        delete_count, insert_count = migrate_table(batch_guid, None, preprod_conn, prod_conn, 'student_reg', False)
        self.assertEqual(0, delete_count)
        self.assertEqual(10, insert_count)

        rset = prod_conn.execute(count_query)
        row = rset.fetchone()
        self.assertEqual(10, row['student_reg_rec_ids'])
        rset.close()
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:31,代码来源:test_migrate.py


示例20: test_migrate_fact_asmt_outcome_vw

 def test_migrate_fact_asmt_outcome_vw(self):
     preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
     prod_conn = EdMigrateDestConnection(tenant=get_unittest_prod_tenant_name())
     batch_guid = "288220EB-3876-41EB-B3A7-F0E6C8BD013B"
     fact_asmt_outcome_table = prod_conn.get_table(Constants.FACT_ASMT_OUTCOME)
     query = select([func.count().label('asmt_outcome_vw_rec_ids')], fact_asmt_outcome_table.c.asmt_outcome_vw_rec_id.in_([1000000776, 1000001034, 1000001112]))
     query_c = query.where(fact_asmt_outcome_table.c.rec_status == 'C')
     query_d = query.where(fact_asmt_outcome_table.c.rec_status == 'D')
     query_I = query.where(fact_asmt_outcome_table.c.rec_status == 'I')
     rset = prod_conn.execute(query_c)
     row = rset.fetchone()
     self.assertEqual(3, row['asmt_outcome_vw_rec_ids'])
     rset.close()
     delete_count, insert_count = migrate_table(batch_guid, None, preprod_conn, prod_conn,
                                                'fact_asmt_outcome_vw', False)
     self.assertEqual(3, delete_count)
     self.assertEqual(3, insert_count)
     rset = prod_conn.execute(query_c)
     row = rset.fetchone()
     self.assertEqual(0, row['asmt_outcome_vw_rec_ids'])
     rset.close()
     rset = prod_conn.execute(query_d)
     row = rset.fetchone()
     self.assertEqual(3, row['asmt_outcome_vw_rec_ids'])
     rset.close()
     # The deactivation count will be always zero in unit test
     rset = prod_conn.execute(query_I)
     row = rset.fetchone()
     self.assertEqual(0, row['asmt_outcome_vw_rec_ids'])
     rset.close()
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:30,代码来源:test_migrate.py



注:本文中的sqlalchemy.sql.expression.func.count函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python func.lower函数代码示例发布时间:2022-05-27
下一篇:
Python expression.tuple_函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap