• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java PubsubIO类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.google.cloud.dataflow.sdk.io.PubsubIO的典型用法代码示例。如果您正苦于以下问题:Java PubsubIO类的具体用法?Java PubsubIO怎么用?Java PubsubIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



PubsubIO类属于com.google.cloud.dataflow.sdk.io包,在下文中一共展示了PubsubIO类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: apply

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
@Override
public PCollection<GameEvent> apply(PBegin begin) {
  if (options.getInput() != null && !options.getInput().isEmpty()) {
    return begin
        .getPipeline()
        .apply(TextIO.Read.from(options.getInput()))
        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
        .apply(
            "AddEventTimestamps",
            WithTimestamps.of((GameEvent i) -> new Instant(i.getTimestamp())));
  } else {
    return begin
        .getPipeline()
        .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
  }
}
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:18,代码来源:Exercise3.java


示例2: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
  CustomPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
  Pipeline p = Pipeline.create(options);

  p.apply(PubsubIO.Read.named("read from PubSub")
      .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
      .timestampLabel("ts")
      .withCoder(TableRowJsonCoder.of()))

   .apply("window 1s", Window.into(FixedWindows.of(Duration.standardSeconds(1))))
   .apply("mark rides", MapElements.via(new MarkRides()))
   .apply("count similar", Count.perKey())
   .apply("format rides", MapElements.via(new TransformRides()))

   .apply(PubsubIO.Write.named("WriteToPubsub")
      .topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
      .withCoder(TableRowJsonCoder.of()));

  p.run();
}
 
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:22,代码来源:CountRides.java


示例3: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
  CustomPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
  Pipeline p = Pipeline.create(options);

  p.apply(PubsubIO.Read.named("read from PubSub")
      .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
      .timestampLabel("ts")
      .withCoder(TableRowJsonCoder.of()))

   .apply("filter lower Manhattan", ParDo.of(new FilterLowerManhattan()))

   .apply(PubsubIO.Write.named("WriteToPubsub")
      .topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
      .withCoder(TableRowJsonCoder.of()));
  p.run();
}
 
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:18,代码来源:FilterRides.java


示例4: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
  CustomPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
  Pipeline p = Pipeline.create(options);

  p.apply(PubsubIO.Read.named("read from PubSub")
      .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
      .timestampLabel("ts")
      .withCoder(TableRowJsonCoder.of()))

   .apply("window 1s", Window.into(FixedWindows.of(Duration.standardSeconds(1))))

   .apply("parse timestamps",
      MapElements.via(
        (TableRow e) ->
          Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(e.get("timestamp").toString())).toEpochMilli())
      .withOutputType(TypeDescriptor.of(Long.class)))

   .apply("max timestamp in window", Max.longsGlobally().withoutDefaults())

   .apply("transform",
      MapElements.via(
        (Long t) -> {
          TableRow ride = new TableRow();
          ride.set("timestamp", Instant.ofEpochMilli(t).toString());
          return ride;
        })
      .withOutputType(TypeDescriptor.of(TableRow.class)))

   .apply(PubsubIO.Write.named("write to PubSub")
      .topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
      .withCoder(TableRowJsonCoder.of()));
  p.run();
}
 
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:TimestampRides.java


示例5: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
  CustomPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
  Pipeline p = Pipeline.create(options);

  p.apply(PubsubIO.Read.named("read from PubSub")
      .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
      .timestampLabel("ts")
      .withCoder(TableRowJsonCoder.of()))

   .apply("extract dollars",
      MapElements.via((TableRow x) -> Double.parseDouble(x.get("meter_increment").toString()))
        .withOutputType(TypeDescriptor.of(Double.class)))

   .apply("fixed window", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
   .apply("trigger",
      Window.<Double>triggering(
        AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1)))
          .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .accumulatingFiredPanes()
        .withAllowedLateness(Duration.standardMinutes(5)))

   .apply("sum whole window", Sum.doublesGlobally().withoutDefaults())
   .apply("format rides", ParDo.of(new TransformRides()))

   .apply(PubsubIO.Write.named("WriteToPubsub")
      .topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
      .withCoder(TableRowJsonCoder.of()));
  p.run();
}
 
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:32,代码来源:ExactDollarRides.java


示例6: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
  CustomPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
  Pipeline p = Pipeline.create(options);

  p.apply(PubsubIO.Read.named("read from PubSub")
      .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
      .timestampLabel("ts")
      .withCoder(TableRowJsonCoder.of()))

   .apply("filter a few rides",
      Filter.byPredicate(
        (TableRow t) -> {
          String rideId = t.get("ride_id").toString();

          // You can change the filter here to allow more or fewer rides through:
          // rideIds starting with "a" are quite common
          // rideIds starting with "ab" are rarer
          // rideIds starting with "abc" are rarer still
          if (rideId.startsWith("ab")) {
            LOG.info("Accepted point on ride {} with order number {}} timestamp {}",
              t.get("ride_id"), t.get("point_idx"), t.get("timestamp"));
            return true;
          }
          return false;
        }))

   .apply(PubsubIO.Write.named("WriteToPubsub")
      .topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
      .withCoder(TableRowJsonCoder.of()));

  p.run();
}
 
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:34,代码来源:DebugFewRides.java


示例7: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {
  CustomPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
  Pipeline p = Pipeline.create(options);

  p.apply(PubsubIO.Read.named("read from PubSub")
      .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
      .timestampLabel("ts")
      .withCoder(TableRowJsonCoder.of()))

   .apply("key rides by rideid",
      MapElements.via((TableRow ride) -> KV.of(ride.get("ride_id").toString(), ride))
        .withOutputType(new TypeDescriptor<KV<String, TableRow>>() {}))

   .apply("session windows on rides with early firings",
      Window.<KV<String, TableRow>>into(
        Sessions.withGapDuration(Duration.standardMinutes(60)))
          .triggering(
            AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(2000))))
          .accumulatingFiredPanes()
          .withAllowedLateness(Duration.ZERO))

   .apply("group ride points on same ride", Combine.perKey(new LatestPointCombine()))

   .apply("discard key",
      MapElements.via((KV<String, TableRow> a) -> a.getValue())
        .withOutputType(TypeDescriptor.of(TableRow.class)))

   .apply(PubsubIO.Write.named("WriteToPubsub")
      .topic(String.format("projects/%s/topics/%s", options.getSinkProject(), options.getSinkTopic()))
      .withCoder(TableRowJsonCoder.of()));
  p.run();
}
 
开发者ID:googlecodelabs,项目名称:cloud-dataflow-nyc-taxi-tycoon,代码行数:35,代码来源:LatestRides.java


示例8: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) throws GeneralSecurityException, IOException, ParseException, ParserConfigurationException, SAXException {
	String params = null;
	for (int i = 0; i < args.length; i++) {
		if (args[i].startsWith("--params="))
			params = args[i].replaceFirst("--params=", "");
	}

	System.out.println(params);
	init(params);

	GoogleCredential credential = new GoogleCredential.Builder()
			.setTransport(new NetHttpTransport())
			.setJsonFactory(new JacksonFactory())
			.setServiceAccountId(accountEmail)
			.setServiceAccountScopes(Arrays.asList(new String[] {"https://www.googleapis.com/auth/cloud-platform"}))
			.setServiceAccountPrivateKeyFromP12File(new File(keyFile))
			.build();

	DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
	options.setRunner(DataflowPipelineRunner.class);
	// Your project ID is required in order to run your pipeline on the Google Cloud.
	options.setProject(projectId);
	// Your Google Cloud Storage path is required for staging local files.
	options.setStagingLocation(workingBucket);
	options.setGcpCredential(credential);
	options.setServiceAccountName(accountEmail);
	options.setServiceAccountKeyfile(keyFile);
	options.setMaxNumWorkers(maxNumWorkers);
	options.setDiskSizeGb(diskSizeGb);
	options.setWorkerMachineType(machineType);
	options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
	options.setZone(zone);
	options.setStreaming(isStreaming);
	options.setJobName(pipelineName);


	Gson gson = new Gson();
	TableSchema schema = gson.fromJson(schemaStr, TableSchema.class);
	Pipeline pipeline = Pipeline.create(options);
	PCollection<String> streamData =
			pipeline.apply(PubsubIO.Read.named("ReadFromPubsub")
					.topic(String.format("projects/%1$s/topics/%2$s",projectId,pubSubTopic)));
	PCollection<TableRow> tableRow = streamData.apply("ToTableRow", ParDo.of(new PrepData.ToTableRow()));


	tableRow.apply(BigQueryIO.Write
			.named("WriteBQTable")
			.to(String.format("%1$s:%2$s.%3$s",projectId, bqDataSet, bqTable))
			.withSchema(schema)
			.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

	System.out.println("Starting pipeline " + pipelineName);
	pipeline.run();
}
 
开发者ID:bomboradata,项目名称:pubsub-to-bigquery,代码行数:55,代码来源:PubSubToBQPipeline.java


示例9: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  options.setBigQuerySchema(getSchema());
  // DataflowExampleUtils creates the necessary input sources to simplify execution of this
  // Pipeline.
  DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
    options.isUnbounded());

  Pipeline pipeline = Pipeline.create(options);

  /**
   * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
   * unbounded input source.
   */
  PCollection<String> input;
  if (options.isUnbounded()) {
    LOG.info("Reading from PubSub.");
    /**
     * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
     * specified as an argument. The data elements' timestamps will come from the pubsub
     * injection.
     */
    input = pipeline
        .apply(PubsubIO.Read.topic(options.getPubsubTopic()));
  } else {
    /** Else, this is a bounded pipeline. Read from the GCS file. */
    input = pipeline
        .apply(TextIO.Read.from(options.getInputFile()))
        // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
        // See AddTimestampFn for more detail on this.
        .apply(ParDo.of(new AddTimestampFn()));
  }

  /**
   * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
   * minute (you can change this with a command-line option). See the documentation for more
   * information on how fixed windows work, and for information on the other types of windowing
   * available (e.g., sliding windows).
   */
  PCollection<String> windowedWords = input
    .apply(Window.<String>into(
      FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));

  /**
   * Concept #5: Re-use our existing CountWords transform that does not have knowledge of
   * windows over a PCollection containing windowed values.
   */
  PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());

  /**
   * Concept #6: Format the results for a BigQuery table, then write to BigQuery.
   * The BigQuery output source supports both bounded and unbounded data.
   */
  wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
      .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));

  PipelineResult result = pipeline.run();

  /**
   * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
   * runs for a limited time, and publishes to the input PubSub topic.
   *
   * With an unbounded input source, you will need to explicitly shut down this pipeline when you
   * are done with it, so that you do not continue to be charged for the instances. You can do
   * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
   * pipelines. The PubSub topic will also be deleted at this time.
   */
  exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
}
 
开发者ID:sinmetal,项目名称:iron-hippo,代码行数:70,代码来源:WindowedWordCount.java


示例10: main

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
public static void main(String[] args) {

    	// Setup Dataflow options
        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
        options.setMaxNumWorkers(3);

        String projectId = options.getProject();

        // Create TableSchemas from their String representation
        TableSchema tweetsTableSchema;
        TableSchema annotatedTweetsTableSchema;
        try {
            tweetsTableSchema = createTableSchema(TWEETS_TABLE_SCHEMA);
            annotatedTweetsTableSchema = createTableSchema(ANNOTATED_TWEETS_TABLE_SCHEMA);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }

        Pipeline p = Pipeline.create(options);

        // Read tweets from Pub/Sub
        PCollection<String> tweets = null;
        tweets = p.apply(PubsubIO.Read.named("Read tweets from PubSub").topic("projects/" + projectId + "/topics/blackfridaytweets"));

        // Format tweets for BigQuery
        PCollection<TableRow> formattedTweets = tweets.apply(ParDo.named("Format tweets for BigQuery").of(new DoFormat()));

        // Create a TableReference for the destination table
        TableReference tableReference = new TableReference();
        tableReference.setProjectId(projectId);
        tableReference.setDatasetId("black_friday_analytics");
        tableReference.setTableId("tweets_raw");

        // Write tweets to BigQuery
        formattedTweets.apply(BigQueryIO.Write.named("Write tweets to BigQuery").to(tableReference).withSchema(tweetsTableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).withoutValidation());

        // Filter and annotate tweets with their sentiment from NL API
        // Note: if the pipeline is run as a batch pipeline, the filter condition is inverted
        PCollection<String> filteredTweets = tweets.apply(ParDo.named("Filter and annotate tweets").of(new DoFilterAndProcess()));

        // Format tweets for BigQuery 
        PCollection<TableRow> filteredFormattedTweets = filteredTweets.apply(ParDo.named("Format annotated tweets for BigQuery").of(new DoFormat()));

		// Create a TableReference for the destination table
        TableReference filteredTableReference = new TableReference();
        filteredTableReference.setProjectId(projectId);
        filteredTableReference.setDatasetId("black_friday_analytics");
        filteredTableReference.setTableId("tweets_sentiment");

        // Write tweets to BigQuery
        filteredFormattedTweets.apply(BigQueryIO.Write.named("Write annotated tweets to BigQuery").to(filteredTableReference).withSchema(annotatedTweetsTableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

        p.run();
    }
 
开发者ID:LorenzoRidiNoovle,项目名称:gcp-black-friday-analytics,代码行数:58,代码来源:TwitterProcessor.java


示例11: apply

import com.google.cloud.dataflow.sdk.io.PubsubIO; //导入依赖的package包/类
@Override
  public PCollectionView<String> apply(PInput input) {
return input.getPipeline()
  		.apply(PubsubIO.Read.topic(topic).maxNumRecords(1))
          .apply(View.<String>asSingleton());
  }
 
开发者ID:SunGard-Labs,项目名称:dataflow-whitepaper,代码行数:7,代码来源:PubSubStarter.java



注:本文中的com.google.cloud.dataflow.sdk.io.PubsubIO类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java MessageTypeParser类代码示例发布时间:2022-05-23
下一篇:
Java ContainerPreemptEventType类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap