• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java BigQueryIO类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.google.cloud.dataflow.sdk.io.BigQueryIO的典型用法代码示例。如果您正苦于以下问题:Java BigQueryIO类的具体用法?Java BigQueryIO怎么用?Java BigQueryIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



BigQueryIO类属于com.google.cloud.dataflow.sdk.io包,在下文中一共展示了BigQueryIO类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) {
	BigQueryToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
			.as(BigQueryToDatastoreOptions.class);

	String inputTable = options.getInputTable().get();
	String projectID = options.getOutputProjectID().get();
	String kind = options.getOutputKind().get();

	LOG.info("Input_Table : " + inputTable);
	LOG.info("ProjectID : " + projectID);
	LOG.info("Kind : " + kind);

	Pipeline p = Pipeline.create(options);

	PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
			.apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());
	
	CreateEntities createEntities = new CreateEntities();
	createEntities.setKind(kind);
	
	PCollection<Entity> entities = keywordGroups.apply(createEntities);
	entities.apply(DatastoreIO.v1().write().withProjectId(projectID));

	p.run();
}
 
开发者ID:sinmetal,项目名称:iron-hippo,代码行数:26,代码来源:BigQueryToDatastore.java


示例2: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/**
 * Run a batch pipeline.
 */
public static void main(String[] args) throws Exception {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  TableReference tableRef = new TableReference();
  tableRef.setDatasetId(options.as(Options.class).getOutputDataset());
  tableRef.setProjectId(options.as(GcpOptions.class).getProject());
  tableRef.setTableId(options.getOutputTableName());

  // Read events from a CSV file and parse them.
  pipeline
      .apply(TextIO.Read.from(options.getInput()))
      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
      // Extract and sum username/score pairs from the event data.
      .apply("ExtractUserScore", new ExtractAndSumScore("user"))
      // Write the results to BigQuery.
      .apply(ParDo.named("FormatUserScoreSums").of(new FormatUserScoreSumsFn()))
      .apply(
          BigQueryIO.Write.to(tableRef)
              .withSchema(FormatUserScoreSumsFn.getSchema())
              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(WriteDisposition.WRITE_APPEND));

  pipeline.run();
}
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:29,代码来源:Exercise1.java


示例3: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/** Run a batch or streaming pipeline. */
public static void main(String[] args) throws Exception {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

  Pipeline pipeline = Pipeline.create(options);

  TableReference tableRef = new TableReference();
  tableRef.setDatasetId(options.as(Options.class).getOutputDataset());
  tableRef.setProjectId(options.as(GcpOptions.class).getProject());
  tableRef.setTableId(options.getOutputTableName());

  // Read events from either a CSV file or PubSub stream.
  pipeline
      .apply(new ReadGameEvents(options))
      .apply("WindowedTeamScore", new Exercise2.WindowedTeamScore(Duration.standardMinutes(60)))
      // Write the results to BigQuery.
      .apply(ParDo.named("FormatTeamScoreSums").of(new Exercise2.FormatTeamScoreSumsFn()))
      .apply(
          BigQueryIO.Write.to(tableRef)
              .withSchema(Exercise2.FormatTeamScoreSumsFn.getSchema())
              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(WriteDisposition.WRITE_APPEND));

  pipeline.run();
}
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:26,代码来源:Exercise3.java


示例4: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/**
 * Run a batch pipeline.
 */
public static void main(String[] args) throws Exception {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  TableReference tableRef = new TableReference();
  tableRef.setDatasetId(options.as(Options.class).getOutputDataset());
  tableRef.setProjectId(options.as(GcpOptions.class).getProject());
  tableRef.setTableId(options.getOutputTableName());

  // Read events from a CSV file, parse them and write (import) them to BigQuery.
  pipeline
      .apply(TextIO.Read.from(options.getInput()))
      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
      .apply(ParDo.named("FormatGameEvent").of(new FormatGameEventFn()))
      .apply(
          BigQueryIO.Write.to(tableRef)
              .withSchema(FormatGameEventFn.getSchema())
              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(WriteDisposition.WRITE_APPEND));

  pipeline.run();
}
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:26,代码来源:Exercise0.java


示例5: apply

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
@Override
public PCollection<TableRow> apply(PCollection<KV<String,Double>> input) {
    PCollection<TableRow> output = input.
      apply(ParDo.named("aggregateToTableRow").of(new DoFn<KV<String, Double>, TableRow>() {
          @Override
          public void processElement(ProcessContext c) {
              KV<String, Double> e = c.element();

              TableRow row = new TableRow()
                .set("destination", e.getKey())
                .set("aggResponseTime", e.getValue());

              c.output(row);
          }
      }));

    output.apply(BigQueryIO.Write
      .named("tableRowToBigQuery")
      .to(this.tableName)
      .withSchema(createTableSchema(this.tableSchema))
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

    return output;
}
 
开发者ID:GoogleCloudPlatform,项目名称:processing-logs-using-dataflow,代码行数:26,代码来源:LogAnalyticsPipeline.java


示例6: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/** Run a batch pipeline. */
public static void main(String[] args) throws Exception {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  TableReference tableRef = new TableReference();
  tableRef.setDatasetId(options.as(Options.class).getOutputDataset());
  tableRef.setProjectId(options.as(GcpOptions.class).getProject());
  tableRef.setTableId(options.getOutputTableName());

  // Read events from a CSV file and parse them.
  pipeline
      .apply(TextIO.Read.from(options.getInput()))
      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
      .apply(
          "AddEventTimestamps", WithTimestamps.of((GameEvent i) -> new Instant(i.getTimestamp())))
      .apply("WindowedTeamScore", new WindowedTeamScore(Duration.standardMinutes(60)))
      // Write the results to BigQuery.
      .apply(ParDo.named("FormatTeamScoreSums").of(new FormatTeamScoreSumsFn()))
      .apply(
          BigQueryIO.Write.to(tableRef)
              .withSchema(FormatTeamScoreSumsFn.getSchema())
              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(WriteDisposition.WRITE_APPEND));

  pipeline.run();
}
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:28,代码来源:Exercise2.java


示例7: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Exercise6Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
    // Enforce that this pipeline is always run in streaming mode.
    options.setStreaming(true);
    // Allow the pipeline to be cancelled automatically.
    options.setRunner(DataflowPipelineRunner.class);
    Pipeline pipeline = Pipeline.create(options);

    TableReference sessionsTable = new TableReference();
    sessionsTable.setDatasetId(options.getOutputDataset());
    sessionsTable.setProjectId(options.getProject());
    sessionsTable.setTableId(options.getOutputTableName());

    PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));

    // Extract username/score pairs from the event stream
    PCollection<KV<String, Integer>> userEvents =
        rawEvents.apply(
            "ExtractUserScore",
            MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
                .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));

    // [START EXERCISE 6]:
    // Detect user sessions-- that is, a burst of activity separated by a gap from further
    // activity. Find and record the mean session lengths.
    // This information could help the game designers track the changing user engagement
    // as their set of games changes.
    userEvents
        // Window the user events into sessions with gap options.getSessionGap() minutes. Make sure
        // to use an outputTimeFn that sets the output timestamp to the end of the window. This will
        // allow us to compute means on sessions based on their end times, rather than their start
        // times.
        .apply(
            /* TODO: YOUR CODE GOES HERE */
            new ChangeMe<PCollection<KV<String, Integer>>, KV<String, Integer>>())
        // For this use, we care only about the existence of the session, not any particular
        // information aggregated over it, so the following is an efficient way to do that.
        .apply(Combine.perKey(x -> 0))
        // Get the duration per session.
        .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
        // Re-window to process groups of session sums according to when the sessions complete.
        // In streaming we don't just ask "what is the mean value" we must ask "what is the mean
        // value for some window of time". To compute periodic means of session durations, we
        // re-window the session durations.
        .apply(
            /* TODO: YOUR CODE GOES HERE */
            new ChangeMe<PCollection<Integer>, Integer>())
        // Find the mean session duration in each window.
        .apply(Mean.<Integer>globally().withoutDefaults())
        // Write this info to a BigQuery table.
        .apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
        .apply(
            BigQueryIO.Write.to(sessionsTable)
                .withSchema(FormatSessionWindowFn.getSchema())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    // [END EXERCISE 6]:

    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
    // command line.
    PipelineResult result = pipeline.run();
  }
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:65,代码来源:Exercise6.java


示例8: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
  Exercise4Options options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise4Options.class);
  // Enforce that this pipeline is always run in streaming mode.
  options.setStreaming(true);
  // For example purposes, allow the pipeline to be easily cancelled instead of running
  // continuously.
  options.setRunner(DataflowPipelineRunner.class);
  Pipeline pipeline = Pipeline.create(options);

  TableReference teamTable = new TableReference();
  teamTable.setDatasetId(options.getOutputDataset());
  teamTable.setProjectId(options.getProject());
  teamTable.setTableId(options.getOutputTableName() + "_team");

  TableReference userTable = new TableReference();
  userTable.setDatasetId(options.getOutputDataset());
  userTable.setProjectId(options.getProject());
  userTable.setTableId(options.getOutputTableName() + "_user");

  PCollection<GameEvent> gameEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));

  gameEvents
      .apply(
          "CalculateTeamScores",
          new CalculateTeamScores(
              Duration.standardMinutes(options.getTeamWindowDuration()),
              Duration.standardMinutes(options.getAllowedLateness())))
      // Write the results to BigQuery.
      .apply(ParDo.named("FormatTeamScores").of(new FormatTeamScoreFn()))
      .apply(
          BigQueryIO.Write.to(teamTable)
              .withSchema(FormatTeamScoreFn.getSchema())
              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(WriteDisposition.WRITE_APPEND));

  gameEvents
      .apply(
          "CalculateUserScores",
          new CalculateUserScores(Duration.standardMinutes(options.getAllowedLateness())))
      // Write the results to BigQuery.
      .apply(ParDo.named("FormatUserScores").of(new FormatUserScoreFn()))
      .apply(
          BigQueryIO.Write.to(userTable)
              .withSchema(FormatUserScoreFn.getSchema())
              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(WriteDisposition.WRITE_APPEND));

  // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
  // command line.
  PipelineResult result = pipeline.run();
}
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:53,代码来源:Exercise4.java


示例9: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
  Exercise7Options options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise7Options.class);
  // Enforce that this pipeline is always run in streaming mode.
  options.setStreaming(true);
  // Allow the pipeline to be cancelled automatically.
  options.setRunner(DataflowPipelineRunner.class);
  Pipeline pipeline = Pipeline.create(options);

  TableReference badUserTable = new TableReference();
  badUserTable.setDatasetId(options.getOutputDataset());
  badUserTable.setProjectId(options.getProject());
  badUserTable.setTableId(options.getOutputTableName() + "_bad_users");

  //  1. Read game events with message id and timestamp
  //  2. Parse events
  //  3. Key by event id
  //  4. Sessionize.
  PCollection<KV<String, GameEvent>> sessionedEvents = null; /* TODO: YOUR CODE GOES HERE */

  //  1. Read play events with message id and timestamp
  //  2. Parse events
  //  3. Key by event id
  //  4. Sessionize.
  PCollection<KV<String, PlayEvent>> sessionedPlayEvents = null; /* TODO: YOUR CODE GOES HERE */

  // 1. Join events
  // 2. Compute latency using ComputeLatencyFn
  PCollection<KV<String, Long>> userLatency = null; /* TODO: YOUR CODE GOES HERE */

  // 1. Get the values of userLatencies
  // 2. Re-window into GlobalWindows with periodic repeated triggers
  // 3. Compute global approximate quantiles with fanout
  PCollectionView<List<Long>> globalQuantiles = null; /* TODO: YOUR CODE GOES HERE */

  userLatency
      // Use the computed latency distribution as a side-input to filter out likely bad users.
      .apply(
          "DetectBadUsers",
          ParDo.withSideInputs(globalQuantiles)
              .of(
                  new DoFn<KV<String, Long>, String>() {
                    public void processElement(ProcessContext c) {
                      /* TODO: YOUR CODE GOES HERE */
                      throw new RuntimeException("Not implemented");
                    }
                  }))
      // We want to only emilt a single BigQuery row for every bad user. To do this, we
      // re-key by user, then window globally and trigger on the first element for each key.
      .apply(
          "KeyByUser",
          WithKeys.of((String user) -> user).withKeyType(TypeDescriptor.of(String.class)))
      .apply(
          "GlobalWindowsTriggerOnFirst",
          Window.<KV<String, String>>into(new GlobalWindows())
              .triggering(
                  AfterProcessingTime.pastFirstElementInPane()
                      .plusDelayOf(Duration.standardSeconds(10)))
              .accumulatingFiredPanes())
      .apply("GroupByUser", GroupByKey.<String, String>create())
      .apply("FormatBadUsers", ParDo.of(new FormatBadUserFn()))
      .apply(
          "WriteBadUsers",
          BigQueryIO.Write.to(badUserTable)
              .withSchema(FormatBadUserFn.getSchema())
              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(WriteDisposition.WRITE_APPEND));

  // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
  // command line.
  PipelineResult result = pipeline.run();
}
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:73,代码来源:Exercise7.java


示例10: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Exercise6Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
    // Enforce that this pipeline is always run in streaming mode.
    options.setStreaming(true);
    // Allow the pipeline to be cancelled automatically.
    options.setRunner(DataflowPipelineRunner.class);
    Pipeline pipeline = Pipeline.create(options);

    TableReference sessionsTable = new TableReference();
    sessionsTable.setDatasetId(options.getOutputDataset());
    sessionsTable.setProjectId(options.getProject());
    sessionsTable.setTableId(options.getOutputTableName());

    PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));

    // Extract username/score pairs from the event stream
    PCollection<KV<String, Integer>> userEvents =
        rawEvents.apply(
            "ExtractUserScore",
            MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
                .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));

    // Detect user sessions-- that is, a burst of activity separated by a gap from further
    // activity. Find and record the mean session lengths.
    // This information could help the game designers track the changing user engagement
    // as their set of games changes.
    userEvents
        .apply(
            Window.named("WindowIntoSessions")
                .<KV<String, Integer>>into(
                    Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
                .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
        // For this use, we care only about the existence of the session, not any particular
        // information aggregated over it, so the following is an efficient way to do that.
        .apply(Combine.perKey(x -> 0))
        // Get the duration per session.
        .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
        // Re-window to process groups of session sums according to when the sessions complete.
        .apply(
            Window.named("WindowToExtractSessionMean")
                .<Integer>into(
                    FixedWindows.of(
                        Duration.standardMinutes(options.getUserActivityWindowDuration()))))
        // Find the mean session duration in each window.
        .apply(Mean.<Integer>globally().withoutDefaults())
        // Write this info to a BigQuery table.
        .apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
        .apply(
            BigQueryIO.Write.to(sessionsTable)
                .withSchema(FormatSessionWindowFn.getSchema())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));

    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
    // command line.
    PipelineResult result = pipeline.run();
  }
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:60,代码来源:Exercise6.java


示例11: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Exercise5Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise5Options.class);
    // Enforce that this pipeline is always run in streaming mode.
    options.setStreaming(true);
    // Allow the pipeline to be cancelled automatically.
    options.setRunner(DataflowPipelineRunner.class);
    Pipeline pipeline = Pipeline.create(options);

    TableReference teamTable = new TableReference();
    teamTable.setDatasetId(options.getOutputDataset());
    teamTable.setProjectId(options.getProject());
    teamTable.setTableId(options.getOutputTableName());

    PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));

    // Extract username/score pairs from the event stream
    PCollection<KV<String, Integer>> userEvents =
        rawEvents.apply(
            "ExtractUserScore",
            MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
                .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));

    // Calculate the total score per user over fixed windows, and
    // cumulative updates for late data.
    final PCollectionView<Map<String, Integer>> spammersView =
        userEvents
            .apply(
                Window.named("FixedWindowsUser")
                    .<KV<String, Integer>>into(
                        FixedWindows.of(
                            Duration.standardMinutes(options.getFixedWindowDuration()))))

            // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
            // These might be robots/spammers.
            .apply("CalculateSpammyUsers", new CalculateSpammyUsers())
            // Derive a view from the collection of spammer users. It will be used as a side input
            // in calculating the team score sums, below.
            .apply("CreateSpammersView", View.<String, Integer>asMap());

    // Calculate the total score per team over fixed windows,
    // and emit cumulative updates for late data. Uses the side input derived above-- the set of
    // suspected robots-- to filter out scores from those users from the sum.
    // Write the results to BigQuery.
    rawEvents
        .apply(
            Window.named("WindowIntoFixedWindows")
                .<GameEvent>into(
                    FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
        // Filter out the detected spammer users, using the side input derived above.
        .apply(
            ParDo.named("FilterOutSpammers")
                .withSideInputs(spammersView)
                .of(
                    new DoFn<GameEvent, GameEvent>() {
                      @Override
                      public void processElement(ProcessContext c) {
                        // If the user is not in the spammers Map, output the data element.
                        if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
                          c.output(c.element());
                        }
                      }
                    }))
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"))
        // Write the result to BigQuery
        .apply(ParDo.named("FormatTeamWindows").of(new FormatTeamWindowFn()))
        .apply(
            BigQueryIO.Write.to(teamTable)
                .withSchema(FormatTeamWindowFn.getSchema())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));

    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
    // command line.
    PipelineResult result = pipeline.run();
  }
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:79,代码来源:Exercise5.java


示例12: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

    Exercise5Options options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise5Options.class);
    // Enforce that this pipeline is always run in streaming mode.
    options.setStreaming(true);
    // Allow the pipeline to be cancelled automatically.
    options.setRunner(DataflowPipelineRunner.class);
    Pipeline pipeline = Pipeline.create(options);

    TableReference teamTable = new TableReference();
    teamTable.setDatasetId(options.getOutputDataset());
    teamTable.setProjectId(options.getProject());
    teamTable.setTableId(options.getOutputTableName());

    PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));

    // Extract username/score pairs from the event stream
    PCollection<KV<String, Integer>> userEvents =
        rawEvents.apply(
            "ExtractUserScore",
            MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
                .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));

    // Calculate the total score per user over fixed windows, and
    // cumulative updates for late data.
    final PCollectionView<Map<String, Integer>> spammersView =
        userEvents
            .apply(
                Window.named("FixedWindowsUser")
                    .<KV<String, Integer>>into(
                        FixedWindows.of(
                            Duration.standardMinutes(options.getFixedWindowDuration()))))

            // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
            // These might be robots/spammers.
            .apply("CalculateSpammyUsers", new CalculateSpammyUsers())
            // Derive a view from the collection of spammer users. It will be used as a side input
            // in calculating the team score sums, below.
            .apply("CreateSpammersView", View.<String, Integer>asMap());

    // [START EXERCISE 5 PART b]:
    // Calculate the total score per team over fixed windows,
    // and emit cumulative updates for late data. Uses the side input derived above-- the set of
    // suspected robots-- to filter out scores from those users from the sum.
    // Write the results to BigQuery.
    rawEvents
        .apply(
            Window.named("WindowIntoFixedWindows")
                .<GameEvent>into(
                    FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
        // Filter out the detected spammer users, using the side input derived above.
        //  Use ParDo with spammersView side input to filter out spammers.
        .apply(/* TODO: YOUR CODE GOES HERE */ new ChangeMe<PCollection<GameEvent>, GameEvent>())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"))
        // Write the result to BigQuery
        .apply(ParDo.named("FormatTeamWindows").of(new FormatTeamWindowFn()))
        .apply(
            BigQueryIO.Write.to(teamTable)
                .withSchema(FormatTeamWindowFn.getSchema())
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    // [START EXERCISE 5 PART b]:

    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
    // command line.
    PipelineResult result = pipeline.run();
  }
 
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:70,代码来源:Exercise5.java


示例13: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws GeneralSecurityException, IOException, ParseException, ParserConfigurationException, SAXException {
	String params = null;
	for (int i = 0; i < args.length; i++) {
		if (args[i].startsWith("--params="))
			params = args[i].replaceFirst("--params=", "");
	}

	System.out.println(params);
	init(params);

	GoogleCredential credential = new GoogleCredential.Builder()
			.setTransport(new NetHttpTransport())
			.setJsonFactory(new JacksonFactory())
			.setServiceAccountId(accountEmail)
			.setServiceAccountScopes(Arrays.asList(new String[] {"https://www.googleapis.com/auth/cloud-platform"}))
			.setServiceAccountPrivateKeyFromP12File(new File(keyFile))
			.build();

	DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
	options.setRunner(DataflowPipelineRunner.class);
	// Your project ID is required in order to run your pipeline on the Google Cloud.
	options.setProject(projectId);
	// Your Google Cloud Storage path is required for staging local files.
	options.setStagingLocation(workingBucket);
	options.setGcpCredential(credential);
	options.setServiceAccountName(accountEmail);
	options.setServiceAccountKeyfile(keyFile);
	options.setMaxNumWorkers(maxNumWorkers);
	options.setDiskSizeGb(diskSizeGb);
	options.setWorkerMachineType(machineType);
	options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
	options.setZone(zone);
	options.setStreaming(isStreaming);
	options.setJobName(pipelineName);


	Gson gson = new Gson();
	TableSchema schema = gson.fromJson(schemaStr, TableSchema.class);
	Pipeline pipeline = Pipeline.create(options);
	PCollection<String> streamData =
			pipeline.apply(PubsubIO.Read.named("ReadFromPubsub")
					.topic(String.format("projects/%1$s/topics/%2$s",projectId,pubSubTopic)));
	PCollection<TableRow> tableRow = streamData.apply("ToTableRow", ParDo.of(new PrepData.ToTableRow()));


	tableRow.apply(BigQueryIO.Write
			.named("WriteBQTable")
			.to(String.format("%1$s:%2$s.%3$s",projectId, bqDataSet, bqTable))
			.withSchema(schema)
			.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

	System.out.println("Starting pipeline " + pipelineName);
	pipeline.run();
}
 
开发者ID:bomboradata,项目名称:pubsub-to-bigquery,代码行数:55,代码来源:PubSubToBQPipeline.java


示例14: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  options.setBigQuerySchema(getSchema());
  // DataflowExampleUtils creates the necessary input sources to simplify execution of this
  // Pipeline.
  DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
    options.isUnbounded());

  Pipeline pipeline = Pipeline.create(options);

  /**
   * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
   * unbounded input source.
   */
  PCollection<String> input;
  if (options.isUnbounded()) {
    LOG.info("Reading from PubSub.");
    /**
     * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
     * specified as an argument. The data elements' timestamps will come from the pubsub
     * injection.
     */
    input = pipeline
        .apply(PubsubIO.Read.topic(options.getPubsubTopic()));
  } else {
    /** Else, this is a bounded pipeline. Read from the GCS file. */
    input = pipeline
        .apply(TextIO.Read.from(options.getInputFile()))
        // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
        // See AddTimestampFn for more detail on this.
        .apply(ParDo.of(new AddTimestampFn()));
  }

  /**
   * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
   * minute (you can change this with a command-line option). See the documentation for more
   * information on how fixed windows work, and for information on the other types of windowing
   * available (e.g., sliding windows).
   */
  PCollection<String> windowedWords = input
    .apply(Window.<String>into(
      FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));

  /**
   * Concept #5: Re-use our existing CountWords transform that does not have knowledge of
   * windows over a PCollection containing windowed values.
   */
  PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());

  /**
   * Concept #6: Format the results for a BigQuery table, then write to BigQuery.
   * The BigQuery output source supports both bounded and unbounded data.
   */
  wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
      .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));

  PipelineResult result = pipeline.run();

  /**
   * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
   * runs for a limited time, and publishes to the input PubSub topic.
   *
   * With an unbounded input source, you will need to explicitly shut down this pipeline when you
   * are done with it, so that you do not continue to be charged for the instances. You can do
   * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
   * pipelines. The PubSub topic will also be deleted at this time.
   */
  exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
}
 
开发者ID:sinmetal,项目名称:iron-hippo,代码行数:70,代码来源:WindowedWordCount.java


示例15: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) {

    	// Setup Dataflow options
        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
        options.setMaxNumWorkers(3);

        String projectId = options.getProject();

        // Create TableSchemas from their String representation
        TableSchema tweetsTableSchema;
        TableSchema annotatedTweetsTableSchema;
        try {
            tweetsTableSchema = createTableSchema(TWEETS_TABLE_SCHEMA);
            annotatedTweetsTableSchema = createTableSchema(ANNOTATED_TWEETS_TABLE_SCHEMA);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }

        Pipeline p = Pipeline.create(options);

        // Read tweets from Pub/Sub
        PCollection<String> tweets = null;
        tweets = p.apply(PubsubIO.Read.named("Read tweets from PubSub").topic("projects/" + projectId + "/topics/blackfridaytweets"));

        // Format tweets for BigQuery
        PCollection<TableRow> formattedTweets = tweets.apply(ParDo.named("Format tweets for BigQuery").of(new DoFormat()));

        // Create a TableReference for the destination table
        TableReference tableReference = new TableReference();
        tableReference.setProjectId(projectId);
        tableReference.setDatasetId("black_friday_analytics");
        tableReference.setTableId("tweets_raw");

        // Write tweets to BigQuery
        formattedTweets.apply(BigQueryIO.Write.named("Write tweets to BigQuery").to(tableReference).withSchema(tweetsTableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).withoutValidation());

        // Filter and annotate tweets with their sentiment from NL API
        // Note: if the pipeline is run as a batch pipeline, the filter condition is inverted
        PCollection<String> filteredTweets = tweets.apply(ParDo.named("Filter and annotate tweets").of(new DoFilterAndProcess()));

        // Format tweets for BigQuery 
        PCollection<TableRow> filteredFormattedTweets = filteredTweets.apply(ParDo.named("Format annotated tweets for BigQuery").of(new DoFormat()));

		// Create a TableReference for the destination table
        TableReference filteredTableReference = new TableReference();
        filteredTableReference.setProjectId(projectId);
        filteredTableReference.setDatasetId("black_friday_analytics");
        filteredTableReference.setTableId("tweets_sentiment");

        // Write tweets to BigQuery
        filteredFormattedTweets.apply(BigQueryIO.Write.named("Write annotated tweets to BigQuery").to(filteredTableReference).withSchema(annotatedTweetsTableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

        p.run();
    }
 
开发者ID:LorenzoRidiNoovle,项目名称:gcp-black-friday-analytics,代码行数:58,代码来源:TwitterProcessor.java


示例16: run

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void run(String[] args) throws Exception {
  System.out.println("Making Datastore->GCS pipeline");

  Options options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(Options.class);

  if (options.getIsBlocking()) {
    options.setRunner(BlockingDataflowPipelineRunner.class);
  } else {
    options.setRunner(DataflowPipelineRunner.class);
    options.setStreaming(false);
  }

  String date = (new SimpleDateFormat("yyyMMdd")).format(new Date());

  // make string in format: "my-project:dataset.entity_date"
  String tableName = String.format("%s:%s.%s_%s",
      options.getProject(),
      options.getBQDataset(),
      options.getDatastoreEntityKind(),
      date);
  System.out.println("Destination BigQuery Table is: " + tableName);

  // Build our Datastore query.
  // Right now we are simply grabbing all Datastore records of a given kind,
  Query.Builder queryBuilder = Query.newBuilder();
  queryBuilder.addKindBuilder().setName(options.getDatastoreEntityKind());
  Query query = queryBuilder.build();

  // Generate the Datastore Read Source
  DatastoreV1.Read read = DatastoreIO.v1().read()
      .withProjectId(options.getProject())
      .withQuery(query);
  
  // Build our data pipeline
  Pipeline pipeline = Pipeline.create(options);
  pipeline.apply("IngestEntities", read)
          .apply("EntityToTableRows", ParDo.of(new DatastoreToTableRow()))
          .apply("WriteTableRows", BigQueryIO.Write
              .to(tableName)
              .withSchema(new BQSchema().getTableSchema())
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

  System.out.println("Running pipeline");
  pipeline.run();
}
 
开发者ID:cobookman,项目名称:DatastoreToGCS,代码行数:49,代码来源:BQBackup.java


示例17: main

import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/**
 * {@link Pipeline} Entry point of this Dataflow Pipeline
 */
public static void main(String args[]) {

	try {

		ExtractContractTerms fn = new ExtractContractTerms();

		// fromArgs takes command-line options and maps them to properties within a PipelineOptions 
		SymbolTransformOptions options = PipelineOptionsFactory.fromArgs(args).as(SymbolTransformOptions.class);
		Pipeline pipeline = Pipeline.create(options);

		// Input is a PCollection of String, Output is a PCollection of OptionTick
		PCollection < OptionsTick > mainCollection = pipeline.apply(TextIO.Read.named("Reading input file")
			.from(options.getInputFilePath()))
			.apply(ParDo.named("Extracting options contract terms from symbol")
			.of(fn)).setCoder(SerializableCoder.of(OptionsTick.class));

		// Input is PCollection of OptionTick, Output are records within the BigQuery
		// If destination is BigQuery then DataflowPipelineRunner or BlockingDataflowPipelineRunner must be used
		if (!"".equals(options.getOutputTable())) {

			mainCollection.apply(ParDo.named("Creating BigQuery row from tick")
				.of(new CreateBigQueryRow()))
				.apply(BigQueryIO.Write.named("Writing records to BigQuery")
				.to(options.getOutputTable())
				.withSchema(SymbolTransformPipeline.generateSchema())
				.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
				.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

		} else {

			// To write to a local text file, please verify that 
			// DirectPipelineRunner is selected in pom.xml or the run shell script
			mainCollection.apply(ParDo.named("Creating String row from tick")
				.of(new TickToString()))
				.apply(TextIO.Write.named("Writing output file")
				.to(options.getOutputFilePath()));
		}

		pipeline.run();
		System.exit(0);

	} catch (Exception ex) {
		LOGGER.error(ex.getMessage(), ex);
		System.exit(1);
	}
}
 
开发者ID:SunGard-Labs,项目名称:dataflow-whitepaper,代码行数:50,代码

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java CircuitPathChooser类代码示例发布时间:2022-05-22
下一篇:
Java SpringWebContext类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap