本文整理汇总了Java中org.apache.flink.streaming.api.operators.InternalTimer类的典型用法代码示例。如果您正苦于以下问题:Java InternalTimer类的具体用法?Java InternalTimer怎么用?Java InternalTimer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
InternalTimer类属于org.apache.flink.streaming.api.operators包,在下文中一共展示了InternalTimer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: onEventTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
/**
* Occurs when an event-time timer fires due to watermark progression.
*
* @param timer the timer details.
*/
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
long currentWatermark = internalTimerService.currentWatermark();
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= currentWatermark) {
long timestamp = sortedTimestamps.poll();
for (T event : elementQueueState.get(timestamp)) {
output.collect(new StreamRecord<>(event, timestamp));
}
elementQueueState.remove(timestamp);
}
if (sortedTimestamps.isEmpty()) {
elementQueueState.clear();
}
if (!sortedTimestamps.isEmpty()) {
saveRegisterWatermarkTimer();
}
}
开发者ID:pravega,项目名称:flink-connectors,代码行数:28,代码来源:EventTimeOrderingOperator.java
示例2: fireTimer
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void fireTimer(InternalTimer<?, TimerData> timer) {
doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<K, InputT>timersWorkItem(
(K) keyedStateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
}
开发者ID:apache,项目名称:beam,代码行数:8,代码来源:WindowDoFnOperator.java
示例3: fireTimer
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem(
(String) keyedStateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
}
开发者ID:apache,项目名称:beam,代码行数:8,代码来源:SplittableDoFnOperator.java
示例4: fireTimer
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
public void fireTimer(InternalTimer<?, TimerData> timer) {
TimerInternals.TimerData timerData = timer.getNamespace();
StateNamespace namespace = timerData.getNamespace();
// This is a user timer, so namespace must be WindowNamespace
checkArgument(namespace instanceof WindowNamespace);
BoundedWindow window = ((WindowNamespace) namespace).getWindow();
pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
timerData.getTimestamp(), timerData.getDomain());
}
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:DoFnOperator.java
示例5: onEventTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
timer.getNamespace(),
LongSerializer.INSTANCE,
stateDescriptor);
assertEquals(state.value(), timer.getNamespace());
getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1);
}
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:StatefulJobSavepointFrom12MigrationITCase.java
示例6: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
timer.getNamespace(),
LongSerializer.INSTANCE,
stateDescriptor);
assertEquals(state.value(), timer.getNamespace());
getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1);
}
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:StatefulJobSavepointFrom12MigrationITCase.java
示例7: onEventTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in event time order and custom comparator if exists
// by feeding them in the NFA
// 3) advance the time to the current watermark, so that expired patterns are discarded.
// 4) update the stored state for the key, by only storing the new NFA and MapState iff they
// have state to be used later.
// 5) update the last seen watermark.
// STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFA<IN> nfa = getNFA();
// STEP 2
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
long timestamp = sortedTimestamps.poll();
sort(elementQueueState.get(timestamp)).forEachOrdered(
event -> processEvent(nfa, event, timestamp)
);
elementQueueState.remove(timestamp);
}
// STEP 3
advanceTime(nfa, timerService.currentWatermark());
// STEP 4
if (sortedTimestamps.isEmpty()) {
elementQueueState.clear();
}
updateNFA(nfa);
if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
saveRegisterWatermarkTimer();
}
// STEP 5
updateLastSeenWatermark(timerService.currentWatermark());
}
开发者ID:axbaretto,项目名称:flink,代码行数:41,代码来源:AbstractKeyedCEPPatternOperator.java
示例8: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in process time order and custom comparator if exists
// by feeding them in the NFA
// 3) update the stored state for the key, by only storing the new NFA and MapState iff they
// have state to be used later.
// STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFA<IN> nfa = getNFA();
// STEP 2
while (!sortedTimestamps.isEmpty()) {
long timestamp = sortedTimestamps.poll();
sort(elementQueueState.get(timestamp)).forEachOrdered(
event -> processEvent(nfa, event, timestamp)
);
elementQueueState.remove(timestamp);
}
// STEP 3
if (sortedTimestamps.isEmpty()) {
elementQueueState.clear();
}
updateNFA(nfa);
}
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:AbstractKeyedCEPPatternOperator.java
示例9: onEventTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:KeyedCoProcessOperator.java
示例10: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:KeyedCoProcessOperator.java
示例11: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:CoStreamTimelyFlatMap.java
示例12: onEventTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
// not supported yet
}
开发者ID:phil3k3,项目名称:flink-esper,代码行数:5,代码来源:SelectEsperStreamOperator.java
示例13: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
EPServiceProvider epServiceProvider = getServiceProvider(this.hashCode() + "");
epServiceProvider.getEPRuntime().sendEvent(new CurrentTimeSpanEvent(internalTimer.getTimestamp()));
this.engineState.update(epServiceProvider);
}
开发者ID:phil3k3,项目名称:flink-esper,代码行数:7,代码来源:SelectEsperStreamOperator.java
示例14: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
}
开发者ID:pravega,项目名称:flink-connectors,代码行数:4,代码来源:EventTimeOrderingOperator.java
示例15: onEventTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
// We don't have to cal checkInvokeStartBundle() because it's already called in
// processWatermark*().
fireTimer(timer);
}
开发者ID:apache,项目名称:beam,代码行数:7,代码来源:DoFnOperator.java
示例16: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
checkInvokeStartBundle();
fireTimer(timer);
}
开发者ID:apache,项目名称:beam,代码行数:6,代码来源:DoFnOperator.java
示例17: onEventTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
evictingWindowState.setCurrentNamespace(stateWindow);
}
} else {
evictingWindowState.setCurrentNamespace(triggerContext.window);
}
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents != null) {
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
}
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:47,代码来源:EvictingWindowOperator.java
示例18: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
evictingWindowState.setCurrentNamespace(stateWindow);
}
} else {
evictingWindowState.setCurrentNamespace(triggerContext.window);
}
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents != null) {
TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
}
if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:46,代码来源:EvictingWindowOperator.java
示例19: onEventTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
windowState.setCurrentNamespace(stateWindow);
}
} else {
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
ACC contents = null;
if (windowState != null) {
contents = windowState.get();
}
if (contents != null) {
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:48,代码来源:WindowOperator.java
示例20: onProcessingTime
import org.apache.flink.streaming.api.operators.InternalTimer; //导入依赖的package包/类
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
windowState.setCurrentNamespace(stateWindow);
}
} else {
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
ACC contents = null;
if (windowState != null) {
contents = windowState.get();
}
if (contents != null) {
TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:48,代码来源:WindowOperator.java
注:本文中的org.apache.flink.streaming.api.operators.InternalTimer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论