本文整理汇总了Java中org.apache.kafka.streams.kstream.ForeachAction类的典型用法代码示例。如果您正苦于以下问题:Java ForeachAction类的具体用法?Java ForeachAction怎么用?Java ForeachAction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ForeachAction类属于org.apache.kafka.streams.kstream包,在下文中一共展示了ForeachAction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: shouldCountSessionWindowsWithInternalStoreName
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
final Map<Windowed<String>, Long> results = new HashMap<>();
KTable table = groupedStream.count(SessionWindows.with(30));
table.foreach(new ForeachAction<Windowed<String>, Long>() {
@Override
public void apply(final Windowed<String> key, final Long value) {
results.put(key, value);
}
});
doCountSessionWindows(results);
assertNull(table.queryableStoreName());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:KGroupedStreamImplTest.java
示例2: shouldReduceSessionWindows
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldReduceSessionWindows() throws Exception {
final Map<Windowed<String>, String> results = new HashMap<>();
KTable table = groupedStream.reduce(
new Reducer<String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + ":" + value2;
}
}, SessionWindows.with(30),
"session-store");
table.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String value) {
results.put(key, value);
}
});
doReduceSessionWindows(results);
assertEquals(table.queryableStoreName(), "session-store");
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KGroupedStreamImplTest.java
示例3: shouldReduceSessionWindowsWithInternalStoreName
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldReduceSessionWindowsWithInternalStoreName() throws Exception {
final Map<Windowed<String>, String> results = new HashMap<>();
KTable table = groupedStream.reduce(
new Reducer<String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + ":" + value2;
}
}, SessionWindows.with(30));
table.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String value) {
results.put(key, value);
}
});
doReduceSessionWindows(results);
assertNull(table.queryableStoreName());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java
示例4: setUp
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
stateDir = TestUtils.tempDirectory();
global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
stream = builder.stream(Serdes.String(), Serdes.String(), streamTopic);
keyValueMapper = new KeyValueMapper<String, String, String>() {
@Override
public String apply(final String key, final String value) {
return value;
}
};
action = new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
results.put(key, value);
}
};
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:GlobalKTableJoinsTest.java
示例5: foreach
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Override
public void foreach(ForeachAction<? super K, ? super V> action) {
Objects.requireNonNull(action, "action can't be null");
String name = topology.newName(FOREACH_NAME);
topology.addProcessor(name, new KStreamPeek<>(action, false), this.name);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:KStreamImpl.java
示例6: peek
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Override
public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {
Objects.requireNonNull(action, "action can't be null");
final String name = topology.newName(PEEK_NAME);
topology.addProcessor(name, new KStreamPeek<>(action, true), this.name);
return new KStreamImpl<>(topology, name, sourceNodes, repartitionRequired);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:KStreamImpl.java
示例7: foreach
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Override
public void foreach(final ForeachAction<? super K, ? super V> action) {
Objects.requireNonNull(action, "action can't be null");
String name = topology.newName(FOREACH_NAME);
KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() {
@Override
public void apply(K key, Change<V> value) {
action.apply(key, value.newValue);
}
}, false);
topology.addProcessor(name, processorSupplier, this.name);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:13,代码来源:KTableImpl.java
示例8: before
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Before
public void before() throws InterruptedException {
testNo++;
builder = new KStreamBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "globalOne-table-test-" + testNo;
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
foreachAction = new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
results.put(key, value);
}
};
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:GlobalKTableIntegrationTest.java
示例9: testTypeVariance
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void testTypeVariance() throws Exception {
ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
@Override
public void apply(Number key, Object value) {}
};
new KStreamBuilder()
.<Integer, String>table("emptyTopic", "emptyStore")
.foreach(consume);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KTableForeachTest.java
示例10: testTypeVariance
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void testTypeVariance() throws Exception {
ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
@Override
public void apply(Number key, Object value) {}
};
new KStreamBuilder()
.<Integer, String>stream("empty")
.foreach(consume);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KStreamSelectKeyTest.java
示例11: collect
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
private static <K, V> ForeachAction<K, V> collect(final List<KeyValue<K, V>> into) {
return new ForeachAction<K, V>() {
@Override
public void apply(final K key, final V value) {
into.add(new KeyValue<>(key, value));
}
};
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:KStreamPeekTest.java
示例12: testForeach
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void testForeach() {
// Given
List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
new KeyValue<>(0, "zero"),
new KeyValue<>(1, "one"),
new KeyValue<>(2, "two"),
new KeyValue<>(3, "three")
);
List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
new KeyValue<>(0, "ZERO"),
new KeyValue<>(2, "ONE"),
new KeyValue<>(4, "TWO"),
new KeyValue<>(6, "THREE")
);
final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
ForeachAction<Integer, String> action =
new ForeachAction<Integer, String>() {
@Override
public void apply(Integer key, String value) {
actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
}
};
// When
KStreamBuilder builder = new KStreamBuilder();
KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
stream.foreach(action);
// Then
driver = new KStreamTestDriver(builder);
for (KeyValue<Integer, String> record: inputRecords) {
driver.process(topicName, record.key, record.value);
}
assertEquals(expectedRecords.size(), actualRecords.size());
for (int i = 0; i < expectedRecords.size(); i++) {
KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
KeyValue<Integer, String> actualRecord = actualRecords.get(i);
assertEquals(expectedRecord, actualRecord);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:45,代码来源:KStreamForeachTest.java
示例13: testTypeVariance
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void testTypeVariance() throws Exception {
ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
@Override
public void apply(Number key, Object value) {}
};
new KStreamBuilder()
.<Integer, String>stream("emptyTopic")
.foreach(consume);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:KStreamForeachTest.java
示例14: shouldCountSessionWindows
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldCountSessionWindows() throws Exception {
final Map<Windowed<String>, Long> results = new HashMap<>();
KTable table = groupedStream.count(SessionWindows.with(30), "session-store");
table.foreach(new ForeachAction<Windowed<String>, Long>() {
@Override
public void apply(final Windowed<String> key, final Long value) {
results.put(key, value);
}
});
doCountSessionWindows(results);
assertEquals(table.queryableStoreName(), "session-store");
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:KGroupedStreamImplTest.java
示例15: shouldCountWindowed
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldCountWindowed() throws Exception {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
groupedStream.count(
TimeWindows.of(500L),
"aggregate-by-key-windowed")
.foreach(new ForeachAction<Windowed<String>, Long>() {
@Override
public void apply(final Windowed<String> key, final Long value) {
results.add(KeyValue.pair(key, value));
}
});
doCountWindowed(results);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:KGroupedStreamImplTest.java
示例16: shouldCountWindowedWithInternalStoreName
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldCountWindowedWithInternalStoreName() throws Exception {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
groupedStream.count(
TimeWindows.of(500L))
.foreach(new ForeachAction<Windowed<String>, Long>() {
@Override
public void apply(final Windowed<String> key, final Long value) {
results.add(KeyValue.pair(key, value));
}
});
doCountWindowed(results);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:KGroupedStreamImplTest.java
示例17: doShouldReduce
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) throws Exception {
final Map<String, Integer> results = new HashMap<>();
reduced.foreach(new ForeachAction<String, Integer>() {
@Override
public void apply(final String key, final Integer value) {
results.put(key, value);
}
});
driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
driver.setTime(10L);
driver.process(topic, "A", 1.1);
driver.process(topic, "B", 2.2);
driver.flushState();
assertEquals(Integer.valueOf(1), results.get("A"));
assertEquals(Integer.valueOf(2), results.get("B"));
driver.process(topic, "A", 2.6);
driver.process(topic, "B", 1.3);
driver.process(topic, "A", 5.7);
driver.process(topic, "B", 6.2);
driver.flushState();
assertEquals(Integer.valueOf(5), results.get("A"));
assertEquals(Integer.valueOf(6), results.get("B"));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:28,代码来源:KGroupedTableImplTest.java
示例18: KStreamPeek
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
public KStreamPeek(final ForeachAction<K, V> action, final boolean forwardDownStream) {
this.action = action;
this.forwardDownStream = forwardDownStream;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:5,代码来源:KStreamPeek.java
示例19: KStreamPrint
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
public KStreamPrint(final ForeachAction<K, V> action, final Serde<?> keySerde, final Serde<?> valueSerde) {
this.action = action;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:6,代码来源:KStreamPrint.java
示例20: shouldReturnFalseOnCloseWhenThreadsHaventTerminated
import org.apache.kafka.streams.kstream.ForeachAction; //导入依赖的package包/类
@Test
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
final AtomicBoolean keepRunning = new AtomicBoolean(true);
try {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final KStreamBuilder builder = new KStreamBuilder();
final CountDownLatch latch = new CountDownLatch(1);
final String topic = "input";
CLUSTER.createTopic(topic);
builder.stream(Serdes.String(), Serdes.String(), topic)
.foreach(new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
try {
latch.countDown();
while (keepRunning.get()) {
Thread.sleep(10);
}
} catch (InterruptedException e) {
// no-op
}
}
});
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
Collections.singletonList(new KeyValue<>("A", "A")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
System.currentTimeMillis());
assertTrue("Timed out waiting to receive single message", latch.await(30, TimeUnit.SECONDS));
assertFalse(streams.close(10, TimeUnit.MILLISECONDS));
} finally {
// stop the thread so we don't interfere with other tests etc
keepRunning.set(false);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:47,代码来源:KafkaStreamsTest.java
注:本文中的org.apache.kafka.streams.kstream.ForeachAction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论