本文整理汇总了Java中org.apache.flink.api.common.functions.FoldFunction类的典型用法代码示例。如果您正苦于以下问题:Java FoldFunction类的具体用法?Java FoldFunction怎么用?Java FoldFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
FoldFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了FoldFunction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testFoldWindowState
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testFoldWindowState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
DataStream<String> src = env.fromElements("abc");
SingleOutputStreamOperator<?> result = src
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) {
return null;
}
})
.timeWindow(Time.milliseconds(1000))
.fold(new File("/"), new FoldFunction<String, File>() {
@Override
public File fold(File a, String e) {
return null;
}
});
validateStateDescriptorConfigured(result);
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:StateDescriptorPassingTest.java
示例2: getFoldReturnTypes
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
* @deprecated will be removed in a future version
*/
@PublicEvolving
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) foldInterface,
FoldFunction.class,
0,
1,
new int[]{1},
NO_INDEX,
inType,
functionName,
allowMissing);
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:TypeExtractor.java
示例3: testFoldWindowAllState
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testFoldWindowAllState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
DataStream<String> src = env.fromElements("abc");
SingleOutputStreamOperator<?> result = src
.timeWindowAll(Time.milliseconds(1000))
.fold(new File("/"), new FoldFunction<String, File>() {
@Override
public File fold(File a, String e) {
return null;
}
});
validateStateDescriptorConfigured(result);
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:StateDescriptorPassingTest.java
示例4: testSessionWithFoldFails
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testSessionWithFoldFails() throws Exception {
// verify that fold does not work with merging windows
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
try {
windowedStream.fold("", new FoldFunction<String, String>() {
private static final long serialVersionUID = -4567902917104921706L;
@Override
public String fold(String accumulator, String value) throws Exception {
return accumulator;
}
});
} catch (UnsupportedOperationException e) {
// expected
// use a catch to ensure that the exception is thrown by the fold
return;
}
fail("The fold call should fail.");
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:AllWindowTranslationTest.java
示例5: testProgram
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
* Assembles a stream of a grouping field and some long data. Applies reduce functions
* on this stream.
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
// base stream
KeyedStream<Tuple2<Integer, Long>, Tuple> stream = env.addSource(new StatefulMultipleSequence())
.keyBy(0);
stream
// testing built-in aggregate
.min(1)
// failure generation
.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
.keyBy(0)
.addSink(new MinEvictingQueueSink());
stream
// testing UDF reducer
.reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> reduce(
Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
.keyBy(0)
.addSink(new SumEvictingQueueSink());
stream
// testing UDF folder
.fold(Tuple2.of(0, 0L), new FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> fold(
Tuple2<Integer, Long> accumulator, Tuple2<Integer, Long> value) throws Exception {
return Tuple2.of(value.f0, accumulator.f1 + value.f1);
}
})
.keyBy(0)
.addSink(new FoldEvictingQueueSink());
}
开发者ID:axbaretto,项目名称:flink,代码行数:44,代码来源:UdfStreamOperatorCheckpointingITCase.java
示例6: FoldingStateDescriptor
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
* Creates a new {@code FoldingStateDescriptor} with the given name and default value.
*
* @param name The (unique) name for the state.
* @param initialValue The initial value of the fold.
* @param foldFunction The {@code FoldFunction} used to aggregate the state.
* @param typeInfo The type of the values in the state.
*/
public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, TypeInformation<ACC> typeInfo) {
super(name, typeInfo, initialValue);
this.foldFunction = requireNonNull(foldFunction);
if (foldFunction instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction.");
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:FoldingStateDescriptor.java
示例7: GenericFoldingState
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
* Creates a new {@code FoldingState} that wraps the given {@link ValueState}. The
* {@code ValueState} must have the initial value of the fold as default value.
*
* @param wrappedState The wrapped {@code ValueState}
* @param foldFunction The {@code FoldFunction} to use for folding values into the state
*/
@SuppressWarnings("unchecked")
public GenericFoldingState(ValueState<ACC> wrappedState, FoldFunction<T, ACC> foldFunction) {
if (!(wrappedState instanceof KvState)) {
throw new IllegalArgumentException("Wrapped state must be a KvState.");
}
this.wrappedState = (W) wrappedState;
this.foldFunction = foldFunction;
}
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:GenericFoldingState.java
示例8: fold
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
* interpreted as a regular non-windowed stream.
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*
* @deprecated use {@link #aggregate(AggregationFunction)} instead
*/
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
"Please use fold(FoldFunction, WindowFunction) instead.");
}
TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
Utils.getCallLocationName(), true);
return fold(initialValue, function, resultType);
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:WindowedStream.java
示例9: fold
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
* interpreted as a regular non-windowed stream.
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*
* @deprecated use {@link #aggregate(AggregateFunction)} instead
*/
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
"Please use fold(FoldFunction, WindowFunction) instead.");
}
TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
Utils.getCallLocationName(), true);
return fold(initialValue, function, resultType);
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:AllWindowedStream.java
示例10: FoldApplyAllWindowFunction
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public FoldApplyAllWindowFunction(ACC initialValue,
FoldFunction<T, ACC> foldFunction,
AllWindowFunction<ACC, R, W> windowFunction,
TypeInformation<ACC> accTypeInformation) {
super(windowFunction);
this.accTypeInformation = accTypeInformation;
this.foldFunction = foldFunction;
this.initialValue = initialValue;
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:FoldApplyAllWindowFunction.java
示例11: testFoldingStateInstantiation
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testFoldingStateInstantiation() throws Exception {
final ExecutionConfig config = new ExecutionConfig();
config.registerKryoType(Path.class);
final AtomicReference<Object> descriptorCapture = new AtomicReference<>();
StreamingRuntimeContext context = new StreamingRuntimeContext(
createDescriptorCapturingMockOp(descriptorCapture, config),
createMockEnvironment(),
Collections.<String, Accumulator<?, ?>>emptyMap());
@SuppressWarnings("unchecked")
FoldFunction<String, TaskInfo> folder = (FoldFunction<String, TaskInfo>) mock(FoldFunction.class);
FoldingStateDescriptor<String, TaskInfo> descr =
new FoldingStateDescriptor<>("name", null, folder, TaskInfo.class);
context.getFoldingState(descr);
FoldingStateDescriptor<?, ?> descrIntercepted = (FoldingStateDescriptor<?, ?>) descriptorCapture.get();
TypeSerializer<?> serializer = descrIntercepted.getSerializer();
// check that the Path class is really registered, i.e., the execution config was applied
assertTrue(serializer instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
}
开发者ID:axbaretto,项目名称:flink,代码行数:29,代码来源:StreamingRuntimeContextTest.java
示例12: testPreAggregatedFoldingTumblingTimeWindow
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testPreAggregatedFoldingTumblingTimeWindow() {
final int numElementsPerKey = 3000;
final int windowSize = 100;
final int numKeys = 1;
FailingSource.reset();
try {
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
env
.addSource(new FailingSource(numKeys,
numElementsPerKey,
numElementsPerKey / 3))
.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS))
.fold(new Tuple4<>(0L, 0L, 0L, new IntType(0)),
new FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() {
@Override
public Tuple4<Long, Long, Long, IntType> fold(Tuple4<Long, Long, Long, IntType> accumulator,
Tuple2<Long, IntType> value) throws Exception {
accumulator.f0 = value.f0;
accumulator.f3 = new IntType(accumulator.f3.value + value.f1.value);
return accumulator;
}
},
new RichAllWindowFunction<Tuple4<Long, Long, Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
private boolean open = false;
@Override
public void open(Configuration parameters) {
assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
open = true;
}
@Override
public void apply(
TimeWindow window,
Iterable<Tuple4<Long, Long, Long, IntType>> input,
Collector<Tuple4<Long, Long, Long, IntType>> out) {
// validate that the function has been opened properly
assertTrue(open);
for (Tuple4<Long, Long, Long, IntType> in: input) {
out.collect(new Tuple4<>(in.f0,
window.getStart(),
window.getEnd(),
in.f3));
}
}
})
.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:67,代码来源:EventTimeAllWindowCheckpointingITCase.java
示例13: getFoldFunction
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
* Returns the fold function to be used for the folding state.
*/
public FoldFunction<T, ACC> getFoldFunction() {
return foldFunction;
}
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:FoldingStateDescriptor.java
示例14: StreamGroupedFold
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
super(folder);
this.initialValue = initialValue;
}
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:StreamGroupedFold.java
示例15: FoldApplyProcessWindowFunction
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
this.windowFunction = windowFunction;
this.foldFunction = foldFunction;
this.initialValue = initialValue;
this.accTypeInformation = accTypeInformation;
}
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:FoldApplyProcessWindowFunction.java
示例16: FoldApplyProcessAllWindowFunction
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
this.windowFunction = windowFunction;
this.foldFunction = foldFunction;
this.initialValue = initialValue;
this.accTypeInformation = accTypeInformation;
}
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:FoldApplyProcessAllWindowFunction.java
示例17: FoldApplyWindowFunction
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public FoldApplyWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
super(windowFunction);
this.accTypeInformation = accTypeInformation;
this.foldFunction = foldFunction;
this.initialValue = initialValue;
}
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:FoldApplyWindowFunction.java
示例18: testFoldingWindow
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testFoldingWindow() throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor =
new FoldingStateDescriptor<>(
"int-fold",
0,
new FoldFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer fold(Integer accumulator, Integer value) throws Exception {
return accumulator + value;
}
},
IntSerializer.INSTANCE);
final ValueStateDescriptor<String> valueStateDescriptor =
new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE);
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction);
testHarness.open();
when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
.thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
// insert two elements without firing
testHarness.processElement(new StreamRecord<>(1, 0L));
testHarness.processElement(new StreamRecord<>(1, 0L));
doAnswer(new Answer<TriggerResult>() {
@Override
public TriggerResult answer(InvocationOnMock invocation) throws Exception {
TimeWindow window = (TimeWindow) invocation.getArguments()[2];
Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
context.registerEventTimeTimer(window.getEnd());
context.getPartitionedState(valueStateDescriptor).update("hello");
return TriggerResult.FIRE;
}
}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
testHarness.processElement(new StreamRecord<>(1, 0L));
verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
// FIRE should not purge contents
assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
}
开发者ID:axbaretto,项目名称:flink,代码行数:64,代码来源:RegularWindowOperatorContractTest.java
示例19: testCleanupTimerWithEmptyFoldingStateForTumblingWindows
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() throws Exception {
final int windowSize = 2;
final long lateness = 1;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, Integer>> windowStateDesc =
new FoldingStateDescriptor<>(
"window-contents",
new Tuple2<>((String) null, 0),
new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
}
},
inputType);
windowStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
new WindowOperator<>(
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
windowStateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
lateness,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
// normal element
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1599));
testHarness.processWatermark(new Watermark(1999));
testHarness.processWatermark(new Watermark(2000));
testHarness.processWatermark(new Watermark(5000));
expected.add(new Watermark(1599));
expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
expected.add(new Watermark(1999)); // here it fires and purges
expected.add(new Watermark(2000)); // here is the cleanup timer
expected.add(new Watermark(5000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
}
开发者ID:axbaretto,项目名称:flink,代码行数:58,代码来源:WindowOperatorTest.java
示例20: testCleanupTimerWithEmptyFoldingStateForSessionWindows
import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception {
final int gapSize = 3;
final long lateness = 10;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, Integer>> windowStateDesc =
new FoldingStateDescriptor<>(
"window-contents",
new Tuple2<>((String) null, 0),
new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
}
},
inputType);
windowStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
new WindowOperator<>(
EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
windowStateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
lateness,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
createTestHarness(operator);
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(4998));
expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
expected.add(new Watermark(4998));
testHarness.processWatermark(new Watermark(14600));
expected.add(new Watermark(14600));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
testHarness.close();
}
开发者ID:axbaretto,项目名称:flink,代码行数:55,代码来源:WindowOperatorTest.java
注:本文中的org.apache.flink.api.common.functions.FoldFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论