• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java TestUtils类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.test.TestUtils的典型用法代码示例。如果您正苦于以下问题:Java TestUtils类的具体用法?Java TestUtils怎么用?Java TestUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



TestUtils类属于org.apache.kafka.test包,在下文中一共展示了TestUtils类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: produceMessages

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceMessages(final long timestamp)
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        timestamp);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:KStreamAggregationIntegrationTest.java


示例2: commitInvalidOffsets

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void commitInvalidOffsets() {
    final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig(
        CLUSTER.bootstrapServers(),
        streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
        StringDeserializer.class,
        StringDeserializer.class));

    final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>();
    invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null));

    consumer.commitSync(invalidOffsets);

    consumer.close();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KStreamsFineGrainedAutoResetIntegrationTest.java


示例3: waitUntilMinKeyValueRecordsReceived

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
/**
 * Wait until enough data (key-value records) has been consumed.
 *
 * @param consumerConfig     Kafka Consumer configuration
 * @param topic              Topic to consume from
 * @param expectedNumRecords Minimum number of expected records
 * @param waitTime           Upper bound in waiting time in milliseconds
 * @return All the records consumed, or null if no records are consumed
 * @throws InterruptedException
 * @throws AssertionError       if the given wait time elapses
 */
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                              final String topic,
                                                                              final int expectedNumRecords,
                                                                              final long waitTime) throws InterruptedException {
    final List<KeyValue<K, V>> accumData = new ArrayList<>();
    try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
        final TestCondition valuesRead = new TestCondition() {
            @Override
            public boolean conditionMet() {
                final List<KeyValue<K, V>> readData =
                    readKeyValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            }
        };
        final String conditionDetails =
            "Expecting " + expectedNumRecords + " records from topic " + topic +
                " while only received " + accumData.size() + ": " + accumData;
        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
    }
    return accumData;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:34,代码来源:IntegrationTestUtils.java


示例4: before

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Before
public void before() throws InterruptedException {
    testNo++;
    String applicationId = "kstream-repartition-join-test-" + testNo;
    builder = new KStreamBuilder();
    createTopics();
    streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
    streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);

    streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
    streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
    streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);

    keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KStreamRepartitionJoinTest.java


示例5: produceStreamTwoInputTo

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceStreamTwoInputTo(final String streamTwoInput)
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        streamTwoInput,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        mockTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:KStreamRepartitionJoinTest.java


示例6: produceToStreamOne

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceToStreamOne()
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(10L, 1),
            new KeyValue<>(5L, 2),
            new KeyValue<>(12L, 3),
            new KeyValue<>(15L, 4),
            new KeyValue<>(20L, 5),
            new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            LongSerializer.class,
            IntegerSerializer.class,
            new Properties()),
        mockTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KStreamRepartitionJoinTest.java


示例7: before

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Before
public void before() throws InterruptedException {
    testNo++;
    userClicksTopic = "user-clicks-" + testNo;
    userRegionsTopic = "user-regions-" + testNo;
    userRegionsStoreName = "user-regions-store-name-" + testNo;
    outputTopic = "output-topic-" + testNo;
    CLUSTER.createTopics(userClicksTopic, userRegionsTopic, outputTopic);
    streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
        TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);


}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KStreamKTableJoinIntegrationTest.java


示例8: produceTopicValues

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            topic,
            Arrays.asList(
                    new KeyValue<>("a", 1L),
                    new KeyValue<>("b", 2L),
                    new KeyValue<>("c", 3L),
                    new KeyValue<>("d", 4L),
                    new KeyValue<>("e", 5L)),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    StringSerializer.class,
                    LongSerializer.class,
                    new Properties()),
            mockTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:GlobalKTableIntegrationTest.java


示例9: produceGlobalTableValues

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            globalOne,
            Arrays.asList(
                    new KeyValue<>(1L, "F"),
                    new KeyValue<>(2L, "G"),
                    new KeyValue<>(3L, "H"),
                    new KeyValue<>(4L, "I"),
                    new KeyValue<>(5L, "J")),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    LongSerializer.class,
                    StringSerializer.class,
                    new Properties()),
            mockTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:GlobalKTableIntegrationTest.java


示例10: prepareTest

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private Properties prepareTest() throws Exception {
    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
    streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
    streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);

    IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

    return streamsConfiguration;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:ResetIntegrationTest.java


示例11: prepareInputData

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void prepareInputData() throws Exception {
    CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);

    final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);

    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:ResetIntegrationTest.java


示例12: readResult

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
                                              final String groupId) throws Exception {
    if (groupId != null) {
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                groupId,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            numberOfRecords
        );
    }

    // read uncommitted
    return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
        TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
        SINGLE_PARTITION_OUTPUT_TOPIC,
        numberOfRecords
    );
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:EosIntegrationTest.java


示例13: testRead

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
/**
 * Test a simple append and read.
 */
@Test
public void testRead() throws IOException {
    FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
    TestUtils.checkEquals(fileRecords.batches(), read.batches());

    List<RecordBatch> items = batches(read);
    RecordBatch second = items.get(1);

    read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
    assertEquals("Try a read starting from the second message",
            items.subList(1, 3), batches(read));

    read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
    assertEquals("Try a read of a single message starting from the second message",
            Collections.singletonList(second), batches(read));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:FileRecordsTest.java


示例14: doAggregateSessionWindows

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) throws Exception {
    driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
    driver.setTime(10);
    driver.process(TOPIC, "1", "1");
    driver.setTime(15);
    driver.process(TOPIC, "2", "2");
    driver.setTime(30);
    driver.process(TOPIC, "1", "1");
    driver.setTime(70);
    driver.process(TOPIC, "1", "1");
    driver.setTime(90);
    driver.process(TOPIC, "1", "1");
    driver.setTime(100);
    driver.process(TOPIC, "1", "1");
    driver.flushState();
    assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
    assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
    assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java


示例15: doCountSessionWindows

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void doCountSessionWindows(final Map<Windowed<String>, Long> results) throws Exception {
    driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
    driver.setTime(10);
    driver.process(TOPIC, "1", "1");
    driver.setTime(15);
    driver.process(TOPIC, "2", "2");
    driver.setTime(30);
    driver.process(TOPIC, "1", "1");
    driver.setTime(70);
    driver.process(TOPIC, "1", "1");
    driver.setTime(90);
    driver.process(TOPIC, "1", "1");
    driver.setTime(100);
    driver.process(TOPIC, "1", "1");
    driver.flushState();
    assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
    assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
    assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java


示例16: doReduceSessionWindows

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void doReduceSessionWindows(final Map<Windowed<String>, String> results) throws Exception {
    driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
    driver.setTime(10);
    driver.process(TOPIC, "1", "A");
    driver.setTime(15);
    driver.process(TOPIC, "2", "Z");
    driver.setTime(30);
    driver.process(TOPIC, "1", "B");
    driver.setTime(70);
    driver.process(TOPIC, "1", "A");
    driver.setTime(90);
    driver.process(TOPIC, "1", "B");
    driver.setTime(100);
    driver.process(TOPIC, "1", "C");
    driver.flushState();
    assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
    assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
    assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java


示例17: doCountWindowed

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) throws Exception {
    driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), 0);
    driver.setTime(0);
    driver.process(TOPIC, "1", "A");
    driver.process(TOPIC, "2", "B");
    driver.process(TOPIC, "3", "C");
    driver.setTime(500);
    driver.process(TOPIC, "1", "A");
    driver.process(TOPIC, "1", "A");
    driver.process(TOPIC, "2", "B");
    driver.process(TOPIC, "2", "B");
    assertThat(results, equalTo(Arrays.asList(
            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
            KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
            KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
    )));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KGroupedStreamImplTest.java


示例18: testReadWrite

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Test
public void testReadWrite() throws IOException {
    File f = TestUtils.tempFile();
    OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);

    try {
        Map<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new TopicPartition(topic, 0), 0L);
        offsets.put(new TopicPartition(topic, 1), 1L);
        offsets.put(new TopicPartition(topic, 2), 2L);

        checkpoint.write(offsets);
        assertEquals(offsets, checkpoint.read());

        checkpoint.delete();
        assertFalse(f.exists());

        offsets.put(new TopicPartition(topic, 3), 3L);
        checkpoint.write(offsets);
        assertEquals(offsets, checkpoint.read());
    } finally {
        checkpoint.delete();
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:OffsetCheckpointTest.java


示例19: shouldCreateLoggingEnabledStoreWhenWindowStoreLogged

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Test
public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception {
    store = createStore(true, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertFalse(logged.isEmpty());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:RocksDBWindowStoreSupplierTest.java


示例20: shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled

import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Test
public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws Exception {
    store = createStore(false, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertTrue(logged.isEmpty());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:RocksDBWindowStoreSupplierTest.java



注:本文中的org.apache.kafka.test.TestUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AppAttemptAddedSchedulerEvent类代码示例发布时间:2022-05-22
下一篇:
Java URLHandlerRegistry类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap