本文整理汇总了Java中org.apache.beam.sdk.transforms.windowing.AfterProcessingTime类的典型用法代码示例。如果您正苦于以下问题:Java AfterProcessingTime类的具体用法?Java AfterProcessingTime怎么用?Java AfterProcessingTime使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
AfterProcessingTime类属于org.apache.beam.sdk.transforms.windowing包,在下文中一共展示了AfterProcessingTime类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(KafkaIO.<String, String>read()
.withBootstrapServers(options.getKafkaBootstrapServer())
.withTopic(options.getTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTimestampFn(new SetTimestampFn()))
.apply("Values", ParDo.of(new ValuesFn()))
.apply("FixedWindows", Window.<String>into(FixedWindows.of(FIVE_MINUTES))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(TEN_MINUTES)
.accumulatingFiredPanes())
.apply("TeamScore", new CalculateTeamScores(options.getOutputPrefix()));
pipeline.run();
}
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:28,代码来源:LeaderBoard.java
示例2: expand
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
return infos.apply("LeaderboardTeamFixedWindows",
Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
// We will get early (speculative) results as well as cumulative
// processing of late data.
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(allowedLateness)
.accumulatingFiredPanes())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"));
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:LeaderBoard.java
示例3: testProcessingTimeTrigger
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testProcessingTimeTrigger() {
TestStream<Long> source = TestStream.create(VarLongCoder.of())
.addElements(TimestampedValue.of(1L, new Instant(1000L)),
TimestampedValue.of(2L, new Instant(2000L)))
.advanceProcessingTime(Duration.standardMinutes(12))
.addElements(TimestampedValue.of(3L, new Instant(3000L)))
.advanceProcessingTime(Duration.standardMinutes(6))
.advanceWatermarkToInfinity();
PCollection<Long> sum = p.apply(source)
.apply(Window.<Long>configure().triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(Sum.longsGlobally());
PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:TestStreamTest.java
示例4: testTriggeredDistinct
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testTriggeredDistinct() {
Instant base = new Instant(0);
TestStream<String> values = TestStream.create(StringUtf8Coder.of())
.advanceWatermarkTo(base)
.addElements(
TimestampedValue.of("k1", base),
TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))))
.advanceProcessingTime(Duration.standardMinutes(1))
.addElements(
TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))))
.advanceWatermarkToInfinity();
PCollection<String> distinctValues = triggeredDistinctPipeline
.apply(values)
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(30))))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply(Distinct.<String>create());
PAssert.that(distinctValues).containsInAnyOrder("k1", "k2", "k3");
triggeredDistinctPipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:30,代码来源:DistinctTest.java
示例5: testTriggeredDistinctRepresentativeValues
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testTriggeredDistinctRepresentativeValues() {
Instant base = new Instant(0);
TestStream<KV<Integer, String>> values = TestStream.create(
KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
.advanceWatermarkTo(base)
.addElements(
TimestampedValue.of(KV.of(1, "k1"), base),
TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))),
TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20))))
.advanceProcessingTime(Duration.standardMinutes(1))
.addElements(
TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))),
TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))),
TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50))))
.advanceWatermarkToInfinity();
PCollection<KV<Integer, String>> distinctValues = triggeredDistinctRepresentativePipeline
.apply(values)
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(30))))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply(Distinct.withRepresentativeValueFn(new Keys<Integer>())
.withRepresentativeType(TypeDescriptor.of(Integer.class)));
PAssert.that(distinctValues).containsInAnyOrder(
KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3"));
triggeredDistinctRepresentativePipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:35,代码来源:DistinctTest.java
示例6: testCombiningAccumulatingProcessingTime
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* Tests that when a processing time timers comes in after a window is expired it does not cause a
* spurious output.
*/
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testCombiningAccumulatingProcessingTime() throws Exception {
PCollection<Integer> triggeredSums =
p.apply(
TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(new Instant(0))
.addElements(
TimestampedValue.of(2, new Instant(2)),
TimestampedValue.of(5, new Instant(5)))
.advanceWatermarkTo(new Instant(100))
.advanceProcessingTime(Duration.millis(10))
.advanceWatermarkToInfinity())
.apply(
Window.<Integer>into(FixedWindows.of(Duration.millis(100)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.millis(10)))))
.apply(Sum.integersGlobally().withoutDefaults());
PAssert.that(triggeredSums)
.containsInAnyOrder(7);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:34,代码来源:GroupByKeyTest.java
示例7: expand
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Override
public PDone expand(PCollection<PubsubMessage> input) {
input
.apply(
"PubsubUnboundedSink.Window",
Window.<PubsubMessage>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(publishBatchSize),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency))))
.discardingFiredPanes())
.apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod)))
.setCoder(KvCoder.of(VarIntCoder.of(), CODER))
.apply(GroupByKey.<Integer, OutgoingMessage>create())
.apply(
"PubsubUnboundedSink.Writer",
ParDo.of(
new WriterFn(
pubsubFactory,
topic,
timestampAttribute,
idAttribute,
publishBatchSize,
publishBatchBytes)));
return PDone.in(input.getPipeline());
}
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:PubsubUnboundedSink.java
示例8: convertSpecific
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
RunnerApi.Trigger.AfterProcessingTime.Builder builder =
RunnerApi.Trigger.AfterProcessingTime.newBuilder();
for (TimestampTransform transform : v.getTimestampTransforms()) {
builder.addTimestampTransforms(convertTimestampTransform(transform));
}
return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
}
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:TriggerTranslation.java
示例9: data
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Parameters(name = "{index}: {0}")
public static Iterable<PCollection<?>> data() {
Pipeline pipeline = TestPipeline.create();
PCollection<Integer> ints = pipeline.apply("ints", Create.of(1, 2, 3));
PCollection<Long> longs = pipeline.apply("unbounded longs", GenerateSequence.from(0));
PCollection<Long> windowedLongs =
longs.apply(
"into fixed windows",
Window.<Long>into(FixedWindows.of(Duration.standardMinutes(10L))));
PCollection<KV<String, Iterable<String>>> groupedStrings =
pipeline
.apply(
"kvs", Create.of(KV.of("foo", "spam"), KV.of("bar", "ham"), KV.of("baz", "eggs")))
.apply("group", GroupByKey.<String, String>create());
PCollection<Long> coderLongs =
pipeline
.apply("counts with alternative coder", GenerateSequence.from(0).to(10))
.setCoder(BigEndianLongCoder.of());
PCollection<Integer> allCustomInts =
pipeline
.apply(
"intsWithCustomCoder",
Create.of(1, 2)
.withCoder(new AutoValue_PCollectionTranslationTest_CustomIntCoder()))
.apply(
"into custom windows",
Window.<Integer>into(new CustomWindows())
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterFirst.of(
AfterPane.elementCountAtLeast(5),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.millis(227L)))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(12L)));
return ImmutableList.<PCollection<?>>of(ints, longs, windowedLongs, coderLongs, groupedStrings);
}
开发者ID:apache,项目名称:beam,代码行数:39,代码来源:PCollectionTranslationTest.java
示例10: testProcessingTimeTimerDoesNotGc
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* Tests that a processing time timer does not cause window GC.
*/
@Test
public void testProcessingTimeTimerDoesNotGc() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.ZERO)
.withTrigger(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceProcessingTime(new Instant(5000));
injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
injectElement(tester, 5);
tester.advanceProcessingTime(new Instant(10000));
tester.assertHasOnlyGlobalAndStateFor(
new IntervalWindow(new Instant(0), new Instant(100)));
assertThat(
tester.extractOutput(),
contains(
isSingleWindowedValue(
equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, Timing.EARLY, 0, 0))));
}
开发者ID:apache,项目名称:beam,代码行数:33,代码来源:ReduceFnRunnerTest.java
示例11: testLateProcessingTimeTimer
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* Tests that when a processing time timer comes in after a window is expired
* it is just ignored.
*/
@Test
public void testLateProcessingTimeTimer() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.ZERO)
.withTrigger(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceProcessingTime(new Instant(5000));
injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
injectElement(tester, 5);
// After this advancement, the window is expired and only the GC process
// should be allowed to touch it
tester.advanceInputWatermarkNoTimers(new Instant(100));
// This should not output
tester.advanceProcessingTime(new Instant(6000));
assertThat(tester.extractOutput(), emptyIterable());
}
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:ReduceFnRunnerTest.java
示例12: testCombiningAccumulatingProcessingTime
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* Tests that when a processing time timer comes in after a window is expired
* but in the same bundle it does not cause a spurious output.
*/
@Test
public void testCombiningAccumulatingProcessingTime() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.ZERO)
.withTrigger(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceProcessingTime(new Instant(5000));
injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
injectElement(tester, 5);
tester.advanceInputWatermarkNoTimers(new Instant(100));
tester.advanceProcessingTimeNoTimers(new Instant(5010));
// Fires the GC/EOW timer at the same time as the processing time timer.
tester.fireTimers(
new IntervalWindow(new Instant(0), new Instant(100)),
TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)),
TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010)));
assertThat(
tester.extractOutput(),
contains(
isSingleWindowedValue(
equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
}
开发者ID:apache,项目名称:beam,代码行数:38,代码来源:ReduceFnRunnerTest.java
示例13: testCombiningAccumulatingProcessingTimeSeparateBundles
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* Tests that when a processing time timers comes in after a window is expired
* and GC'd it does not cause a spurious output.
*/
@Test
public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.ZERO)
.withTrigger(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceProcessingTime(new Instant(5000));
injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
injectElement(tester, 5);
tester.advanceInputWatermark(new Instant(100));
tester.advanceProcessingTime(new Instant(5011));
assertThat(
tester.extractOutput(),
contains(
isSingleWindowedValue(
equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
}
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:ReduceFnRunnerTest.java
示例14: fireEmptyOnDrainInGlobalWindowIfRequested
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* We should fire an empty ON_TIME pane in the GlobalWindow when the watermark moves to
* end-of-time.
*/
@Test
public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester =
ReduceFnTester.nonCombining(
WindowingStrategy.of(new GlobalWindows())
.withTrigger(Repeatedly.<GlobalWindow>forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
new Duration(3))))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES));
final int n = 20;
for (int i = 0; i < n; i++) {
tester.advanceProcessingTime(new Instant(i));
tester.injectElements(TimestampedValue.of(i, new Instant(i)));
}
tester.advanceProcessingTime(new Instant(n + 4));
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertEquals((n + 3) / 4, output.size());
for (int i = 0; i < output.size(); i++) {
assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
assertEquals(i, output.get(i).getPane().getIndex());
assertEquals(4, Iterables.size(output.get(i).getValue()));
}
tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
output = tester.extractOutput();
assertEquals(1, output.size());
assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
assertEquals((n + 3) / 4, output.get(0).getPane().getIndex());
assertEquals(0, Iterables.size(output.get(0).getValue()));
}
开发者ID:apache,项目名称:beam,代码行数:37,代码来源:ReduceFnRunnerTest.java
示例15: getPipeline
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* Get the beam pipeline
* @return the pipeline
*/
public Pipeline getPipeline() {
LOGGER.debug("Building Beam Pipeline");
final PipelineOptions options = PipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(options);
// Add membership triples
p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_LDP_MEMBERSHIP_ADD).withoutMetadata())
.apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferMembership.getIRIString(), true)))
.apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE));
// Delete membership triples
p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_LDP_MEMBERSHIP_DELETE).withoutMetadata())
.apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferMembership.getIRIString(), false)))
.apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE));
// Add containment triples
p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_LDP_CONTAINMENT_ADD).withoutMetadata())
.apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), true)))
.apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE));
// Delete containment triples
p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_LDP_CONTAINMENT_DELETE).withoutMetadata())
.apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), false)))
.apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE));
if (aggregateSeconds > 0) {
// Aggregate cache writes
p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE).withoutMetadata())
.apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(aggregateSeconds)))
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(aggregateSeconds)))
.discardingFiredPanes().withAllowedLateness(Duration.ZERO))
.apply(Combine.perKey(x -> x.iterator().next()))
.apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE));
} else {
// Skip aggregation
p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE).withoutMetadata())
.apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE));
}
// Write to cache and dispatch to the event bus
p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_CACHE).withoutMetadata())
.apply(ParDo.of(new CacheWriter(dataConfiguration)))
.apply(ParDo.of(new EventProcessor(baseUrlConfiguration)))
.apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_EVENT));
return p;
}
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:54,代码来源:FileProcessingPipeline.java
示例16: createTriggerWithDelay
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
}
开发者ID:apache,项目名称:beam,代码行数:5,代码来源:BeamAggregationRule.java
示例17: data
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Parameters(name = "{index}: {0}")
public static Iterable<ToProtoAndBackSpec> data() {
return ImmutableList.of(
// Atomic triggers
toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
toProtoAndBackSpec(Never.ever()),
toProtoAndBackSpec(DefaultTrigger.of()),
toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
toProtoAndBackSpec(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
toProtoAndBackSpec(
AfterProcessingTime.pastFirstElementInPane()
.alignedTo(Duration.millis(5), new Instant(27))),
toProtoAndBackSpec(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(3))
.alignedTo(Duration.millis(5), new Instant(27))
.plusDelayOf(Duration.millis(13))),
// Composite triggers
toProtoAndBackSpec(
AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())),
toProtoAndBackSpec(
AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))),
toProtoAndBackSpec(
AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))),
toProtoAndBackSpec(
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
toProtoAndBackSpec(
AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
toProtoAndBackSpec(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
.withLateFirings(AfterPane.elementCountAtLeast(3))),
toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
toProtoAndBackSpec(
Repeatedly.forever(AfterPane.elementCountAtLeast(1))
.orFinally(AfterWatermark.pastEndOfWindow())));
}
开发者ID:apache,项目名称:beam,代码行数:44,代码来源:TriggerTranslationTest.java
示例18: testNoWatermarkTriggerNoHold
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* If the trigger does not care about the watermark, the ReduceFnRunner should still emit an
* element for the ON_TIME pane.
*/
@Test
public void testNoWatermarkTriggerNoHold() throws Exception {
Duration allowedLateness = Duration.standardDays(1);
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))))
.withAllowedLateness(allowedLateness));
// First, an element comes in on time in [0, 10) but ReduceFnRunner should
// not set a hold or timer for 9. That is the trigger's job.
IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10));
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
tester.injectElements(TimestampedValue.of(1, new Instant(1)));
// Since some data arrived, the element hold will be the end of the window.
assertThat(tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp()));
tester.advanceProcessingTime(new Instant(6000));
// Sanity check; we aren't trying to verify output in this test
assertThat(tester.getOutputSize(), equalTo(1));
// Since we did not request empty final panes, no hold
assertThat(tester.getWatermarkHold(), nullValue());
// So when the input watermark advanced, the output advances with it (automated by tester)
tester.advanceInputWatermark(
new Instant(expectedWindow.maxTimestamp().plus(Duration.standardHours(1))));
// Now late data arrives
tester.injectElements(TimestampedValue.of(3, new Instant(3)));
// The ReduceFnRunner should set a GC hold since the element was too late and its timestamp
// will be ignored for the purposes of the watermark hold
assertThat(
tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness)));
}
开发者ID:apache,项目名称:beam,代码行数:48,代码来源:ReduceFnRunnerTest.java
示例19: testEmptyOnTimeFromOrFinally
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* Test that we receive an empty on-time pane when an or-finally waiting for the watermark fires.
* Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
* when the on-time pane is empty.
*/
@Test
public void testEmptyOnTimeFromOrFinally() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withTrigger(
AfterEach.<IntervalWindow>inOrder(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(new Duration(5)))
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(new Duration(25)))))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100));
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
// Processing time timer for 5
tester.injectElements(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(1, new Instant(3)),
TimestampedValue.of(1, new Instant(7)),
TimestampedValue.of(1, new Instant(5)));
// Should fire early pane
tester.advanceProcessingTime(new Instant(6));
// Should fire empty on time pane
tester.advanceInputWatermark(new Instant(11));
List<WindowedValue<Integer>> output = tester.extractOutput();
assertEquals(2, output.size());
assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));
assertThat(
output.get(0),
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
assertThat(
output.get(1),
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0)));
}
开发者ID:apache,项目名称:beam,代码行数:55,代码来源:ReduceFnRunnerTest.java
示例20: testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
* Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY.
*/
@Test
public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withTrigger(
AfterEach.<IntervalWindow>inOrder(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(new Duration(5)))
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(new Duration(25)))))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY);
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
// Processing time timer for 5
tester.injectElements(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(1, new Instant(3)),
TimestampedValue.of(1, new Instant(7)),
TimestampedValue.of(1, new Instant(5)));
// Should fire early pane
tester.advanceProcessingTime(new Instant(6));
// Should not fire empty on time pane
tester.advanceInputWatermark(new Instant(11));
// Should fire final GC pane
tester.advanceInputWatermark(new Instant(10 + 100));
List<WindowedValue<Integer>> output = tester.extractOutput();
assertEquals(2, output.size());
assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));
assertThat(
output.get(0),
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
assertThat(
output.get(1),
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 1, 0)));
}
开发者ID:apache,项目名称:beam,代码行数:58,代码来源:ReduceFnRunnerTest.java
注:本文中的org.apache.beam.sdk.transforms.windowing.AfterProcessingTime类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论