• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java Combine类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.transforms.Combine的典型用法代码示例。如果您正苦于以下问题:Java Combine类的具体用法?Java Combine怎么用?Java Combine使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Combine类属于org.apache.beam.sdk.transforms包,在下文中一共展示了Combine类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testMergingCustomWindows

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindows() {
  Instant startInstant = new Instant(0L);
  List<TimestampedValue<String>> input = new ArrayList<>();
  PCollection<String> inputCollection =
      pipeline.apply(
          Create.timestamped(
              TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))),
              TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))),
              // This one will be outside of bigWindow thus not merged
              TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39)))));
  PCollection<String> windowedCollection =
      inputCollection.apply(Window.into(new CustomWindowFn<String>()));
  PCollection<Long> count =
      windowedCollection.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
  // "small1" and "big" elements merged into bigWindow "small2" not merged
  // because timestamp is not in bigWindow
  PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
  pipeline.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:WindowTest.java


示例2: testMergingCustomWindowsKeyedCollection

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindowsKeyedCollection() {
  Instant startInstant = new Instant(0L);
  PCollection<KV<Integer, String>> inputCollection =
      pipeline.apply(
          Create.timestamped(
              TimestampedValue.of(
                  KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))),
              TimestampedValue.of(
                  KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))),
              // This element is not contained within the bigWindow and not merged
              TimestampedValue.of(
                  KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39)))));
  PCollection<KV<Integer, String>> windowedCollection =
      inputCollection.apply(Window.into(new CustomWindowFn<KV<Integer, String>>()));
  PCollection<Long> count =
      windowedCollection.apply(
          Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
  // "small1" and "big" elements merged into bigWindow "small2" not merged
  // because it is not contained in bigWindow
  PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
  pipeline.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:WindowTest.java


示例3: testHifIOWithElastic

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
 * Test to read data from embedded Elasticsearch instance and verify whether data is read
 * successfully.
 */
@Test
public void testHifIOWithElastic() {
  // Expected hashcode is evaluated during insertion time one time and hardcoded here.
  String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc";
  Configuration conf = getConfiguration();
  PCollection<KV<Text, LinkedMapWritable>> esData =
      pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
  PCollection<Long> count = esData.apply(Count.<KV<Text, LinkedMapWritable>>globally());
  // Verify that the count of objects fetched using HIFInputFormat IO is correct.
  PAssert.thatSingleton(count).isEqualTo((long) TEST_DATA_ROW_COUNT);
  PCollection<LinkedMapWritable> values = esData.apply(Values.<LinkedMapWritable>create());
  PCollection<String> textValues = values.apply(transformFunc);
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
  pipeline.run().waitUntilFinish();
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:HIFIOWithElasticTest.java


示例4: testHifIOWithElastic

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
 * This test reads data from the Elasticsearch instance and verifies whether data is read
 * successfully.
 */
@Test
public void testHifIOWithElastic() throws SecurityException, IOException {
  // Expected hashcode is evaluated during insertion time one time and hardcoded here.
  final long expectedRowCount = 1000L;
  String expectedHashCode = "42e254c8689050ed0a617ff5e80ea392";
  Configuration conf = getConfiguration(options);
  PCollection<KV<Text, LinkedMapWritable>> esData =
      pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
  // Verify that the count of objects fetched using HIFInputFormat IO is correct.
  PCollection<Long> count = esData.apply(Count.<KV<Text, LinkedMapWritable>>globally());
  PAssert.thatSingleton(count).isEqualTo(expectedRowCount);
  PCollection<LinkedMapWritable> values = esData.apply(Values.<LinkedMapWritable>create());
  PCollection<String> textValues = values.apply(transformFunc);
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
  pipeline.run().waitUntilFinish();
}
 
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:HIFIOElasticIT.java


示例5: testHIFReadForCassandra

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
 * This test reads data from the Cassandra instance and verifies if data is read successfully.
 */
@Test
public void testHIFReadForCassandra() {
  // Expected hashcode is evaluated during insertion time one time and hardcoded here.
  String expectedHashCode = "1a30ad400afe4ebf5fde75f5d2d95408";
  Long expectedRecordsCount = 1000L;
  Configuration conf = getConfiguration(options);
  PCollection<KV<Long, String>> cassandraData = pipeline.apply(HadoopInputFormatIO
      .<Long, String>read().withConfiguration(conf).withValueTranslation(myValueTranslate));
  PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
      .isEqualTo(expectedRecordsCount);
  PCollection<String> textValues = cassandraData.apply(Values.<String>create());
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
  pipeline.run().waitUntilFinish();
}
 
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:HIFIOCassandraIT.java


示例6: testHIFReadForCassandraQuery

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
 * This test reads data from the Cassandra instance based on query and verifies if data is read
 * successfully.
 */
@Test
public void testHIFReadForCassandraQuery() {
  String expectedHashCode = "7bead6d6385c5f4dd0524720cd320b49";
  Long expectedNumRows = 1L;
  Configuration conf = getConfiguration(options);
  conf.set("cassandra.input.cql", "select * from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_TABLE
      + " where token(y_id) > ? and token(y_id) <= ? "
      + "and field0 = 'user48:field0:431531'");
  PCollection<KV<Long, String>> cassandraData =
      pipeline.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
          .withValueTranslation(myValueTranslate));
  PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
      .isEqualTo(expectedNumRows);
  PCollection<String> textValues = cassandraData.apply(Values.<String>create());
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
  pipeline.run().waitUntilFinish();
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:HIFIOCassandraIT.java


示例7: testHIFReadForCassandra

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
 * Test to read data from embedded Cassandra instance and verify whether data is read
 * successfully.
 * @throws Exception
 */
@Test
public void testHIFReadForCassandra() throws Exception {
  // Expected hashcode is evaluated during insertion time one time and hardcoded here.
  String expectedHashCode = "1b9780833cce000138b9afa25ba63486";
  Configuration conf = getConfiguration();
  PCollection<KV<Long, String>> cassandraData =
      p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
          .withValueTranslation(myValueTranslate));
  // Verify the count of data retrieved from Cassandra matches expected count.
  PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
      .isEqualTo(TEST_DATA_ROW_COUNT);
  PCollection<String> textValues = cassandraData.apply(Values.<String>create());
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode);
  p.run().waitUntilFinish();
}
 
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:HIFIOWithEmbeddedCassandraTest.java


示例8: testHIFReadForCassandraQuery

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
/**
 * Test to read data from embedded Cassandra instance based on query and verify whether data is
 * read successfully.
 */
@Test
public void testHIFReadForCassandraQuery() throws Exception {
  Long expectedCount = 1L;
  String expectedChecksum = "f11caabc7a9fc170e22b41218749166c";
  Configuration conf = getConfiguration();
  conf.set("cassandra.input.cql", "select * from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_TABLE
      + " where token(id) > ? and token(id) <= ? and scientist='Faraday1' allow filtering");
  PCollection<KV<Long, String>> cassandraData =
      p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf)
          .withValueTranslation(myValueTranslate));
  // Verify the count of data retrieved from Cassandra matches expected count.
  PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally()))
      .isEqualTo(expectedCount);
  PCollection<String> textValues = cassandraData.apply(Values.<String>create());
  // Verify the output values using checksum comparison.
  PCollection<String> consolidatedHashcode =
      textValues.apply(Combine.globally(new HashingFn()).withoutDefaults());
  PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedChecksum);
  p.run().waitUntilFinish();
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:HIFIOWithEmbeddedCassandraTest.java


示例9: RawCombine

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
private RawCombine(RunnerApi.PTransform protoTransform,
    RehydratedComponents rehydratedComponents) throws IOException {
  this.protoTransform = protoTransform;
  this.rehydratedComponents = rehydratedComponents;
  this.spec = protoTransform.getSpec();
  this.payload = CombinePayload.parseFrom(spec.getPayload());

  // Eagerly extract the coder to throw a good exception here
  try {
    this.accumulatorCoder =
        (Coder<AccumT>) rehydratedComponents.getCoder(payload.getAccumulatorCoderId());
  } catch (IOException exc) {
    throw new IllegalArgumentException(
        String.format(
            "Failure extracting accumulator coder with id '%s' for %s",
            payload.getAccumulatorCoderId(), Combine.class.getSimpleName()),
        exc);
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:CombineTranslation.java


示例10: getAdditionalInputs

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
  Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
  for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
    try {
      additionalInputs.put(
          new TupleTag<>(sideInputEntry.getKey()),
          rehydratedComponents.getPCollection(
              protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
    } catch (IOException exc) {
      throw new IllegalStateException(
          String.format(
              "Could not find input with name %s for %s transform",
              sideInputEntry.getKey(), Combine.class.getSimpleName()));
    }
  }
  return additionalInputs;
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:CombineTranslation.java


示例11: toProto

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@VisibleForTesting
static CombinePayload toProto(
    AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> combine, SdkComponents sdkComponents)
    throws IOException {
  GlobalCombineFn<?, ?, ?> combineFn = combine.getTransform().getFn();
  try {
    Coder<?> accumulatorCoder = extractAccumulatorCoder(combineFn, (AppliedPTransform) combine);
    Map<String, SideInput> sideInputs = new HashMap<>();
    return RunnerApi.CombinePayload.newBuilder()
        .setAccumulatorCoderId(sdkComponents.registerCoder(accumulatorCoder))
        .putAllSideInputs(sideInputs)
        .setCombineFn(toProto(combineFn))
        .build();
  } catch (CannotProvideCoderException e) {
    throw new IllegalStateException(e);
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:CombineTranslation.java


示例12: extractAccumulatorCoder

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
private static <K, InputT, AccumT> Coder<AccumT> extractAccumulatorCoder(
    GlobalCombineFn<InputT, AccumT, ?> combineFn,
    AppliedPTransform<PCollection<KV<K, InputT>>, ?, Combine.PerKey<K, InputT, ?>> transform)
    throws CannotProvideCoderException {
  @SuppressWarnings("unchecked")
  PCollection<KV<K, InputT>> mainInput =
      (PCollection<KV<K, InputT>>)
          Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(transform));
  KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) mainInput.getCoder();
  return AppliedCombineFn.withInputCoder(
          combineFn,
          transform.getPipeline().getCoderRegistry(),
          inputCoder,
          transform.getTransform().getSideInputs(),
          ((PCollection<?>) Iterables.getOnlyElement(transform.getOutputs().values()))
              .getWindowingStrategy())
      .getAccumulatorCoder();
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:CombineTranslation.java


示例13: canTranslate

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
boolean canTranslate(
    Combine.PerKey<K, InputT, OutputT> transform,
    FlinkStreamingTranslationContext context) {

  // if we have a merging window strategy and side inputs we cannot
  // translate as a proper combine. We have to group and then run the combine
  // over the final grouped values.
  PCollection<KV<K, InputT>> input = context.getInput(transform);

  @SuppressWarnings("unchecked")
  WindowingStrategy<?, BoundedWindow> windowingStrategy =
      (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();

  return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty();
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:FlinkStreamingTransformTranslators.java


示例14: translateHelper

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
private <K, InputT, OutputT> void translateHelper(
    final CombineGroupedValues<K, InputT, OutputT> primitiveTransform,
    TranslationContext context) {
  Combine.GroupedValues<K, InputT, OutputT> originalTransform =
      primitiveTransform.getOriginalCombine();
  StepTranslationContext stepContext =
      context.addStep(primitiveTransform, "CombineValues");
  translateInputs(
      stepContext,
      context.getInput(primitiveTransform),
      originalTransform.getSideInputs(),
      context);

  AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
      originalTransform.getAppliedFn(
          context.getInput(primitiveTransform).getPipeline().getCoderRegistry(),
          context.getInput(primitiveTransform).getCoder(),
          context.getInput(primitiveTransform).getWindowingStrategy());

  stepContext.addEncodingInput(fn.getAccumulatorCoder());
  stepContext.addInput(
      PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn)));
  stepContext.addOutput(context.getOutput(primitiveTransform));
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:DataflowPipelineTranslator.java


示例15: getReplacementTransform

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
    AppliedPTransform<
        PCollection<InputT>,
        PValue,
        PTransform<PCollection<InputT>, PValue>> transform) {
  Combine.GloballyAsSingletonView<?, ?> combineTransform =
      (Combine.GloballyAsSingletonView) transform.getTransform();
  return PTransformReplacement.of(
      PTransformReplacements.getSingletonMainInput(transform),
      new BatchViewOverrides.BatchViewAsSingleton(
          runner,
          findCreatePCollectionView(transform),
          (CombineFn) combineTransform.getCombineFn(),
          combineTransform.getFanout()));
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:DataflowRunner.java


示例16: expand

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PDone expand(PCollection<KV<KV<String, String>, Long>> similarPairs) {
  return similarPairs
      .apply(Sum.<KV<String, String>>longsPerKey())
      .apply(Combine.globally(TO_LIST))
      .apply("PCoAAnalysis", ParDo.of(new PCoAnalysis(dataIndices)))
      .apply("FormatGraphData", ParDo
          .of(new DoFn<Iterable<PCoAnalysis.GraphResult>, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
              Iterable<PCoAnalysis.GraphResult> graphResults = c.element();
              for (PCoAnalysis.GraphResult result : graphResults) {
                c.output(result.toString());
              }
            }
          }))
      .apply("WriteCounts", TextIO.write().to(outputFile));
}
 
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:19,代码来源:OutputPCoAFile.java


示例17: expand

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<TableRow> expand(PCollection<KV<String, LaneInfo>> flowInfo) {
  // stationId, LaneInfo => stationId + max lane flow info
  PCollection<KV<String, LaneInfo>> flowMaxes =
      flowInfo.apply(Combine.<String, LaneInfo>perKey(
          new MaxFlow()));

  // <stationId, max lane flow info>... => row...
  PCollection<TableRow> results = flowMaxes.apply(
      ParDo.of(new FormatMaxesFn()));

  return results;
}
 
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:TrafficMaxLaneFlow.java


示例18: expand

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<TableRow> expand(PCollection<TableRow> rows) {

  // row... => <word, play_name> ...
  PCollection<KV<String, String>> words = rows.apply(
      ParDo.of(new ExtractLargeWordsFn()));

  // word, play_name => word, all_plays ...
  PCollection<KV<String, String>> wordAllPlays =
      words.apply(Combine.<String, String>perKey(
          new ConcatWords()));

  // <word, all_plays>... => row...
  PCollection<TableRow> results = wordAllPlays.apply(
      ParDo.of(new FormatShakespeareOutputFn()));

  return results;
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:CombinePerKeyExamples.java


示例19: expand

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<Long> expand(PCollection<InputT> input) {
  return input
      .apply(
          "Compute HyperLogLog Structure",
          Combine.globally(
              ApproximateDistinctFn.<InputT>create(input.getCoder())
                  .withPrecision(this.precision())
                  .withSparseRepresentation(this.sparsePrecision())))
      .apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.globally()));
}
 
开发者ID:apache,项目名称:beam,代码行数:12,代码来源:ApproximateDistinct.java


示例20: MovingFunction

import org.apache.beam.sdk.transforms.Combine; //导入依赖的package包/类
public MovingFunction(long samplePeriodMs, long sampleUpdateMs,
                      int numSignificantBuckets, int numSignificantSamples,
                      Combine.BinaryCombineLongFn function) {
  this.sampleUpdateMs = sampleUpdateMs;
  this.numSignificantBuckets = numSignificantBuckets;
  this.numSignificantSamples = numSignificantSamples;
  this.function = function;
  int n = (int) (samplePeriodMs / sampleUpdateMs);
  buckets = new long[n];
  Arrays.fill(buckets, function.identity());
  numSamples = new int[n];
  Arrays.fill(numSamples, 0);
  currentMsSinceEpoch = -1;
  currentIndex = -1;
}
 
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:MovingFunction.java



注:本文中的org.apache.beam.sdk.transforms.Combine类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java UnexpectedResultException类代码示例发布时间:2022-05-22
下一篇:
Java ITransactionalSpout类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap