• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java DataflowRunner类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.runners.dataflow.DataflowRunner的典型用法代码示例。如果您正苦于以下问题:Java DataflowRunner类的具体用法?Java DataflowRunner怎么用?Java DataflowRunner使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DataflowRunner类属于org.apache.beam.runners.dataflow包,在下文中一共展示了DataflowRunner类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
 * Runs the DatastoreToGcs dataflow pipeline
 */
public static void main(String[] args) throws IOException, ScriptException {
  Options options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(Options.class);

  options.setRunner(DataflowRunner.class);

  Pipeline pipeline = Pipeline.create(options);

  pipeline
      .apply("IngestEntities",
          DatastoreIO.v1().read()
              .withProjectId(options.getDatastoreProjectId())
              .withLiteralGqlQuery(options.getGqlQuery())
              .withNamespace(options.getNamespace()))
      .apply("EntityToJson", ParDo.of(EntityToJson.newBuilder()
          .setJsTransformPath(options.getJsTransformPath())
          .setJsTransformFunctionName(options.getJsTransformFunctionName())
          .build()))
      .apply("JsonToGcs", TextIO.write().to(options.getSavePath())
          .withSuffix(".json"));

  pipeline.run();
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:28,代码来源:DatastoreToGcs.java


示例2: main

import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
 * Runs the GcsToDatastore dataflow pipeline
 */
public static void main(String[] args) throws IOException, ScriptException {
  Options options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(Options.class);

  options.setRunner(DataflowRunner.class);

  Pipeline pipeline = Pipeline.create(options);

  pipeline
      .apply("IngestJson", TextIO.read()
          .from(options.getJsonPathPrefix()))
      .apply("GcsToEntity", ParDo.of(JsonToEntity.newBuilder()
          .setJsTransformPath(options.getJsTransformPath())
          .setJsTransformFunctionName(options.getJsTransformFunctionName())
          .build()))
      .apply(DatastoreIO.v1().write()
          .withProjectId(options.getDatastoreProjectId()));

  pipeline.run();
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:25,代码来源:GcsToDatastore.java


示例3: deploy

import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/** Deploys the invoicing pipeline as a template on GCS, for a given projectID and GCS bucket. */
public void deploy() {
  // We can't store options as a member variable due to serialization concerns.
  InvoicingPipelineOptions options = PipelineOptionsFactory.as(InvoicingPipelineOptions.class);
  options.setProject(projectId);
  options.setRunner(DataflowRunner.class);
  options.setStagingLocation(beamBucket + "/staging");
  options.setTemplateLocation(beamBucket + "/templates/invoicing");
  Pipeline p = Pipeline.create(options);

  PCollection<BillingEvent> billingEvents =
      p.apply(
          "Read BillingEvents from Bigquery",
          BigQueryIO.read(BillingEvent::parseFromRecord)
              .fromQuery(InvoicingUtils.makeQueryProvider(options.getYearMonth(), projectId))
              .withCoder(SerializableCoder.of(BillingEvent.class))
              .usingStandardSql()
              .withoutValidation()
              .withTemplateCompatibility());
  applyTerminalTransforms(billingEvents, options.getYearMonth());
  p.run();
}
 
开发者ID:google,项目名称:nomulus,代码行数:23,代码来源:InvoicingPipeline.java


示例4: main

import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
 * Runs the DatastoreToBigQuery dataflow pipeline
 */
public static void main(String[] args) throws IOException, ScriptException {
  Options options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(Options.class);

  NestedValueProvider<String, String> bqJsonSchema = NestedValueProvider
      .of(options.getBqJsonSchema(), new ValueProviderHelpers.GcsLoad());

  options.setRunner(DataflowRunner.class);
  Pipeline pipeline = Pipeline.create(options);
  pipeline
      .apply("IngestEntities",
          DatastoreIO.v1().read()
              .withProjectId(options.getDatastoreProjectId())
              .withLiteralGqlQuery(options.getGqlQuery())
              .withNamespace(options.getNamespace()))
      .apply("EntityToTableRow", ParDo.of(EntityToTableRow.newBuilder()
          .setJsTransformPath(options.getJsTransformPath())
          .setJsTransformFunctionName(options.getJsTransformFunctionName())
          .setStrictCast(options.getStrictCast())
          .setTableSchemaJson(bqJsonSchema)
          .build()))
      .apply("TableRowToBigQuery", BigQueryIO.writeTableRows()
          .to(options.getBqTableSpec())
          .withJsonSchema(bqJsonSchema)
          .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
          .withWriteDisposition(WriteDisposition.WRITE_APPEND));
  pipeline.run();
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:33,代码来源:DatastoreToBq.java


示例5: createTestServiceRunner

import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
 * Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey}
 * is not expanded. This is used for verifying that even without expansion the proper errors show
 * up.
 */
private Pipeline createTestServiceRunner() {
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
  options.setRunner(DataflowRunner.class);
  options.setProject("someproject");
  options.setGcpTempLocation("gs://staging");
  options.setPathValidatorClass(NoopPathValidator.class);
  options.setDataflowClient(dataflow);
  return Pipeline.create(options);
}
 
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:DataflowGroupByKeyTest.java


示例6: createTestBatchRunner

import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
private Pipeline createTestBatchRunner() {
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
  options.setRunner(DataflowRunner.class);
  options.setProject("someproject");
  options.setGcpTempLocation("gs://staging");
  options.setPathValidatorClass(NoopPathValidator.class);
  options.setDataflowClient(dataflow);
  return Pipeline.create(options);
}
 
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:DataflowViewTest.java


示例7: createTestStreamingRunner

import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
private Pipeline createTestStreamingRunner() {
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
  options.setRunner(DataflowRunner.class);
  options.setStreaming(true);
  options.setProject("someproject");
  options.setGcpTempLocation("gs://staging");
  options.setPathValidatorClass(NoopPathValidator.class);
  options.setDataflowClient(dataflow);
  return Pipeline.create(options);
}
 
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:DataflowViewTest.java


示例8: main

import org.apache.beam.runners.dataflow.DataflowRunner; //导入依赖的package包/类
/**
 * <p>Creates a dataflow pipeline that creates the following chain:</p>
 * <ol>
 *   <li> Reads from a Cloud Pubsub topic
 *   <li> Window into fixed windows of 1 minute
 *   <li> Applies word count transform
 *   <li> Creates Puts from each of the word counts in the array
 *   <li> Performs a Bigtable Put on the items
 * </ol>
 *
 * @param args Arguments to use to configure the Dataflow Pipeline.  The first three are required
 *   when running via managed resource in Google Cloud Platform.  Those options should be omitted
 *   for LOCAL runs.  The next four arguments are to configure the Bigtable connection. The last
 *   two items are for Cloud Pubsub.
 *        --runner=BlockingDataflowPipelineRunner
 *        --project=[dataflow project] \\
 *        --stagingLocation=gs://[your google storage bucket] \\
 *        --bigtableProjectId=[bigtable project] \\
 *        --bigtableInstanceId=[bigtable instance id] \\
 *        --bigtableTableId=[bigtable tableName]
 *        --inputFile=[file path on GCS]
 *        --pubsubTopic=projects/[project name]/topics/[topic name]
 */

public static void main(String[] args) throws Exception {
  // CloudBigtableOptions is one way to retrieve the options.  It's not required.
  BigtablePubsubOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtablePubsubOptions.class);

  // CloudBigtableTableConfiguration contains the project, instance and table to connect to.
  CloudBigtableTableConfiguration config =
      new CloudBigtableTableConfiguration.Builder()
      .withProjectId(options.getBigtableProjectId())
      .withInstanceId(options.getBigtableInstanceId())
      .withTableId(options.getBigtableTableId())
      .build();

  // In order to cancel the pipelines automatically,
  // DataflowPipelineRunner is forced to be used.
  // Also enables the 2 jobs to run at the same time.
  options.setRunner(DataflowRunner.class);

  options.as(DataflowPipelineOptions.class).setStreaming(true);
  Pipeline p = Pipeline.create(options);

  FixedWindows window = FixedWindows.of(Duration.standardMinutes(options.getWindowSize()));

  p
      .apply(PubsubIO.readStrings().fromTopic(options.getPubsubTopic()))
      .apply(Window.<String> into(window))
      .apply(ParDo.of(new ExtractWordsFn()))
      .apply(Count.<String> perElement())
      .apply(ParDo.of(MUTATION_TRANSFORM))
      .apply(CloudBigtableIO.writeToTable(config));

  p.run().waitUntilFinish();
  // Start a second job to inject messages into a Cloud Pubsub topic
  injectMessages(options);
}
 
开发者ID:GoogleCloudPlatform,项目名称:cloud-bigtable-examples,代码行数:60,代码来源:PubsubWordCount.java



注:本文中的org.apache.beam.runners.dataflow.DataflowRunner类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java PlainSyntaxHighlighter类代码示例发布时间:2022-05-23
下一篇:
Java JobInstance类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap