本文整理汇总了Python中sqlalchemy.sql.expression.func.count函数的典型用法代码示例。如果您正苦于以下问题:Python count函数的具体用法?Python count怎么用?Python count使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了count函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_migrate_batch_stureg
def test_migrate_batch_stureg(self):
batch_guid = '2bb942b9-75cf-4055-a67a-8b9ab53a9dfc'
batch = {UdlStatsConstants.REC_ID: '6',
UdlStatsConstants.BATCH_GUID: batch_guid, UdlStatsConstants.TENANT: self.__tenant,
UdlStatsConstants.SCHEMA_NAME: None, Constants.DEACTIVATE: False,
UdlStatsConstants.LOAD_TYPE: LoadType.STUDENT_REGISTRATION,
UdlStatsConstants.BATCH_OPERATION: 's',
UdlStatsConstants.SNAPSHOT_CRITERIA: '{"reg_system_id": "015247bd-058c-48cd-bb4d-f6cffe5b40c1", "academic_year": 2015}'}
self.insert_into_udl_stats(batch[UdlStatsConstants.REC_ID], batch_guid, self.__tenant, batch[UdlStatsConstants.LOAD_TYPE])
preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
count_to_source_query = select([func.count()]).select_from(preprod_conn.get_table(Constants.STUDENT_REG))
count_to_be_inserted = preprod_conn.execute(count_to_source_query).fetchall()[0][0]
self.assertEqual(10, count_to_be_inserted)
prod_conn = EdMigrateDestConnection(tenant=get_unittest_preprod_tenant_name())
student_reg_table = prod_conn.get_table(Constants.STUDENT_REG)
count_query = select([func.count()]).select_from(student_reg_table)
count_before = prod_conn.execute(count_query).fetchall()[0][0]
self.assertEqual(2581, count_before)
count_snapshot_query = select([func.count()], student_reg_table.c.academic_year == 2015).select_from(student_reg_table)
count_to_be_deleted = prod_conn.execute(count_snapshot_query).fetchall()[0][0]
self.assertEqual(1217, count_to_be_deleted)
rtn = migrate_batch(batch)
self.assertTrue(rtn)
expected_count_after = count_before - count_to_be_deleted + count_to_be_inserted
count_after = prod_conn.execute(count_query).fetchall()[0][0]
self.assertEqual(expected_count_after, count_after)
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:31,代码来源:test_migrate.py
示例2: get_community_tags
def get_community_tags(self, item=None, limit=None):
"""Returns community tags for an item."""
# Get item-tag association class.
item_class = item.__class__
item_tag_assoc_class = self.get_tag_assoc_class(item_class)
if not item_tag_assoc_class:
return []
# Build select statement.
cols_to_select = [item_tag_assoc_class.table.c.tag_id, func.count('*')]
from_obj = item_tag_assoc_class.table.join(item_class.table).join(galaxy.model.Tag.table)
where_clause = (self.get_id_col_in_item_tag_assoc_table(item_class) == item.id)
order_by = [func.count("*").desc()]
group_by = item_tag_assoc_class.table.c.tag_id
# Do query and get result set.
query = select(columns=cols_to_select,
from_obj=from_obj,
whereclause=where_clause,
group_by=group_by,
order_by=order_by,
limit=limit)
result_set = self.sa_session.execute(query)
# Return community tags.
community_tags = []
for row in result_set:
tag_id = row[0]
community_tags.append(self.get_tag_by_id(tag_id))
return community_tags
开发者ID:osallou,项目名称:galaxy,代码行数:27,代码来源:tags.py
示例3: dataset_counts
def dataset_counts(cls, datasets_q):
sq = datasets_q.subquery()
q = select([cls.code, func.count(cls.dataset_id)],
group_by=cls.code,
order_by=func.count(cls.dataset_id).desc())
q = q.where(cls.dataset_id == sq.c.id)
return db.session.bind.execute(q).fetchall()
开发者ID:CivicVision,项目名称:datahub,代码行数:7,代码来源:facets.py
示例4: clientstats
def clientstats(db):
startdatestring = request.json['startdate']
enddatestring = request.json['enddate']
dateformat = '%d-%m-%Y'
startdate = datetime.strptime(startdatestring, dateformat)
enddate = datetime.strptime(enddatestring, dateformat)
selectedRequestsBrowser = db.query(Request.browsername, func.count(Request.browsername)).filter_by(isPageview=True).filter(Request.datetime.between(startdate, enddate)).group_by(Request.browsername)
selectedRequestsPlatform = db.query(Request.platformname, func.count(Request.platformname)).filter_by(isPageview=True).filter(Request.datetime.between(startdate, enddate)).group_by(Request.platformname)
returnDict = {'postdata' : request.json, 'returndata' : {'browserstats' : [], 'platformstats' : []}}
for selectedRequestBrowser in selectedRequestsBrowser:
tempDict = {'browser' : selectedRequestBrowser[0],
'pageviews' : selectedRequestBrowser[1]}
returnDict['returndata']['browserstats'].append(tempDict)
for selectedRequestPlatform in selectedRequestsPlatform:
tempDict = {'platform' : selectedRequestPlatform[0],
'pageviews' : selectedRequestPlatform[1]}
returnDict['returndata']['platformstats'].append(tempDict)
return returnDict
开发者ID:olafeee,项目名称:logniter,代码行数:26,代码来源:apiserver.py
示例5: by_filter
def by_filter(cls, session, opts, **kwargs):
"""
Get packages from given filters.
:param session: SQLAlchemy session
:type session: :class:`sqlalchemy.Session`
:param opts: filtering options
:type opts: `dict
:return: package instances
:rtype: generator of :class:`pyshop.models.Package`
"""
where = []
if opts.get('local_only'):
where.append(cls.local == True)
if opts.get('names'):
where.append(cls.name.in_(opts['names']))
if opts.get('classifiers'):
ids = [c.id for c in opts.get('classifiers')]
cls_pkg = classifier__package
qry = session.query(cls_pkg.c.package_id,
func.count('*'))
qry = qry.filter(cls_pkg.c.classifier_id.in_(ids))
qry = qry.group_by(cls_pkg.c.package_id)
qry = qry.having(func.count('*') >= len(ids))
where.append(cls.id.in_([r[0] for r in qry.all()]))
return cls.find(session, where=where, **kwargs)
开发者ID:EasyPost,项目名称:pyshop,代码行数:32,代码来源:models.py
示例6: dataset_counts
def dataset_counts(cls, datasets):
ds_ids = [d.id for d in datasets]
if not len(ds_ids):
return []
q = select([cls.code, func.count(cls.dataset_id)],
cls.dataset_id.in_(ds_ids), group_by=cls.code,
order_by=func.count(cls.dataset_id).desc())
return db.session.bind.execute(q).fetchall()
开发者ID:ToroidalATLAS,项目名称:openspending,代码行数:8,代码来源:common.py
示例7: calculate_score
def calculate_score(self, educatives, seminar):
total_score = 0
score = self.score
ken_count = session.query(Educative.hug_id, Educative.ken_id, func.count(Educative.ken_id)).group_by(Educative.hug_id, Educative.ken_id).all()
for ken in ken_count:
total_score += (ken[2] ** 2) * score
second_ken_count = session.query(Educative.hug_id, Educative.second_ken_id, func.count(Educative.second_ken_id)).group_by(Educative.hug_id, Educative.second_ken_id).all()
for second_ken in second_ken_count:
total_score += (second_ken[2] ** 2) * score
return total_score
开发者ID:aviadnissel,项目名称:Liron,代码行数:10,代码来源:default_constraints.py
示例8: PageContent
def PageContent(self):
results = dict()
storySubselect = self.session.query(func.count(Story.idstory).label('storycount')).group_by(Story.categoryid).add_column(Story.categoryid).subquery()
charSubselect = self.session.query(func.count(Character.idcharacter).label('charactercount')).group_by(Character.categoryid).add_column(Character.categoryid).subquery()
query = self.session.query(Category, storySubselect.c.storycount, charSubselect.c.charactercount).order_by(Category.name)
query = query.join((storySubselect, Category.idcategory == storySubselect.c.categoryid))
query = query.join((charSubselect, Category.idcategory == charSubselect.c.categoryid))
results['categories'] = [(r[0].name, r[1], r[2], self.request.route_url("search", category=r[0].idcategory)) for r in query]
numPerRow = 3
results['numPerRow'] = numPerRow
results['rows'] = (len(results['categories'])/numPerRow) + 1
return results
开发者ID:moatra,项目名称:fffn1,代码行数:15,代码来源:searchselect.py
示例9: store_item
def store_item(self, item, spider): # {{{
item_table = self.table
dbobj = {}
for k,v in item.iteritems():
if isinstance(v, list) or isinstance(v, dict):
dbobj[k] = json.dumps(v)
else:
dbobj[k] = v
conn = self.engine.connect()
page_url = item['page_url']
where = item_table.c.page_url == page_url
sel = select([func.count(item_table.c.id)]).where(where)
cnt = conn.execute(sel).scalar()
if cnt:
assert cnt==1, 'More than one item with page_url %s' % page_url
upd = item_table.update().where(where)
conn.execute(upd, dbobj)
status = 'updated'
else:
ins = item_table.insert()
conn.execute(ins, dbobj)
status = 'inserted'
log.msg('Item %s into %s: %s' % (status, item_table.name, page_url), level=log.DEBUG, spider=spider)
conn.close()
开发者ID:1060460048,项目名称:sedemo-spider,代码行数:28,代码来源:store.py
示例10: get_sub_entries_count
def get_sub_entries_count(self, identity = None, exclude_identity = None):
"""
Returns the number of child entries of this instance.
:param offset: SQLAlchemy query offset
:param limit: SQLAlchemy query limit
:param identity: Count only DataLinker children of the given identity
:param exclude_identity: Count only DataLinker children not be of the given identity
:return: (int) Number of child entries
:since: v0.2.00
"""
if (self.log_handler is not None): self.log_handler.debug("#echo(__FILEPATH__)# -{0!r}.get_sub_entries_count()- (#echo(__LINE__)#)", self, context = "pas_datalinker")
if (identity is None and exclude_identity is None): _return = self.get_data_attributes("sub_entries")['sub_entries']
elif (identity is not None and exclude_identity is not None): raise ValueException("Defining both an identity and to exclude an identity is not supported")
else:
with self:
db_query = self.local.db_instance.rel_children.with_entities(sql.count(_DbDataLinker.id))
if (identity is not None): db_query = db_query.filter(_DbDataLinker.identity == identity)
elif (exclude_identity is not None): db_query = db_query.filter(_DbDataLinker.identity != exclude_identity)
db_query = DataLinker._db_apply_id_site_condition(db_query)
_return = db_query.scalar()
#
#
return _return
开发者ID:dNG-git,项目名称:pas_datalinker,代码行数:31,代码来源:data_linker.py
示例11: rank
def rank(self):
inner_team = aliased(Team)
return (DBSession.query(func.count('*') + 1).
select_from(inner_team).
filter(inner_team.score > Team.score).
correlate(Team).
label('rank'))
开发者ID:Immortalem,项目名称:fluxscoreboard,代码行数:7,代码来源:team.py
示例12: count
def count(self):
""" Get a count of the number of distinct objects. """
q = select(from_obj=self.join(self.alias))
q = self.filter(q, partial=True)
q = q.column(func.count(func.distinct(self.alias.c.id)).label('num'))
rp = db.session.execute(q)
return rp.fetchone().num
开发者ID:01-,项目名称:grano,代码行数:7,代码来源:__init__.py
示例13: get_stats
def get_stats(cls):
return dict(
DBSession.query(Provider.pk, func.count(cls.ref_pk).label('c'))
.filter(Provider.pk == cls.provider_pk)
.group_by(Provider.pk)
.order_by(desc('c'))
.all())
开发者ID:mitcho,项目名称:glottolog3,代码行数:7,代码来源:models.py
示例14: get_number_solved_subquery
def get_number_solved_subquery():
"""
Get a subquery that returns how many teams have solved a challenge.
Example usage:
.. code-block:: python
number_of_solved_subquery = get_number_solved_subquery()
challenge_query = (DBSession.query(Challenge,
number_of_solved_subquery)
Here we query for a list of all challenges and additionally fetch the
number of times it has been solved. This subquery will use the outer
challenge to correlate on, so make sure to provide one or this query
makes no sense.
"""
from fluxscoreboard.models import dynamic_challenges
query = (DBSession.query(func.count('*')).
filter(Challenge.id == Submission.challenge_id).
correlate(Challenge).as_scalar())
for name, module in dynamic_challenges.registry.items():
dyn_cnt = module.solved_count_query().filter(Challenge.module == name)
query = query + dyn_cnt.as_scalar()
return query.label("solved_count")
开发者ID:Immortalem,项目名称:fluxscoreboard,代码行数:25,代码来源:team.py
示例15: is_activated_by_callfilter_id
def is_activated_by_callfilter_id(session, callfilter_id):
return (session.query(func.count(Callfiltermember.active))
.join((Callfilter, Callfilter.id == Callfiltermember.callfilterid))
.filter(and_(Callfiltermember.callfilterid == callfilter_id,
Callfiltermember.bstype == 'secretary',
Callfiltermember.active == 1))
.first()[0])
开发者ID:jaunis,项目名称:xivo-dao,代码行数:7,代码来源:callfilter_dao.py
示例16: get_number_comments
def get_number_comments(self, status=None):
if not status:
return Session.query(sa.func.count(Comment.id)).filter_by(change_id=self.id).first()[0]
date = Session.query(func.max(CommentStatus.created_date).label('date'), Comment.id)
date = date.filter(CommentStatus.comment_id==Comment.id).filter(Comment.change_id==self.id)
date = date.group_by(CommentStatus.comment_id, Comment.id)
subq = date.subquery()
q = Session.query(func.count(Comment.id)).outerjoin((subq, subq.c.id==Comment.id))
q = q.outerjoin((CommentStatus, CommentStatus.comment_id==Comment.id))
q = q.filter(Comment.change_id==self.id).filter(Comment.status!=STATUS_REMOVED)
q = q.filter(Comment.in_reply_to_id==None)
if status == STATUS_OPEN:
q = q.filter(sa.or_(
CommentStatus.id==None,
sa.and_(CommentStatus.created_date==subq.columns.date, CommentStatus.status==status)
))
return q.scalar()
else:
q = q.filter(
sa.and_(CommentStatus.created_date==subq.columns.date, CommentStatus.status==status)
)
return q.scalar()
开发者ID:Nullicopter,项目名称:Desio,代码行数:26,代码来源:projects.py
示例17: bioconcept_interaction_starter
def bioconcept_interaction_starter():
nex_session = nex_session_maker()
id_to_bioentity = dict([(x.id, x) for x in nex_session.query(Locus).all()])
id_to_bioconcept = dict([(x.id, x) for x in nex_session.query(Bioconcept).all()])
bad_interactors = set([x.id for x in nex_session.query(Bioconcept).filter(Bioconcept.format_name.in_({'vegetative_growth',
'haploinsufficient',
'viable',
'heat_sensitivity',
'toxin_resistance',
'chronological_lifespan',
'competitive_fitness',
'desiccation_resistance',
'resistance_to_cycloheximide',
'resistance_to_methyl_methanesulfonate',
'resistance_to_sirolimus',
'vacuolar_morphology',
'inviable'})).all()])
#Go
for row in nex_session.query(Goevidence.locus_id, Goevidence.go_id, func.count(Goevidence.id)).filter(Goevidence.annotation_type != 'computational').group_by(Goevidence.locus_id, Goevidence.go_id).all():
go = id_to_bioconcept[row[1]]
locus = id_to_bioentity[row[0]]
if go.go_aspect == 'biological process' and go.id not in bad_interactors:
yield {'interaction_type': 'GO', 'evidence_count': row[2], 'bioentity': locus, 'interactor': go}
#Phenotype
for row in nex_session.query(Phenotypeevidence.locus_id, Phenotypeevidence.phenotype_id, func.count(Phenotypeevidence.id)).group_by(Phenotypeevidence.locus_id, Phenotypeevidence.phenotype_id).all():
observable = id_to_bioconcept[row[1]].observable
locus = id_to_bioentity[row[0]]
if observable.id not in bad_interactors:
yield {'interaction_type': 'PHENOTYPE', 'evidence_count': row[2], 'bioentity': locus, 'interactor': observable}
nex_session.close()
开发者ID:kkarra,项目名称:SGDBackend,代码行数:35,代码来源:auxiliary.py
示例18: get_staging_demographic_counts
def get_staging_demographic_counts(self):
demographics = ['hispanicorlatinoethnicity', 'americanindianoralaskanative', 'asian', 'blackorafricanamerican',
'nativehawaiianorotherpacificislander', 'white', 'demographicracetwoormoreraces',
'ideaindicator', 'lepstatus', 'section504status', 'economicdisadvantagestatus',
'migrantstatus']
results_dict = {}
with get_udl_connection() as conn:
stg_outcome = conn.get_table('stg_sbac_asmt_outcome')
for entry in demographics:
query = select([func.count(stg_outcome.c[entry])], from_obj=stg_outcome).where(stg_outcome.c[entry].in_(['Y', 'y', 'yes']))
result = conn.execute(query)
for row in result:
demo_count = row[0]
results_dict[entry] = demo_count
corrleated_results = {
'dmg_eth_hsp': results_dict['hispanicorlatinoethnicity'],
'dmg_eth_ami': results_dict['americanindianoralaskanative'],
'dmg_eth_asn': results_dict['asian'],
'dmg_eth_blk': results_dict['blackorafricanamerican'],
'dmg_eth_pcf': results_dict['nativehawaiianorotherpacificislander'],
'dmg_eth_wht': results_dict['white'],
'dmg_eth_2om': results_dict['demographicracetwoormoreraces'],
'dmg_prg_iep': results_dict['ideaindicator'],
'dmg_prg_lep': results_dict['lepstatus'],
'dmg_prg_504': results_dict['section504status'],
'dmg_sts_ecd': results_dict['economicdisadvantagestatus'],
'dmg_sts_mig': results_dict['migrantstatus'],
}
return corrleated_results
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:32,代码来源:util.py
示例19: test_migrate_student_reg
def test_migrate_student_reg(self):
Unittest_with_edcore_sqlite.setUpClass(EdMigrateDestConnection.get_datasource_name(TestMigrate.test_tenant),
use_metadata_from_db=False)
preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
prod_conn = EdMigrateDestConnection(tenant=get_unittest_prod_tenant_name())
batch_guid = "0aa942b9-75cf-4055-a67a-8b9ab53a9dfc"
student_reg_table = preprod_conn.get_table(Constants.STUDENT_REG)
get_query = select([student_reg_table.c.student_reg_rec_id]).order_by(student_reg_table.c.student_reg_rec_id)
count_query = select([func.count().label('student_reg_rec_ids')],
student_reg_table.c.student_reg_rec_id.in_(range(15541, 15551)))
rset = preprod_conn.execute(get_query)
row = rset.fetchall()
self.assertEqual(10, len(row))
self.assertListEqual([(15541,), (15542,), (15543,), (15544,), (15545,), (15546,), (15547,), (15548,), (15549,), (15550,)],
row)
rset.close()
rset = prod_conn.execute(count_query)
row = rset.fetchone()
self.assertEqual(0, row['student_reg_rec_ids'])
rset.close()
delete_count, insert_count = migrate_table(batch_guid, None, preprod_conn, prod_conn, 'student_reg', False)
self.assertEqual(0, delete_count)
self.assertEqual(10, insert_count)
rset = prod_conn.execute(count_query)
row = rset.fetchone()
self.assertEqual(10, row['student_reg_rec_ids'])
rset.close()
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:31,代码来源:test_migrate.py
示例20: test_migrate_fact_asmt_outcome_vw
def test_migrate_fact_asmt_outcome_vw(self):
preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
prod_conn = EdMigrateDestConnection(tenant=get_unittest_prod_tenant_name())
batch_guid = "288220EB-3876-41EB-B3A7-F0E6C8BD013B"
fact_asmt_outcome_table = prod_conn.get_table(Constants.FACT_ASMT_OUTCOME)
query = select([func.count().label('asmt_outcome_vw_rec_ids')], fact_asmt_outcome_table.c.asmt_outcome_vw_rec_id.in_([1000000776, 1000001034, 1000001112]))
query_c = query.where(fact_asmt_outcome_table.c.rec_status == 'C')
query_d = query.where(fact_asmt_outcome_table.c.rec_status == 'D')
query_I = query.where(fact_asmt_outcome_table.c.rec_status == 'I')
rset = prod_conn.execute(query_c)
row = rset.fetchone()
self.assertEqual(3, row['asmt_outcome_vw_rec_ids'])
rset.close()
delete_count, insert_count = migrate_table(batch_guid, None, preprod_conn, prod_conn,
'fact_asmt_outcome_vw', False)
self.assertEqual(3, delete_count)
self.assertEqual(3, insert_count)
rset = prod_conn.execute(query_c)
row = rset.fetchone()
self.assertEqual(0, row['asmt_outcome_vw_rec_ids'])
rset.close()
rset = prod_conn.execute(query_d)
row = rset.fetchone()
self.assertEqual(3, row['asmt_outcome_vw_rec_ids'])
rset.close()
# The deactivation count will be always zero in unit test
rset = prod_conn.execute(query_I)
row = rset.fetchone()
self.assertEqual(0, row['asmt_outcome_vw_rec_ids'])
rset.close()
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:30,代码来源:test_migrate.py
注:本文中的sqlalchemy.sql.expression.func.count函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论