本文整理汇总了Java中com.google.cloud.dataflow.sdk.io.BigQueryIO类的典型用法代码示例。如果您正苦于以下问题:Java BigQueryIO类的具体用法?Java BigQueryIO怎么用?Java BigQueryIO使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
BigQueryIO类属于com.google.cloud.dataflow.sdk.io包,在下文中一共展示了BigQueryIO类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) {
BigQueryToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(BigQueryToDatastoreOptions.class);
String inputTable = options.getInputTable().get();
String projectID = options.getOutputProjectID().get();
String kind = options.getOutputKind().get();
LOG.info("Input_Table : " + inputTable);
LOG.info("ProjectID : " + projectID);
LOG.info("Kind : " + kind);
Pipeline p = Pipeline.create(options);
PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
.apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());
CreateEntities createEntities = new CreateEntities();
createEntities.setKind(kind);
PCollection<Entity> entities = keywordGroups.apply(createEntities);
entities.apply(DatastoreIO.v1().write().withProjectId(projectID));
p.run();
}
开发者ID:sinmetal,项目名称:iron-hippo,代码行数:26,代码来源:BigQueryToDatastore.java
示例2: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/**
* Run a batch pipeline.
*/
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
tableRef.setDatasetId(options.as(Options.class).getOutputDataset());
tableRef.setProjectId(options.as(GcpOptions.class).getProject());
tableRef.setTableId(options.getOutputTableName());
// Read events from a CSV file and parse them.
pipeline
.apply(TextIO.Read.from(options.getInput()))
.apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"))
// Write the results to BigQuery.
.apply(ParDo.named("FormatUserScoreSums").of(new FormatUserScoreSumsFn()))
.apply(
BigQueryIO.Write.to(tableRef)
.withSchema(FormatUserScoreSumsFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:29,代码来源:Exercise1.java
示例3: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/** Run a batch or streaming pipeline. */
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
tableRef.setDatasetId(options.as(Options.class).getOutputDataset());
tableRef.setProjectId(options.as(GcpOptions.class).getProject());
tableRef.setTableId(options.getOutputTableName());
// Read events from either a CSV file or PubSub stream.
pipeline
.apply(new ReadGameEvents(options))
.apply("WindowedTeamScore", new Exercise2.WindowedTeamScore(Duration.standardMinutes(60)))
// Write the results to BigQuery.
.apply(ParDo.named("FormatTeamScoreSums").of(new Exercise2.FormatTeamScoreSumsFn()))
.apply(
BigQueryIO.Write.to(tableRef)
.withSchema(Exercise2.FormatTeamScoreSumsFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:26,代码来源:Exercise3.java
示例4: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/**
* Run a batch pipeline.
*/
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
tableRef.setDatasetId(options.as(Options.class).getOutputDataset());
tableRef.setProjectId(options.as(GcpOptions.class).getProject());
tableRef.setTableId(options.getOutputTableName());
// Read events from a CSV file, parse them and write (import) them to BigQuery.
pipeline
.apply(TextIO.Read.from(options.getInput()))
.apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
.apply(ParDo.named("FormatGameEvent").of(new FormatGameEventFn()))
.apply(
BigQueryIO.Write.to(tableRef)
.withSchema(FormatGameEventFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:26,代码来源:Exercise0.java
示例5: apply
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
@Override
public PCollection<TableRow> apply(PCollection<KV<String,Double>> input) {
PCollection<TableRow> output = input.
apply(ParDo.named("aggregateToTableRow").of(new DoFn<KV<String, Double>, TableRow>() {
@Override
public void processElement(ProcessContext c) {
KV<String, Double> e = c.element();
TableRow row = new TableRow()
.set("destination", e.getKey())
.set("aggResponseTime", e.getValue());
c.output(row);
}
}));
output.apply(BigQueryIO.Write
.named("tableRowToBigQuery")
.to(this.tableName)
.withSchema(createTableSchema(this.tableSchema))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
return output;
}
开发者ID:GoogleCloudPlatform,项目名称:processing-logs-using-dataflow,代码行数:26,代码来源:LogAnalyticsPipeline.java
示例6: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/** Run a batch pipeline. */
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
tableRef.setDatasetId(options.as(Options.class).getOutputDataset());
tableRef.setProjectId(options.as(GcpOptions.class).getProject());
tableRef.setTableId(options.getOutputTableName());
// Read events from a CSV file and parse them.
pipeline
.apply(TextIO.Read.from(options.getInput()))
.apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
.apply(
"AddEventTimestamps", WithTimestamps.of((GameEvent i) -> new Instant(i.getTimestamp())))
.apply("WindowedTeamScore", new WindowedTeamScore(Duration.standardMinutes(60)))
// Write the results to BigQuery.
.apply(ParDo.named("FormatTeamScoreSums").of(new FormatTeamScoreSumsFn()))
.apply(
BigQueryIO.Write.to(tableRef)
.withSchema(FormatTeamScoreSumsFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:28,代码来源:Exercise2.java
示例7: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise6Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference sessionsTable = new TableReference();
sessionsTable.setDatasetId(options.getOutputDataset());
sessionsTable.setProjectId(options.getProject());
sessionsTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// [START EXERCISE 6]:
// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
// Window the user events into sessions with gap options.getSessionGap() minutes. Make sure
// to use an outputTimeFn that sets the output timestamp to the end of the window. This will
// allow us to compute means on sessions based on their end times, rather than their start
// times.
.apply(
/* TODO: YOUR CODE GOES HERE */
new ChangeMe<PCollection<KV<String, Integer>>, KV<String, Integer>>())
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
// Get the duration per session.
.apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
// Re-window to process groups of session sums according to when the sessions complete.
// In streaming we don't just ask "what is the mean value" we must ask "what is the mean
// value for some window of time". To compute periodic means of session durations, we
// re-window the session durations.
.apply(
/* TODO: YOUR CODE GOES HERE */
new ChangeMe<PCollection<Integer>, Integer>())
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
.apply(
BigQueryIO.Write.to(sessionsTable)
.withSchema(FormatSessionWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// [END EXERCISE 6]:
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:65,代码来源:Exercise6.java
示例8: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise4Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise4Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// For example purposes, allow the pipeline to be easily cancelled instead of running
// continuously.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference teamTable = new TableReference();
teamTable.setDatasetId(options.getOutputDataset());
teamTable.setProjectId(options.getProject());
teamTable.setTableId(options.getOutputTableName() + "_team");
TableReference userTable = new TableReference();
userTable.setDatasetId(options.getOutputDataset());
userTable.setProjectId(options.getProject());
userTable.setTableId(options.getOutputTableName() + "_user");
PCollection<GameEvent> gameEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
gameEvents
.apply(
"CalculateTeamScores",
new CalculateTeamScores(
Duration.standardMinutes(options.getTeamWindowDuration()),
Duration.standardMinutes(options.getAllowedLateness())))
// Write the results to BigQuery.
.apply(ParDo.named("FormatTeamScores").of(new FormatTeamScoreFn()))
.apply(
BigQueryIO.Write.to(teamTable)
.withSchema(FormatTeamScoreFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
gameEvents
.apply(
"CalculateUserScores",
new CalculateUserScores(Duration.standardMinutes(options.getAllowedLateness())))
// Write the results to BigQuery.
.apply(ParDo.named("FormatUserScores").of(new FormatUserScoreFn()))
.apply(
BigQueryIO.Write.to(userTable)
.withSchema(FormatUserScoreFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:53,代码来源:Exercise4.java
示例9: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise7Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise7Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference badUserTable = new TableReference();
badUserTable.setDatasetId(options.getOutputDataset());
badUserTable.setProjectId(options.getProject());
badUserTable.setTableId(options.getOutputTableName() + "_bad_users");
// 1. Read game events with message id and timestamp
// 2. Parse events
// 3. Key by event id
// 4. Sessionize.
PCollection<KV<String, GameEvent>> sessionedEvents = null; /* TODO: YOUR CODE GOES HERE */
// 1. Read play events with message id and timestamp
// 2. Parse events
// 3. Key by event id
// 4. Sessionize.
PCollection<KV<String, PlayEvent>> sessionedPlayEvents = null; /* TODO: YOUR CODE GOES HERE */
// 1. Join events
// 2. Compute latency using ComputeLatencyFn
PCollection<KV<String, Long>> userLatency = null; /* TODO: YOUR CODE GOES HERE */
// 1. Get the values of userLatencies
// 2. Re-window into GlobalWindows with periodic repeated triggers
// 3. Compute global approximate quantiles with fanout
PCollectionView<List<Long>> globalQuantiles = null; /* TODO: YOUR CODE GOES HERE */
userLatency
// Use the computed latency distribution as a side-input to filter out likely bad users.
.apply(
"DetectBadUsers",
ParDo.withSideInputs(globalQuantiles)
.of(
new DoFn<KV<String, Long>, String>() {
public void processElement(ProcessContext c) {
/* TODO: YOUR CODE GOES HERE */
throw new RuntimeException("Not implemented");
}
}))
// We want to only emilt a single BigQuery row for every bad user. To do this, we
// re-key by user, then window globally and trigger on the first element for each key.
.apply(
"KeyByUser",
WithKeys.of((String user) -> user).withKeyType(TypeDescriptor.of(String.class)))
.apply(
"GlobalWindowsTriggerOnFirst",
Window.<KV<String, String>>into(new GlobalWindows())
.triggering(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10)))
.accumulatingFiredPanes())
.apply("GroupByUser", GroupByKey.<String, String>create())
.apply("FormatBadUsers", ParDo.of(new FormatBadUserFn()))
.apply(
"WriteBadUsers",
BigQueryIO.Write.to(badUserTable)
.withSchema(FormatBadUserFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:73,代码来源:Exercise7.java
示例10: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise6Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise6Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference sessionsTable = new TableReference();
sessionsTable.setDatasetId(options.getOutputDataset());
sessionsTable.setProjectId(options.getProject());
sessionsTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
.apply(
Window.named("WindowIntoSessions")
.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
// Get the duration per session.
.apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
// Re-window to process groups of session sums according to when the sessions complete.
.apply(
Window.named("WindowToExtractSessionMean")
.<Integer>into(
FixedWindows.of(
Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(ParDo.named("FormatSessions").of(new FormatSessionWindowFn()))
.apply(
BigQueryIO.Write.to(sessionsTable)
.withSchema(FormatSessionWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:60,代码来源:Exercise6.java
示例11: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise5Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise5Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference teamTable = new TableReference();
teamTable.setDatasetId(options.getOutputDataset());
teamTable.setProjectId(options.getProject());
teamTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
final PCollectionView<Map<String, Integer>> spammersView =
userEvents
.apply(
Window.named("FixedWindowsUser")
.<KV<String, Integer>>into(
FixedWindows.of(
Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
// These might be robots/spammers.
.apply("CalculateSpammyUsers", new CalculateSpammyUsers())
// Derive a view from the collection of spammer users. It will be used as a side input
// in calculating the team score sums, below.
.apply("CreateSpammersView", View.<String, Integer>asMap());
// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
.apply(
Window.named("WindowIntoFixedWindows")
.<GameEvent>into(
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out the detected spammer users, using the side input derived above.
.apply(
ParDo.named("FilterOutSpammers")
.withSideInputs(spammersView)
.of(
new DoFn<GameEvent, GameEvent>() {
@Override
public void processElement(ProcessContext c) {
// If the user is not in the spammers Map, output the data element.
if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
c.output(c.element());
}
}
}))
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"))
// Write the result to BigQuery
.apply(ParDo.named("FormatTeamWindows").of(new FormatTeamWindowFn()))
.apply(
BigQueryIO.Write.to(teamTable)
.withSchema(FormatTeamWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:79,代码来源:Exercise5.java
示例12: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Exercise5Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Exercise5Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// Allow the pipeline to be cancelled automatically.
options.setRunner(DataflowPipelineRunner.class);
Pipeline pipeline = Pipeline.create(options);
TableReference teamTable = new TableReference();
teamTable.setDatasetId(options.getOutputDataset());
teamTable.setProjectId(options.getProject());
teamTable.setTableId(options.getOutputTableName());
PCollection<GameEvent> rawEvents = pipeline.apply(new Exercise3.ReadGameEvents(options));
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply(
"ExtractUserScore",
MapElements.via((GameEvent gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
final PCollectionView<Map<String, Integer>> spammersView =
userEvents
.apply(
Window.named("FixedWindowsUser")
.<KV<String, Integer>>into(
FixedWindows.of(
Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
// These might be robots/spammers.
.apply("CalculateSpammyUsers", new CalculateSpammyUsers())
// Derive a view from the collection of spammer users. It will be used as a side input
// in calculating the team score sums, below.
.apply("CreateSpammersView", View.<String, Integer>asMap());
// [START EXERCISE 5 PART b]:
// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
.apply(
Window.named("WindowIntoFixedWindows")
.<GameEvent>into(
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out the detected spammer users, using the side input derived above.
// Use ParDo with spammersView side input to filter out spammers.
.apply(/* TODO: YOUR CODE GOES HERE */ new ChangeMe<PCollection<GameEvent>, GameEvent>())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new Exercise1.ExtractAndSumScore("team"))
// Write the result to BigQuery
.apply(ParDo.named("FormatTeamWindows").of(new FormatTeamWindowFn()))
.apply(
BigQueryIO.Write.to(teamTable)
.withSchema(FormatTeamWindowFn.getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// [START EXERCISE 5 PART b]:
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
}
开发者ID:mdvorsky,项目名称:DataflowSME,代码行数:70,代码来源:Exercise5.java
示例13: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws GeneralSecurityException, IOException, ParseException, ParserConfigurationException, SAXException {
String params = null;
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("--params="))
params = args[i].replaceFirst("--params=", "");
}
System.out.println(params);
init(params);
GoogleCredential credential = new GoogleCredential.Builder()
.setTransport(new NetHttpTransport())
.setJsonFactory(new JacksonFactory())
.setServiceAccountId(accountEmail)
.setServiceAccountScopes(Arrays.asList(new String[] {"https://www.googleapis.com/auth/cloud-platform"}))
.setServiceAccountPrivateKeyFromP12File(new File(keyFile))
.build();
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
// Your project ID is required in order to run your pipeline on the Google Cloud.
options.setProject(projectId);
// Your Google Cloud Storage path is required for staging local files.
options.setStagingLocation(workingBucket);
options.setGcpCredential(credential);
options.setServiceAccountName(accountEmail);
options.setServiceAccountKeyfile(keyFile);
options.setMaxNumWorkers(maxNumWorkers);
options.setDiskSizeGb(diskSizeGb);
options.setWorkerMachineType(machineType);
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
options.setZone(zone);
options.setStreaming(isStreaming);
options.setJobName(pipelineName);
Gson gson = new Gson();
TableSchema schema = gson.fromJson(schemaStr, TableSchema.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> streamData =
pipeline.apply(PubsubIO.Read.named("ReadFromPubsub")
.topic(String.format("projects/%1$s/topics/%2$s",projectId,pubSubTopic)));
PCollection<TableRow> tableRow = streamData.apply("ToTableRow", ParDo.of(new PrepData.ToTableRow()));
tableRow.apply(BigQueryIO.Write
.named("WriteBQTable")
.to(String.format("%1$s:%2$s.%3$s",projectId, bqDataSet, bqTable))
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
System.out.println("Starting pipeline " + pipelineName);
pipeline.run();
}
开发者ID:bomboradata,项目名称:pubsub-to-bigquery,代码行数:55,代码来源:PubSubToBQPipeline.java
示例14: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setBigQuerySchema(getSchema());
// DataflowExampleUtils creates the necessary input sources to simplify execution of this
// Pipeline.
DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options,
options.isUnbounded());
Pipeline pipeline = Pipeline.create(options);
/**
* Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or
* unbounded input source.
*/
PCollection<String> input;
if (options.isUnbounded()) {
LOG.info("Reading from PubSub.");
/**
* Concept #3: Read from the PubSub topic. A topic will be created if it wasn't
* specified as an argument. The data elements' timestamps will come from the pubsub
* injection.
*/
input = pipeline
.apply(PubsubIO.Read.topic(options.getPubsubTopic()));
} else {
/** Else, this is a bounded pipeline. Read from the GCS file. */
input = pipeline
.apply(TextIO.Read.from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn()));
}
/**
* Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1
* minute (you can change this with a command-line option). See the documentation for more
* information on how fixed windows work, and for information on the other types of windowing
* available (e.g., sliding windows).
*/
PCollection<String> windowedWords = input
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
/**
* Concept #5: Re-use our existing CountWords transform that does not have knowledge of
* windows over a PCollection containing windowed values.
*/
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/**
* Concept #6: Format the results for a BigQuery table, then write to BigQuery.
* The BigQuery output source supports both bounded and unbounded data.
*/
wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
PipelineResult result = pipeline.run();
/**
* To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that
* runs for a limited time, and publishes to the input PubSub topic.
*
* With an unbounded input source, you will need to explicitly shut down this pipeline when you
* are done with it, so that you do not continue to be charged for the instances. You can do
* this via a ctrl-C from the command line, or from the developer's console UI for Dataflow
* pipelines. The PubSub topic will also be deleted at this time.
*/
exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
}
开发者ID:sinmetal,项目名称:iron-hippo,代码行数:70,代码来源:WindowedWordCount.java
示例15: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void main(String[] args) {
// Setup Dataflow options
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
options.setMaxNumWorkers(3);
String projectId = options.getProject();
// Create TableSchemas from their String representation
TableSchema tweetsTableSchema;
TableSchema annotatedTweetsTableSchema;
try {
tweetsTableSchema = createTableSchema(TWEETS_TABLE_SCHEMA);
annotatedTweetsTableSchema = createTableSchema(ANNOTATED_TWEETS_TABLE_SCHEMA);
} catch (IOException e) {
e.printStackTrace();
return;
}
Pipeline p = Pipeline.create(options);
// Read tweets from Pub/Sub
PCollection<String> tweets = null;
tweets = p.apply(PubsubIO.Read.named("Read tweets from PubSub").topic("projects/" + projectId + "/topics/blackfridaytweets"));
// Format tweets for BigQuery
PCollection<TableRow> formattedTweets = tweets.apply(ParDo.named("Format tweets for BigQuery").of(new DoFormat()));
// Create a TableReference for the destination table
TableReference tableReference = new TableReference();
tableReference.setProjectId(projectId);
tableReference.setDatasetId("black_friday_analytics");
tableReference.setTableId("tweets_raw");
// Write tweets to BigQuery
formattedTweets.apply(BigQueryIO.Write.named("Write tweets to BigQuery").to(tableReference).withSchema(tweetsTableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).withoutValidation());
// Filter and annotate tweets with their sentiment from NL API
// Note: if the pipeline is run as a batch pipeline, the filter condition is inverted
PCollection<String> filteredTweets = tweets.apply(ParDo.named("Filter and annotate tweets").of(new DoFilterAndProcess()));
// Format tweets for BigQuery
PCollection<TableRow> filteredFormattedTweets = filteredTweets.apply(ParDo.named("Format annotated tweets for BigQuery").of(new DoFormat()));
// Create a TableReference for the destination table
TableReference filteredTableReference = new TableReference();
filteredTableReference.setProjectId(projectId);
filteredTableReference.setDatasetId("black_friday_analytics");
filteredTableReference.setTableId("tweets_sentiment");
// Write tweets to BigQuery
filteredFormattedTweets.apply(BigQueryIO.Write.named("Write annotated tweets to BigQuery").to(filteredTableReference).withSchema(annotatedTweetsTableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
}
开发者ID:LorenzoRidiNoovle,项目名称:gcp-black-friday-analytics,代码行数:58,代码来源:TwitterProcessor.java
示例16: run
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
public static void run(String[] args) throws Exception {
System.out.println("Making Datastore->GCS pipeline");
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
if (options.getIsBlocking()) {
options.setRunner(BlockingDataflowPipelineRunner.class);
} else {
options.setRunner(DataflowPipelineRunner.class);
options.setStreaming(false);
}
String date = (new SimpleDateFormat("yyyMMdd")).format(new Date());
// make string in format: "my-project:dataset.entity_date"
String tableName = String.format("%s:%s.%s_%s",
options.getProject(),
options.getBQDataset(),
options.getDatastoreEntityKind(),
date);
System.out.println("Destination BigQuery Table is: " + tableName);
// Build our Datastore query.
// Right now we are simply grabbing all Datastore records of a given kind,
Query.Builder queryBuilder = Query.newBuilder();
queryBuilder.addKindBuilder().setName(options.getDatastoreEntityKind());
Query query = queryBuilder.build();
// Generate the Datastore Read Source
DatastoreV1.Read read = DatastoreIO.v1().read()
.withProjectId(options.getProject())
.withQuery(query);
// Build our data pipeline
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("IngestEntities", read)
.apply("EntityToTableRows", ParDo.of(new DatastoreToTableRow()))
.apply("WriteTableRows", BigQueryIO.Write
.to(tableName)
.withSchema(new BQSchema().getTableSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
System.out.println("Running pipeline");
pipeline.run();
}
开发者ID:cobookman,项目名称:DatastoreToGCS,代码行数:49,代码来源:BQBackup.java
示例17: main
import com.google.cloud.dataflow.sdk.io.BigQueryIO; //导入依赖的package包/类
/**
* {@link Pipeline} Entry point of this Dataflow Pipeline
*/
public static void main(String args[]) {
try {
ExtractContractTerms fn = new ExtractContractTerms();
// fromArgs takes command-line options and maps them to properties within a PipelineOptions
SymbolTransformOptions options = PipelineOptionsFactory.fromArgs(args).as(SymbolTransformOptions.class);
Pipeline pipeline = Pipeline.create(options);
// Input is a PCollection of String, Output is a PCollection of OptionTick
PCollection < OptionsTick > mainCollection = pipeline.apply(TextIO.Read.named("Reading input file")
.from(options.getInputFilePath()))
.apply(ParDo.named("Extracting options contract terms from symbol")
.of(fn)).setCoder(SerializableCoder.of(OptionsTick.class));
// Input is PCollection of OptionTick, Output are records within the BigQuery
// If destination is BigQuery then DataflowPipelineRunner or BlockingDataflowPipelineRunner must be used
if (!"".equals(options.getOutputTable())) {
mainCollection.apply(ParDo.named("Creating BigQuery row from tick")
.of(new CreateBigQueryRow()))
.apply(BigQueryIO.Write.named("Writing records to BigQuery")
.to(options.getOutputTable())
.withSchema(SymbolTransformPipeline.generateSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
} else {
// To write to a local text file, please verify that
// DirectPipelineRunner is selected in pom.xml or the run shell script
mainCollection.apply(ParDo.named("Creating String row from tick")
.of(new TickToString()))
.apply(TextIO.Write.named("Writing output file")
.to(options.getOutputFilePath()));
}
pipeline.run();
System.exit(0);
} catch (Exception ex) {
LOGGER.error(ex.getMessage(), ex);
System.exit(1);
}
}
开发者ID:SunGard-Labs,项目名称:dataflow-whitepaper,代码行数:50,代码 |
请发表评论