本文整理汇总了Python中six.next函数的典型用法代码示例。如果您正苦于以下问题:Python next函数的具体用法?Python next怎么用?Python next使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了next函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: parse_localnamespacepath
def parse_localnamespacepath(parser, event, node):
#pylint: disable=unused-argument
"""Parse LOCALNAMESPACEPATH for Namespace. Return assembled namespace
<!ELEMENT LOCALNAMESPACEPATH (NAMESPACE+)>
"""
(next_event, next_node) = six.next(parser)
namespaces = []
if not _is_start(next_event, next_node, 'NAMESPACE'):
print(next_event, next_node)
raise ParseError('Expecting NAMESPACE')
namespaces.append(parse_namespace(parser, next_event, next_node))
while 1:
(next_event, next_node) = six.next(parser)
if _is_end(next_event, next_node, 'LOCALNAMESPACEPATH'):
break
if _is_start(next_event, next_node, 'NAMESPACE'):
namespaces.append(parse_namespace(parser, next_event, next_node))
else:
raise ParseError('Expecting NAMESPACE')
return '/'.join(namespaces)
开发者ID:Napsty,项目名称:pywbem,代码行数:29,代码来源:cimxml_parse.py
示例2: test_closed_cursor_raises_when_used
def test_closed_cursor_raises_when_used(dsn, configuration):
connection = connect(dsn, **get_credentials(configuration))
cursor = connection.cursor()
cursor.close()
with pytest.raises(InterfaceError):
cursor.execute("SELECT 42")
with pytest.raises(InterfaceError):
cursor.executemany("SELECT 42")
with pytest.raises(InterfaceError):
cursor.executemanycolumns("SELECT 42", [])
with pytest.raises(InterfaceError):
cursor.fetchone()
with pytest.raises(InterfaceError):
cursor.fetchmany()
with pytest.raises(InterfaceError):
cursor.fetchall()
with pytest.raises(InterfaceError):
six.next(cursor)
开发者ID:blue-yonder,项目名称:turbodbc,代码行数:26,代码来源:test_cursor_basics.py
示例3: lldp_parse
def lldp_parse(data):
pkt = packet.Packet(data)
i = iter(pkt)
eth_pkt = six.next(i)
assert type(eth_pkt) == ethernet.ethernet
lldp_pkt = six.next(i)
if type(lldp_pkt) != lldp.lldp:
raise LLDPPacket.LLDPUnknownFormat()
tlv_chassis_id = lldp_pkt.tlvs[0]
if tlv_chassis_id.subtype != lldp.ChassisID.SUB_LOCALLY_ASSIGNED:
raise LLDPPacket.LLDPUnknownFormat(
msg='unknown chassis id subtype %d' % tlv_chassis_id.subtype)
chassis_id = tlv_chassis_id.chassis_id.decode('utf-8')
if not chassis_id.startswith(LLDPPacket.CHASSIS_ID_PREFIX):
raise LLDPPacket.LLDPUnknownFormat(
msg='unknown chassis id format %s' % chassis_id)
src_dpid = str_to_dpid(chassis_id[LLDPPacket.CHASSIS_ID_PREFIX_LEN:])
tlv_port_id = lldp_pkt.tlvs[1]
if tlv_port_id.subtype != lldp.PortID.SUB_PORT_COMPONENT:
raise LLDPPacket.LLDPUnknownFormat(
msg='unknown port id subtype %d' % tlv_port_id.subtype)
port_id = tlv_port_id.port_id
if len(port_id) != LLDPPacket.PORT_ID_SIZE:
raise LLDPPacket.LLDPUnknownFormat(
msg='unknown port id %d' % port_id)
(src_port_no, ) = struct.unpack(LLDPPacket.PORT_ID_STR, port_id)
return src_dpid, src_port_no
开发者ID:AsmaSwapna,项目名称:ryu,代码行数:31,代码来源:switches.py
示例4: has_primitive_root
def has_primitive_root(n):
if n==1 : return True # to match A033948, but why ?
try:
six.next(primitive_root_gen(n))
return True
except StopIteration:
return False
开发者ID:goulu,项目名称:Goulib,代码行数:7,代码来源:oeis.py
示例5: _collect_linear_sum
def _collect_linear_sum(exp, idMap, multiplier, coef, varmap, compute_values):
coef[None] += multiplier * exp._const # None is the constant term in the coefficient map.
arg_coef_iterator = exp._coef.__iter__()
for arg in exp._args:
# an arg can be anything - a product, a variable, whatever.
# Special case... <sigh>
if ((arg.__class__ is _GeneralVarData) or isinstance(arg, _VarData)) and (not arg.fixed):
# save an expensive recursion - this is by far the most common case.
id_ = id(arg)
if id_ in idMap[None]:
key = idMap[None][id_]
else:
key = len(idMap) - 1
idMap[None][id_] = key
idMap[key] = arg
#
varmap[key]=arg
if key in coef:
coef[key] += multiplier * six.next(arg_coef_iterator)
else:
coef[key] = multiplier * six.next(arg_coef_iterator)
else:
_linear_collectors[arg.__class__](arg, idMap, multiplier * six.next(arg_coef_iterator), coef, varmap, compute_values)
开发者ID:SemanticBeeng,项目名称:pyomo,代码行数:26,代码来源:canonical_repn.py
示例6: build_from_obj
def build_from_obj(obj):
if isinstance(obj, list):
return [build_from_obj(item) for item in obj]
if not isinstance(obj, dict):
return obj
_class = get_node_class(next(iterkeys(obj)))
return _class(next(itervalues(obj))) if _class else obj
开发者ID:robin900,项目名称:psqlparse,代码行数:7,代码来源:utils.py
示例7: test_load_directory_caching_with_files_updated
def test_load_directory_caching_with_files_updated(self):
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
self.enforcer.load_rules(False)
self.assertIsNotNone(self.enforcer.rules)
old = six.next(six.itervalues(
self.enforcer._policy_dir_mtimes))
self.assertEqual(1, len(self.enforcer._policy_dir_mtimes))
# Touch the file
conf_path = os.path.join(self.config_dir, 'policy.d/a.conf')
stinfo = os.stat(conf_path)
os.utime(conf_path, (stinfo.st_atime + 10, stinfo.st_mtime + 10))
self.enforcer.load_rules(False)
self.assertEqual(1, len(self.enforcer._policy_dir_mtimes))
self.assertEqual(old, six.next(six.itervalues(
self.enforcer._policy_dir_mtimes)))
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
'policy.d/a.conf',
'policy.d/a.conf',
])
开发者ID:bdrich,项目名称:neutron-lbaas,代码行数:27,代码来源:test_policy.py
示例8: test_region_mapping
def test_region_mapping(self, service_credentials_conf,
resource_plugin_conf, mock_bulk):
mock_engine = mock.Mock()
plugin = fake_plugins.FakeSimplePlugin(es_engine=mock_engine)
resource_plugin_conf.include_region_name = True
service_credentials_conf.os_region_name = 'test-region'
indexing_helper = helper.IndexingHelper(plugin)
_, mapping = six.next(plugin.get_full_mapping())
self.assertIn('region_name', mapping['properties'])
count = len(plugin.get_objects())
fake_versions = range(1, count + 1)
indexing_helper.save_documents(plugin.get_objects(),
fake_versions)
self.assertEqual(1, len(mock_bulk.call_args_list))
actions = list(mock_bulk.call_args_list[0][1]['actions'])
self.assertEqual(['test-region'],
actions[0]['_source']['region_name'])
# Test without a region
resource_plugin_conf.include_region_name = False
mock_bulk.reset_mock()
_, mapping = six.next(plugin.get_full_mapping())
self.assertNotIn('region_name', mapping['properties'])
indexing_helper.save_documents(plugin.get_objects(),
fake_versions)
actions = list(mock_bulk.call_args_list[0][1]['actions'])
self.assertNotIn('region_name', actions[0]['_source'])
开发者ID:openstack,项目名称:searchlight,代码行数:33,代码来源:test_indexing_helper.py
示例9: read_header
def read_header(ofile):
"""Read the header of the iterable ofile."""
i = next(ofile)
# Pass first comments
while r_comment.match(i):
i = next(ofile)
# Header is everything up to DATA attribute ?
relation = None
attributes = []
while not r_datameta.match(i):
m = r_headerline.match(i)
if m:
isattr = r_attribute.match(i)
if isattr:
name, type, i = tokenize_attribute(ofile, i)
attributes.append((name, type))
else:
isrel = r_relation.match(i)
if isrel:
relation = isrel.group(1)
else:
raise ValueError("Error parsing line %s" % i)
i = next(ofile)
else:
i = next(ofile)
return relation, attributes
开发者ID:jsren,项目名称:sdp-vision-env,代码行数:29,代码来源:arffread.py
示例10: __init__
def __init__(self, filename, sample=None):
"""An iterator over the VCF file format
This reads VCF files and has been tested on VCF 4.0.
The returned items are VCFEntry
"""
super(VCFIterator, self).__init__(filename)
#process our meta information
row = six.next(self.filename).strip()
self.vcf_file = structure.VCFFile(filename)
self.inum=0
while row[:2] == '##':
if row.startswith('##INFO='):
assert(self.vcf_file.add_info(row)==True)
elif row.startswith('##FILTER='):
assert(self.vcf_file.add_filter(row))
elif row.startswith('##FORMAT='):
assert(self.vcf_file.add_format(row))
elif row.startswith('##CONTIG='):
assert(self.vcf_file.add_contig(row))
elif row.startswith('##ALT='):
assert(self.vcf_file.add_alt(row))
elif row.startswith('##'):
assert(self.vcf_file.add_extra(row))
row = six.next(self.filename).strip()
#got to the end of meta information, we now have the header
assert(self.vcf_file.add_header(row))
self.sample = sample
开发者ID:pandeylab,项目名称:pythomics,代码行数:29,代码来源:parsers.py
示例11: _next
def _next(self):
row = six.next(self.filename)
while not row:#skip blanks
row = six.next(self.filename)
ob = structure.BedObject()
ob.parse(row)
return ob
开发者ID:pandeylab,项目名称:pythomics,代码行数:7,代码来源:parsers.py
示例12: does_event
def does_event(self,event):
if self._simple:
return self.args == event
ie = iter(event)
ia = iter(self.args)
ctx = {}
pos = 0
while True:
try: e = six.next(ie)
except StopIteration: e = StopIteration
try: a = six.next(ia)
except StopIteration: a = StopIteration
if e is StopIteration and a is StopIteration:
return True
if e is StopIteration or a is StopIteration:
return False
if hasattr(a,"startswith") and a.startswith('*'):
if a == '*':
pos += 1
a = str(pos)
else:
a = a[1:]
ctx[a] = e
elif str(a) != str(e):
return False
开发者ID:M-o-a-T,项目名称:moat,代码行数:25,代码来源:event_hook.py
示例13: _partition_patches
def _partition_patches(patches, regex):
if regex is None:
return [patches]
def take(_patch):
return not bool(regex.search(_patch[1]))
def _stacker(buckets):
while True:
item, new_bucket = yield
if new_bucket:
buckets.append([item])
else:
buckets[-1].append(item)
def _filter(check, stacker):
start_bucket = True
while True:
item = yield
if check(item):
stacker.send((item, start_bucket))
start_bucket = False
else:
start_bucket = True
buckets = []
stacker = _stacker(buckets)
six.next(stacker)
filter = _filter(take, stacker)
six.next(filter)
for patch in patches:
filter.send(patch)
return buckets
开发者ID:openstack-packages,项目名称:rdopkg,代码行数:34,代码来源:actions.py
示例14: parse_instancepath
def parse_instancepath(parser, event, node):
#pylint: disable=unused-argument
"""Parse the CIM/XML INSTANCEPATH element and return an
instancname
<!ELEMENT INSTANCEPATH (NAMESPACEPATH, INSTANCENAME)>
"""
(next_event, next_node) = six.next(parser)
if not _is_start(next_event, next_node, 'NAMESPACEPATH'):
raise ParseError('Expecting NAMESPACEPATH')
host, namespacepath = parse_namespacepath(parser, next_event, next_node)
(next_event, next_node) = six.next(parser)
if not _is_start(next_event, next_node, 'INSTANCENAME'):
print(next_event, next_node)
raise ParseError('Expecting INSTANCENAME')
instancename = parse_instancename(parser, next_event, next_node)
instancename.host = host
instancename.namespace = namespacepath
return instancename
开发者ID:Napsty,项目名称:pywbem,代码行数:27,代码来源:cimxml_parse.py
示例15: test_iterator_calls_parent_item_to_value
def test_iterator_calls_parent_item_to_value(self):
parent = mock.sentinel.parent
item_to_value = mock.Mock(
side_effect=lambda iterator, value: value, spec=["__call__"]
)
page = page_iterator.Page(parent, (10, 11, 12), item_to_value)
page._remaining = 100
assert item_to_value.call_count == 0
assert page.remaining == 100
assert six.next(page) == 10
assert item_to_value.call_count == 1
item_to_value.assert_called_with(parent, 10)
assert page.remaining == 99
assert six.next(page) == 11
assert item_to_value.call_count == 2
item_to_value.assert_called_with(parent, 11)
assert page.remaining == 98
assert six.next(page) == 12
assert item_to_value.call_count == 3
item_to_value.assert_called_with(parent, 12)
assert page.remaining == 97
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:27,代码来源:test_page_iterator.py
示例16: generator
def generator(row_iter, delim=","):
# TODO: this is where we are spending times (~80%). I think things
# could be made more efficiently:
# - We could for example "compile" the function, because some values
# do not change here.
# - The function to convert a line to dtyped values could also be
# generated on the fly from a string and be executed instead of
# looping.
# - The regex are overkill: for comments, checking that a line starts
# by % should be enough and faster, and for empty lines, same thing
# --> this does not seem to change anything.
# We do not abstract skipping comments and empty lines for performances
# reason.
raw = next(row_iter)
while r_empty.match(raw):
raw = next(row_iter)
while r_comment.match(raw):
raw = next(row_iter)
# 'compiling' the range since it does not change
# Note, I have already tried zipping the converters and
# row elements and got slightly worse performance.
elems = list(range(ni))
row = raw.split(delim)
yield tuple([convertors[i](row[i]) for i in elems])
for raw in row_iter:
while r_comment.match(raw):
raw = next(row_iter)
while r_empty.match(raw):
raw = next(row_iter)
row = raw.split(delim)
yield tuple([convertors[i](row[i]) for i in elems])
开发者ID:jsren,项目名称:sdp-vision-env,代码行数:34,代码来源:arffread.py
示例17: take_nth
def take_nth(rank, size, seq):
"""
Iterate returning every nth value.
Return an iterator over the sequence that returns every
nth element of seq based on the given rank within a group of
the given size. For example, if size = 2, a rank of 0 returns
even indexed elements and a rank of 1 returns odd indexed elements.
Parameters
----------
rank : int
MPI rank of this process.
size : int
Size of the array we're taking nth entries from.
seq : iter
Iterator containing the values being returned.
"""
assert(rank < size)
it = iter(seq)
while True:
for proc in range(size):
if rank == proc:
yield six.next(it)
else:
six.next(it)
开发者ID:samtx,项目名称:OpenMDAO,代码行数:26,代码来源:array_utils.py
示例18: test_page_non_empty_response
def test_page_non_empty_response(self):
import six
from google.cloud.storage.blob import Blob
blob_name = 'blob-name'
response = {'items': [{'name': blob_name}], 'prefixes': ['foo']}
connection = _Connection()
client = _Client(connection)
name = 'name'
bucket = self._make_one(client=client, name=name)
def dummy_response():
return response
iterator = bucket.list_blobs()
iterator._get_next_page_response = dummy_response
page = six.next(iterator.pages)
self.assertEqual(page.prefixes, ('foo',))
self.assertEqual(page.num_items, 1)
blob = six.next(page)
self.assertEqual(page.remaining, 0)
self.assertIsInstance(blob, Blob)
self.assertEqual(blob.name, blob_name)
self.assertEqual(iterator.prefixes, set(['foo']))
开发者ID:Fkawala,项目名称:gcloud-python,代码行数:25,代码来源:test_bucket.py
示例19: parse_title_page
def parse_title_page(lines):
"""Parse the title page.
Spec: http://fountain.io/syntax#section-titlepage
Returns None if the document does not have a title page section,
otherwise a dictionary with the data.
"""
result = {}
it = iter(lines)
try:
line = next(it)
while True:
key_match = title_page_key_re.match(line)
if not key_match:
return None
key, value = key_match.groups()
if value:
# Single line key/value
result.setdefault(key, []).append(value)
line = next(it)
else:
for line in it:
value_match = title_page_value_re.match(line)
if not value_match:
break
result.setdefault(key, []).append(value_match.group(1))
else:
# Last line has been processed
break
except StopIteration:
pass
return result
开发者ID:vilcans,项目名称:screenplain,代码行数:33,代码来源:fountain.py
示例20: _Net_forward_all
def _Net_forward_all(self, blobs=None, **kwargs):
"""
Run net forward in batches.
Take
blobs: list of blobs to extract as in forward()
kwargs: Keys are input blob names and values are blob ndarrays.
Refer to forward().
Give
all_outs: {blob name: list of blobs} dict.
"""
# Collect outputs from batches
all_outs = {out: [] for out in set(self.outputs + (blobs or []))}
for batch in self._batch(kwargs):
outs = self.forward(blobs=blobs, **batch)
for out, out_blob in six.iteritems(outs):
all_outs[out].extend(out_blob.copy())
# Package in ndarray.
for out in all_outs:
all_outs[out] = np.asarray(all_outs[out])
# Discard padding.
pad = len(six.next(six.itervalues(all_outs))) - len(six.next(six.itervalues(kwargs)))
if pad:
for out in all_outs:
all_outs[out] = all_outs[out][:-pad]
return all_outs
开发者ID:mickaelmaillard,项目名称:caffe,代码行数:27,代码来源:pycaffe.py
注:本文中的six.next函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论