• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java Combine类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.google.cloud.dataflow.sdk.transforms.Combine的典型用法代码示例。如果您正苦于以下问题:Java Combine类的具体用法?Java Combine怎么用?Java Combine使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Combine类属于com.google.cloud.dataflow.sdk.transforms包,在下文中一共展示了Combine类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: apply

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<KV<String, DuplicationMetrics>> apply(final PCollection<GATKRead> preads) {
    return preads
        .apply(Filter.by(
            new SerializableFunction<GATKRead, Boolean>() {
              private static final long serialVersionUID = 0;

              public Boolean apply(GATKRead element) {
                return !element.isSecondaryAlignment() && !element.isSupplementaryAlignment();
              }
            }))
        .apply(generateMetricsByLibrary())
        .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(DuplicationMetrics.class)))
        // Combine metrics for reads from the same library.
        .apply(Combine.<String, DuplicationMetrics>perKey(new CombineMetricsFn()))
        // For each library, finalize the metrics by computing derived metrics and dividing paired counters
        // by 2.
        .apply(finalizeMetrics());
}
 
开发者ID:broadinstitute,项目名称:gatk-dataflow,代码行数:20,代码来源:MarkDuplicatesDataflowUtils.java


示例2: createAggregator

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
/**
 * Creates and aggregator and associates it with the specified name.
 *
 * @param named     Name of aggregator.
 * @param combineFn Combine function used in aggregation.
 * @param <IN>      Type of inputs to aggregator.
 * @param <INTER>   Intermediate data type
 * @param <OUT>     Type of aggregator outputs.
 * @return Specified aggregator
 */
public synchronized <IN, INTER, OUT> Aggregator<IN, OUT> createAggregator(
    String named,
    Combine.CombineFn<? super IN, INTER, OUT> combineFn) {
  @SuppressWarnings("unchecked")
  Aggregator<IN, OUT> aggregator = (Aggregator<IN, OUT>) aggregators.get(named);
  if (aggregator == null) {
    @SuppressWarnings("unchecked")
    NamedAggregators.CombineFunctionState<IN, INTER, OUT> state =
        new NamedAggregators.CombineFunctionState<>(
            (Combine.CombineFn<IN, INTER, OUT>) combineFn,
            (Coder<IN>) getCoder(combineFn),
            this);
    accum.add(new NamedAggregators(named, state));
    aggregator = new SparkAggregator<>(named, state);
    aggregators.put(named, aggregator);
  }
  return aggregator;
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:29,代码来源:SparkRuntimeContext.java


示例3: testCSVToMapLineCombineFn

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Test
public void testCSVToMapLineCombineFn() {

    final String[] csv = new String[] {
            "000,Tokyo,120",
            "001,Osaka,100",
            "002,Kyoto,140"
    };
    final List<String> csvlist = Arrays.asList(csv);

    Pipeline p = TestPipeline.create();
    PCollection<String> maplines = p.apply(Create.of(csvlist)).setCoder(StringUtf8Coder.of());
    PCollectionView<Map<String,String>> mapview = maplines.apply(Combine.globally(new CSVToMapLineCombineFn()).asSingletonView());

    final String[] dummy = new String[] {
            "000",
            "001",
            "002"
    };
    List<String> dummylist = Arrays.asList(dummy);
    DoFnTester<String,String> fnTester = DoFnTester.of(new AAA(mapview));
    fnTester.setSideInputInGlobalWindow(mapview, csvlist);

    //dummylines.apply(ParDo.of(fnTester));
    List<String> results = fnTester.processBatch(dummylist);
    System.out.println(results);

    //p.apply()
}
 
开发者ID:topgate,项目名称:retail-demo,代码行数:30,代码来源:MainPipelineTest.java


示例4: main

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public static void main(String[] args) {
  CustomPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
  Pipeline p = Pipeline.create(options);

  p.apply(PubsubIO.Read.named("read from PubSub")
      .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
      .timestampLabel("ts")
      .withCoder(TableRowJsonCoder.of()))

   .apply("key rides by rideid",
      MapElements.via((TableRow ride) -> KV.of(ride.get("ride_id").toString(), ride))
        .withOutputType(new TypeDescriptor<KV<String, TableRow>>() {}))

   .apply("session windows on rides with early firings",
      Window.<KV<String, TableRow>>into(
        Sessions.withGapDuration(Duration.standardMinutes(60)))
          .triggering(
            AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(2000))))
          .accumulatingFiredPanes()
          .withAllowedLateness(Duration.ZERO))

   .apply("group ride points on same ride", Combine.perKey(new LatestPointCombine()))

   .apply("discard key",
      MapElements.via((KV<String, TableRow> a) -> a.getValue())
        .withOutputType(TypeDescriptor.of(TableRow.class)))

   .apply(PubsubIO.Write.named("WriteToPubsub")
      .topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
      .withCoder(TableRowJsonCoder.of()));
  p.run();
}
 
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:LatestRides.java


示例5: apply

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<T> apply(PCollection<T> input) {
  return input
      .apply(ParDo.named("BreakFusion").of(new DummyMapFn<T>()))
      .apply(Combine.<String, T>perKey(new First<T>()))
      .apply(Values.<T>create());
}
 
开发者ID:googlegenomics,项目名称:dockerflow,代码行数:8,代码来源:BreakFusion.java


示例6: apply

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<KV<String, WorkflowArgs>> apply(
    PCollection<KV<String, WorkflowArgs>> input) {
  return input
      .apply(ParDo.named("Prepare").of(new Gather(task)))
      .apply(Combine.perKey(new SortArgs()))
      .apply(ParDo.named("CombineOutputs").of(new CombineArgs()));
}
 
开发者ID:googlegenomics,项目名称:dockerflow,代码行数:9,代码来源:DockerDo.java


示例7: apply

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<KV<String, WorkflowArgs>> apply(
    PCollectionList<KV<String, WorkflowArgs>> input) {
  return input
      .apply(Flatten.<KV<String, WorkflowArgs>>pCollections())
      .apply(Combine.globally(new Merge()));
}
 
开发者ID:googlegenomics,项目名称:dockerflow,代码行数:8,代码来源:MergeBranches.java


示例8: generateCompleteWindowData

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public PCollection<KV<String, TSProto>> generateCompleteWindowData(Pipeline pipeline,
    List<KV<String, TSProto>> data, WorkPacketConfig packetConfig) {

  LOG.info("Check to see that time streams with missing 'ticks' have been corrected");

  PCollection<KV<String, TSProto>> tsData = setupDataInput(pipeline, data);


  PCollection<KV<String, TSProto>> windowedData =
      tsData.apply("CandleResolutionWindow", Window.<KV<String, TSProto>>into(FixedWindows
          .of(Duration.standardSeconds(((FXTimeSeriesPipelineOptions) pipeline.getOptions())
              .getCandleResolution()))));

  // Determine streams that are missing in this Window and generate values for them

  PCollection<KV<String, TSProto>> generatedValues =
      windowedData
          .apply(
              "DetectMissingTimeSeriesValues",
              Combine.globally(new DetectMissingTimeSeriesValuesCombiner(packetConfig))
                  .withoutDefaults()).apply(ParDo.of(new CreateMissingTimeSeriesValuesDoFn()))
          .setName("CreateMissingTimeSeriesValues");

  // Flatten the live streams and the generated streams together

  PCollection<KV<String, TSProto>> completeWindowData =
      PCollectionList.of(windowedData).and(generatedValues)
          .apply("MergeGeneratedLiveValues", Flatten.<KV<String, TSProto>>pCollections());


  return completeWindowData;
}
 
开发者ID:GoogleCloudPlatform,项目名称:data-timeseries-java,代码行数:33,代码来源:FXTimeSeriesPipelineSRGTests.java


示例9: grouped

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
private static <K, VI, VO> TransformEvaluator<Combine.GroupedValues<K, VI, VO>> grouped() {
  return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() {
    @Override
    public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) {
      Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform);
      @SuppressWarnings("unchecked")
      JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD =
          (JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform);
      context.setOutputRDD(transform,
          inRDD.map(new KVFunction<>(keyed)));
    }
  };
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:14,代码来源:TransformTranslator.java


示例10: getCoder

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
  try {
    if (combiner.getClass() == Sum.SumIntegerFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
    } else if (combiner.getClass() == Sum.SumLongFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
    } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
    } else if (combiner.getClass() == Min.MinIntegerFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
    } else if (combiner.getClass() == Min.MinLongFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
    } else if (combiner.getClass() == Min.MinDoubleFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
    } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
    } else if (combiner.getClass() == Max.MaxLongFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
    } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
      return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
    } else {
      throw new IllegalArgumentException("unsupported combiner in Aggregator: "
          + combiner.getClass().getName());
    }
  } catch (CannotProvideCoderException e) {
    throw new IllegalStateException("Could not determine default coder for combiner", e);
  }
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:29,代码来源:SparkRuntimeContext.java


示例11: CombineFunctionState

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public CombineFunctionState(
    Combine.CombineFn<IN, INTER, OUT> combineFn,
    Coder<IN> inCoder,
    SparkRuntimeContext ctxt) {
  this.combineFn = combineFn;
  this.inCoder = inCoder;
  this.ctxt = ctxt;
  this.state = combineFn.createAccumulator();
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:10,代码来源:NamedAggregators.java


示例12: readObject

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
  ctxt = (SparkRuntimeContext) ois.readObject();
  combineFn = (Combine.CombineFn<IN, INTER, OUT>) ois.readObject();
  inCoder = (Coder<IN>) ois.readObject();
  try {
    state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
        .decode(ois, Coder.Context.NESTED);
  } catch (CannotProvideCoderException e) {
    throw new IllegalStateException("Could not determine coder for accumulator", e);
  }
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:13,代码来源:NamedAggregators.java


示例13: test

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Test
public void test() throws Exception {
  SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);
  PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
  PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));

  EvaluationResult res = SparkPipelineRunner.create().run(p);
  assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output)));
  res.close();
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:12,代码来源:CombineGloballyTest.java


示例14: test

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Test
public void test() throws Exception {
  SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);
  List<String> empty = Collections.emptyList();
  PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of());
  PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));

  EvaluationResult res = SparkPipelineRunner.create().run(p);
  assertEquals("", Iterables.getOnlyElement(res.get(output)));
  res.close();
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:13,代码来源:EmptyInputTest.java


示例15: main

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Exercise6Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
    // Enforce that this pipeline is always run in streaming mode.
    options.setStreaming(true);
    // Allow the pipeline to be cancelled automatically.
    options.setRunner(DataflowPipelineRunner.class);
    Pipeline pipeline = Pipeline.create(options);

    TableReference sessionsTable = new TableReference();
    sessionsTable.setDatasetId(options.getOutputDataset());
    sessionsTable.setProjectId(options.getProject());
    sessionsTable.setTableId(options.getOutputTableName());

    PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));

    // Extract username/score pairs from the event stream
    PCollection<KV<String, Integer>> userEvents =
        rawEvents.apply(
            "ExtractUserScore",
            MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
                .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));

    // [START EXERCISE 6]:
    // Detect user sessions-- that is, a burst of activity separated by a gap from further
    // activity. Find and record the mean session lengths.
    // This information could help the game designers track the changing user engagement
    // as their set of games changes.
    userEvents
        // Window the user events into sessions with gap options.getSessionGap() minutes. Make sure
        // to use an outputTimeFn that sets the output timestamp to the end of the window. This will
        // allow us to compute means on sessions based on their end times, rather than their start
        // times.
        .apply(
            /* TODO: YOUR CODE GOES HERE */
            new ChangeMe<PCollection<KV<String, Integer>>, KV<String, Integer>>())
        // For this use, we care only about the existence of the session, not any particular
        // information aggregated over it, so the following is an efficient way to do that.
        .apply(Combine.perKey(x -> 0))
        // Get the duration per session.
        .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
        // Re-window to process groups of session sums according to when the sessions complete.
        // In streaming we don't just ask "what is the mean value" we must ask "what is the mean
        // value for some window of time". To compute periodic means of session durations, we
        // re-window the session durations.
        .apply(
            /* TODO: YOUR CODE GOES HERE */
            new ChangeMe<PCollection<Integer>, Integer>())
        // Find the mean session duration in each window.
        .apply(Mean.<Integer>globally().withoutDefaults())
        // Write this info to a BigQuery table.
        .apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
        .apply(
            BigQueryIO.Write.to(sessionsTable)
                .withSchema(FormatSessionWindowFn.getSchema())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    // [END EXERCISE 6]:

    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
    // command line.
    PipelineResult result = pipeline.run();
  }
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:65,代码来源:Exercise6.java


示例16: main

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Exercise6Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
    // Enforce that this pipeline is always run in streaming mode.
    options.setStreaming(true);
    // Allow the pipeline to be cancelled automatically.
    options.setRunner(DataflowPipelineRunner.class);
    Pipeline pipeline = Pipeline.create(options);

    TableReference sessionsTable = new TableReference();
    sessionsTable.setDatasetId(options.getOutputDataset());
    sessionsTable.setProjectId(options.getProject());
    sessionsTable.setTableId(options.getOutputTableName());

    PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));

    // Extract username/score pairs from the event stream
    PCollection<KV<String, Integer>> userEvents =
        rawEvents.apply(
            "ExtractUserScore",
            MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
                .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));

    // Detect user sessions-- that is, a burst of activity separated by a gap from further
    // activity. Find and record the mean session lengths.
    // This information could help the game designers track the changing user engagement
    // as their set of games changes.
    userEvents
        .apply(
            Window.named("WindowIntoSessions")
                .<KV<String, Integer>>into(
                    Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
                .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
        // For this use, we care only about the existence of the session, not any particular
        // information aggregated over it, so the following is an efficient way to do that.
        .apply(Combine.perKey(x -> 0))
        // Get the duration per session.
        .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
        // Re-window to process groups of session sums according to when the sessions complete.
        .apply(
            Window.named("WindowToExtractSessionMean")
                .<Integer>into(
                    FixedWindows.of(
                        Duration.standardMinutes(options.getUserActivityWindowDuration()))))
        // Find the mean session duration in each window.
        .apply(Mean.<Integer>globally().withoutDefaults())
        // Write this info to a BigQuery table.
        .apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
        .apply(
            BigQueryIO.Write.to(sessionsTable)
                .withSchema(FormatSessionWindowFn.getSchema())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));

    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
    // command line.
    PipelineResult result = pipeline.run();
  }
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:60,代码来源:Exercise6.java


示例17: createCompleteAggregates

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
public PCollection<KV<String, TSAggValueProto>> createCompleteAggregates(Pipeline pipeline,
    List<KV<String, TSProto>> data, WorkPacketConfig packetConfig) {

  PCollection<KV<String, TSProto>> completeWindowData =
      generateCompleteWindowData(pipeline, data, packetConfig);

  PCollection<KV<String, TSAggValueProto>> parital =
      completeWindowData.apply("CreatePartialAggregates",
          Combine.perKey(new PartialTimeSeriesAggCombiner()));

  PCollection<KV<String, TSAggValueProto>> paritalWithWindowBoundary =
      parital.apply(ParDo.of(new EmbedWindowTimeIntoAggregateDoFn()));

  PCollection<KV<String, TSAggValueProto>> completeAggregationStage1 =
      paritalWithWindowBoundary.apply(
          "completeAggregationStage1",
          Window.<KV<String, TSAggValueProto>>into(new GlobalWindows())
              .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
              .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
              .accumulatingFiredPanes());

  PCollection<KV<String, TSAggValueProto>> completeAggregationStage2 =
      completeAggregationStage1.apply("CreateCompleteCandles",
          Combine.perKey(new CompleteTimeSeriesAggCombiner())).apply("FlattenIterables",
          ParDo.of(new FlattenKVIterableDoFn()));

  PCollection<KV<String, TSAggValueProto>> completeAggregationStage3 =
      completeAggregationStage2.apply("ResetTimestampsAfterGlobalWindow",
          ParDo.of(new DoFn<KV<String, TSAggValueProto>, KV<String, TSAggValueProto>>() {

            @Override
            public void processElement(
                DoFn<KV<String, TSAggValueProto>, KV<String, TSAggValueProto>>.ProcessContext c)
                throws Exception {
              if (c.timestamp().isBefore(new Instant(32530703764000L))) {

                if (c.timestamp().isAfter(
                    new Instant(c.element().getValue().getCloseState().getTime()))) {

                  LOG.error("BUG There was a timestamp before current :: "
                      + TextFormat.shortDebugString(c.element().getValue()));

                } else {
                  c.outputWithTimestamp(c.element(), new Instant(c.element().getValue()
                      .getCloseTime()));

                }
              }

            }

          }));

  return completeAggregationStage3;

}
 
开发者ID:GoogleCloudPlatform,项目名称:data-timeseries-java,代码行数:57,代码来源:FXTimeSeriesPipelineSRGTests.java


示例18: apply

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public PCollection<FlagStatus> apply(final PCollection<GATKRead> input) {
    return input.apply(Combine.globally(new CombineCounts()));
}
 
开发者ID:broadinstitute,项目名称:gatk-dataflow,代码行数:5,代码来源:FlagStatusDataflowTransform.java


示例19: createAggregatorInternal

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
@Override
public <AI, AO> Aggregator<AI, AO> createAggregatorInternal(
    String named,
    Combine.CombineFn<AI, ?, AO> combineFn) {
  return mRuntimeContext.createAggregator(named, combineFn);
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:7,代码来源:SparkProcessContext.java


示例20: combineGlobally

import com.google.cloud.dataflow.sdk.transforms.Combine; //导入依赖的package包/类
private static <I, A, O> TransformEvaluator<Combine.Globally<I, O>> combineGlobally() {
  return new TransformEvaluator<Combine.Globally<I, O>>() {

    @Override
    public void evaluate(Combine.Globally<I, O> transform, EvaluationContext context) {
      final Combine.CombineFn<I, A, O> globally = COMBINE_GLOBALLY_FG.get("fn", transform);

      @SuppressWarnings("unchecked")
      JavaRDDLike<WindowedValue<I>, ?> inRdd =
          (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);

      final Coder<I> iCoder = context.getInput(transform).getCoder();
      final Coder<A> aCoder;
      try {
        aCoder = globally.getAccumulatorCoder(
            context.getPipeline().getCoderRegistry(), iCoder);
      } catch (CannotProvideCoderException e) {
        throw new IllegalStateException("Could not determine coder for accumulator", e);
      }

      // Use coders to convert objects in the PCollection to byte arrays, so they
      // can be transferred over the network for the shuffle.
      JavaRDD<byte[]> inRddBytes = inRdd
          .map(WindowingHelpers.<I>unwindowFunction())
          .map(CoderHelpers.toByteFunction(iCoder));

      /*A*/ byte[] acc = inRddBytes.aggregate(
          CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
          new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() {
            @Override
            public /*A*/ byte[] call(/*A*/ byte[] ab, /*I*/ byte[] ib) throws Exception {
              A a = CoderHelpers.fromByteArray(ab, aCoder);
              I i = CoderHelpers.fromByteArray(ib, iCoder);
              return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
            }
          },
          new Function2</*A*/ byte[], /*A*/ byte[], /*A*/ byte[]>() {
            @Override
            public /*A*/ byte[] call(/*A*/ byte[] a1b, /*A*/ byte[] a2b) throws Exception {
              A a1 = CoderHelpers.fromByteArray(a1b, aCoder);
              A a2 = CoderHelpers.fromByteArray(a2b, aCoder);
              // don't use Guava's ImmutableList.of as values may be null
              List<A> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
              A merged = globally.mergeAccumulators(accumulators);
              return CoderHelpers.toByteArray(merged, aCoder);
            }
          }
      );
      O output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));

      Coder<O> coder = context.getOutput(transform).getCoder();
      JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize(
          // don't use Guava's ImmutableList.of as output may be null
          CoderHelpers.toByteArrays(Collections.singleton(output), coder));
      context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder))
          .map(WindowingHelpers.<O>windowFunction()));
    }
  };
}
 
开发者ID:shakamunyi,项目名称:spark-dataflow,代码行数:60,代码来源:TransformTranslator.java



注:本文中的com.google.cloud.dataflow.sdk.transforms.Combine类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java REngineException类代码示例发布时间:2022-05-23
下一篇:
Java StreamInterface类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap