• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java DefaultPartitioner类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.clients.producer.internals.DefaultPartitioner的典型用法代码示例。如果您正苦于以下问题:Java DefaultPartitioner类的具体用法?Java DefaultPartitioner怎么用?Java DefaultPartitioner使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DefaultPartitioner类属于org.apache.kafka.clients.producer.internals包,在下文中一共展示了DefaultPartitioner类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testPartitionSpread

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testPartitionSpread() throws Exception {
    Multiset<Integer> results = TreeMultiset.create();
    Cluster c = Cluster.empty();
    try (Partitioner p = new DefaultPartitioner()) {
        PartitionKeyGenerator pkg = new PartitionKeyGenerator();

        mockPartitions(c);

        for (int i = 0; i < messages; i++) {
            results.add(p.partition("test", null, pkg.next(), null, null, c));
        }

        int expected = messages / partitions;
        double threshold = expected * 0.05;

        for (Multiset.Entry<Integer> e : results.entrySet()) {
            int offBy = Math.abs(e.getCount() - expected);
            assertTrue("Partition " + e.getElement() + " had " + e.getCount() + " elements, expected " + expected + ", threshold is " + threshold,
                    offBy < threshold);
        }
    }
}
 
开发者ID:opentable,项目名称:otj-logging,代码行数:24,代码来源:PartitionSpreadTest.java


示例2: testStreamPartitioner

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testStreamPartitioner() {

    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
            "RecordCollectorTest-TestStreamPartitioner");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner);

    final Map<TopicPartition, Long> offsets = collector.offsets();

    assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:RecordCollectorTest.java


示例3: visible

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Override
public boolean visible(String name, Map<String, Object> connectorConfigs) {
  String partitionerName = (String) connectorConfigs.get(PARTITIONER_CLASS_CONFIG);
  try {
    @SuppressWarnings("unchecked")
    Class<? extends Partitioner> partitioner = (Class<? extends Partitioner>) Class.forName(partitionerName);
    if (classNameEquals(partitionerName, DefaultPartitioner.class)) {
      return false;
    } else if (FieldPartitioner.class.isAssignableFrom(partitioner)) {
      // subclass of FieldPartitioner
      return name.equals(PARTITION_FIELD_NAME_CONFIG);
    } else if (TimeBasedPartitioner.class.isAssignableFrom(partitioner)) {
      // subclass of TimeBasedPartitioner
      if (classNameEquals(partitionerName, DailyPartitioner.class) || classNameEquals(partitionerName, HourlyPartitioner.class)) {
        return name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG);
      } else {
        return name.equals(PARTITION_DURATION_MS_CONFIG) || name.equals(PATH_FORMAT_CONFIG) || name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG);
      }
    } else {
      throw new ConfigException("Not a valid partitioner class: " + partitionerName);
    }
  } catch (ClassNotFoundException e) {
    throw new ConfigException("Partitioner class not found: " + partitionerName);
  }
}
 
开发者ID:qubole,项目名称:streamx,代码行数:26,代码来源:HdfsSinkConnectorConfig.java


示例4: testCopartitioning

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testCopartitioning() {

    Random rand = new Random();

    DefaultPartitioner defaultPartitioner = new DefaultPartitioner();

    WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer);
    WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer);

    for (int k = 0; k < 10; k++) {
        Integer key = rand.nextInt();
        byte[] keyBytes = intSerializer.serialize(topicName, key);

        String value = key.toString();
        byte[] valueBytes = stringSerializer.serialize(topicName, value);

        Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);

        for (int w = 1; w < 10; w++) {
            TimeWindow window = new TimeWindow(10 * w, 20 * w);

            Windowed<Integer> windowedKey = new Windowed<>(key, window);
            Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());

            assertEquals(expected, actual);
        }
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:30,代码来源:WindowedStreamPartitionerTest.java


示例5: testSpecificPartition

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testSpecificPartition() {

    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
            "RecordCollectorTest-TestSpecificPartition");

    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);

    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);

    collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);

    final Map<TopicPartition, Long> offsets = collector.offsets();

    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));

    // ignore StreamPartitioner
    collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
    collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);

    assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
    assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
    assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 2)));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:32,代码来源:RecordCollectorTest.java


示例6: shouldRetryWhenTimeoutExceptionOccursOnSend

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
    final AtomicInteger attempt = new AtomicInteger(0);
    final RecordCollectorImpl collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    if (attempt.getAndIncrement() == 0) {
                        throw new TimeoutException();
                    }
                    return super.send(record, callback);
                }
            },
            "test");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
    assertEquals(Long.valueOf(0L), offset);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:RecordCollectorTest.java


示例7: shouldThrowStreamsExceptionAfterMaxAttempts

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    throw new TimeoutException();
                }
            },
            "test");

    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);

}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:RecordCollectorTest.java


示例8: shouldThrowStreamsExceptionOnSubsequentCallIfASendFails

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:RecordCollectorTest.java


示例9: shouldThrowStreamsExceptionOnFlushIfASendFailed

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.flush();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:RecordCollectorTest.java


示例10: shouldThrowStreamsExceptionOnCloseIfASendFailed

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
    final RecordCollector collector = new RecordCollectorImpl(
            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                @Override
                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
                    callback.onCompletion(null, new Exception());
                    return null;
                }
            },
            "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
    collector.close();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:RecordCollectorTest.java


示例11: shouldThrowIfTopicIsUnknown

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowIfTopicIsUnknown() {
    final RecordCollector collector = new RecordCollectorImpl(
        new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
            @Override
            public List<PartitionInfo> partitionsFor(final String topic) {
                return Collections.EMPTY_LIST;
            }

        },
        "test");
    collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:RecordCollectorTest.java


示例12: testPartitioner

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testPartitioner() throws Exception {
    PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
    Cluster cluster = new Cluster(null, new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1),
            Collections.<String>emptySet(), Collections.<String>emptySet());
    MockProducer<String, String> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
    Future<RecordMetadata> metadata = producer.send(record);
    assertEquals("Partition should be correct", 1, metadata.get().partition());
    producer.clear();
    assertEquals("Clear should erase our history", 0, producer.history().size());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:MockProducerTest.java


示例13: testPartitioner

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testPartitioner() throws Exception {
    PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
    Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet());
    MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value");
    Future<RecordMetadata> metadata = producer.send(record);
    assertEquals("Partition should be correct", 1, metadata.get().partition());
    producer.clear();
    assertEquals("Clear should erase our history", 0, producer.history().size());
}
 
开发者ID:txazo,项目名称:kafka,代码行数:13,代码来源:MockProducerTest.java


示例14: producerConfigs

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Bean
public Map<String, Object> producerConfigs() {
    //FIXME: 12factorize
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList());
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.ACKS_CONFIG, acks);
    //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    //props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}
 
开发者ID:Eventasia,项目名称:eventasia,代码行数:16,代码来源:EventasiaKafkaConfig.java


示例15: DefaultStreamPartitioner

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) {
    this.keySerializer = keySerializer;
    this.cluster = cluster;
    this.topic = topic;
    this.defaultPartitioner = new DefaultPartitioner();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:7,代码来源:DefaultStreamPartitioner.java


示例16: MockProducer

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
/**
 * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
 *
 * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
 */
public MockProducer(final boolean autoComplete,
                    final Serializer<K> keySerializer,
                    final Serializer<V> valueSerializer) {
    this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:MockProducer.java


示例17: MockProducer

import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
/**
 * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers
 *
 * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
 */
public MockProducer(boolean autoComplete, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
}
 
开发者ID:txazo,项目名称:kafka,代码行数:9,代码来源:MockProducer.java



注:本文中的org.apache.kafka.clients.producer.internals.DefaultPartitioner类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Property类代码示例发布时间:2022-05-23
下一篇:
Java BatchGraph类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap