• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java AllWindowFunction类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.api.functions.windowing.AllWindowFunction的典型用法代码示例。如果您正苦于以下问题:Java AllWindowFunction类的具体用法?Java AllWindowFunction怎么用?Java AllWindowFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



AllWindowFunction类属于org.apache.flink.streaming.api.functions.windowing包,在下文中一共展示了AllWindowFunction类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
public static void main(String... args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

        edits
            .timeWindowAll(Time.minutes(1))
            .apply(new AllWindowFunction<WikipediaEditEvent, Tuple3<Date, Long, Long>, TimeWindow>() {
                @Override
                public void apply(TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple3<Date, Long, Long>> collector) throws Exception {
                    long count = 0;
                    long bytesChanged = 0;

                    for (WikipediaEditEvent event : iterable) {
                        count++;
                        bytesChanged += event.getByteDiff();
                    }

                    collector.collect(new Tuple3<>(new Date(timeWindow.getEnd()), count, bytesChanged));
                }
            })
            .print();


        env.execute();
    }
 
开发者ID:mushketyk,项目名称:flink-examples,代码行数:27,代码来源:NumberOfWikiEditsPerWindow.java


示例2: aggregate

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
 * that the window function typically has only a single value to process when called.
 *
 * @param aggFunction The aggregate function that is used for incremental aggregation.
 * @param windowFunction The window function.
 *
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @param <ACC> The type of the AggregateFunction's accumulator
 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
 * @param <R> The type of the elements in the resulting stream, equal to the
 *            WindowFunction's result type
 */
@PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
		AggregateFunction<T, ACC, V> aggFunction,
		AllWindowFunction<V, R, W> windowFunction) {

	checkNotNull(aggFunction, "aggFunction");
	checkNotNull(windowFunction, "windowFunction");

	TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
			aggFunction, input.getType(), null, false);

	TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
			aggFunction, input.getType(), null, false);

	TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);

	return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:AllWindowedStream.java


示例3: testApplyWindowAllState

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
public void testApplyWindowAllState() throws Exception {
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
	env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);

	DataStream<File> src = env.fromElements(new File("/"));

	SingleOutputStreamOperator<?> result = src
			.timeWindowAll(Time.milliseconds(1000))
			.apply(new AllWindowFunction<File, String, TimeWindow>() {
				@Override
				public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {}
			});

	validateListStateDescriptorConfigured(result);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:StateDescriptorPassingTest.java


示例4: main

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6};
        List<Integer> list = Arrays.asList(array);
        DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list)
                .windowAll(GlobalWindows.create())
                .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() {
                    @Override
                    public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                        HashMap<Integer, Integer> map = new HashMap<>();
                        for (Integer tuple : tuples) {
                            Integer value = 0;
                            if (map.containsKey(tuple)) {
                                value = map.get(tuple);
                            }
                            map.put(tuple, value + 1);
                        }

                        for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
                            out.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
                        }
                    }
                });

        counts.print();

        env.execute("Stream WordCount");
    }
 
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:31,代码来源:CountWindowTest.java


示例5: getAllWindowFunctionReturnType

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
		AllWindowFunction<IN, OUT, ?> function,
		TypeInformation<IN> inType) {
	return TypeExtractor.getUnaryOperatorReturnType(
		function,
		AllWindowFunction.class,
		0,
		1,
		new int[]{1, 0},
		new int[]{2, 0},
		inType,
		null,
		false);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:15,代码来源:AllWindowedStream.java


示例6: testApplyEventTime

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testApplyEventTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window1 = source
			.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void apply(
						TimeWindow window,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(in);
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:AllWindowTranslationTest.java


示例7: testApplyProcessingTimeTime

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testApplyProcessingTimeTime() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window1 = source
			.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void apply(
						TimeWindow window,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(in);
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));

}
 
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:AllWindowTranslationTest.java


示例8: testApplyWithCustomTrigger

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testApplyWithCustomTrigger() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window1 = source
			.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.trigger(CountTrigger.of(1))
			.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void apply(
						TimeWindow window,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(in);
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof WindowOperator);
	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:AllWindowTranslationTest.java


示例9: testApplyWithEvictor

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
@Test
@SuppressWarnings("rawtypes")
public void testApplyWithEvictor() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));

	DataStream<Tuple2<String, Integer>> window1 = source
			.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
			.trigger(CountTrigger.of(1))
			.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
			.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
				private static final long serialVersionUID = 1L;

				@Override
				public void apply(
						TimeWindow window,
						Iterable<Tuple2<String, Integer>> values,
						Collector<Tuple2<String, Integer>> out) throws Exception {
					for (Tuple2<String, Integer> in : values) {
						out.collect(in);
					}
				}
			});

	OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
	Assert.assertTrue(operator instanceof EvictingWindowOperator);
	EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
	Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
	Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
	Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);

	processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:AllWindowTranslationTest.java


示例10: InternalSingleValueAllWindowFunction

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
public InternalSingleValueAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
	super(wrappedFunction);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:InternalSingleValueAllWindowFunction.java


示例11: InternalIterableAllWindowFunction

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
public InternalIterableAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
	super(wrappedFunction);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:InternalIterableAllWindowFunction.java


示例12: reduce

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given reducer.
 *
 * @param reduceFunction The reduce function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
@PublicEvolving
public <R> SingleOutputStreamOperator<R> reduce(
		ReduceFunction<T> reduceFunction,
		AllWindowFunction<T, R, W> function) {

	TypeInformation<T> inType = input.getType();
	TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);

	return reduce(reduceFunction, function, resultType);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:AllWindowedStream.java


示例13: fold

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window for each key individually. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Arriving data is incrementally aggregated using the given fold function.
 *
 * @param initialValue The initial value of the fold.
 * @param foldFunction The fold function that is used for incremental aggregation.
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 *
 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
 */
@PublicEvolving
@Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) {

	TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
		Utils.getCallLocationName(), true);

	TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, foldAccumulatorType);

	return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:AllWindowedStream.java


示例14: apply

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; //导入依赖的package包/类
/**
 * Applies the given window function to each window. The window function is called for each
 * evaluation of the window. The output of the window function is
 * interpreted as a regular non-windowed stream.
 *
 * <p>Not that this function requires that all data in the windows is buffered until the window
 * is evaluated, as the function provides no means of incremental aggregation.
 *
 * @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
	String callLocation = Utils.getCallLocationName();
	function = input.getExecutionEnvironment().clean(function);
	TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
	return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:AllWindowedStream.java



注:本文中的org.apache.flink.streaming.api.functions.windowing.AllWindowFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Long2DoubleMap类代码示例发布时间:2022-05-22
下一篇:
Java Sample类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap