• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java AfterProcessingTime类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.transforms.windowing.AfterProcessingTime的典型用法代码示例。如果您正苦于以下问题:Java AfterProcessingTime类的具体用法?Java AfterProcessingTime怎么用?Java AfterProcessingTime使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



AfterProcessingTime类属于org.apache.beam.sdk.transforms.windowing包,在下文中一共展示了AfterProcessingTime类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    pipeline
    .apply(KafkaIO.<String, String>read()
        .withBootstrapServers(options.getKafkaBootstrapServer())
        .withTopic(options.getTopic())
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .withTimestampFn(new SetTimestampFn()))
    .apply("Values", ParDo.of(new ValuesFn()))

    .apply("FixedWindows", Window.<String>into(FixedWindows.of(FIVE_MINUTES))
        .triggering(AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TWO_MINUTES))
            .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(TEN_MINUTES)
        .accumulatingFiredPanes())

    .apply("TeamScore", new CalculateTeamScores(options.getOutputPrefix()));

    pipeline.run();
  }
 
开发者ID:davorbonaci,项目名称:beam-portability-demo,代码行数:28,代码来源:LeaderBoard.java


示例2: expand

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
  return infos.apply("LeaderboardTeamFixedWindows",
      Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
          // We will get early (speculative) results as well as cumulative
          // processing of late data.
          .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                  .plusDelayOf(FIVE_MINUTES))
              .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                  .plusDelayOf(TEN_MINUTES)))
          .withAllowedLateness(allowedLateness)
          .accumulatingFiredPanes())
      // Extract and sum teamname/score pairs from the event data.
      .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:LeaderBoard.java


示例3: testProcessingTimeTrigger

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Test
@Category({NeedsRunner.class, UsesTestStream.class})
public void testProcessingTimeTrigger() {
  TestStream<Long> source = TestStream.create(VarLongCoder.of())
      .addElements(TimestampedValue.of(1L, new Instant(1000L)),
          TimestampedValue.of(2L, new Instant(2000L)))
      .advanceProcessingTime(Duration.standardMinutes(12))
      .addElements(TimestampedValue.of(3L, new Instant(3000L)))
      .advanceProcessingTime(Duration.standardMinutes(6))
      .advanceWatermarkToInfinity();

  PCollection<Long> sum = p.apply(source)
      .apply(Window.<Long>configure().triggering(AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
              .plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes()
          .withAllowedLateness(Duration.ZERO))
      .apply(Sum.longsGlobally());

  PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L);

  p.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:TestStreamTest.java


示例4: testTriggeredDistinct

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testTriggeredDistinct() {
  Instant base = new Instant(0);
  TestStream<String> values = TestStream.create(StringUtf8Coder.of())
      .advanceWatermarkTo(base)
      .addElements(
          TimestampedValue.of("k1", base),
          TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))),
          TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))))
      .advanceProcessingTime(Duration.standardMinutes(1))
      .addElements(
          TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))),
          TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))),
          TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))))
      .advanceWatermarkToInfinity();

  PCollection<String> distinctValues = triggeredDistinctPipeline
      .apply(values)
      .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
          .triggering(Repeatedly.forever(
              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                  Duration.standardSeconds(30))))
          .withAllowedLateness(Duration.ZERO)
          .accumulatingFiredPanes())
      .apply(Distinct.<String>create());
  PAssert.that(distinctValues).containsInAnyOrder("k1", "k2", "k3");
  triggeredDistinctPipeline.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:30,代码来源:DistinctTest.java


示例5: testTriggeredDistinctRepresentativeValues

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testTriggeredDistinctRepresentativeValues() {
  Instant base = new Instant(0);
  TestStream<KV<Integer, String>> values = TestStream.create(
      KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
      .advanceWatermarkTo(base)
      .addElements(
          TimestampedValue.of(KV.of(1, "k1"), base),
          TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))),
          TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20))))
      .advanceProcessingTime(Duration.standardMinutes(1))
      .addElements(
          TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))),
          TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))),
          TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50))))
      .advanceWatermarkToInfinity();

  PCollection<KV<Integer, String>> distinctValues = triggeredDistinctRepresentativePipeline
      .apply(values)
      .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(1)))
          .triggering(Repeatedly.forever(
              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                  Duration.standardSeconds(30))))
          .withAllowedLateness(Duration.ZERO)
          .accumulatingFiredPanes())
      .apply(Distinct.withRepresentativeValueFn(new Keys<Integer>())
          .withRepresentativeType(TypeDescriptor.of(Integer.class)));


  PAssert.that(distinctValues).containsInAnyOrder(
      KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3"));
  triggeredDistinctRepresentativePipeline.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:35,代码来源:DistinctTest.java


示例6: testCombiningAccumulatingProcessingTime

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * Tests that when a processing time timers comes in after a window is expired it does not cause a
 * spurious output.
 */
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testCombiningAccumulatingProcessingTime() throws Exception {
  PCollection<Integer> triggeredSums =
      p.apply(
              TestStream.create(VarIntCoder.of())
                  .advanceWatermarkTo(new Instant(0))
                  .addElements(
                      TimestampedValue.of(2, new Instant(2)),
                      TimestampedValue.of(5, new Instant(5)))
                  .advanceWatermarkTo(new Instant(100))
                  .advanceProcessingTime(Duration.millis(10))
                  .advanceWatermarkToInfinity())
          .apply(
              Window.<Integer>into(FixedWindows.of(Duration.millis(100)))
                  .withTimestampCombiner(TimestampCombiner.EARLIEST)
                  .accumulatingFiredPanes()
                  .withAllowedLateness(Duration.ZERO)
                  .triggering(
                      Repeatedly.forever(
                          AfterProcessingTime.pastFirstElementInPane()
                              .plusDelayOf(Duration.millis(10)))))
          .apply(Sum.integersGlobally().withoutDefaults());

  PAssert.that(triggeredSums)
      .containsInAnyOrder(7);

  p.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:34,代码来源:GroupByKeyTest.java


示例7: expand

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Override
public PDone expand(PCollection<PubsubMessage> input) {
  input
      .apply(
          "PubsubUnboundedSink.Window",
          Window.<PubsubMessage>into(new GlobalWindows())
              .triggering(
                  Repeatedly.forever(
                      AfterFirst.of(
                          AfterPane.elementCountAtLeast(publishBatchSize),
                          AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency))))
              .discardingFiredPanes())
      .apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod)))
      .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
      .apply(GroupByKey.<Integer, OutgoingMessage>create())
      .apply(
          "PubsubUnboundedSink.Writer",
          ParDo.of(
              new WriterFn(
                  pubsubFactory,
                  topic,
                  timestampAttribute,
                  idAttribute,
                  publishBatchSize,
                  publishBatchBytes)));
  return PDone.in(input.getPipeline());
}
 
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:PubsubUnboundedSink.java


示例8: convertSpecific

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
  RunnerApi.Trigger.AfterProcessingTime.Builder builder =
      RunnerApi.Trigger.AfterProcessingTime.newBuilder();

  for (TimestampTransform transform : v.getTimestampTransforms()) {
    builder.addTimestampTransforms(convertTimestampTransform(transform));
  }

  return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
}
 
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:TriggerTranslation.java


示例9: data

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Parameters(name = "{index}: {0}")
public static Iterable<PCollection<?>> data() {
  Pipeline pipeline = TestPipeline.create();
  PCollection<Integer> ints = pipeline.apply("ints", Create.of(1, 2, 3));
  PCollection<Long> longs = pipeline.apply("unbounded longs", GenerateSequence.from(0));
  PCollection<Long> windowedLongs =
      longs.apply(
          "into fixed windows",
          Window.<Long>into(FixedWindows.of(Duration.standardMinutes(10L))));
  PCollection<KV<String, Iterable<String>>> groupedStrings =
      pipeline
          .apply(
              "kvs", Create.of(KV.of("foo", "spam"), KV.of("bar", "ham"), KV.of("baz", "eggs")))
          .apply("group", GroupByKey.<String, String>create());
  PCollection<Long> coderLongs =
      pipeline
          .apply("counts with alternative coder", GenerateSequence.from(0).to(10))
          .setCoder(BigEndianLongCoder.of());
  PCollection<Integer> allCustomInts =
      pipeline
          .apply(
              "intsWithCustomCoder",
              Create.of(1, 2)
                  .withCoder(new AutoValue_PCollectionTranslationTest_CustomIntCoder()))
          .apply(
              "into custom windows",
              Window.<Integer>into(new CustomWindows())
                  .triggering(
                      AfterWatermark.pastEndOfWindow()
                          .withEarlyFirings(
                              AfterFirst.of(
                                  AfterPane.elementCountAtLeast(5),
                                  AfterProcessingTime.pastFirstElementInPane()
                                      .plusDelayOf(Duration.millis(227L)))))
                  .accumulatingFiredPanes()
                  .withAllowedLateness(Duration.standardMinutes(12L)));
  return ImmutableList.<PCollection<?>>of(ints, longs, windowedLongs, coderLongs, groupedStrings);
}
 
开发者ID:apache,项目名称:beam,代码行数:39,代码来源:PCollectionTranslationTest.java


示例10: testProcessingTimeTimerDoesNotGc

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * Tests that a processing time timer does not cause window GC.
 */
@Test
public void testProcessingTimeTimerDoesNotGc() throws Exception {
  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.ZERO)
          .withTrigger(
              Repeatedly.forever(
                  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());

  tester.advanceProcessingTime(new Instant(5000));
  injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
  injectElement(tester, 5);

  tester.advanceProcessingTime(new Instant(10000));

  tester.assertHasOnlyGlobalAndStateFor(
      new IntervalWindow(new Instant(0), new Instant(100)));

  assertThat(
      tester.extractOutput(),
      contains(
          isSingleWindowedValue(
              equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, Timing.EARLY, 0, 0))));
}
 
开发者ID:apache,项目名称:beam,代码行数:33,代码来源:ReduceFnRunnerTest.java


示例11: testLateProcessingTimeTimer

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * Tests that when a processing time timer comes in after a window is expired
 * it is just ignored.
 */
@Test
public void testLateProcessingTimeTimer() throws Exception {
  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.ZERO)
          .withTrigger(
              Repeatedly.forever(
                  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());

  tester.advanceProcessingTime(new Instant(5000));
  injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
  injectElement(tester, 5);

  // After this advancement, the window is expired and only the GC process
  // should be allowed to touch it
  tester.advanceInputWatermarkNoTimers(new Instant(100));

  // This should not output
  tester.advanceProcessingTime(new Instant(6000));

  assertThat(tester.extractOutput(), emptyIterable());
}
 
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:ReduceFnRunnerTest.java


示例12: testCombiningAccumulatingProcessingTime

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * Tests that when a processing time timer comes in after a window is expired
 * but in the same bundle it does not cause a spurious output.
 */
@Test
public void testCombiningAccumulatingProcessingTime() throws Exception {
  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.ZERO)
          .withTrigger(
              Repeatedly.forever(
                  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());

  tester.advanceProcessingTime(new Instant(5000));
  injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
  injectElement(tester, 5);

  tester.advanceInputWatermarkNoTimers(new Instant(100));
  tester.advanceProcessingTimeNoTimers(new Instant(5010));

  // Fires the GC/EOW timer at the same time as the processing time timer.
  tester.fireTimers(
      new IntervalWindow(new Instant(0), new Instant(100)),
      TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)),
      TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010)));

  assertThat(
      tester.extractOutput(),
      contains(
          isSingleWindowedValue(
              equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
}
 
开发者ID:apache,项目名称:beam,代码行数:38,代码来源:ReduceFnRunnerTest.java


示例13: testCombiningAccumulatingProcessingTimeSeparateBundles

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * Tests that when a processing time timers comes in after a window is expired
 * and GC'd it does not cause a spurious output.
 */
@Test
public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws Exception {
  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.ZERO)
          .withTrigger(
              Repeatedly.forever(
                  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());

  tester.advanceProcessingTime(new Instant(5000));
  injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
  injectElement(tester, 5);

  tester.advanceInputWatermark(new Instant(100));
  tester.advanceProcessingTime(new Instant(5011));

  assertThat(
      tester.extractOutput(),
      contains(
          isSingleWindowedValue(
              equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
}
 
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:ReduceFnRunnerTest.java


示例14: fireEmptyOnDrainInGlobalWindowIfRequested

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * We should fire an empty ON_TIME pane in the GlobalWindow when the watermark moves to
 * end-of-time.
 */
@Test
public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception {
  ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester =
      ReduceFnTester.nonCombining(
          WindowingStrategy.of(new GlobalWindows())
                           .withTrigger(Repeatedly.<GlobalWindow>forever(
                               AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                                   new Duration(3))))
                           .withMode(AccumulationMode.DISCARDING_FIRED_PANES));

  final int n = 20;
  for (int i = 0; i < n; i++) {
    tester.advanceProcessingTime(new Instant(i));
    tester.injectElements(TimestampedValue.of(i, new Instant(i)));
  }
  tester.advanceProcessingTime(new Instant(n + 4));
  List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
  assertEquals((n + 3) / 4, output.size());
  for (int i = 0; i < output.size(); i++) {
    assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
    assertEquals(i, output.get(i).getPane().getIndex());
    assertEquals(4, Iterables.size(output.get(i).getValue()));
  }

  tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

  output = tester.extractOutput();
  assertEquals(1, output.size());
  assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
  assertEquals((n + 3) / 4, output.get(0).getPane().getIndex());
  assertEquals(0, Iterables.size(output.get(0).getValue()));
}
 
开发者ID:apache,项目名称:beam,代码行数:37,代码来源:ReduceFnRunnerTest.java


示例15: getPipeline

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * Get the beam pipeline
 * @return the pipeline
 */
public Pipeline getPipeline() {

    LOGGER.debug("Building Beam Pipeline");
    final PipelineOptions options = PipelineOptionsFactory.create();
    final Pipeline p = Pipeline.create(options);

    // Add membership triples
    p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_LDP_MEMBERSHIP_ADD).withoutMetadata())
        .apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferMembership.getIRIString(), true)))
        .apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE));

    // Delete membership triples
    p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_LDP_MEMBERSHIP_DELETE).withoutMetadata())
        .apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferMembership.getIRIString(), false)))
        .apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE));

    // Add containment triples
    p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_LDP_CONTAINMENT_ADD).withoutMetadata())
        .apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), true)))
        .apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE));

    // Delete containment triples
    p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_LDP_CONTAINMENT_DELETE).withoutMetadata())
        .apply(ParDo.of(new BeamProcessor(dataConfiguration, LDP.PreferContainment.getIRIString(), false)))
        .apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE));

    if (aggregateSeconds > 0) {
        // Aggregate cache writes
        p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE).withoutMetadata())
            .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(aggregateSeconds)))
                    .triggering(AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(Duration.standardSeconds(aggregateSeconds)))
                    .discardingFiredPanes().withAllowedLateness(Duration.ZERO))
            .apply(Combine.perKey(x -> x.iterator().next()))
            .apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE));
    } else {
        // Skip aggregation
        p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_CACHE_AGGREGATE).withoutMetadata())
            .apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_CACHE));
    }

    // Write to cache and dispatch to the event bus
    p.apply(getKafkaReader(bootstrapServers).withTopic(TOPIC_CACHE).withoutMetadata())
        .apply(ParDo.of(new CacheWriter(dataConfiguration)))
        .apply(ParDo.of(new EventProcessor(baseUrlConfiguration)))
        .apply(getKafkaWriter(bootstrapServers).withTopic(TOPIC_EVENT));

    return p;
}
 
开发者ID:trellis-ldp-archive,项目名称:trellis-rosid-file-streaming,代码行数:54,代码来源:FileProcessingPipeline.java


示例16: createTriggerWithDelay

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
  return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
      .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
}
 
开发者ID:apache,项目名称:beam,代码行数:5,代码来源:BeamAggregationRule.java


示例17: data

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
@Parameters(name = "{index}: {0}")
public static Iterable<ToProtoAndBackSpec> data() {
  return ImmutableList.of(
      // Atomic triggers
      toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
      toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
      toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
      toProtoAndBackSpec(Never.ever()),
      toProtoAndBackSpec(DefaultTrigger.of()),
      toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
      toProtoAndBackSpec(
          AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
      toProtoAndBackSpec(
          AfterProcessingTime.pastFirstElementInPane()
              .alignedTo(Duration.millis(5), new Instant(27))),
      toProtoAndBackSpec(
          AfterProcessingTime.pastFirstElementInPane()
              .plusDelayOf(Duration.standardSeconds(3))
              .alignedTo(Duration.millis(5), new Instant(27))
              .plusDelayOf(Duration.millis(13))),

      // Composite triggers

      toProtoAndBackSpec(
          AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())),
      toProtoAndBackSpec(
          AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))),
      toProtoAndBackSpec(
          AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))),
      toProtoAndBackSpec(
          AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
      toProtoAndBackSpec(
          AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
      toProtoAndBackSpec(
          AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(
                  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
              .withLateFirings(AfterPane.elementCountAtLeast(3))),
      toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
      toProtoAndBackSpec(
          Repeatedly.forever(AfterPane.elementCountAtLeast(1))
              .orFinally(AfterWatermark.pastEndOfWindow())));
}
 
开发者ID:apache,项目名称:beam,代码行数:44,代码来源:TriggerTranslationTest.java


示例18: testNoWatermarkTriggerNoHold

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * If the trigger does not care about the watermark, the ReduceFnRunner should still emit an
 * element for the ON_TIME pane.
 */
@Test
public void testNoWatermarkTriggerNoHold() throws Exception {
  Duration allowedLateness = Duration.standardDays(1);
  ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
      ReduceFnTester.nonCombining(
          WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
              .withTrigger(
                  Repeatedly.forever(
                      AfterProcessingTime.pastFirstElementInPane()
                          .plusDelayOf(Duration.standardSeconds(5))))
              .withAllowedLateness(allowedLateness));

  // First, an element comes in on time in [0, 10) but ReduceFnRunner should
  // not set a hold or timer for 9. That is the trigger's job.
  IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10));
  tester.advanceInputWatermark(new Instant(0));
  tester.advanceProcessingTime(new Instant(0));

  tester.injectElements(TimestampedValue.of(1, new Instant(1)));

  // Since some data arrived, the element hold will be the end of the window.
  assertThat(tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp()));

  tester.advanceProcessingTime(new Instant(6000));

  // Sanity check; we aren't trying to verify output in this test
  assertThat(tester.getOutputSize(), equalTo(1));

  // Since we did not request empty final panes, no hold
  assertThat(tester.getWatermarkHold(), nullValue());

  // So when the input watermark advanced, the output advances with it (automated by tester)
  tester.advanceInputWatermark(
      new Instant(expectedWindow.maxTimestamp().plus(Duration.standardHours(1))));

  // Now late data arrives
  tester.injectElements(TimestampedValue.of(3, new Instant(3)));

  // The ReduceFnRunner should set a GC hold since the element was too late and its timestamp
  // will be ignored for the purposes of the watermark hold
  assertThat(
      tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness)));
}
 
开发者ID:apache,项目名称:beam,代码行数:48,代码来源:ReduceFnRunnerTest.java


示例19: testEmptyOnTimeFromOrFinally

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * Test that we receive an empty on-time pane when an or-finally waiting for the watermark fires.
 * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
 * when the on-time pane is empty.
 */
@Test
public void testEmptyOnTimeFromOrFinally() throws Exception {

  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withTrigger(
              AfterEach.<IntervalWindow>inOrder(
                  Repeatedly.forever(
                          AfterProcessingTime.pastFirstElementInPane()
                              .plusDelayOf(new Duration(5)))
                      .orFinally(AfterWatermark.pastEndOfWindow()),
                  Repeatedly.forever(
                      AfterProcessingTime.pastFirstElementInPane()
                          .plusDelayOf(new Duration(25)))))
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.millis(100));

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());

  tester.advanceInputWatermark(new Instant(0));
  tester.advanceProcessingTime(new Instant(0));

  // Processing time timer for 5
  tester.injectElements(
      TimestampedValue.of(1, new Instant(1)),
      TimestampedValue.of(1, new Instant(3)),
      TimestampedValue.of(1, new Instant(7)),
      TimestampedValue.of(1, new Instant(5)));

  // Should fire early pane
  tester.advanceProcessingTime(new Instant(6));

  // Should fire empty on time pane
  tester.advanceInputWatermark(new Instant(11));
  List<WindowedValue<Integer>> output = tester.extractOutput();
  assertEquals(2, output.size());

  assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
  assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));

  assertThat(
      output.get(0),
      WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
  assertThat(
      output.get(1),
      WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0)));
}
 
开发者ID:apache,项目名称:beam,代码行数:55,代码来源:ReduceFnRunnerTest.java


示例20: testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty

import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; //导入依赖的package包/类
/**
 * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY.
 */
@Test
public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception {

  WindowingStrategy<?, IntervalWindow> strategy =
      WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
          .withTimestampCombiner(TimestampCombiner.EARLIEST)
          .withTrigger(
              AfterEach.<IntervalWindow>inOrder(
                  Repeatedly.forever(
                      AfterProcessingTime.pastFirstElementInPane()
                          .plusDelayOf(new Duration(5)))
                      .orFinally(AfterWatermark.pastEndOfWindow()),
                  Repeatedly.forever(
                      AfterProcessingTime.pastFirstElementInPane()
                          .plusDelayOf(new Duration(25)))))
          .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
          .withAllowedLateness(Duration.millis(100))
          .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
          .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY);

  ReduceFnTester<Integer, Integer, IntervalWindow> tester =
      ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());

  tester.advanceInputWatermark(new Instant(0));
  tester.advanceProcessingTime(new Instant(0));

  // Processing time timer for 5
  tester.injectElements(
      TimestampedValue.of(1, new Instant(1)),
      TimestampedValue.of(1, new Instant(3)),
      TimestampedValue.of(1, new Instant(7)),
      TimestampedValue.of(1, new Instant(5)));

  // Should fire early pane
  tester.advanceProcessingTime(new Instant(6));

  // Should not fire empty on time pane
  tester.advanceInputWatermark(new Instant(11));

  // Should fire final GC pane
  tester.advanceInputWatermark(new Instant(10 + 100));
  List<WindowedValue<Integer>> output = tester.extractOutput();
  assertEquals(2, output.size());

  assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
  assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));

  assertThat(
      output.get(0),
      WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
  assertThat(
      output.get(1),
      WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 1, 0)));
}
 
开发者ID:apache,项目名称:beam,代码行数:58,代码来源:ReduceFnRunnerTest.java



注:本文中的org.apache.beam.sdk.transforms.windowing.AfterProcessingTime类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java RpcClient类代码示例发布时间:2022-05-22
下一篇:
Java InfoMessage类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap