本文整理汇总了Python中stackalytics.dashboard.parameters.get_parameter函数的典型用法代码示例。如果您正苦于以下问题:Python get_parameter函数的具体用法?Python get_parameter怎么用?Python get_parameter使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_parameter函数的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_modules_json
def get_modules_json(record_ids, **kwargs):
module_id_index = vault.get_vault()['module_id_index']
tags = parameters.get_parameter({}, 'tag', 'tags')
# all modules mentioned in records
module_ids = vault.get_memory_storage().get_index_keys_by_record_ids(
'module', record_ids)
add_modules = set([])
for module in six.itervalues(module_id_index):
if set(module['modules']) & module_ids:
add_modules.add(module['id'])
module_ids |= add_modules
# keep only modules with specified tags
if tags:
module_ids = set(module_id for module_id in module_ids
if ((module_id in module_id_index) and
(module_id_index[module_id].get('tag') in tags)))
result = []
for module_id in module_ids:
module = module_id_index[module_id]
result.append({'id': module['id'],
'text': module['module_group_name'],
'tag': module['tag']})
return sorted(result, key=operator.itemgetter('text'))
开发者ID:dlundquist,项目名称:stackalytics,代码行数:29,代码来源:web.py
示例2: test_parameters_get_parameter
def test_parameters_get_parameter(self, get_default, flask_request):
flask_request.args = mock.Mock()
flask_request.args.get = mock.Mock(side_effect=lambda x: x)
def make(values=None):
def f(arg):
return values.get(arg, None) if values else None
return f
get_default.side_effect = make()
flask_request.args.get.side_effect = make({'param': 'foo'})
self.assertEqual(['foo'], parameters.get_parameter(
{'param': 'foo'}, 'param'))
flask_request.args.get.side_effect = make({'param': 'foo'})
self.assertEqual(['foo'], parameters.get_parameter({}, 'param'))
flask_request.args.get.side_effect = make({'param': 'foo'})
self.assertEqual([], parameters.get_parameter(
{}, 'other', use_default=False))
flask_request.args.get.side_effect = make({'params': 'foo'})
self.assertEqual(['foo'], parameters.get_parameter(
{}, 'param', plural_name='params'))
flask_request.args.get.side_effect = make({})
get_default.side_effect = make({'param': 'foo'})
self.assertEqual(['foo'], parameters.get_parameter({}, 'param'))
self.assertEqual([], parameters.get_parameter({}, 'other'))
开发者ID:Tan0,项目名称:stackalytics,代码行数:30,代码来源:test_web_utils.py
示例3: make_link
def make_link(title, uri=None, options=None):
param_names = ('release', 'project_type', 'module', 'company', 'user_id',
'metric')
param_values = {}
for param_name in param_names:
value = parameters.get_parameter({}, param_name)
if value:
param_values[param_name] = ','.join(value)
if options:
param_values.update(options)
if param_values:
uri += '?' + '&'.join(['%s=%s' % (n, utils.safe_encode(v))
for n, v in six.iteritems(param_values)])
return '<a href="%(uri)s">%(title)s</a>' % {'uri': uri, 'title': title}
开发者ID:Mirantis,项目名称:stackalytics,代码行数:14,代码来源:helpers.py
示例4: get_engineers
def get_engineers(records, metric_filter, finalize_handler, **kwargs):
modules_names = parameters.get_parameter({}, 'module', 'modules')
modules = set([m for m, r in vault.resolve_modules(modules_names, [''])])
def postprocessing(record):
if finalize_handler:
record = finalize_handler(record)
user = vault.get_user_from_runtime_storage(record['id'])
record['core'] = get_core_engineer_branch(user, modules)
return record
return _get_aggregated_stats(records, metric_filter,
vault.get_memory_storage().get_user_ids(),
'user_id', 'author_name',
finalize_handler=postprocessing)
开发者ID:dlundquist,项目名称:stackalytics,代码行数:15,代码来源:web.py
示例5: get_engineers_extended
def get_engineers_extended(records, **kwargs):
modules_names = parameters.get_parameter({}, 'module', 'modules')
modules = set([m for m, r in vault.resolve_modules(modules_names, [''])])
def postprocessing(record):
record = decorators.mark_finalize(record)
if not (record['mark'] or record['review'] or record['commit'] or
record['email'] or record['patch']):
return
user = vault.get_user_from_runtime_storage(record['id'])
record['company'] = user['companies'][-1]['company_name']
record['core'] = get_core_engineer_branch(user, modules)
return record
def record_processing(result, record, param_id):
result_row = result[record[param_id]]
record_type = record['record_type']
result_row[record_type] = result_row.get(record_type, 0) + 1
if record_type == 'mark':
decorators.mark_filter(result, record, param_id, {})
result = {}
for record in records:
user_id = record['user_id']
if user_id not in result:
result[user_id] = {'id': user_id, 'mark': 0, 'review': 0,
'commit': 0, 'email': 0, 'patch': 0,
'metric': 0}
record_processing(result, record, 'user_id')
result[user_id]['name'] = record['author_name']
response = result.values()
response = [item for item in map(postprocessing, response) if item]
response.sort(key=lambda x: x['metric'], reverse=True)
utils.add_index(response)
return response
开发者ID:dlundquist,项目名称:stackalytics,代码行数:39,代码来源:web.py
示例6: _prepare_params
def _prepare_params(kwargs, ignore):
params = kwargs.get('_params')
if not params:
params = {'action': flask.request.path}
for key in parameters.FILTER_PARAMETERS:
params[key] = parameters.get_parameter(kwargs, key, key)
if params['start_date']:
params['start_date'] = [utils.round_timestamp_to_day(
params['start_date'][0])]
if params['end_date']:
params['end_date'] = [utils.round_timestamp_to_day(
params['end_date'][0])]
kwargs['_params'] = params
if ignore:
return dict([(k, v if k not in ignore else [])
for k, v in six.iteritems(params)])
else:
return params
开发者ID:dims,项目名称:stackalytics,代码行数:22,代码来源:decorators.py
示例7: timeline
def timeline(records, **kwargs):
# find start and end dates
metric = parameters.get_parameter(kwargs, 'metric')
start_date = int(parameters.get_single_parameter(kwargs, 'start_date')
or 0)
release_name = parameters.get_single_parameter(kwargs, 'release') or 'all'
releases = vault.get_vault()['releases']
if 'all' in release_name:
start_week = release_start_week = _get_week(kwargs, 'start_date')
end_week = release_end_week = _get_week(kwargs, 'end_date')
else:
release = releases[release_name]
start_week = release_start_week = utils.timestamp_to_week(
release['start_date'])
end_week = release_end_week = utils.timestamp_to_week(
release['end_date'])
now = utils.timestamp_to_week(int(time.time())) + 1
# expand start-end to year if needed
if release_end_week - release_start_week < 52:
expansion = (52 - (release_end_week - release_start_week)) // 2
if release_end_week + expansion < now:
end_week += expansion
else:
end_week = now
start_week = end_week - 52
# empty stats for all weeks in range
weeks = range(start_week, end_week)
week_stat_loc = dict((c, 0) for c in weeks)
week_stat_commits = dict((c, 0) for c in weeks)
week_stat_commits_hl = dict((c, 0) for c in weeks)
if ('commits' in metric) or ('loc' in metric):
handler = lambda record: record['loc']
else:
handler = lambda record: 0
# fill stats with the data
if 'person-day' in metric:
# special case for man-day effort metric
release_stat = collections.defaultdict(set)
all_stat = collections.defaultdict(set)
for record in records:
if ((record['record_type'] in ['commit', 'member']) or
(record['week'] not in weeks)):
continue
day = utils.timestamp_to_day(record['date'])
user = vault.get_user_from_runtime_storage(record['user_id'])
if record['release'] == release_name:
release_stat[day] |= set([user['seq']])
all_stat[day] |= set([user['seq']])
for day, users in six.iteritems(release_stat):
week = utils.timestamp_to_week(day * 24 * 3600)
week_stat_commits_hl[week] += len(users)
for day, users in six.iteritems(all_stat):
week = utils.timestamp_to_week(day * 24 * 3600)
week_stat_commits[week] += len(users)
else:
for record in records:
week = record['week']
if week in weeks:
week_stat_loc[week] += handler(record)
week_stat_commits[week] += 1
if 'members' in metric:
if record['date'] >= start_date:
week_stat_commits_hl[week] += 1
else:
if record['release'] == release_name:
week_stat_commits_hl[week] += 1
if 'all' == release_name and 'members' not in metric:
week_stat_commits_hl = week_stat_commits
# form arrays in format acceptable to timeline plugin
array_loc = []
array_commits = []
array_commits_hl = []
for week in weeks:
week_str = utils.week_to_date(week)
array_loc.append([week_str, week_stat_loc[week]])
array_commits.append([week_str, week_stat_commits[week]])
array_commits_hl.append([week_str, week_stat_commits_hl[week]])
return [array_commits, array_commits_hl, array_loc]
开发者ID:dlundquist,项目名称:stackalytics,代码行数:89,代码来源:web.py
示例8: record_filter_decorated_function
def record_filter_decorated_function(*args, **kwargs):
memory_storage_inst = vault.get_memory_storage()
record_ids = None
params = _prepare_params(kwargs, ignore)
release = params['release']
if release:
if 'all' not in release:
record_ids = (
memory_storage_inst.get_record_ids_by_releases(
c.lower() for c in release))
project_type = params['project_type']
mr = None
if project_type:
mr = set(vault.resolve_modules(vault.resolve_project_types(
project_type), release))
module = params['module']
if module:
mr = _intersect(mr, set(vault.resolve_modules(
module, release)))
if mr is not None:
record_ids = _intersect(
record_ids, _filter_records_by_modules(
memory_storage_inst, mr))
user_id = params['user_id']
user_id = [u for u in user_id
if vault.get_user_from_runtime_storage(u)]
if user_id:
record_ids = _intersect(
record_ids,
memory_storage_inst.get_record_ids_by_user_ids(user_id))
company = params['company']
if company:
record_ids = _intersect(
record_ids,
memory_storage_inst.get_record_ids_by_companies(company))
metric = params['metric']
if 'all' not in metric:
for metric in metric:
if metric in parameters.METRIC_TO_RECORD_TYPE:
record_ids = _intersect(
record_ids,
memory_storage_inst.get_record_ids_by_type(
parameters.METRIC_TO_RECORD_TYPE[metric]))
if 'tm_marks' in metric:
filtered_ids = []
review_nth = int(parameters.get_parameter(
kwargs, 'review_nth')[0])
for record in memory_storage_inst.get_records(record_ids):
parent = memory_storage_inst.get_record_by_primary_key(
record['review_id'])
if (parent and ('review_number' in parent) and
(parent['review_number'] <= review_nth)):
filtered_ids.append(record['record_id'])
record_ids = filtered_ids
blueprint_id = params['blueprint_id']
if blueprint_id:
record_ids = _intersect(
record_ids,
memory_storage_inst.get_record_ids_by_blueprint_ids(
blueprint_id))
start_date = params['start_date']
end_date = params['end_date']
if start_date or end_date:
record_ids = _intersect(
record_ids, _filter_records_by_days(start_date, end_date,
memory_storage_inst))
kwargs['record_ids'] = record_ids
kwargs['records'] = memory_storage_inst.get_records(record_ids)
return f(*args, **kwargs)
开发者ID:dims,项目名称:stackalytics,代码行数:84,代码来源:decorators.py
注:本文中的stackalytics.dashboard.parameters.get_parameter函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论