本文整理汇总了Python中six.viewvalues函数的典型用法代码示例。如果您正苦于以下问题:Python viewvalues函数的具体用法?Python viewvalues怎么用?Python viewvalues使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了viewvalues函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: reinitializeAll
def reinitializeAll():
"""
Force all models to reconnect/rebuild indices (needed for testing).
"""
for pluginModels in list(six.viewvalues(_modelInstances)):
for model in list(six.viewvalues(pluginModels)):
model.reconnect()
开发者ID:0x414A,项目名称:girder,代码行数:7,代码来源:model_importer.py
示例2: __call__
def __call__(self):
while True:
try:
header, payload, __ = recv(self.socket)
except GreenletExit:
break
except zmq.ZMQError:
exc = TaskClosed('Collector socket closed')
for results in viewvalues(self.results):
for result in viewvalues(results):
result.set_exception(exc)
break
except:
# TODO: warn MalformedMessage
continue
method, call_id, task_id = header
method = ord(method)
reply = Reply(method, call_id, task_id)
if method & ACK:
value = payload or None
else:
value = self.unpack(payload)
self.trace and self.trace(method, (call_id, task_id, value))
del header, payload, method, call_id, task_id
try:
self.dispatch_reply(reply, value)
except KeyError:
# TODO: warning
continue
finally:
del reply, value
开发者ID:sublee,项目名称:zeronimo,代码行数:31,代码来源:core.py
示例3: setup_params
def setup_params(self, data):
params = self.params.copy()
valid_scale = ('area', 'count', 'width')
if params['scale'] not in valid_scale:
msg = "Parameter scale should be one of {}"
raise PlotnineError(msg.format(valid_scale))
lookup = {
'biweight': 'biw',
'cosine': 'cos',
'cosine2': 'cos2',
'epanechnikov': 'epa',
'gaussian': 'gau',
'triangular': 'tri',
'triweight': 'triw',
'uniform': 'uni'}
with suppress(KeyError):
params['kernel'] = lookup[params['kernel'].lower()]
if params['kernel'] not in six.viewvalues(lookup):
msg = ("kernel should be one of {}. "
"You may use the abbreviations {}")
raise PlotnineError(msg.format(six.viewkeys(lookup),
six.viewvalues()))
missing_params = (six.viewkeys(stat_density.DEFAULT_PARAMS) -
six.viewkeys(params))
for key in missing_params:
params[key] = stat_density.DEFAULT_PARAMS[key]
return params
开发者ID:jwhendy,项目名称:plotnine,代码行数:33,代码来源:stat_ydensity.py
示例4: _dpdk_devs_current
def _dpdk_devs_current(dpdk_devices):
devs_exist = all(_dev_exists(devinfo)
for devinfo in six.viewvalues(dpdk_devices))
unlisted_devices = _unlisted_devices(
[devinfo['pci_addr'] for devinfo in six.viewvalues(dpdk_devices)])
return devs_exist and not unlisted_devices
开发者ID:nirs,项目名称:vdsm,代码行数:7,代码来源:dpdk.py
示例5: validate_column_specs
def validate_column_specs(events, next_value_columns, previous_value_columns):
"""
Verify that the columns of ``events`` can be used by an EventsLoader to
serve the BoundColumns described by ``next_value_columns`` and
``previous_value_columns``.
"""
required = {
TS_FIELD_NAME,
SID_FIELD_NAME,
EVENT_DATE_FIELD_NAME,
}.union(
# We also expect any of the field names that our loadable columns
# are mapped to.
viewvalues(next_value_columns),
viewvalues(previous_value_columns),
)
received = set(events.columns)
missing = required - received
if missing:
raise ValueError(
"EventsLoader missing required columns {missing}.\n"
"Got Columns: {received}\n"
"Expected Columns: {required}".format(
missing=sorted(missing),
received=sorted(received),
required=sorted(required),
)
)
开发者ID:jeyoor,项目名称:zipline,代码行数:28,代码来源:events.py
示例6: _normalize_qos_config
def _normalize_qos_config(qos):
for value in six.viewvalues(qos):
for attrs in six.viewvalues(value):
if attrs.get('m1') == 0:
del attrs['m1']
if attrs.get('d') == 0:
del attrs['d']
return qos
开发者ID:EdDev,项目名称:vdsm,代码行数:8,代码来源:netfunctestlib.py
示例7: required_event_fields
def required_event_fields(next_value_columns, previous_value_columns):
"""
Compute the set of resource columns required to serve
``next_value_columns`` and ``previous_value_columns``.
"""
# These metadata columns are used to align event indexers.
return {TS_FIELD_NAME, SID_FIELD_NAME, EVENT_DATE_FIELD_NAME}.union(
# We also expect any of the field names that our loadable columns
# are mapped to.
viewvalues(next_value_columns),
viewvalues(previous_value_columns),
)
开发者ID:quantopian,项目名称:zipline,代码行数:12,代码来源:events.py
示例8: mockBitbucketApiUser
def mockBitbucketApiUser(url, request):
try:
for account in six.viewvalues(providerInfo['accounts']):
if 'Bearer %s' % account['access_token'] == \
request.headers['Authorization']:
break
else:
self.fail()
except AssertionError as e:
return {
'status_code': 401,
'content': json.dumps({
'message': repr(e)
})
}
return json.dumps({
"created_on": "2011-12-20T16:34:07.132459+00:00",
"uuid": account['user']['oauth']['id'],
"location": "Santa Monica, CA",
"links": {},
"website": "https://tutorials.bitbucket.org/",
"username": account['user']['login'],
"display_name": '%s %s' % (account['user']['firstName'],
account['user']['lastName'])
})
开发者ID:0x414A,项目名称:girder,代码行数:25,代码来源:oauth_test.py
示例9: _computeHash
def _computeHash(file, progress=noProgress):
"""
Computes all supported checksums on a given file. Downloads the
file data and stream-computes all required hashes on it, saving
the results in the file document.
In the case of assetstore impls that already compute the sha512,
and when sha512 is the only supported algorithm, we will not download
the file to the server.
"""
toCompute = SUPPORTED_ALGORITHMS - set(file)
toCompute = {alg: getattr(hashlib, alg)() for alg in toCompute}
if not toCompute:
return
fileModel = FileModel()
with fileModel.open(file) as fh:
while True:
chunk = fh.read(_CHUNK_LEN)
if not chunk:
break
for digest in six.viewvalues(toCompute):
digest.update(chunk)
progress.update(increment=len(chunk))
digests = {alg: digest.hexdigest() for alg, digest in six.viewitems(toCompute)}
fileModel.update({'_id': file['_id']}, update={
'$set': digests
}, multi=False)
return digests
开发者ID:girder,项目名称:girder,代码行数:32,代码来源:__init__.py
示例10: calculate_cum_reward
def calculate_cum_reward(policy):
"""Calculate cumulative reward with respect to time.
Parameters
----------
policy: bandit object
The bandit algorithm you want to evaluate.
Return
---------
cum_reward: dict
The dict stores {history_id: cumulative reward} .
cum_n_actions: dict
The dict stores
{history_id: cumulative number of recommended actions}.
"""
cum_reward = {-1: 0.0}
cum_n_actions = {-1: 0}
for i in range(policy.history_storage.n_histories):
reward = policy.history_storage.get_history(i).rewards
cum_n_actions[i] = cum_n_actions[i - 1] + len(reward)
cum_reward[i] = cum_reward[i - 1] + sum(six.viewvalues(reward))
return cum_reward, cum_n_actions
开发者ID:ntucllab,项目名称:striatum,代码行数:25,代码来源:rewardplot.py
示例11: _update_dhcp_info
def _update_dhcp_info(nets_info, devices_info):
"""Update DHCP info for both networks and devices"""
net_ifaces = {net_info['iface'] for net_info in six.viewvalues(nets_info)}
flat_devs_info = {
devname: devinfo
for sub_devs in six.viewvalues(devices_info)
for devname, devinfo in six.viewitems(sub_devs)
}
dhcp_info = dhclient.dhcp_info(net_ifaces | frozenset(flat_devs_info))
for net_info in six.viewvalues(nets_info):
net_info.update(dhcp_info[net_info['iface']])
for devname, devinfo in six.viewitems(flat_devs_info):
devinfo.update(dhcp_info[devname])
开发者ID:nirs,项目名称:vdsm,代码行数:16,代码来源:cache.py
示例12: _exp4p_score
def _exp4p_score(self, context):
"""The main part of Exp4.P.
"""
advisor_ids = list(six.viewkeys(context))
w = self._modelstorage.get_model()['w']
if len(w) == 0:
for i in advisor_ids:
w[i] = 1
w_sum = sum(six.viewvalues(w))
action_probs_list = []
for action_id in self.action_ids:
weighted_exp = [w[advisor_id] * context[advisor_id][action_id]
for advisor_id in advisor_ids]
prob_vector = np.sum(weighted_exp) / w_sum
action_probs_list.append((1 - self.n_actions * self.p_min)
* prob_vector
+ self.p_min)
action_probs_list = np.asarray(action_probs_list)
action_probs_list /= action_probs_list.sum()
estimated_reward = {}
uncertainty = {}
score = {}
for action_id, action_prob in zip(self.action_ids, action_probs_list):
estimated_reward[action_id] = action_prob
uncertainty[action_id] = 0
score[action_id] = action_prob
self._modelstorage.save_model(
{'action_probs': estimated_reward, 'w': w})
return estimated_reward, uncertainty, score
开发者ID:ntucllab,项目名称:striatum,代码行数:33,代码来源:exp4p.py
示例13: validate_southbound_devices_usages
def validate_southbound_devices_usages(nets, ni):
kernel_config = KernelConfig(ni)
for requested_net, net_info in six.viewitems(nets):
if 'remove' in net_info:
kernel_config.removeNetwork(requested_net)
for requested_net, net_info in six.viewitems(nets):
if 'remove' in net_info:
continue
kernel_config.setNetwork(requested_net, net_info)
underlying_devices = []
for net_attrs in six.viewvalues(kernel_config.networks):
vlan = net_attrs.get('vlan')
if 'bonding' in net_attrs:
underlying_devices.append((net_attrs['bonding'], vlan))
elif 'nic' in net_attrs:
underlying_devices.append((net_attrs['nic'], vlan))
if len(set(underlying_devices)) < len(underlying_devices):
raise ne.ConfigNetworkError(
ne.ERR_BAD_PARAMS,
'multiple networks/similar vlans cannot be'
' defined on a single underlying device. '
'kernel networks: {}\nrequested networks: {}'.format(
kernel_config.networks,
nets))
开发者ID:EdDev,项目名称:vdsm,代码行数:28,代码来源:validator.py
示例14: mockBitbucketApiEmail
def mockBitbucketApiEmail(url, request):
try:
for account in six.viewvalues(providerInfo['accounts']):
if 'Bearer %s' % account['access_token'] == \
request.headers['Authorization']:
break
else:
self.fail()
except AssertionError as e:
return {
'status_code': 401,
'content': json.dumps({
'message': repr(e)
})
}
return json.dumps({
"page": 1,
"pagelen": 10,
"size": 1,
"values": [{
'is_primary': True,
'is_confirmed': True,
'email': account['user']['email'],
'links': {},
"type": "email"
}]
})
开发者ID:0x414A,项目名称:girder,代码行数:27,代码来源:oauth_test.py
示例15: restore_subscriptions
def restore_subscriptions(self):
subs = [sub for sub in six.viewvalues(self._subscriptions)]
self._subscriptions.clear()
for sub in subs:
self.subscribe(sub.destination,
message_handler=sub.message_handler)
开发者ID:oVirt,项目名称:vdsm,代码行数:7,代码来源:stompclient.py
示例16: mockGithubApiEmail
def mockGithubApiEmail(url, request):
try:
for account in six.viewvalues(providerInfo['accounts']):
if 'token %s' % account['access_token'] == \
request.headers['Authorization']:
break
else:
self.fail()
except AssertionError as e:
return {
'status_code': 401,
'content': json.dumps({
'message': repr(e)
})
}
return json.dumps([
{
'primary': False,
'email': '[email protected]',
'verified': True
}, {
'primary': True,
'email': account['user']['email'],
'verified': True
}
])
开发者ID:anukat2015,项目名称:girder,代码行数:26,代码来源:oauth_test.py
示例17: onJobSave
def onJobSave(event):
# Patch a bug with how girder_worker's Girder task spec's 'api_url' is set
# with Vagrant port forwarding and Nginx proxies
# This is fundamentally a problem with "rest.getApiUrl"
job = event.info
if job['handler'] == 'worker_handler':
# All girder_worker jobs have 3 absolute external URLs, which need to
# patched; these are located at (excluding other job fields):
# job = {
# 'kwargs': {
# 'inputs': {
# '<input_name1>': {
# 'mode': 'girder',
# 'api_url': '<external_url>'
# }
# },
# 'outputs': {
# '<output_name1>': {
# 'mode': 'girder',
# 'api_url': '<external_url>'
# }
# },
# 'jobInfo': {
# 'url': '<external_url>'
# }
# }
# }
# We need to do this for all job statuses, since due to the way that
# Job.save is overridden, the local model may be desynchronized from
# the database after saving; this is fine, since girder_worker
# (where it matters) will always load directly from the correct entry
# in the database
def replaceHost(url):
# TODO: get this from the server config or the request
return 'http://127.0.0.1:8080' + url[url.find('/api/v1'):]
job['kwargs'] = json_util.loads(job['kwargs'])
for specValue in itertools.chain(
six.viewvalues(job['kwargs'].get('inputs', {})),
six.viewvalues(job['kwargs'].get('outputs', {}))):
if specValue['mode'] == 'girder':
specValue['api_url'] = replaceHost(specValue['api_url'])
if job['kwargs'].get('jobInfo'):
job['kwargs']['jobInfo']['url'] = replaceHost(
job['kwargs']['jobInfo']['url'])
job['kwargs'] = json_util.dumps(job['kwargs'])
开发者ID:msmolens,项目名称:isic-archive,代码行数:47,代码来源:__init__.py
示例18: required_estimates_fields
def required_estimates_fields(columns):
"""
Compute the set of resource columns required to serve
`columns`.
"""
# We also expect any of the field names that our loadable columns
# are mapped to.
return metadata_columns.union(viewvalues(columns))
开发者ID:kongscn,项目名称:zipline,代码行数:8,代码来源:earnings_estimates.py
示例19: _get_timeout
def _get_timeout(self, map):
timeout = 30.0
for disp in list(six.viewvalues(self._map)):
if hasattr(disp, "next_check_interval"):
interval = disp.next_check_interval()
if interval is not None and interval >= 0:
timeout = min(interval, timeout)
return timeout
开发者ID:nirs,项目名称:vdsm,代码行数:8,代码来源:betterAsyncore.py
示例20: __init__
def __init__(self, savePath):
self.networksPath = os.path.join(savePath, 'nets', '')
self.bondingsPath = os.path.join(savePath, 'bonds', '')
nets = self._getConfigs(self.networksPath)
for net_attrs in six.viewvalues(nets):
_filter_out_volatile_net_attrs(net_attrs)
bonds = self._getConfigs(self.bondingsPath)
super(Config, self).__init__(nets, bonds)
开发者ID:EdDev,项目名称:vdsm,代码行数:8,代码来源:netconfpersistence.py
注:本文中的six.viewvalues函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论