• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java InternalTimer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.api.operators.InternalTimer的典型用法代码示例。如果您正苦于以下问题:Java InternalTimer类的具体用法?Java InternalTimer怎么用?Java InternalTimer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



InternalTimer类属于org.apache.flink.streaming.api.operators包,在下文中一共展示了InternalTimer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: onEventTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
/**
 * Occurs when an event-time timer fires due to watermark progression.
 *
 * @param timer the timer details.
 */
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {

    long currentWatermark = internalTimerService.currentWatermark();

    PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
    while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= currentWatermark) {
        long timestamp = sortedTimestamps.poll();
        for (T event : elementQueueState.get(timestamp)) {
            output.collect(new StreamRecord<>(event, timestamp));
        }
        elementQueueState.remove(timestamp);
    }

    if (sortedTimestamps.isEmpty()) {
        elementQueueState.clear();
    }

    if (!sortedTimestamps.isEmpty()) {
        saveRegisterWatermarkTimer();
    }
}
 
开发者ID:pravega,项目名称:flink-connectors,代码行数:28,代码来源:EventTimeOrderingOperator.java


示例2: fireTimer

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void fireTimer(InternalTimer<?, TimerData> timer) {
  doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
      KeyedWorkItems.<K, InputT>timersWorkItem(
          (K) keyedStateInternals.getKey(),
          Collections.singletonList(timer.getNamespace()))));
}
 
开发者ID:apache,项目名称:beam,代码行数:8,代码来源:WindowDoFnOperator.java


示例3: fireTimer

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
  doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
      KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem(
          (String) keyedStateInternals.getKey(),
          Collections.singletonList(timer.getNamespace()))));
}
 
开发者ID:apache,项目名称:beam,代码行数:8,代码来源:SplittableDoFnOperator.java


示例4: fireTimer

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
public void fireTimer(InternalTimer<?, TimerData> timer) {
  TimerInternals.TimerData timerData = timer.getNamespace();
  StateNamespace namespace = timerData.getNamespace();
  // This is a user timer, so namespace must be WindowNamespace
  checkArgument(namespace instanceof WindowNamespace);
  BoundedWindow window = ((WindowNamespace) namespace).getWindow();
  pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
      timerData.getTimestamp(), timerData.getDomain());
}
 
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:DoFnOperator.java


示例5: onEventTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
	ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
			timer.getNamespace(),
			LongSerializer.INSTANCE,
			stateDescriptor);

	assertEquals(state.value(), timer.getNamespace());
	getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:StatefulJobSavepointFrom12MigrationITCase.java


示例6: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
	ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
			timer.getNamespace(),
			LongSerializer.INSTANCE,
			stateDescriptor);

	assertEquals(state.value(), timer.getNamespace());
	getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:StatefulJobSavepointFrom12MigrationITCase.java


示例7: onEventTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {

	// 1) get the queue of pending elements for the key and the corresponding NFA,
	// 2) process the pending elements in event time order and custom comparator if exists
	//		by feeding them in the NFA
	// 3) advance the time to the current watermark, so that expired patterns are discarded.
	// 4) update the stored state for the key, by only storing the new NFA and MapState iff they
	//		have state to be used later.
	// 5) update the last seen watermark.

	// STEP 1
	PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
	NFA<IN> nfa = getNFA();

	// STEP 2
	while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
		long timestamp = sortedTimestamps.poll();
		sort(elementQueueState.get(timestamp)).forEachOrdered(
			event -> processEvent(nfa, event, timestamp)
		);
		elementQueueState.remove(timestamp);
	}

	// STEP 3
	advanceTime(nfa, timerService.currentWatermark());

	// STEP 4
	if (sortedTimestamps.isEmpty()) {
		elementQueueState.clear();
	}
	updateNFA(nfa);

	if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
		saveRegisterWatermarkTimer();
	}

	// STEP 5
	updateLastSeenWatermark(timerService.currentWatermark());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:41,代码来源:AbstractKeyedCEPPatternOperator.java


示例8: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
	// 1) get the queue of pending elements for the key and the corresponding NFA,
	// 2) process the pending elements in process time order and custom comparator if exists
	//		by feeding them in the NFA
	// 3) update the stored state for the key, by only storing the new NFA and MapState iff they
	//		have state to be used later.

	// STEP 1
	PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
	NFA<IN> nfa = getNFA();

	// STEP 2
	while (!sortedTimestamps.isEmpty()) {
		long timestamp = sortedTimestamps.poll();
		sort(elementQueueState.get(timestamp)).forEachOrdered(
			event -> processEvent(nfa, event, timestamp)
		);
		elementQueueState.remove(timestamp);
	}

	// STEP 3
	if (sortedTimestamps.isEmpty()) {
		elementQueueState.clear();
	}
	updateNFA(nfa);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:AbstractKeyedCEPPatternOperator.java


示例9: onEventTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
	collector.setAbsoluteTimestamp(timer.getTimestamp());
	onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
	onTimerContext.timer = timer;
	userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
	onTimerContext.timeDomain = null;
	onTimerContext.timer = null;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:KeyedCoProcessOperator.java


示例10: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
	collector.eraseTimestamp();
	onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
	onTimerContext.timer = timer;
	userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
	onTimerContext.timeDomain = null;
	onTimerContext.timer = null;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:KeyedCoProcessOperator.java


示例11: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
	collector.setAbsoluteTimestamp(timer.getTimestamp());
	onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
	onTimerContext.timer = timer;
	userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
	onTimerContext.timeDomain = null;
	onTimerContext.timer = null;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:CoStreamTimelyFlatMap.java


示例12: onEventTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
    // not supported yet
}
 
开发者ID:phil3k3,项目名称:flink-esper,代码行数:5,代码来源:SelectEsperStreamOperator.java


示例13: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
    EPServiceProvider epServiceProvider = getServiceProvider(this.hashCode() + "");
    epServiceProvider.getEPRuntime().sendEvent(new CurrentTimeSpanEvent(internalTimer.getTimestamp()));
    this.engineState.update(epServiceProvider);
}
 
开发者ID:phil3k3,项目名称:flink-esper,代码行数:7,代码来源:SelectEsperStreamOperator.java


示例14: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
}
 
开发者ID:pravega,项目名称:flink-connectors,代码行数:4,代码来源:EventTimeOrderingOperator.java


示例15: onEventTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
  // We don't have to cal checkInvokeStartBundle() because it's already called in
  // processWatermark*().
  fireTimer(timer);
}
 
开发者ID:apache,项目名称:beam,代码行数:7,代码来源:DoFnOperator.java


示例16: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
  checkInvokeStartBundle();
  fireTimer(timer);
}
 
开发者ID:apache,项目名称:beam,代码行数:6,代码来源:DoFnOperator.java


示例17: onEventTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {

	triggerContext.key = timer.getKey();
	triggerContext.window = timer.getNamespace();
	evictorContext.key = timer.getKey();
	evictorContext.window = timer.getNamespace();

	MergingWindowSet<W> mergingWindows = null;

	if (windowAssigner instanceof MergingWindowAssigner) {
		mergingWindows = getMergingWindowSet();
		W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
		if (stateWindow == null) {
			// Timer firing for non-existent window, this can only happen if a
			// trigger did not clean up timers. We have already cleared the merging
			// window and therefore the Trigger state, however, so nothing to do.
			return;
		} else {
			evictingWindowState.setCurrentNamespace(stateWindow);
		}
	} else {
		evictingWindowState.setCurrentNamespace(triggerContext.window);
	}

	Iterable<StreamRecord<IN>> contents = evictingWindowState.get();

	if (contents != null) {
		TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
		if (triggerResult.isFire()) {
			emitWindowContents(triggerContext.window, contents, evictingWindowState);
		}
		if (triggerResult.isPurge()) {
			evictingWindowState.clear();
		}
	}

	if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
		clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
	}

	if (mergingWindows != null) {
		// need to make sure to update the merging state in state
		mergingWindows.persist();
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:47,代码来源:EvictingWindowOperator.java


示例18: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
	triggerContext.key = timer.getKey();
	triggerContext.window = timer.getNamespace();
	evictorContext.key = timer.getKey();
	evictorContext.window = timer.getNamespace();

	MergingWindowSet<W> mergingWindows = null;

	if (windowAssigner instanceof MergingWindowAssigner) {
		mergingWindows = getMergingWindowSet();
		W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
		if (stateWindow == null) {
			// Timer firing for non-existent window, this can only happen if a
			// trigger did not clean up timers. We have already cleared the merging
			// window and therefore the Trigger state, however, so nothing to do.
			return;
		} else {
			evictingWindowState.setCurrentNamespace(stateWindow);
		}
	} else {
		evictingWindowState.setCurrentNamespace(triggerContext.window);
	}

	Iterable<StreamRecord<IN>> contents = evictingWindowState.get();

	if (contents != null) {
		TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
		if (triggerResult.isFire()) {
			emitWindowContents(triggerContext.window, contents, evictingWindowState);
		}
		if (triggerResult.isPurge()) {
			evictingWindowState.clear();
		}
	}

	if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
		clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
	}

	if (mergingWindows != null) {
		// need to make sure to update the merging state in state
		mergingWindows.persist();
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:46,代码来源:EvictingWindowOperator.java


示例19: onEventTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
	triggerContext.key = timer.getKey();
	triggerContext.window = timer.getNamespace();

	MergingWindowSet<W> mergingWindows;

	if (windowAssigner instanceof MergingWindowAssigner) {
		mergingWindows = getMergingWindowSet();
		W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
		if (stateWindow == null) {
			// Timer firing for non-existent window, this can only happen if a
			// trigger did not clean up timers. We have already cleared the merging
			// window and therefore the Trigger state, however, so nothing to do.
			return;
		} else {
			windowState.setCurrentNamespace(stateWindow);
		}
	} else {
		windowState.setCurrentNamespace(triggerContext.window);
		mergingWindows = null;
	}

	ACC contents = null;
	if (windowState != null) {
		contents = windowState.get();
	}

	if (contents != null) {
		TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
		if (triggerResult.isFire()) {
			emitWindowContents(triggerContext.window, contents);
		}
		if (triggerResult.isPurge()) {
			windowState.clear();
		}
	}

	if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
		clearAllState(triggerContext.window, windowState, mergingWindows);
	}

	if (mergingWindows != null) {
		// need to make sure to update the merging state in state
		mergingWindows.persist();
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:48,代码来源:WindowOperator.java


示例20: onProcessingTime

import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
	triggerContext.key = timer.getKey();
	triggerContext.window = timer.getNamespace();

	MergingWindowSet<W> mergingWindows;

	if (windowAssigner instanceof MergingWindowAssigner) {
		mergingWindows = getMergingWindowSet();
		W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
		if (stateWindow == null) {
			// Timer firing for non-existent window, this can only happen if a
			// trigger did not clean up timers. We have already cleared the merging
			// window and therefore the Trigger state, however, so nothing to do.
			return;
		} else {
			windowState.setCurrentNamespace(stateWindow);
		}
	} else {
		windowState.setCurrentNamespace(triggerContext.window);
		mergingWindows = null;
	}

	ACC contents = null;
	if (windowState != null) {
		contents = windowState.get();
	}

	if (contents != null) {
		TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
		if (triggerResult.isFire()) {
			emitWindowContents(triggerContext.window, contents);
		}
		if (triggerResult.isPurge()) {
			windowState.clear();
		}
	}

	if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
		clearAllState(triggerContext.window, windowState, mergingWindows);
	}

	if (mergingWindows != null) {
		// need to make sure to update the merging state in state
		mergingWindows.persist();
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:48,代码来源:WindowOperator.java



注:本文中的org.apache.flink.streaming.api.operators.InternalTimer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java MetaDataRoleAuthorizationStrategy类代码示例发布时间:2022-05-22
下一篇:
Java ClasspathToolkit类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap