本文整理汇总了Java中org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition类的典型用法代码示例。如果您正苦于以下问题:Java WriteDisposition类的具体用法?Java WriteDisposition怎么用?Java WriteDisposition使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
WriteDisposition类属于org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write包,在下文中一共展示了WriteDisposition类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: expand
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
@Override
public PDone expand(PCollection<InputT> teamAndScore) {
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(
BigQueryIO.writeTableRows()
.to(getTable(projectId, datasetId, tableName))
.withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
return PDone.in(teamAndScore.getPipeline());
}
开发者ID:apache,项目名称:beam,代码行数:13,代码来源:WriteToBigQuery.java
示例2: BatchLoads
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
boolean singletonTable,
DynamicDestinations<?, DestinationT> dynamicDestinations,
Coder<DestinationT> destinationCoder,
ValueProvider<String> customGcsTempLocation) {
bigQueryServices = new BigQueryServicesImpl();
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.singletonTable = singletonTable;
this.dynamicDestinations = dynamicDestinations;
this.destinationCoder = destinationCoder;
this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
this.triggeringFrequency = null;
this.customGcsTempLocation = customGcsTempLocation;
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:BatchLoads.java
示例3: WriteTables
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
public WriteTables(
boolean singlePartition,
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
List<PCollectionView<?>> sideInputs,
DynamicDestinations<?, DestinationT> dynamicDestinations) {
this.singlePartition = singlePartition;
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.firstPaneWriteDisposition = writeDisposition;
this.firstPaneCreateDisposition = createDisposition;
this.sideInputs = sideInputs;
this.dynamicDestinations = dynamicDestinations;
this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput");
this.temporaryFilesTag = new TupleTag<>("TemporaryFiles");
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:WriteTables.java
示例4: validateDispositions
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
private boolean validateDispositions(Table table, CreateDisposition createDisposition,
WriteDisposition writeDisposition)
throws InterruptedException, IOException {
if (table == null) {
if (createDisposition == CreateDisposition.CREATE_NEVER) {
return false;
}
} else if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
datasetService.deleteTable(table.getTableReference());
} else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
List<TableRow> allRows = datasetService.getAllRows(table.getTableReference().getProjectId(),
table.getTableReference().getDatasetId(), table.getTableReference().getTableId());
if (!allRows.isEmpty()) {
return false;
}
}
return true;
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:FakeJobService.java
示例5: main
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
/**
* Runs the DatastoreToBigQuery dataflow pipeline
*/
public static void main(String[] args) throws IOException, ScriptException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
NestedValueProvider<String, String> bqJsonSchema = NestedValueProvider
.of(options.getBqJsonSchema(), new ValueProviderHelpers.GcsLoad());
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestEntities",
DatastoreIO.v1().read()
.withProjectId(options.getDatastoreProjectId())
.withLiteralGqlQuery(options.getGqlQuery())
.withNamespace(options.getNamespace()))
.apply("EntityToTableRow", ParDo.of(EntityToTableRow.newBuilder()
.setJsTransformPath(options.getJsTransformPath())
.setJsTransformFunctionName(options.getJsTransformFunctionName())
.setStrictCast(options.getStrictCast())
.setTableSchemaJson(bqJsonSchema)
.build()))
.apply("TableRowToBigQuery", BigQueryIO.writeTableRows()
.to(options.getBqTableSpec())
.withJsonSchema(bqJsonSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
pipeline.run();
}
开发者ID:cobookman,项目名称:teleport,代码行数:33,代码来源:DatastoreToBq.java
示例6: expand
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
@Override
public PDone expand(PCollection<T> teamAndScore) {
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.writeTableRows()
.to(getTable(projectId, datasetId, tableName))
.withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
return PDone.in(teamAndScore.getPipeline());
}
开发者ID:apache,项目名称:beam,代码行数:12,代码来源:WriteWindowedToBigQuery.java
示例7: WriteRename
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
public WriteRename(
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
WriteDisposition writeDisposition,
CreateDisposition createDisposition) {
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.firstPaneWriteDisposition = writeDisposition;
this.firstPaneCreateDisposition = createDisposition;
}
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:WriteRename.java
示例8: writeRename
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
private void writeRename(
TableDestination finalTableDestination, Iterable<String> tempTableNames, ProcessContext c)
throws Exception {
WriteDisposition writeDisposition =
(c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
CreateDisposition createDisposition =
(c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
List<String> tempTablesJson = Lists.newArrayList(tempTableNames);
// Do not copy if no temp tables are provided
if (tempTablesJson.size() == 0) {
return;
}
List<TableReference> tempTables = Lists.newArrayList();
for (String table : tempTablesJson) {
tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
}
// Make sure each destination table gets a unique job id.
String jobIdPrefix =
BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1,
c.pane().getIndex());
copy(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
jobIdPrefix,
finalTableDestination.getTableReference(),
tempTables,
writeDisposition,
createDisposition,
finalTableDestination.getTableDescription());
DatasetService tableService =
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
removeTemporaryTables(tableService, tempTables);
}
开发者ID:apache,项目名称:beam,代码行数:38,代码来源:WriteRename.java
示例9: writeTempTables
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
private PCollection<KV<TableDestination, String>> writeTempTables(
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
PCollectionView<String> jobIdTokenView) {
List<PCollectionView<?>> sideInputs = Lists.<PCollectionView<?>>newArrayList(jobIdTokenView);
sideInputs.addAll(dynamicDestinations.getSideInputs());
Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
KvCoder.of(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
ListCoder.of(StringUtf8Coder.of()));
// If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
// the import needs to be split into multiple partitions, and those partitions will be
// specified in multiPartitionsTag.
return input
.setCoder(partitionsCoder)
// Reshuffle will distribute this among multiple workers, and also guard against
// reexecution of the WritePartitions step once WriteTables has begun.
.apply("MultiPartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
.apply(
"MultiPartitionsWriteTables",
new WriteTables<>(
false,
bigQueryServices,
jobIdTokenView,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
sideInputs,
dynamicDestinations));
}
开发者ID:apache,项目名称:beam,代码行数:31,代码来源:BatchLoads.java
示例10: runLoadJob
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load)
throws InterruptedException, IOException {
TableReference destination = load.getDestinationTable();
TableSchema schema = load.getSchema();
checkArgument(schema != null, "No schema specified");
List<ResourceId> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition());
CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition());
checkArgument(load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON"));
Table existingTable = datasetService.getTable(destination);
if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
}
if (existingTable == null) {
TableReference strippedDestination =
destination
.clone()
.setTableId(BigQueryHelpers.stripPartitionDecorator(destination.getTableId()));
existingTable =
new Table()
.setTableReference(strippedDestination)
.setSchema(schema);
if (load.getTimePartitioning() != null) {
existingTable = existingTable.setTimePartitioning(load.getTimePartitioning());
}
datasetService.createTable(existingTable);
}
List<TableRow> rows = Lists.newArrayList();
for (ResourceId filename : sourceFiles) {
rows.addAll(readRows(filename.toString()));
}
datasetService.insertAll(destination, rows, null);
FileSystems.delete(sourceFiles);
return new JobStatus().setState("DONE");
}
开发者ID:apache,项目名称:beam,代码行数:37,代码来源:FakeJobService.java
示例11: runCopyJob
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
private JobStatus runCopyJob(JobConfigurationTableCopy copy)
throws InterruptedException, IOException {
List<TableReference> sources = copy.getSourceTables();
TableReference destination = copy.getDestinationTable();
WriteDisposition writeDisposition = WriteDisposition.valueOf(copy.getWriteDisposition());
CreateDisposition createDisposition = CreateDisposition.valueOf(copy.getCreateDisposition());
Table existingTable = datasetService.getTable(destination);
if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
}
TimePartitioning partitioning = null;
TableSchema schema = null;
boolean first = true;
List<TableRow> allRows = Lists.newArrayList();
for (TableReference source : sources) {
Table table = checkNotNull(datasetService.getTable(source));
if (!first) {
if (partitioning != table.getTimePartitioning()) {
return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
}
if (schema != table.getSchema()) {
return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
}
}
partitioning = table.getTimePartitioning();
schema = table.getSchema();
first = false;
allRows.addAll(datasetService.getAllRows(
source.getProjectId(), source.getDatasetId(), source.getTableId()));
}
datasetService.createTable(new Table()
.setTableReference(destination)
.setSchema(schema)
.setTimePartitioning(partitioning));
datasetService.insertAll(destination, allRows, null);
return new JobStatus().setState("DONE");
}
开发者ID:apache,项目名称:beam,代码行数:38,代码来源:FakeJobService.java
示例12: writeAllTablesToBigQuery
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
/**
* @param bqrows
* @param webresourceRowsUnindexed
* @param webresourceDeduped
* @param options
*/
private static void writeAllTablesToBigQuery(PCollectionTuple bqrows,
PCollection<TableRow> webresourceRowsUnindexed, PCollection<TableRow> webresourceDeduped,
IndexerPipelineOptions options) {
PCollection<TableRow> webresourceRows = bqrows.get(PipelineTags.webresourceTag);
PCollection<TableRow> documentRows = bqrows.get(PipelineTags.documentTag);
PCollection<TableRow> sentimentRows = bqrows.get(PipelineTags.sentimentTag);
// Now write to BigQuery
WriteDisposition dispo = options.getWriteTruncate() ?
WriteDisposition.WRITE_TRUNCATE: WriteDisposition.WRITE_APPEND;
//Merge all collections with WebResource table records
PCollectionList<TableRow> webresourceRowsList = (webresourceDeduped == null) ?
PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed) :
PCollectionList.of(webresourceRows).and(webresourceRowsUnindexed).and(webresourceDeduped);
PCollection<TableRow> allWebresourceRows =
webresourceRowsList.apply(Flatten.<TableRow>pCollections());
allWebresourceRows = !options.isStreaming() ?
allWebresourceRows.apply("Reshuffle Webresources", new Reshuffle<TableRow>()) :
allWebresourceRows;
allWebresourceRows
.apply("Write to webresource",
BigQueryIO.writeTableRows()
.to(getWebResourcePartitionedTableRef(options))
.withSchema(getWebResourceSchema())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(dispo));
documentRows = !options.isStreaming() ?
documentRows.apply("Reshuffle Documents", new Reshuffle<TableRow>()):
documentRows;
documentRows
.apply("Write to document",
BigQueryIO.writeTableRows()
.to(getDocumentPartitionedTableRef(options))
.withSchema(getDocumentTableSchema())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(dispo));
sentimentRows = !options.isStreaming() ?
sentimentRows.apply("Reshuffle Sentiments", new Reshuffle<TableRow>()):
sentimentRows;
sentimentRows
.apply("Write to sentiment",
BigQueryIO.writeTableRows()
.to(getSentimentPartitionedTableRef(options))
.withSchema(getSentimentSchema())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(dispo));
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:62,代码来源:IndexerPipeline.java
示例13: main
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
public static void main(String[] args) {
// Setup Dataflow options
StreamingOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
//BQ table setup
TableSchema bqTableSchema;
try {
bqTableSchema = createTableSchema(tableSchema);
} catch (IOException e){
e.printStackTrace();
return;
}
String tableName = projectId + ":" + datasetId + "." + tableId;
Pipeline p = Pipeline.create(options);
// Read message from Pub/Sub
p.apply("ReadFromPubSub", PubsubIO.readStrings()
.fromTopic(readTopic))
// Format tweets for BigQuery - convert string to table row
.apply("Format for BigQuery", ParDo.of(new StringToRowConverter()))
// Write tweets to BigQuery
.apply("write to BQ", BigQueryIO.writeTableRows()
.to(tableName)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
.withSchema(bqTableSchema));
//run pipeline
PipelineResult result = p.run();
}
开发者ID:yuriatgoogle,项目名称:basicpipeline,代码行数:44,代码来源:basicpipeline.java
示例14: copy
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
private void copy(
JobService jobService,
DatasetService datasetService,
String jobIdPrefix,
TableReference ref,
List<TableReference> tempTables,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
@Nullable String tableDescription) throws InterruptedException, IOException {
JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy()
.setSourceTables(tempTables)
.setDestinationTable(ref)
.setWriteDisposition(writeDisposition.name())
.setCreateDisposition(createDisposition.name());
String projectId = ref.getProjectId();
Job lastFailedCopyJob = null;
for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
JobReference jobRef = new JobReference()
.setProjectId(projectId)
.setJobId(jobId);
jobService.startCopyJob(jobRef, copyConfig);
Job copyJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
switch (jobStatus) {
case SUCCEEDED:
if (tableDescription != null) {
datasetService.patchTableDescription(ref, tableDescription);
}
return;
case UNKNOWN:
throw new RuntimeException(String.format(
"UNKNOWN status of copy job [%s]: %s.", jobId,
BigQueryHelpers.jobToPrettyString(copyJob)));
case FAILED:
lastFailedCopyJob = copyJob;
continue;
default:
throw new IllegalStateException(String.format(
"Unexpected status [%s] of load job: %s.",
jobStatus, BigQueryHelpers.jobToPrettyString(copyJob)));
}
}
throw new RuntimeException(String.format(
"Failed to create copy job with id prefix %s, "
+ "reached max retries: %d, last failed copy job: %s.",
jobIdPrefix,
BatchLoads.MAX_RETRY_JOBS,
BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
}
开发者ID:apache,项目名称:beam,代码行数:52,代码来源:WriteRename.java
示例15: processElement
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
DestinationT destination = c.element().getKey().getKey();
TableSchema tableSchema;
if (firstPaneCreateDisposition == CreateDisposition.CREATE_NEVER) {
tableSchema = null;
} else if (jsonSchemas.containsKey(destination)) {
tableSchema =
BigQueryHelpers.fromJsonString(jsonSchemas.get(destination), TableSchema.class);
} else {
tableSchema = dynamicDestinations.getSchema(destination);
checkArgument(
tableSchema != null,
"Unless create disposition is %s, a schema must be specified, i.e. "
+ "DynamicDestinations.getSchema() may not return null. "
+ "However, create disposition is %s, and %s returned null for destination %s",
CreateDisposition.CREATE_NEVER,
firstPaneCreateDisposition,
dynamicDestinations,
destination);
jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema));
}
TableDestination tableDestination = dynamicDestinations.getTable(destination);
checkArgument(
tableDestination != null,
"DynamicDestinations.getTable() may not return null, "
+ "but %s returned null for destination %s",
dynamicDestinations,
destination);
TableReference tableReference = tableDestination.getTableReference();
if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
tableReference.setProjectId(
c.getPipelineOptions().as(BigQueryOptions.class).getProject());
tableDestination = new TableDestination(
tableReference, tableDestination.getTableDescription());
}
Integer partition = c.element().getKey().getShardNumber();
List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
String jobIdPrefix = BigQueryHelpers.createJobId(
c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex());
if (!singlePartition) {
tableReference.setTableId(jobIdPrefix);
}
WriteDisposition writeDisposition =
(c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
CreateDisposition createDisposition =
(c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
load(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
jobIdPrefix,
tableReference,
tableDestination.getTimePartitioning(),
tableSchema,
partitionFiles,
writeDisposition,
createDisposition,
tableDestination.getTableDescription());
c.output(
mainOutputTag, KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference)));
for (String file : partitionFiles) {
c.output(temporaryFilesTag, file);
}
}
开发者ID:apache,项目名称:beam,代码行数:70,代码来源:WriteTables.java
示例16: load
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; //导入依赖的package包/类
private void load(
JobService jobService,
DatasetService datasetService,
String jobIdPrefix,
TableReference ref,
TimePartitioning timePartitioning,
@Nullable TableSchema schema,
List<String> gcsUris,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
@Nullable String tableDescription)
throws InterruptedException, IOException {
JobConfigurationLoad loadConfig =
new JobConfigurationLoad()
.setDestinationTable(ref)
.setSchema(schema)
.setSourceUris(gcsUris)
.setWriteDisposition(writeDisposition.name())
.setCreateDisposition(createDisposition.name())
.setSourceFormat("NEWLINE_DELIMITED_JSON");
if (timePartitioning != null) {
loadConfig.setTimePartitioning(timePartitioning);
}
String projectId = ref.getProjectId();
Job lastFailedLoadJob = null;
for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId);
jobService.startLoadJob(jobRef, loadConfig);
Job loadJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
switch (jobStatus) {
case SUCCEEDED:
if (tableDescription != null) {
datasetService.patchTableDescription(
ref.clone().setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
tableDescription);
}
return;
case UNKNOWN:
throw new RuntimeException(
String.format(
"UNKNOWN status of load job [%s]: %s.",
jobId, BigQueryHelpers.jobToPrettyString(loadJob)));
case FAILED:
lastFailedLoadJob = loadJob;
continue;
default:
throw new IllegalStateException(
String.format(
"Unexpected status [%s] of load job: %s.",
jobStatus, BigQueryHelpers.jobToPrettyString(loadJob)));
}
}
throw new RuntimeException(
String.format(
"Failed to create load job with id prefix %s, "
+ "reached max retries: %d, last failed load job: %s.",
jobIdPrefix,
BatchLoads.MAX_RETRY_JOBS,
BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
}
开发者ID:apache,项目名称:beam,代码行数:63,代码来源:WriteTables.java
注:本文中的org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论