本文整理汇总了Python中sqlalchemy.func.count函数的典型用法代码示例。如果您正苦于以下问题:Python count函数的具体用法?Python count怎么用?Python count使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了count函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: match
def match(self, level, limit=10):
'''
Returns a list of UserSkillMatch objects, in descending order of number
of skills matched for each user.
'''
skills_to_learn = [
s.name for s in
self.skills if s.level == LEVELS['LEVEL_I_WANT_TO_LEARN']['score']
]
if skills_to_learn:
matched_users = User.query_in_deployment().\
add_column(func.string_agg(UserSkill.name, ',')).\
add_column(func.count(UserSkill.id)).\
filter(UserSkill.name.in_(skills_to_learn)).\
filter(User.id == UserSkill.user_id).\
filter(UserSkill.level == level).\
filter(UserSkill.user_id != self.id).\
group_by(User).\
order_by(func.count().desc()).\
limit(limit)
else:
matched_users = []
for user, question_ids_by_comma, count in matched_users:
yield UserSkillMatch(user, question_ids_by_comma.split(','))
开发者ID:codybousc,项目名称:noi2,代码行数:26,代码来源:models.py
示例2: test_generic_count
def test_generic_count(self):
assert isinstance(func.count().type, sqltypes.Integer)
self.assert_compile(func.count(), 'count(*)')
self.assert_compile(func.count(1), 'count(:count_1)')
c = column('abc')
self.assert_compile(func.count(c), 'count(abc)')
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:7,代码来源:test_functions.py
示例3: post
def post(self):
import pluricent as pl
from sqlalchemy import func, distinct
import numpy as np
import json
args = {}
p = pl.Pluricent(pl.global_settings()['database'])
datatypes = [e[0] for e in p.session.query(distinct(pl.models.Processing.datatype)).all()]
table = []
headers = ['subject', 't1image']
q = []
q.append(dict(p.session.query(pl.models.Subject.identifier, func.count(pl.models.T1Image.path)).filter(pl.models.T1Image.subject_id == pl.models.Subject.id).group_by(pl.models.Subject.identifier).all()))
for each in datatypes:
headers.append(each)
res = p.session.query(pl.models.Subject.identifier, func.count(pl.models.Processing.path)).join(pl.models.T1Image).filter(pl.models.Processing.input_id == pl.models.T1Image.id).filter(pl.models.T1Image.subject_id == pl.models.Subject.id).filter(pl.models.Processing.datatype==each).group_by(pl.models.Subject.identifier).all()
q.append(dict(res))
subjects = [e[0] for e in p.session.query(pl.models.Subject.identifier).all()]
table.append(headers)
for s in subjects:
table.append([s])
table[-1].extend([each.get(s, 0) for each in q])
#print t1images
args['images'] = table
res = json.dumps(args)
self.write(res)
return None
开发者ID:xgrg,项目名称:pluricent,代码行数:31,代码来源:__init__.py
示例4: index
def index(self, id=None):
LIMIT = 20
if not self.authorizer.am_authorized(c, model.Action.USER_READ, model.System):
abort(401, _('Not authorized to see this page'))
page = int(request.params.get('page', 1))
c.q = request.params.get('q', '')
c.order_by = request.params.get('order_by', 'name')
query = model.Session.query(model.User, func.count(model.User.id))
if c.q:
query = model.User.search(c.q, query)
if c.order_by == 'edits':
query = query.join((model.Revision, or_(
model.Revision.author==model.User.name,
model.Revision.author==model.User.openid
)))
query = query.group_by(model.User)
query = query.order_by(desc(func.count(model.User.id)))
else:
query = query.group_by(model.User)
query = query.order_by(model.User.name)
c.page = h.Page(
collection=query,
page=page,
item_count=query.count(),
items_per_page=LIMIT
)
return render('user/list.html')
开发者ID:AdamJensen-dk,项目名称:ckan-drupal,代码行数:32,代码来源:user.py
示例5: get_query
def get_query(camp_idx, *args, group_by=None, area_idx=None):
'''
항목별 통계를 쉽게 뽑아내기 위한 메타쿼리
'''
if group_by is None:
base_query = db.session.query(func.count('*'), func.count(Member.payment), func.sum(Member.attend_yn))
else:
base_query = db.session.query(getattr(Member, group_by), func.count('*'), func.count(Member.payment), func.sum(Member.attend_yn))
base_query = base_query.select_from(Member).outerjoin(Member.payment).filter(Member.camp_idx == camp_idx, Member.cancel_yn == 0)
if area_idx is not None:
base_query = base_query.filter(Member.area_idx == area_idx)
filtered_query = base_query
for key, value in args:
if value in ['none', 'not_none']:
if value == 'none':
filtered_query = filtered_query.filter(getattr(Member, key).is_(None))
else:
filtered_query = filtered_query.filter(getattr(Member, key).isnot(None))
else:
filtered_query = filtered_query.filter(getattr(Member, key) == value)
if group_by is not None:
filtered_query = filtered_query.group_by(getattr(Member, group_by))
return filtered_query
开发者ID:INTERCP,项目名称:missioncamp,代码行数:27,代码来源:statistics.py
示例6: counts
def counts(self, terms, types=None, contact_id=None,
extra_params=None, extra_columns=None):
"""Prepare the search query, but return only count for each of
the requested objects."""
extra_params = extra_params or {}
extra_columns = extra_columns or {}
model_names = self._get_grouped_types(types, extra_params)
query = db.session.query(
self.record_type.type, func.count(distinct(
self.record_type.key)), literal(""))
query = query.filter(self.get_permissions_query(model_names))
query = query.filter(self._get_filter_query(terms))
query = self.search_get_owner_query(query, types, contact_id)
query = query.group_by(self.record_type.type)
all_extra_columns = dict(extra_columns.items() +
[(p, p) for p in extra_params
if p not in extra_columns])
if not all_extra_columns:
return query.all()
# Add extra_params and extra_colums:
for key, value in all_extra_columns.iteritems():
extra_q = db.session.query(self.record_type.type,
func.count(distinct(self.record_type.key)),
literal(key))
extra_q = extra_q.filter(self.get_permissions_query([value]))
extra_q = extra_q.filter(self._get_filter_query(terms))
extra_q = self.search_get_owner_query(extra_q, [value], contact_id)
extra_q = self._add_extra_params_query(extra_q,
value,
extra_params.get(key, None))
extra_q = extra_q.group_by(self.record_type.type)
query = query.union(extra_q)
return query.all()
开发者ID:zidarsk8,项目名称:ggrc-core,代码行数:34,代码来源:mysql.py
示例7: get_urls
def get_urls(self):
urls = []
session = db.create_scoped_session()
records = (
session
.query(Log.dashboard_id, func.count(Log.dashboard_id))
.filter(and_(
Log.dashboard_id.isnot(None),
Log.dttm >= self.since,
))
.group_by(Log.dashboard_id)
.order_by(func.count(Log.dashboard_id).desc())
.limit(self.top_n)
.all()
)
dash_ids = [record.dashboard_id for record in records]
dashboards = (
session
.query(Dashboard)
.filter(Dashboard.id.in_(dash_ids))
.all()
)
for dashboard in dashboards:
for chart in dashboard.slices:
urls.append(
get_url({'form_data': get_form_data(chart.id, dashboard)}))
return urls
开发者ID:tan31989,项目名称:caravel,代码行数:29,代码来源:cache.py
示例8: recommended_books_by_publisher
def recommended_books_by_publisher(books_read_by_user, recommended_books):
read_books_isbn_list = [book.book_isbn for book in books_read_by_user]
# Get publishers and the number of books read by the user of the publisher
# in descending order
publisher_count = db.session.query(
func.lower(Books.publisher),
func.count(Books.publisher)
).filter(
Books.isbn.in_(read_books_isbn_list)
).group_by(
func.lower(Books.publisher)
).order_by(
func.count(Books.publisher).desc()
).all()
publisher_count_dict = {str(publisher): count for (
publisher, count) in publisher_count}
unread_books_by_same_publishers = db.session.query(
Books.isbn,
func.lower(Books.publisher)
).filter(
and_(
func.lower(Books.publisher).in_([x[0] for x in publisher_count]),
~Books.isbn.in_(read_books_isbn_list)
)
).all()
# Gets the books of the above publishers which are not read by the user
for unread_book in unread_books_by_same_publishers:
isbn = unread_book[0]
publisher = unread_book[1]
weight = g.user.publisher_weight * publisher_count_dict[publisher]
if isbn not in recommended_books:
recommended_books.update({isbn: weight})
else:
recommended_books[isbn] += weight
开发者ID:sanchitgn,项目名称:books,代码行数:34,代码来源:api.py
示例9: recommended_books_by_genre
def recommended_books_by_genre(books_read_by_user, recommended_books):
read_books_isbn_list = [book.book_isbn for book in books_read_by_user]
# Get the genres, count of genres read by user
genre_count = db.session.query(
BooksHasGenre.genre,
func.count(BooksHasGenre.genre)
).filter(
BooksHasGenre.book_isbn.in_(read_books_isbn_list)
).group_by(
BooksHasGenre.genre
).order_by(
func.count(BooksHasGenre.genre).desc()
).all()
genre_count_dict = {genre: count for (genre, count) in genre_count}
# Find all the unread books having the same genre
unread_books_having_same_genre = db.session.query(
Books.isbn,
BooksHasGenre.genre
).filter(
Books.isbn == BooksHasGenre.book_isbn,
BooksHasGenre.genre.in_([x[0] for x in genre_count]),
~Books.isbn.in_(read_books_isbn_list)
).all()
for unread_book in unread_books_having_same_genre:
isbn = unread_book[0]
genre = unread_book[1]
weight = g.user.genre_weight * genre_count_dict[genre]
if isbn not in recommended_books:
recommended_books.update({isbn: weight})
else:
recommended_books[isbn] += weight
开发者ID:sanchitgn,项目名称:books,代码行数:31,代码来源:api.py
示例10: plot_gender
def plot_gender():
metadata, connection = setup_database()
tables = ["visits_10min"]
for table in tables:
consolidated = return_joined_table(table, metadata)
print connection.execute(select([func.count()], consolidated.c["demographics_gender"] == 2)).fetchall()
gender_checkins = []
for gender in (0, 1):
gender_checkins.append([])
for place_label in xrange(1, 11):
query = select([func.count()], and_(consolidated.c["visits_joined_places_place_label"] == place_label, consolidated.c["demographics_gender"] == gender + 1))
result = connection.execute(query).fetchall()
gender_checkins[gender].append(result[0][0])
fig, ax = plt.subplots()
width = 0.35
rects1 = ax.bar(xrange(1, 11), gender_checkins[0], width, color='r')
rects2 = ax.bar([i + width for i in xrange(1, 11)], gender_checkins[1], width, color='g')
ax.legend((rects1[0], rects2[0]), ('Men', 'Women'))
ax.set_ylabel("Count", fontsize = 24, fontweight = 'bold')
ax.set_xlabel("Place Category", fontsize=24, fontweight = 'bold')
ax.set_title("Visits Across Gender", fontsize=32, fontweight='bold')
xticks_values = [LABEL_PLACE_MAPPING[i] for i in xrange(1, 11)]
xticks_values = [textwrap.fill(text,10) for text in xticks_values]
ax.set_xticks([i + width for i in xrange(1, 11)])
ax.set_xticklabels(xticks_values)
#autolabel(rects1, gender_checkins[0])
#autolabel(rects2, gender_checkins[1])
plt.show()
开发者ID:siddharthsarda,项目名称:spams,代码行数:29,代码来源:plot_places.py
示例11: cup_list
def cup_list(request):
season = request.matchdict.get('season', None)
if season:
start = datetime.strptime(season.split('-')[0] + '-11-01', '%Y-%m-%d')
end = datetime.strptime(season.split('-')[1] + '-11-01', '%Y-%m-%d')
session = DBSession()
start_date = func.min(CupDate.date).label('start_date')
end_date = func.max(CupDate.date).label('end_date')
cups = session.query(Cup, func.count(CupDate.cup_id).label('total'),
func.count(CupDate.tournament_id).label('completed'),
start_date,
end_date) \
.join(CupDate)
if season:
cups = cups \
.group_by(Cup.id) \
.having(start_date >= start) \
.having(start_date <= end)
cups = cups \
.group_by(Cup) \
.order_by('end_date desc', Cup.name.desc()) \
.all()
return dict(cups=cups)
开发者ID:janerist,项目名称:dsjtournaments,代码行数:29,代码来源:view_cups.py
示例12: get_licenses
def get_licenses(session, suite=None):
""" Count files per license filtered by `suite`
"""
logging.debug('grouped by license summary')
if not suite:
q = (session.query(FileCopyright.license, Suite.suite,
sql_func.count(FileCopyright.id))
.join(File)
.join(Package)
.join(Suite)
.group_by(Suite.suite)
.group_by(FileCopyright.license)
.order_by(Suite.suite))
return q.all()
else:
q = (session.query(FileCopyright.license,
sql_func.count(FileCopyright.id))
.join(File)
.join(Package))
if suite != 'ALL':
q = q.join(Suite) \
.filter(Suite.suite == suite)
q = q.group_by(FileCopyright.license)
return dict(q.all())
开发者ID:chr7stos,项目名称:debsources,代码行数:25,代码来源:statistics.py
示例13: match
def match(self, level, limit=10):
'''
Returns a list of UserSkillMatch objects, in descending order of number
of skills matched for each user.
'''
if db.engine.name == 'sqlite':
agg = func.group_concat
elif db.engine.name == 'postgresql':
agg = func.string_agg
else:
raise Exception('Unknown aggregation function for DB {}'.format(
db.engine.name))
skills_to_learn = [
s.name for s in
self.skills if s.level == LEVELS['LEVEL_I_WANT_TO_LEARN']['score']
]
if skills_to_learn:
matched_users = User.query_in_deployment().\
add_column(agg(UserSkill.name, ',')).\
add_column(func.count(UserSkill.id)).\
filter(UserSkill.name.in_(skills_to_learn)).\
filter(User.id == UserSkill.user_id).\
filter(UserSkill.level == level).\
filter(UserSkill.user_id != self.id).\
group_by(User).\
order_by(func.count().desc()).\
limit(limit)
else:
matched_users = []
for user, question_ids_by_comma, count in matched_users:
yield UserSkillMatch(user, question_ids_by_comma.split(','))
开发者ID:yeehanchan,项目名称:noi2,代码行数:32,代码来源:models.py
示例14: nearest_neighbors
def nearest_neighbors(self, limit=10):
'''
Returns a list of (user, score) tuples with the closest matching
skills. If they haven't answered the equivalent skill question, we
consider that a very big difference (12).
Order is closest to least close, which is an ascending score.
'''
my_skills = aliased(UserSkill, name='my_skills', adapt_on_names=True)
their_skills = aliased(UserSkill, name='their_skills', adapt_on_names=True)
# difference we assume for user that has not answered question
unanswered_difference = (LEVELS['LEVEL_I_CAN_DO_IT']['score'] -
LEVELS['LEVEL_I_WANT_TO_LEARN']['score']) * 2
return User.query_in_deployment().\
add_column(((len(self.skills) - func.count(func.distinct(their_skills.id))) *
unanswered_difference) + \
func.sum(func.abs(their_skills.level - my_skills.level))).\
filter(their_skills.user_id != my_skills.user_id).\
filter(User.id == their_skills.user_id).\
filter(their_skills.name == my_skills.name).\
filter(my_skills.user_id == self.id).\
group_by(User).\
order_by(((len(self.skills) - func.count(func.distinct(their_skills.id)))
* unanswered_difference) + \
func.sum(func.abs(their_skills.level - my_skills.level))).\
limit(limit)
开发者ID:yeehanchan,项目名称:noi2,代码行数:28,代码来源:models.py
示例15: crawler_stats
def crawler_stats(cls, crawler_id):
stats = {}
col = func.count(func.distinct(cls.crawler_run))
q = db.session.query(col)
q = q.filter(cls.crawler_id == crawler_id)
stats['run_count'] = q.scalar()
last_run_id, last_run_time = cls.crawler_last_run(crawler_id)
# Check if the crawler was active very recently, if so, don't
# allow the user to execute a new run right now.
timeout = (datetime.utcnow() - CrawlerState.TIMEOUT)
stats['running'] = last_run_time > timeout if last_run_time else False
q = db.session.query(func.count(func.distinct(cls.foreign_id)))
q = q.filter(cls.crawler_id == crawler_id)
for section in ['last', 'all']:
data = {}
sq = q
if section == 'last':
sq = sq.filter(cls.crawler_run == last_run_id)
okq = sq.filter(cls.status == cls.STATUS_OK)
data['ok'] = okq.scalar() if last_run_id else 0
failq = sq.filter(cls.status == cls.STATUS_FAIL)
data['fail'] = failq.scalar() if last_run_id else 0
stats[section] = data
stats['last']['updated'] = last_run_time
stats['last']['run_id'] = last_run_id
return stats
开发者ID:andkamau,项目名称:aleph,代码行数:28,代码来源:crawler_state.py
示例16: render_receipt
def render_receipt(user, png=False, pdf=False):
tickets = (user.tickets
.filter_by(paid=True)
.join(TicketType)
.order_by(TicketType.order))
entrance_tts_counts = (tickets.filter(TicketType.admits.in_(['full', 'kid']))
.with_entities(TicketType, func.count(Ticket.id).label('ticket_count'))
.group_by(TicketType).all())
entrance_tickets_count = sum(c for tt, c in entrance_tts_counts)
vehicle_tickets = tickets.filter(TicketType.admits.in_(['car', 'campervan'])).all()
transferred_tickets = user.transfers_from.order_by('timestamp').all()
tees = (tickets.filter(TicketType.fixed_id.in_(range(14, 24)))
.with_entities(TicketType, func.count(Ticket.id).label('ticket_count'))
.group_by(TicketType).all()) # t-shirts
return render_template('receipt.html', user=user,
format_inline_qr=format_inline_qr,
format_inline_barcode=format_inline_barcode,
entrance_tts_counts=entrance_tts_counts,
entrance_tickets_count=entrance_tickets_count,
vehicle_tickets=vehicle_tickets,
transferred_tickets=transferred_tickets,
tees=tees,
pdf=pdf, png=png)
开发者ID:bfirsh,项目名称:Website,代码行数:28,代码来源:receipt.py
示例17: test_events
def test_events(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses))
})
sess = create_session()
u1 = User(name='jack')
a1 = Address(email_address='foo')
sess.add_all([u1, a1])
sess.flush()
eq_(testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)), 0)
u1 = sess.query(User).get(u1.id)
u1.addresses.append(a1)
sess.flush()
eq_(testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall(),
[(a1.id, u1.id, 'foo')])
u1.addresses.remove(a1)
sess.flush()
eq_(testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)), 0)
u1.addresses.append(a1)
sess.flush()
eq_(testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall(),
[(a1.id, u1.id, 'foo')])
a2 = Address(email_address='bar')
u1.addresses.remove(a1)
u1.addresses.append(a2)
sess.flush()
eq_(testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall(),
[(a2.id, u1.id, 'bar')])
开发者ID:gaguilarmi,项目名称:sqlalchemy,代码行数:33,代码来源:test_dynamic.py
示例18: view_filing
def view_filing(self):
req = self.request
ses = req.session
form = self.get_form(AddSchema)
page = {}
rowpage = 1
cpage = 'page' in req.POST and req.POST['page'] or 1
if cpage<1:
cpage = 1
page['current']=int(cpage)
offset = (page['current']-1) * rowpage
if 'data' in req.POST:
page['row'] = DBSession.query(func.count(Filing.id)).\
filter(or_(Filing.tag.like('%%%s%%' % req.POST['data']),
Filing.nama.like('%%%s%%' % req.POST['data'])),).scalar() or 0
rows = DBSession.query(Filing).\
filter(or_(Filing.tag.like('%%%s%%' % req.POST['data']),
Filing.nama.like('%%%s%%' % req.POST['data'])),).\
limit(rowpage).offset(offset)
else:
rows = DBSession.query(Filing).\
limit(rowpage).offset(offset)
page['row'] = DBSession.query(func.count(Filing.id)).scalar() or 0
count = page['row'] / int(rowpage)
page['count'] = count
if count < page['row']/float(rowpage):
page['count']=count+1
return dict(form=form, rows=rows, page=page)
开发者ID:aagusti,项目名称:osipkd-pdpt,代码行数:31,代码来源:__init__.py
示例19: top_tags
def top_tags(cls, limit=10, returned_tag_info='object'): # by package
assert returned_tag_info in ('name', 'id', 'object')
tag = table('tag')
package_tag = table('package_tag')
package = table('package')
if returned_tag_info == 'name':
from_obj = [package_tag.join(tag)]
tag_column = tag.c.name
else:
from_obj = None
tag_column = package_tag.c.tag_id
j = join(package_tag, package,
package_tag.c.package_id == package.c.id)
s = select([tag_column, func.count(package_tag.c.package_id)],
from_obj=from_obj).\
select_from(j).\
where(and_(package_tag.c.state=='active', package.c.private == False, package.c.state == 'active' ))
s = s.group_by(tag_column).\
order_by(func.count(package_tag.c.package_id).desc()).\
limit(limit)
res_col = model.Session.execute(s).fetchall()
if returned_tag_info in ('id', 'name'):
return res_col
elif returned_tag_info == 'object':
res_tags = [(model.Session.query(model.Tag).get(text_type(tag_id)), val) for tag_id, val in res_col]
return res_tags
开发者ID:PublicaMundi,项目名称:ckan,代码行数:26,代码来源:stats.py
示例20: get_story
def get_story(cls, limit: Optional[int] = None,
offset: Optional[int] = None,
sort: Optional[str] = None,
category_id: Optional[int] = None) -> list:
story_qs = Story.sa.query(
Story.sa.id,
Story.sa.name,
Story.sa.slug,
Story.sa.description,
Story.sa.content,
Category.sa.name.label('category_name')
).join(
Category.sa
)
if category_id:
total_count = Story.sa.query(func.count(Story.sa.id)).filter(Category.sa.id == category_id).scalar()
story_qs = story_qs.filter(
Category.sa.id == category_id
)
else:
total_count = Story.sa.query(func.count(Story.sa.id)).scalar()
story_qs = BaseRespository.filter_limit(story_qs, limit)
story_qs = BaseRespository.filter_offset(story_qs, offset)
story_qs = BaseRespository.sort_all(story_qs, sort, Story)
try:
result = story_qs.all()
except NoResultFound:
result = None
return result, total_count
开发者ID:tuanquanghpvn,项目名称:restful-exam,代码行数:29,代码来源:repository.py
注:本文中的sqlalchemy.func.count函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论