• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java GlobalWindow类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.api.windowing.windows.GlobalWindow的典型用法代码示例。如果您正苦于以下问题:Java GlobalWindow类的具体用法?Java GlobalWindow怎么用?Java GlobalWindow使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



GlobalWindow类属于org.apache.flink.streaming.api.windowing.windows包,在下文中一共展示了GlobalWindow类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6};
        List<Integer> list = Arrays.asList(array);
        DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list)
                .windowAll(GlobalWindows.create())
                .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() {
                    @Override
                    public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                        HashMap<Integer, Integer> map = new HashMap<>();
                        for (Integer tuple : tuples) {
                            Integer value = 0;
                            if (map.containsKey(tuple)) {
                                value = map.get(tuple);
                            }
                            map.put(tuple, value + 1);
                        }

                        for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
                            out.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
                        }
                    }
                });

        counts.print();

        env.execute("Stream WordCount");
    }
 
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:31,代码来源:CountWindowTest.java


示例2: mockGlobalWindowAssigner

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() throws Exception {
	@SuppressWarnings("unchecked")
	WindowAssigner<T, GlobalWindow> mockAssigner = mock(WindowAssigner.class);

	when(mockAssigner.getWindowSerializer(Mockito.<ExecutionConfig>any())).thenReturn(new GlobalWindow.Serializer());
	when(mockAssigner.isEventTime()).thenReturn(true);
	when(mockAssigner.assignWindows(Mockito.<T>any(), anyLong(), anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));

	return mockAssigner;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:WindowOperatorContractTest.java


示例3: testNoGarbageCollectionTimerForGlobalWindow

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {

		WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
		timeAdaptor.setIsEventTime(mockAssigner);
		Trigger<Integer, GlobalWindow> mockTrigger = mockTrigger();
		InternalWindowFunction<Iterable<Integer>, Void, Integer, GlobalWindow> mockWindowFunction = mockWindowFunction();

		// this needs to be true for the test to succeed
		assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());

		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
				createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);

		testHarness.open();

		assertEquals(0, testHarness.getOutput().size());
		assertEquals(0, testHarness.numKeyedStateEntries());

		testHarness.processElement(new StreamRecord<>(0, 0L));

		// just the window contents
		assertEquals(1, testHarness.numKeyedStateEntries());

		// verify we have no timers for either time domain
		assertEquals(0, testHarness.numEventTimeTimers());
		assertEquals(0, testHarness.numProcessingTimeTimers());
	}
 
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:WindowOperatorContractTest.java


示例4: testWindowAssignment

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Test
public void testWindowAssignment() {
	WindowAssigner.WindowAssignerContext mockContext =
			mock(WindowAssigner.WindowAssignerContext.class);

	GlobalWindows assigner = GlobalWindows.create();

	assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get()));
	assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get()));
	assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get()));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:GlobalWindowsTest.java


示例5: testProperties

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Test
public void testProperties() {
	GlobalWindows assigner = GlobalWindows.create();

	assertFalse(assigner.isEventTime());
	assertEquals(new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
	assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(GlobalWindows.NeverTrigger.class));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:GlobalWindowsTest.java


示例6: onElement

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public TriggerResult onElement(ConnectedCarEvent event, long timestamp, GlobalWindow window, TriggerContext context) throws Exception {

	// if this is a stop event, set a timer
	if (event.speed == 0.0) {
		context.registerEventTimeTimer(event.timestamp);
	}

	return TriggerResult.CONTINUE;
}
 
开发者ID:dataArtisans,项目名称:flink-training-exercises,代码行数:11,代码来源:DrivingSegments.java


示例7: evictAfter

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public void evictAfter(Iterable<TimestampedValue<ConnectedCarEvent>> elements, int size, GlobalWindow window, EvictorContext ctx) {
	long firstStop = ConnectedCarEvent.earliestStopElement(elements);

	// remove all events up to (and including) the first stop event (which is the event that triggered the window)
	for (Iterator<TimestampedValue<ConnectedCarEvent>> iterator = elements.iterator(); iterator.hasNext(); ) {
		TimestampedValue<ConnectedCarEvent> element = iterator.next();
		if (element.getTimestamp() <= firstStop) {
			iterator.remove();
		}
	}
}
 
开发者ID:dataArtisans,项目名称:flink-training-exercises,代码行数:13,代码来源:DrivingSegments.java


示例8: apply

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public void apply(Tuple key, GlobalWindow window, Iterable<ConnectedCarEvent> events, Collector<StoppedSegment> out) {
	StoppedSegment seg = new StoppedSegment(events);
	if (seg.length > 0) {
		out.collect(seg);
	}
}
 
开发者ID:dataArtisans,项目名称:flink-training-exercises,代码行数:8,代码来源:DrivingSegments.java


示例9: onElement

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public TriggerResult onElement(T record, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
	ValueState<WindowState> windowState = triggerContext.getPartitionedState(windowStateDescriptor);

	if (isSessionStart.apply(record)) {
		if (windowState.value() == WindowState.EMPTY) {
			windowState.update(WindowState.START_ELEMENT);

			triggerContext.registerEventTimeTimer(timestamp + timeout);

			return TriggerResult.CONTINUE;
		} else if (windowState.value() == WindowState.END_ELEMENT) {
			return TriggerResult.CONTINUE;
		} else {
			LOG.info("Received another start element for the same session.");
			return TriggerResult.CONTINUE;
		}
	} else if (isSessionEnd.apply(record)) {
		if (windowState.value() == WindowState.EMPTY) {
			windowState.update(WindowState.END_ELEMENT);

			triggerContext.registerEventTimeTimer(timestamp);
			return TriggerResult.CONTINUE;
		} else if (windowState.value() == WindowState.START_ELEMENT) {
			return TriggerResult.FIRE_AND_PURGE;
		} else {
			LOG.info("Received another end element for the same session.");
			return TriggerResult.CONTINUE;
		}
	} else {
		return TriggerResult.CONTINUE;
	}
}
 
开发者ID:dataArtisans,项目名称:timeout-monitoring,代码行数:34,代码来源:SessionTrigger.java


示例10: assignWindows

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public Collection<GlobalWindow> assignWindows(T record, long timestamp) {
	if (keepSessionElements) {
		return Collections.singletonList(GlobalWindow.get());
	} else {
		if (isSessionStart.apply(record) || isSessionEnd.apply(record)) {
			return Collections.singletonList(GlobalWindow.get());
		} else {
			return Collections.EMPTY_LIST;
		}
	}
}
 
开发者ID:dataArtisans,项目名称:timeout-monitoring,代码行数:13,代码来源:SessionWindowAssigner.java


示例11: assignWindows

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
	return Collections.singletonList(GlobalWindow.get());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:GlobalWindows.java


示例12: getDefaultTrigger

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
	return new NeverTrigger();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:GlobalWindows.java


示例13: onElement

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
	return TriggerResult.CONTINUE;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:GlobalWindows.java


示例14: onEventTime

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
	return TriggerResult.CONTINUE;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:GlobalWindows.java


示例15: onProcessingTime

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
	return TriggerResult.CONTINUE;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:GlobalWindows.java


示例16: clear

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
 
开发者ID:axbaretto,项目名称:flink,代码行数:3,代码来源:GlobalWindows.java


示例17: onMerge

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:GlobalWindows.java


示例18: getWindowSerializer

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
@Override
public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
	return new GlobalWindow.Serializer();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:GlobalWindows.java


示例19: testCountEvictorEvictAfter

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
/**
 * Tests CountEvictor evictAfter behavior.
 * @throws Exception
    */
@Test
public void testCountEvictorEvictAfter() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);
	final int windowSize = 4;
	final int triggerCount = 2;
	final boolean evictAfter = true;

	TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
		(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
		new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(triggerCount),
		CountEvictor.of(windowSize, evictAfter),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 6), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:76,代码来源:EvictingWindowOperatorTest.java


示例20: testTimeEvictorEvictAfter

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; //导入依赖的package包/类
/**
 * Tests TimeEvictor evictAfter behavior.
 * @throws Exception
 */
@Test
public void testTimeEvictorEvictAfter() throws Exception {
	AtomicInteger closeCalled = new AtomicInteger(0);
	final int triggerCount = 2;
	final boolean evictAfter = true;

	TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");

	@SuppressWarnings({"unchecked", "rawtypes"})
	TypeSerializer<StreamRecord<Tuple2<String, Integer>>> streamRecordSerializer =
		(TypeSerializer<StreamRecord<Tuple2<String, Integer>>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig()));

	ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc =
		new ListStateDescriptor<>("window-contents", streamRecordSerializer);

	EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
		GlobalWindows.create(),
		new GlobalWindow.Serializer(),
		new TupleKeySelector(),
		BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
		stateDesc,
		new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
		CountTrigger.of(triggerCount),
		TimeEvictor.of(Time.seconds(2), evictAfter),
		0,
		null /* late data output tag */);

	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

	long initialTime = 0L;
	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

	testHarness.open();

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 4000));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3500));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2001));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1001));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
	testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1002));

	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
	expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));

	TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());

	testHarness.close();

	Assert.assertEquals("Close was not called.", 1, closeCalled.get());

}
 
开发者ID:axbaretto,项目名称:flink,代码行数:70,代码来源:EvictingWindowOperatorTest.java



注:本文中的org.apache.flink.streaming.api.windowing.windows.GlobalWindow类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Nanopub类代码示例发布时间:2022-05-22
下一篇:
Java UnaryExpression类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap