• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ForeachAction类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.streams.kstream.ForeachAction的典型用法代码示例。如果您正苦于以下问题:Java ForeachAction类的具体用法?Java ForeachAction怎么用?Java ForeachAction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ForeachAction类属于org.apache.kafka.streams.kstream包,在下文中一共展示了ForeachAction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: shouldCountSessionWindowsWithInternalStoreName

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
    final Map<Windowed<String>, Long> results = new HashMap<>();
    KTable table = groupedStream.count(SessionWindows.with(30));
    table.foreach(new ForeachAction<Windowed<String>, Long>() {
        @Override
        public void apply(final Windowed<String> key, final Long value) {
            results.put(key, value);
        }
    });
    doCountSessionWindows(results);
    assertNull(table.queryableStoreName());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:KGroupedStreamImplTest.java


示例2: shouldReduceSessionWindows

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldReduceSessionWindows() throws Exception {
    final Map<Windowed<String>, String> results = new HashMap<>();
    KTable table = groupedStream.reduce(
            new Reducer<String>() {
                @Override
                public String apply(final String value1, final String value2) {
                    return value1 + ":" + value2;
                }
            }, SessionWindows.with(30),
            "session-store");
    table.foreach(new ForeachAction<Windowed<String>, String>() {
        @Override
        public void apply(final Windowed<String> key, final String value) {
            results.put(key, value);
        }
    });
    doReduceSessionWindows(results);
    assertEquals(table.queryableStoreName(), "session-store");
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KGroupedStreamImplTest.java


示例3: shouldReduceSessionWindowsWithInternalStoreName

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldReduceSessionWindowsWithInternalStoreName() throws Exception {
    final Map<Windowed<String>, String> results = new HashMap<>();
    KTable table = groupedStream.reduce(
            new Reducer<String>() {
                @Override
                public String apply(final String value1, final String value2) {
                    return value1 + ":" + value2;
                }
            }, SessionWindows.with(30));
    table.foreach(new ForeachAction<Windowed<String>, String>() {
        @Override
        public void apply(final Windowed<String> key, final String value) {
            results.put(key, value);
        }
    });
    doReduceSessionWindows(results);
    assertNull(table.queryableStoreName());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java


示例4: setUp

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
    stateDir = TestUtils.tempDirectory();
    global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
    stream = builder.stream(Serdes.String(), Serdes.String(), streamTopic);
    keyValueMapper = new KeyValueMapper<String, String, String>() {
        @Override
        public String apply(final String key, final String value) {
            return value;
        }
    };
    action = new ForeachAction<String, String>() {
        @Override
        public void apply(final String key, final String value) {
            results.put(key, value);
        }
    };
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:GlobalKTableJoinsTest.java


示例5: foreach

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Override
public void foreach(ForeachAction<? super K, ? super V> action) {
    Objects.requireNonNull(action, "action can't be null");
    String name = topology.newName(FOREACH_NAME);

    topology.addProcessor(name, new KStreamPeek<>(action, false), this.name);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:KStreamImpl.java


示例6: peek

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Override
public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {
    Objects.requireNonNull(action, "action can't be null");
    final String name = topology.newName(PEEK_NAME);

    topology.addProcessor(name, new KStreamPeek<>(action, true), this.name);

    return new KStreamImpl<>(topology, name, sourceNodes, repartitionRequired);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:KStreamImpl.java


示例7: foreach

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Override
public void foreach(final ForeachAction<? super K, ? super V> action) {
    Objects.requireNonNull(action, "action can't be null");
    String name = topology.newName(FOREACH_NAME);
    KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() {
        @Override
        public void apply(K key, Change<V> value) {
            action.apply(key, value.newValue);
        }
    }, false);
    topology.addProcessor(name, processorSupplier, this.name);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:13,代码来源:KTableImpl.java


示例8: before

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Before
public void before() throws InterruptedException {
    testNo++;
    builder = new KStreamBuilder();
    createTopics();
    streamsConfiguration = new Properties();
    final String applicationId = "globalOne-table-test-" + testNo;
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    streamsConfiguration
            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
    globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
    stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
    table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
    foreachAction = new ForeachAction<String, String>() {
        @Override
        public void apply(final String key, final String value) {
            results.put(key, value);
        }
    };
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:GlobalKTableIntegrationTest.java


示例9: testTypeVariance

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void testTypeVariance() throws Exception {
    ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
        @Override
        public void apply(Number key, Object value) {}
    };

    new KStreamBuilder()
        .<Integer, String>table("emptyTopic", "emptyStore")
        .foreach(consume);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KTableForeachTest.java


示例10: testTypeVariance

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void testTypeVariance() throws Exception {
    ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
        @Override
        public void apply(Number key, Object value) {}
    };

    new KStreamBuilder()
        .<Integer, String>stream("empty")
        .foreach(consume);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KStreamSelectKeyTest.java


示例11: collect

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
private static <K, V> ForeachAction<K, V> collect(final List<KeyValue<K, V>> into) {
    return new ForeachAction<K, V>() {
        @Override
        public void apply(final K key, final V value) {
            into.add(new KeyValue<>(key, value));
        }
    };
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:KStreamPeekTest.java


示例12: testForeach

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void testForeach() {
    // Given
    List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
        new KeyValue<>(0, "zero"),
        new KeyValue<>(1, "one"),
        new KeyValue<>(2, "two"),
        new KeyValue<>(3, "three")
    );

    List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
        new KeyValue<>(0, "ZERO"),
        new KeyValue<>(2, "ONE"),
        new KeyValue<>(4, "TWO"),
        new KeyValue<>(6, "THREE")
    );

    final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
    ForeachAction<Integer, String> action =
        new ForeachAction<Integer, String>() {
            @Override
            public void apply(Integer key, String value) {
                actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
            }
        };

    // When
    KStreamBuilder builder = new KStreamBuilder();
    KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
    stream.foreach(action);

    // Then
    driver = new KStreamTestDriver(builder);
    for (KeyValue<Integer, String> record: inputRecords) {
        driver.process(topicName, record.key, record.value);
    }

    assertEquals(expectedRecords.size(), actualRecords.size());
    for (int i = 0; i < expectedRecords.size(); i++) {
        KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
        KeyValue<Integer, String> actualRecord = actualRecords.get(i);
        assertEquals(expectedRecord, actualRecord);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:45,代码来源:KStreamForeachTest.java


示例13: testTypeVariance

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void testTypeVariance() throws Exception {
    ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
        @Override
        public void apply(Number key, Object value) {}
    };

    new KStreamBuilder()
        .<Integer, String>stream("emptyTopic")
        .foreach(consume);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KStreamForeachTest.java


示例14: shouldCountSessionWindows

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldCountSessionWindows() throws Exception {
    final Map<Windowed<String>, Long> results = new HashMap<>();
    KTable table = groupedStream.count(SessionWindows.with(30), "session-store");
    table.foreach(new ForeachAction<Windowed<String>, Long>() {
        @Override
        public void apply(final Windowed<String> key, final Long value) {
            results.put(key, value);
        }
    });
    doCountSessionWindows(results);
    assertEquals(table.queryableStoreName(), "session-store");
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:KGroupedStreamImplTest.java


示例15: shouldCountWindowed

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldCountWindowed() throws Exception {
    final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
    groupedStream.count(
            TimeWindows.of(500L),
            "aggregate-by-key-windowed")
            .foreach(new ForeachAction<Windowed<String>, Long>() {
                @Override
                public void apply(final Windowed<String> key, final Long value) {
                    results.add(KeyValue.pair(key, value));
                }
            });

    doCountWindowed(results);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:KGroupedStreamImplTest.java


示例16: shouldCountWindowedWithInternalStoreName

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldCountWindowedWithInternalStoreName() throws Exception {
    final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
    groupedStream.count(
            TimeWindows.of(500L))
            .foreach(new ForeachAction<Windowed<String>, Long>() {
                @Override
                public void apply(final Windowed<String> key, final Long value) {
                    results.add(KeyValue.pair(key, value));
                }
            });

    doCountWindowed(results);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KGroupedStreamImplTest.java


示例17: doShouldReduce

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) throws Exception {
    final Map<String, Integer> results = new HashMap<>();
    reduced.foreach(new ForeachAction<String, Integer>() {
        @Override
        public void apply(final String key, final Integer value) {
            results.put(key, value);
        }
    });

    driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
    driver.setTime(10L);
    driver.process(topic, "A", 1.1);
    driver.process(topic, "B", 2.2);
    driver.flushState();

    assertEquals(Integer.valueOf(1), results.get("A"));
    assertEquals(Integer.valueOf(2), results.get("B"));

    driver.process(topic, "A", 2.6);
    driver.process(topic, "B", 1.3);
    driver.process(topic, "A", 5.7);
    driver.process(topic, "B", 6.2);
    driver.flushState();

    assertEquals(Integer.valueOf(5), results.get("A"));
    assertEquals(Integer.valueOf(6), results.get("B"));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:28,代码来源:KGroupedTableImplTest.java


示例18: KStreamPeek

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
public KStreamPeek(final ForeachAction<K, V> action, final boolean forwardDownStream) {
    this.action = action;
    this.forwardDownStream = forwardDownStream;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:5,代码来源:KStreamPeek.java


示例19: KStreamPrint

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
public KStreamPrint(final ForeachAction<K, V> action, final Serde<?> keySerde, final Serde<?> valueSerde) {
    this.action = action;
    this.keySerde = keySerde;
    this.valueSerde = valueSerde;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:6,代码来源:KStreamPrint.java


示例20: shouldReturnFalseOnCloseWhenThreadsHaventTerminated

import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
    final AtomicBoolean keepRunning = new AtomicBoolean(true);
    try {
        final Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        final KStreamBuilder builder = new KStreamBuilder();
        final CountDownLatch latch = new CountDownLatch(1);
        final String topic = "input";
        CLUSTER.createTopic(topic);

        builder.stream(Serdes.String(), Serdes.String(), topic)
                .foreach(new ForeachAction<String, String>() {
                    @Override
                    public void apply(final String key, final String value) {
                        try {
                            latch.countDown();
                            while (keepRunning.get()) {
                                Thread.sleep(10);
                            }
                        } catch (InterruptedException e) {
                            // no-op
                        }
                    }
                });
        final KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
                                                                        Collections.singletonList(new KeyValue<>("A", "A")),
                                                                        TestUtils.producerConfig(
                                                                                CLUSTER.bootstrapServers(),
                                                                                StringSerializer.class,
                                                                                StringSerializer.class,
                                                                                new Properties()),
                                                                                System.currentTimeMillis());

        assertTrue("Timed out waiting to receive single message", latch.await(30, TimeUnit.SECONDS));
        assertFalse(streams.close(10, TimeUnit.MILLISECONDS));
    } finally {
        // stop the thread so we don't interfere with other tests etc
        keepRunning.set(false);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:47,代码来源:KafkaStreamsTest.java



注:本文中的org.apache.kafka.streams.kstream.ForeachAction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java RemotingCommandException类代码示例发布时间:2022-05-22
下一篇:
Java LocalRegionCache类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap