• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java FoldFunction类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.common.functions.FoldFunction的典型用法代码示例。如果您正苦于以下问题:Java FoldFunction类的具体用法?Java FoldFunction怎么用?Java FoldFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FoldFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了FoldFunction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testFoldWindowState

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testFoldWindowState() throws Exception {
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
	env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);

	DataStream<String> src = env.fromElements("abc");

	SingleOutputStreamOperator<?> result = src
			.keyBy(new KeySelector<String, String>() {
				@Override
				public String getKey(String value) {
					return null;
				}
			})
			.timeWindow(Time.milliseconds(1000))
			.fold(new File("/"), new FoldFunction<String, File>() {

				@Override
				public File fold(File a, String e) {
					return null;
				}
			});

	validateStateDescriptorConfigured(result);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:StateDescriptorPassingTest.java


示例2: getFoldReturnTypes

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
 * @deprecated will be removed in a future version
 */
@PublicEvolving
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
	return getUnaryOperatorReturnType(
		(Function) foldInterface,
		FoldFunction.class,
		0,
		1,
		new int[]{1},
		NO_INDEX,
		inType,
		functionName,
		allowMissing);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:TypeExtractor.java


示例3: testFoldWindowAllState

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testFoldWindowAllState() throws Exception {
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
	env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);

	DataStream<String> src = env.fromElements("abc");

	SingleOutputStreamOperator<?> result = src
			.timeWindowAll(Time.milliseconds(1000))
			.fold(new File("/"), new FoldFunction<String, File>() {

				@Override
				public File fold(File a, String e) {
					return null;
				}
			});

	validateStateDescriptorConfigured(result);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:StateDescriptorPassingTest.java


示例4: testSessionWithFoldFails

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testSessionWithFoldFails() throws Exception {
	// verify that fold does not work with merging windows

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
			.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));

	try {
		windowedStream.fold("", new FoldFunction<String, String>() {
			private static final long serialVersionUID = -4567902917104921706L;

			@Override
			public String fold(String accumulator, String value) throws Exception {
				return accumulator;
			}
		});
	} catch (UnsupportedOperationException e) {
		// expected
		// use a catch to ensure that the exception is thrown by the fold
		return;
	}

	fail("The fold call should fail.");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:AllWindowTranslationTest.java


示例5: testProgram

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
 * Assembles a stream of a grouping field and some long data. Applies reduce functions
 * on this stream.
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {

	// base stream
	KeyedStream<Tuple2<Integer, Long>, Tuple> stream = env.addSource(new StatefulMultipleSequence())
			.keyBy(0);

	stream
			// testing built-in aggregate
			.min(1)
			// failure generation
			.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
			.keyBy(0)
			.addSink(new MinEvictingQueueSink());

	stream
			// testing UDF reducer
			.reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
				@Override
				public Tuple2<Integer, Long> reduce(
						Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
					return Tuple2.of(value1.f0, value1.f1 + value2.f1);
				}
			})
			.keyBy(0)
			.addSink(new SumEvictingQueueSink());

	stream
			// testing UDF folder
			.fold(Tuple2.of(0, 0L), new FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>() {
				@Override
				public Tuple2<Integer, Long> fold(
						Tuple2<Integer, Long> accumulator, Tuple2<Integer, Long> value) throws Exception {
					return Tuple2.of(value.f0, accumulator.f1 + value.f1);
				}
			})
			.keyBy(0)
			.addSink(new FoldEvictingQueueSink());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:44,代码来源:UdfStreamOperatorCheckpointingITCase.java


示例6: FoldingStateDescriptor

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
 * Creates a new {@code FoldingStateDescriptor} with the given name and default value.
 *
 * @param name The (unique) name for the state.
 * @param initialValue The initial value of the fold.
 * @param foldFunction The {@code FoldFunction} used to aggregate the state.
 * @param typeInfo The type of the values in the state.
 */
public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, TypeInformation<ACC> typeInfo) {
	super(name, typeInfo, initialValue);
	this.foldFunction = requireNonNull(foldFunction);

	if (foldFunction instanceof RichFunction) {
		throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction.");
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:FoldingStateDescriptor.java


示例7: GenericFoldingState

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
 * Creates a new {@code FoldingState} that wraps the given {@link ValueState}. The
 * {@code ValueState} must have the initial value of the fold as default value.
 *
 * @param wrappedState The wrapped {@code ValueState}
 * @param foldFunction The {@code FoldFunction} to use for folding values into the state
 */
@SuppressWarnings("unchecked")
public GenericFoldingState(ValueState<ACC> wrappedState, FoldFunction<T, ACC> foldFunction) {
	if (!(wrappedState instanceof KvState)) {
		throw new IllegalArgumentException("Wrapped state must be a KvState.");
	}
	this.wrappedState = (W) wrappedState;
	this.foldFunction = foldFunction;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:GenericFoldingState.java


示例8: fold

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
 * Applies the given fold function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the reduce function is
 * interpreted as a regular non-windowed stream.
 *
 * @param function The fold function.
 * @return The data stream that is the result of applying the fold function to the window.
 *
 * @deprecated use {@link #aggregate(AggregationFunction)} instead
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
	if (function instanceof RichFunction) {
		throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
			"Please use fold(FoldFunction, WindowFunction) instead.");
	}

	TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
			Utils.getCallLocationName(), true);

	return fold(initialValue, function, resultType);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:WindowedStream.java


示例9: fold

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
 * Applies the given fold function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the reduce function is
 * interpreted as a regular non-windowed stream.
 *
 * @param function The fold function.
 * @return The data stream that is the result of applying the fold function to the window.
 *
 * @deprecated use {@link #aggregate(AggregateFunction)} instead
 */
@Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
	if (function instanceof RichFunction) {
		throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
				"Please use fold(FoldFunction, WindowFunction) instead.");
	}

	TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
			Utils.getCallLocationName(), true);

	return fold(initialValue, function, resultType);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:AllWindowedStream.java


示例10: FoldApplyAllWindowFunction

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public FoldApplyAllWindowFunction(ACC initialValue,
		FoldFunction<T, ACC> foldFunction,
		AllWindowFunction<ACC, R, W> windowFunction,
		TypeInformation<ACC> accTypeInformation) {
	super(windowFunction);
	this.accTypeInformation = accTypeInformation;
	this.foldFunction = foldFunction;
	this.initialValue = initialValue;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:FoldApplyAllWindowFunction.java


示例11: testFoldingStateInstantiation

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testFoldingStateInstantiation() throws Exception {

	final ExecutionConfig config = new ExecutionConfig();
	config.registerKryoType(Path.class);

	final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

	StreamingRuntimeContext context = new StreamingRuntimeContext(
			createDescriptorCapturingMockOp(descriptorCapture, config),
			createMockEnvironment(),
			Collections.<String, Accumulator<?, ?>>emptyMap());

	@SuppressWarnings("unchecked")
	FoldFunction<String, TaskInfo> folder = (FoldFunction<String, TaskInfo>) mock(FoldFunction.class);

	FoldingStateDescriptor<String, TaskInfo> descr =
			new FoldingStateDescriptor<>("name", null, folder, TaskInfo.class);

	context.getFoldingState(descr);

	FoldingStateDescriptor<?, ?> descrIntercepted = (FoldingStateDescriptor<?, ?>) descriptorCapture.get();
	TypeSerializer<?> serializer = descrIntercepted.getSerializer();

	// check that the Path class is really registered, i.e., the execution config was applied
	assertTrue(serializer instanceof KryoSerializer);
	assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:29,代码来源:StreamingRuntimeContextTest.java


示例12: testPreAggregatedFoldingTumblingTimeWindow

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testPreAggregatedFoldingTumblingTimeWindow() {
	final int numElementsPerKey = 3000;
	final int windowSize = 100;
	final int numKeys = 1;
	FailingSource.reset();

	try {
		env.setParallelism(PARALLELISM);
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.enableCheckpointing(100);
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
		env.getConfig().disableSysoutLogging();

		env
				.addSource(new FailingSource(numKeys,
						numElementsPerKey,
						numElementsPerKey / 3))
				.rebalance()
				.timeWindowAll(Time.of(windowSize, MILLISECONDS))
				.fold(new Tuple4<>(0L, 0L, 0L, new IntType(0)),
						new FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() {
							@Override
							public Tuple4<Long, Long, Long, IntType> fold(Tuple4<Long, Long, Long, IntType> accumulator,
									Tuple2<Long, IntType> value) throws Exception {
								accumulator.f0 = value.f0;
								accumulator.f3 = new IntType(accumulator.f3.value + value.f1.value);
								return accumulator;
							}
						},
						new RichAllWindowFunction<Tuple4<Long, Long, Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {

							private boolean open = false;

							@Override
							public void open(Configuration parameters) {
								assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
								open = true;
							}

							@Override
							public void apply(
									TimeWindow window,
									Iterable<Tuple4<Long, Long, Long, IntType>> input,
									Collector<Tuple4<Long, Long, Long, IntType>> out) {

								// validate that the function has been opened properly
								assertTrue(open);

								for (Tuple4<Long, Long, Long, IntType> in: input) {
									out.collect(new Tuple4<>(in.f0,
											window.getStart(),
											window.getEnd(),
											in.f3));
								}
							}
						})
				.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);

		tryExecute(env, "Tumbling Window Test");
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:67,代码来源:EventTimeAllWindowCheckpointingITCase.java


示例13: getFoldFunction

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
/**
 * Returns the fold function to be used for the folding state.
 */
public FoldFunction<T, ACC> getFoldFunction() {
	return foldFunction;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:FoldingStateDescriptor.java


示例14: StreamGroupedFold

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
	super(folder);
	this.initialValue = initialValue;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:StreamGroupedFold.java


示例15: FoldApplyProcessWindowFunction

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
	this.windowFunction = windowFunction;
	this.foldFunction = foldFunction;
	this.initialValue = initialValue;
	this.accTypeInformation = accTypeInformation;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:FoldApplyProcessWindowFunction.java


示例16: FoldApplyProcessAllWindowFunction

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
	this.windowFunction = windowFunction;
	this.foldFunction = foldFunction;
	this.initialValue = initialValue;
	this.accTypeInformation = accTypeInformation;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:FoldApplyProcessAllWindowFunction.java


示例17: FoldApplyWindowFunction

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
public FoldApplyWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
	super(windowFunction);
	this.accTypeInformation = accTypeInformation;
	this.foldFunction = foldFunction;
	this.initialValue = initialValue;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:FoldApplyWindowFunction.java


示例18: testFoldingWindow

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testFoldingWindow() throws Exception {

	WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
	Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
	InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();

	FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor =
		new FoldingStateDescriptor<>(
				"int-fold",
				0,
				new FoldFunction<Integer, Integer>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Integer fold(Integer accumulator, Integer value) throws Exception {
						return accumulator + value;
					}
				},
				IntSerializer.INSTANCE);

	final ValueStateDescriptor<String> valueStateDescriptor =
			new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE);

	KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
			createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction);

	testHarness.open();

	when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
			.thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));

	assertEquals(0, testHarness.getOutput().size());
	assertEquals(0, testHarness.numKeyedStateEntries());

	// insert two elements without firing
	testHarness.processElement(new StreamRecord<>(1, 0L));
	testHarness.processElement(new StreamRecord<>(1, 0L));

	doAnswer(new Answer<TriggerResult>() {
		@Override
		public TriggerResult answer(InvocationOnMock invocation) throws Exception {
			TimeWindow window = (TimeWindow) invocation.getArguments()[2];
			Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
			context.registerEventTimeTimer(window.getEnd());
			context.getPartitionedState(valueStateDescriptor).update("hello");
			return TriggerResult.FIRE;
		}
	}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());

	testHarness.processElement(new StreamRecord<>(1, 0L));

	verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
	verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
	verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());

	// clear is only called at cleanup time/GC time
	verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());

	// FIRE should not purge contents
	assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state
	assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:64,代码来源:RegularWindowOperatorContractTest.java


示例19: testCleanupTimerWithEmptyFoldingStateForTumblingWindows

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() throws Exception {
	final int windowSize = 2;
	final long lateness = 1;

	TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");

	FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, Integer>> windowStateDesc =
		new FoldingStateDescriptor<>(
			"window-contents",
			new Tuple2<>((String) null, 0),
			new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
				private static final long serialVersionUID = 1L;

				@Override
				public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
					return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
				}
			},
			inputType);
	windowStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
		new WindowOperator<>(
			TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
			new TimeWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			windowStateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
			EventTimeTrigger.create(),
			lateness,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		createTestHarness(operator);

	testHarness.open();

	ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();

	// normal element
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
	testHarness.processWatermark(new Watermark(1599));
	testHarness.processWatermark(new Watermark(1999));
	testHarness.processWatermark(new Watermark(2000));
	testHarness.processWatermark(new Watermark(5000));

	expected.add(new Watermark(1599));
	expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
	expected.add(new Watermark(1999)); // here it fires and purges
	expected.add(new Watermark(2000)); // here is the cleanup timer
	expected.add(new Watermark(5000));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
	testHarness.close();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:58,代码来源:WindowOperatorTest.java


示例20: testCleanupTimerWithEmptyFoldingStateForSessionWindows

import org.apache.flink.api.common.functions.FoldFunction; //导入依赖的package包/类
@Test
public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception {
	final int gapSize = 3;
	final long lateness = 10;

	TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");

	FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, Integer>> windowStateDesc =
		new FoldingStateDescriptor<>(
			"window-contents",
			new Tuple2<>((String) null, 0),
			new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
				private static final long serialVersionUID = 1L;

				@Override
				public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
					return new Tuple2<>(value.f0, accumulator.f1 + value.f1);
				}
			},
			inputType);
	windowStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());

	WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
		new WindowOperator<>(
			EventTimeSessionWindows.withGap(Time.seconds(gapSize)),
			new TimeWindow.Serializer(),
			new TupleKeySelector(),
			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
			windowStateDesc,
			new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
			EventTimeTrigger.create(),
			lateness,
			null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		createTestHarness(operator);

	testHarness.open();

	ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
	testHarness.processWatermark(new Watermark(4998));

	expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
	expected.add(new Watermark(4998));

	testHarness.processWatermark(new Watermark(14600));
	expected.add(new Watermark(14600));

	ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
	testHarness.close();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:55,代码来源:WindowOperatorTest.java



注:本文中的org.apache.flink.api.common.functions.FoldFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java TypeToken类代码示例发布时间:2022-05-22
下一篇:
Java NNHAStatusHeartbeat类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap