本文整理汇总了Python中pykafka.utils.compat.range函数的典型用法代码示例。如果您正苦于以下问题:Python range函数的具体用法?Python range怎么用?Python range使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了range函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _test_decide_partitions
def _test_decide_partitions(self, membership_protocol):
for i in range(100):
num_participants = i + 1
num_partitions = 100 - i
participants = sorted(['test-debian:{p}'.format(p=p)
for p in range(num_participants)])
cns, topic = self.buildMockConsumer(num_partitions=num_partitions,
num_participants=num_participants)
cns._membership_protocol = membership_protocol
assigned_parts = []
for consumer_id in participants:
partitions = cns._membership_protocol.decide_partitions(
participants, topic.partitions, consumer_id)
assigned_parts.extend(partitions)
remainder_ppc = num_partitions % num_participants
idx = participants.index(consumer_id)
parts_per_consumer = math.floor(num_partitions / num_participants)
num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1)
self.assertEqual(len(partitions), int(num_parts))
# Validate all partitions were assigned once and only once
all_partitions = sorted(topic.partitions.values(), key=lambda x: x.id)
assigned_parts = sorted(assigned_parts, key=lambda x: x.id)
self.assertListEqual(assigned_parts, all_partitions)
开发者ID:Parsely,项目名称:pykafka,代码行数:26,代码来源:test_balancedconsumer.py
示例2: test_decide_partitions
def test_decide_partitions(self):
"""Test partition assignment for a number of partitions/consumers."""
# 100 test iterations
for i in range(100):
# Set up partitions, cluster, etc
num_participants = i + 1
num_partitions = 100 - i
participants = sorted(['test-debian:{p}'.format(p=p)
for p in range(num_participants)])
cns, topic = self.buildMockConsumer(num_partitions=num_partitions,
num_participants=num_participants)
# Simulate each participant to ensure they're correct
assigned_parts = []
for p_id in range(num_participants):
cns._consumer_id = participants[p_id] # override consumer id
# Decide partitions then validate
partitions = cns._decide_partitions(participants)
assigned_parts.extend(partitions)
remainder_ppc = num_partitions % num_participants
idx = participants.index(cns._consumer_id)
parts_per_consumer = num_partitions / num_participants
parts_per_consumer = math.floor(parts_per_consumer)
num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1)
self.assertEqual(len(partitions), int(num_parts))
# Validate all partitions were assigned once and only once
all_partitions = topic.partitions.values()
all_partitions = sorted(all_partitions, key=lambda x: x.id)
assigned_parts = sorted(assigned_parts, key=lambda x: x.id)
self.assertListEqual(assigned_parts, all_partitions)
开发者ID:Justontheway,项目名称:pykafka,代码行数:35,代码来源:test_balancedconsumer.py
示例3: test_consume_earliest
def test_consume_earliest(self):
try:
consumer_a = self.client.topics[self.topic_name].get_balanced_consumer(
b'test_consume_earliest', zookeeper_connect=self.kafka.zookeeper,
auto_offset_reset=OffsetType.EARLIEST
)
consumer_b = self.client.topics[self.topic_name].get_balanced_consumer(
b'test_consume_earliest', zookeeper_connect=self.kafka.zookeeper,
auto_offset_reset=OffsetType.EARLIEST
)
# Consume from both a few times
messages = [consumer_a.consume() for i in range(1)]
self.assertTrue(len(messages) == 1)
messages = [consumer_b.consume() for i in range(1)]
self.assertTrue(len(messages) == 1)
# Validate they aren't sharing partitions
self.assertSetEqual(
consumer_a._partitions & consumer_b._partitions,
set()
)
# Validate all partitions are here
self.assertSetEqual(
consumer_a._partitions | consumer_b._partitions,
set(self.client.topics[self.topic_name].partitions.values())
)
finally:
try:
consumer_a.stop()
consumer_b.stop()
except:
pass
开发者ID:brianbruggeman,项目名称:pykafka,代码行数:34,代码来源:test_balancedconsumer.py
示例4: test_extra_consumer
def test_extra_consumer(self):
"""Ensure proper operation of "extra" consumers in a group
An "extra" consumer is the N+1th member of a consumer group consuming a topic
of N partitions, and any consumer beyond the N+1th.
"""
group = b"test_extra_consumer"
extras = 1
def verify_extras(consumers, extras_count):
messages = [c.consume() for c in consumers]
successes = [a for a in messages if a is not None]
nones = [a for a in messages if a is None]
attempts = 0
while len(nones) != extras_count and attempts < 5:
messages = [c.consume() for c in consumers]
successes = [a for a in messages if a is not None]
nones = [a for a in messages if a is None]
attempts += 1
self.assertEqual(len(nones), extras_count)
self.assertEqual(len(successes), self.n_partitions)
try:
consumers = [self.get_balanced_consumer(group, consumer_timeout_ms=5000)
for i in range(self.n_partitions + extras)]
verify_extras(consumers, extras)
# when one consumer stops, the extra should pick up its partitions
removed = consumers[:extras]
for consumer in removed:
consumer.stop()
consumers = [a for a in consumers if a not in removed]
self.wait_for_rebalancing(*consumers)
self.assertEqual(len(consumers), self.n_partitions)
verify_extras(consumers, 0)
# added "extra" consumers should idle
for i in range(extras):
consumers.append(self.get_balanced_consumer(group,
consumer_timeout_ms=5000))
self.wait_for_rebalancing(*consumers)
verify_extras(consumers, extras)
finally:
for consumer in consumers:
try:
consumer.stop()
except:
pass
开发者ID:Parsely,项目名称:pykafka,代码行数:48,代码来源:test_balancedconsumer.py
示例5: _start_process
def _start_process(self):
"""Start the instance processes"""
self._init_dirs()
self._download_kafka()
# Start all relevant processes and save which ports they use
zk_port = self._start_zookeeper()
self.zookeeper = 'localhost:{port}'.format(port=zk_port)
broker_ports, broker_ssl_ports = self._start_brokers()
self.brokers = ','.join('localhost:{port}'.format(port=port)
for port in broker_ports)
self.brokers_ssl = ','.join('localhost:{port}'.format(port=port)
for port in broker_ssl_ports)
# Process is started when the port isn't free anymore
all_ports = [zk_port] + broker_ports
for i in range(10):
if all(not self._is_port_free(port) for port in all_ports):
log.info('Kafka cluster started.')
return # hooray! success
log.info('Waiting for cluster to start....')
time.sleep(6) # Waits 60s total
# If it got this far, it's an error
raise ProcessNotStartingError('Unable to start Kafka cluster.')
开发者ID:BlackRider97,项目名称:pykafka,代码行数:26,代码来源:kafka_instance.py
示例6: _latest_partition_offsets_by_reading
def _latest_partition_offsets_by_reading(consumer, n_reads):
"""Obtain message offsets from consumer, return grouped by partition"""
latest_offs = {}
for _ in range(n_reads):
msg = consumer.consume()
latest_offs[msg.partition_id] = msg.offset
return latest_offs
开发者ID:BlackRider97,项目名称:pykafka,代码行数:7,代码来源:test_simple_consumer.py
示例7: _start_brokers
def _start_brokers(self):
"""Start all brokers and return used ports."""
self._broker_procs = []
ports = self._port_generator(9092)
used_ports = []
for i in range(self._num_instances):
port = next(ports)
used_ports.append(port)
log.info('Starting Kafka on port %i.', port)
conf = os.path.join(self._conf_dir,
'kafka_{instance}.properties'.format(instance=i))
with open(conf, 'w') as f:
f.write(_kafka_properties.format(
broker_id=i,
port=port,
zk_connstr=self.zookeeper,
data_dir=self._data_dir + '_{instance}'.format(instance=i),
))
binfile = os.path.join(self._bin_dir, 'bin/kafka-server-start.sh')
logfile = os.path.join(self._log_dir, 'kafka_{instance}.log'.format(instance=i))
self._broker_procs.append(utils.Popen(
args=[binfile, conf],
stderr=utils.STDOUT,
stdout=open(logfile, 'w'),
use_gevent=self.use_gevent
))
return used_ports
开发者ID:Emor93,项目名称:pykafka,代码行数:29,代码来源:kafka_instance.py
示例8: setUpClass
def setUpClass(cls):
cls.kafka = get_cluster()
cls.topic_name = uuid4().hex.encode()
cls.kafka.create_topic(cls.topic_name, 3, 2)
# It turns out that the underlying producer used by KafkaInstance will
# write all messages in a batch to a single partition, though not the
# same partition every time. We try to attain some spread here by
# sending more than one batch:
batch = 300
cls.total_msgs = 3 * batch
for _ in range(3):
cls.kafka.produce_messages(
cls.topic_name,
('msg {i}'.format(i=i) for i in range(batch)))
cls.client = KafkaClient(cls.kafka.brokers)
开发者ID:raghavtan,项目名称:pykafka,代码行数:17,代码来源:test_simpleconsumer.py
示例9: test_consume_latest
def test_consume_latest(self):
try:
topic = self.client.topics[self.topic_name]
consumer_a = topic.get_balanced_consumer(
b'test_consume_latest',
zookeeper_connect=self.kafka.zookeeper,
auto_offset_reset=OffsetType.LATEST,
use_rdkafka=self.USE_RDKAFKA)
consumer_b = topic.get_balanced_consumer(
b'test_consume_latest',
zookeeper_connect=self.kafka.zookeeper,
auto_offset_reset=OffsetType.LATEST,
use_rdkafka=self.USE_RDKAFKA)
# Make sure we're done before producing more messages:
self.wait_for_rebalancing(consumer_a, consumer_b)
# Since we are consuming from the latest offset,
# produce more messages to consume.
for i in range(10):
self.prod.produce('msg {num}'.format(num=i).encode())
# Consume from both a few times
messages = [consumer_a.consume() for i in range(1)]
self.assertTrue(len(messages) == 1)
messages = [consumer_b.consume() for i in range(1)]
self.assertTrue(len(messages) == 1)
# Validate they aren't sharing partitions
self.assertSetEqual(
consumer_a._partitions & consumer_b._partitions,
set()
)
# Validate all partitions are here
self.assertSetEqual(
consumer_a._partitions | consumer_b._partitions,
set(self.client.topics[self.topic_name].partitions.values())
)
finally:
try:
consumer_a.stop()
consumer_b.stop()
except:
pass
开发者ID:Justxu,项目名称:pykafka,代码行数:45,代码来源:test_balancedconsumer.py
示例10: test_offset_commit
def test_offset_commit(self):
"""Check fetched offsets match pre-commit internal state"""
with self._get_simple_consumer(
consumer_group=b'test_offset_commit') as consumer:
[consumer.consume() for _ in range(100)]
offsets_committed = consumer.held_offsets
consumer.commit_offsets()
offsets_fetched = self._convert_offsets(consumer.fetch_offsets())
self.assertEquals(offsets_fetched, offsets_committed)
开发者ID:kmgv,项目名称:pykafka,代码行数:10,代码来源:test_simpleconsumer.py
示例11: setUpClass
def setUpClass(cls):
cls.kafka = get_cluster()
cls.topic_name = b'test-data'
cls.kafka.create_topic(cls.topic_name, 3, 2)
cls.client = KafkaClient(cls.kafka.brokers)
cls.prod = cls.client.topics[cls.topic_name].get_producer(
min_queued_messages=1
)
for i in range(1000):
cls.prod.produce('msg {num}'.format(num=i).encode())
开发者ID:brianbruggeman,项目名称:pykafka,代码行数:10,代码来源:test_balancedconsumer.py
示例12: test_offset_resume
def test_offset_resume(self):
"""Check resumed internal state matches committed offsets"""
with self._get_simple_consumer(
consumer_group=b'test_offset_resume') as consumer:
[consumer.consume() for _ in range(100)]
offsets_committed = consumer.held_offsets
consumer.commit_offsets()
with self._get_simple_consumer(
consumer_group=b'test_offset_resume') as consumer:
self.assertEquals(consumer.held_offsets, offsets_committed)
开发者ID:raghavtan,项目名称:pykafka,代码行数:11,代码来源:test_simpleconsumer.py
示例13: setUpClass
def setUpClass(cls):
cls.kafka = get_cluster()
cls.topic_name = uuid4().hex.encode()
cls.n_partitions = 3
cls.kafka.create_topic(cls.topic_name, cls.n_partitions, 2)
cls.client = KafkaClient(cls.kafka.brokers, use_greenlets=cls.USE_GEVENT)
cls.prod = cls.client.topics[cls.topic_name].get_producer(
min_queued_messages=1
)
for i in range(1000):
cls.prod.produce('msg {num}'.format(num=i).encode())
开发者ID:Justontheway,项目名称:pykafka,代码行数:11,代码来源:test_balancedconsumer.py
示例14: test_offset_commit_override
def test_offset_commit_override(self):
"""Check fetched offsets match committed offsets"""
with self._get_simple_consumer(
consumer_group=b'test_offset_commit') as consumer:
[consumer.consume() for _ in range(100)]
offset = 69
offsets_committed = [(p, offset) for p in consumer.partitions.values()]
consumer.commit_offsets(partition_offsets=offsets_committed)
offsets_fetched = self._convert_offsets(consumer.fetch_offsets())
offsets_committed = {p.id: offset - 1 for p in consumer.partitions.values()}
self.assertEquals(offsets_fetched, offsets_committed)
开发者ID:Parsely,项目名称:pykafka,代码行数:12,代码来源:test_simpleconsumer.py
示例15: setUpClass
def setUpClass(cls):
cls.kafka = get_cluster()
cls.topic_name = uuid4().hex.encode()
cls.kafka.create_topic(cls.topic_name, 3, 2)
cls.total_msgs = 1000
cls.client = KafkaClient(cls.kafka.brokers, broker_version=kafka_version)
cls.prod = cls.client.topics[cls.topic_name].get_producer(
min_queued_messages=1
)
for i in range(cls.total_msgs):
cls.prod.produce('msg {i}'.format(i=i).encode())
开发者ID:kmgv,项目名称:pykafka,代码行数:12,代码来源:test_simpleconsumer.py
示例16: _start_brokers
def _start_brokers(self):
"""Start all brokers and return used ports."""
ports = self._port_generator(9092)
used_ports = []
used_ssl_ports = []
ssl_port = None
for i in range(self._num_instances):
port = next(ports)
used_ports.append(port)
log.info('Starting Kafka on port %i.', port)
if self.certs is not None:
ssl_port = next(ports)
used_ssl_ports.append(ssl_port) # to return at end
self._start_broker_proc(port, ssl_port)
return used_ports, used_ssl_ports
开发者ID:Parsely,项目名称:pykafka,代码行数:17,代码来源:kafka_instance.py
示例17: wait_for_rebalancing
def wait_for_rebalancing(self, *balanced_consumers):
"""Test helper that loops while rebalancing is ongoing
Needs to be given all consumer instances active in a consumer group.
Waits for up to 100 seconds, which should be enough for even a very
oversubscribed test cluster.
"""
for _ in range(500):
n_parts = [len(cons.partitions) for cons in balanced_consumers]
if (max(n_parts) - min(n_parts) <= 1
and sum(n_parts) == self.n_partitions):
break
else:
balanced_consumers[0]._cluster.handler.sleep(.2)
# check for failed consumers (there'd be no point waiting anymore)
[cons._raise_worker_exceptions() for cons in balanced_consumers]
else:
raise AssertionError("Rebalancing failed")
开发者ID:Justontheway,项目名称:pykafka,代码行数:18,代码来源:test_balancedconsumer.py
示例18: buildMockConsumer
def buildMockConsumer(self, num_partitions=10, num_participants=1, timeout=2000):
consumer_group = b'testgroup'
topic = mock.Mock()
topic.name = 'testtopic'
topic.partitions = {}
for k in range(num_partitions):
part = mock.Mock(name='part-{part}'.format(part=k))
part.id = k
part.topic = topic
part.leader = mock.Mock()
part.leader.id = k % num_participants
topic.partitions[k] = part
cluster = mock.MagicMock()
zk = mock.MagicMock()
return BalancedConsumer(topic, cluster, consumer_group,
zookeeper=zk, auto_start=False, use_rdkafka=False,
consumer_timeout_ms=timeout), topic
开发者ID:Justontheway,项目名称:pykafka,代码行数:18,代码来源:test_balancedconsumer.py
示例19: test_consume
def test_consume(self):
with self._get_simple_consumer() as consumer:
messages = [consumer.consume() for _ in range(self.total_msgs)]
self.assertEquals(len(messages), self.total_msgs)
self.assertTrue(None not in messages)
开发者ID:raghavtan,项目名称:pykafka,代码行数:5,代码来源:test_simpleconsumer.py
注:本文中的pykafka.utils.compat.range函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论