• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ValueProvider类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.options.ValueProvider的典型用法代码示例。如果您正苦于以下问题:Java ValueProvider类的具体用法?Java ValueProvider怎么用?Java ValueProvider使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ValueProvider类属于org.apache.beam.sdk.options包,在下文中一共展示了ValueProvider类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: getTableWithDefaultProject

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
/**
 * Returns the table to write, or {@code null} if writing with {@code tableFunction}.
 *
 * <p>If the table's project is not specified, use the executing project.
 */
@Nullable
ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
  ValueProvider<TableReference> table = getTable();
  if (table == null) {
    return table;
  }

  if (!table.isAccessible()) {
    LOG.info("Using a dynamic value for table input. This must contain a project"
            + " in the table reference: {}", table);
    return table;
  }
  if (Strings.isNullOrEmpty(table.get().getProjectId())) {
    // If user does not specify a project we assume the table to be located in
    // the default project.
    TableReference tableRef = table.get();
    tableRef.setProjectId(bqOptions.getProject());
    return NestedValueProvider.of(StaticValueProvider.of(
        BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef());
  }
  return table;
}
 
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:BigQueryIO.java


示例2: BatchLoads

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
           boolean singletonTable,
           DynamicDestinations<?, DestinationT> dynamicDestinations,
           Coder<DestinationT> destinationCoder,
           ValueProvider<String> customGcsTempLocation) {
  bigQueryServices = new BigQueryServicesImpl();
  this.writeDisposition = writeDisposition;
  this.createDisposition = createDisposition;
  this.singletonTable = singletonTable;
  this.dynamicDestinations = dynamicDestinations;
  this.destinationCoder = destinationCoder;
  this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
  this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
  this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
  this.triggeringFrequency = null;
  this.customGcsTempLocation = customGcsTempLocation;
}
 
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:BatchLoads.java


示例3: createIndexerPipeline

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
/**
 * This function creates the DAG graph of transforms. It can be called from main()
 * as well as from the ControlPipeline.
 * @param options
 * @return
 * @throws Exception
 */
public static Pipeline createIndexerPipeline(FileIndexerPipelineOptions options) throws Exception {
	
    IndexerPipelineUtils.validateIndexerPipelineOptions(options);
	Pipeline pipeline = Pipeline.create(options);
	
	// PHASE: Read raw content from sources
	
	PCollection<InputContent> readContent = pipeline
			.apply("Read entire CSV file", org.apache.beam.sdk.io.Read.from(new RecordFileSource<String>(
				ValueProvider.StaticValueProvider.of(options.getInputFile()), 
				StringUtf8Coder.of(), RecordFileSource.DEFAULT_RECORD_SEPARATOR))) //
			.apply("Parse CSV file into InputContent objects", ParDo.of(new ParseCSVFile()));
	
	// Define the accumulators of all filters
	PCollection<InputContent> contentToIndex = readContent;
	
	// PHASE: Index documents (extract opinions and entities/tags). 
	// Return successfully indexed docs, and create a Bigtable write transform to store errors 
	// in Dead Letter table.
	PCollection<ContentIndexSummary> indexes = indexDocuments(options, contentToIndex);
	
	if (options.getRatioEnrichWithCNLP() > 0)
		indexes = enrichWithCNLP(indexes, options.getRatioEnrichWithCNLP());
	
	// PHASE: Write to BigQuery
	// For the Indexes that are unique ("filteredIndexes"), create records in webresource, document, and sentiment.
	// Then, merge resulting webresources with webresourceRowsUnindexed and webresourceDeduped
	indexes
		.apply(ParDo.of(new CreateCSVLineFromIndexSummaryFn()))
		.apply(TextIO.write()
			.to(options.getOutputFile()));
	
	
	return pipeline;
}
 
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:43,代码来源:FileIndexerPipeline.java


示例4: getFilenamePolicy

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(String genus) {
  return DefaultFilenamePolicy.fromStandardParameters(
    ValueProvider.StaticValueProvider.of(
      baseDir.resolve(genus, RESOLVE_FILE)),
    ShardNameTemplate.INDEX_OF_MAX,
    ".avro",
    false);
}
 
开发者ID:gbif,项目名称:pipelines,代码行数:10,代码来源:MultiAvroOutDemo.java


示例5: BigQueryQuerySource

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
private BigQueryQuerySource(
    String stepUuid,
    ValueProvider<String> query,
    Boolean flattenResults,
    Boolean useLegacySql,
    BigQueryServices bqServices,
    Coder<T> coder,
    SerializableFunction<SchemaAndRecord, T> parseFn) {
  super(stepUuid, bqServices, coder, parseFn);
  this.query = checkNotNull(query, "query");
  this.flattenResults = checkNotNull(flattenResults, "flattenResults");
  this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
  this.dryRunJobStats = new AtomicReference<>();
}
 
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:BigQueryQuerySource.java


示例6: newProvider

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
/**
 * Returns a new {@link ValueProvider} that is inaccessible before {@link #run}, but will be
 * accessible while the pipeline runs.
 */
public <T> ValueProvider<T> newProvider(T runtimeValue) {
  String uuid = UUID.randomUUID().toString();
  providerRuntimeValues.put(uuid, runtimeValue);
  return ValueProvider.NestedValueProvider.of(
      options.as(TestValueProviderOptions.class).getProviderRuntimeValues(),
      new GetFromRuntimeValues<T>(uuid));
}
 
开发者ID:apache,项目名称:beam,代码行数:12,代码来源:TestPipeline.java


示例7: TFRecordSink

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
@VisibleForTesting
TFRecordSink(
    ValueProvider<ResourceId> outputPrefix,
    @Nullable String shardTemplate,
    @Nullable String suffix,
    Compression compression) {
  super(
      outputPrefix,
      DynamicFileDestinations.<byte[]>constant(
          DefaultFilenamePolicy.fromStandardParameters(
              outputPrefix, shardTemplate, suffix, false)),
      compression);
}
 
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:TFRecordIO.java


示例8: FileBasedSink

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
/**
 * Construct a {@link FileBasedSink} with the given temp directory, producing uncompressed files.
 */
@Experimental(Kind.FILESYSTEM)
public FileBasedSink(
    ValueProvider<ResourceId> tempDirectoryProvider,
    DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations) {
  this(tempDirectoryProvider, dynamicDestinations, Compression.UNCOMPRESSED);
}
 
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:FileBasedSink.java


示例9: XmlSource

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
XmlSource(
    ValueProvider<String> spec,
    XmlIO.MappingConfiguration<T> configuration,
    long minBundleSizeBytes) {
  super(spec, minBundleSizeBytes);
  this.configuration = configuration;
}
 
开发者ID:apache,项目名称:beam,代码行数:8,代码来源:XmlSource.java


示例10: testRuntimeOptionsNotCalledInApplyOutput

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
@Test
public void testRuntimeOptionsNotCalledInApplyOutput() {
  p.enableAbandonedNodeEnforcement(false);

  BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows()
      .to(p.newProvider("some-table"))
      .withSchema(ValueProvider.NestedValueProvider.of(
          p.newProvider("some-schema"), new BigQueryHelpers.JsonSchemaToTableSchema()))
      .withoutValidation();
  p.apply(Create.empty(TableRowJsonCoder.of())).apply(write);
  // Test that this doesn't throw.
  DisplayData.from(write);
}
 
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:BigQueryIOWriteTest.java


示例11: writeDetailReports

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
/** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */
private TextIO.TypedWrite<BillingEvent, Params> writeDetailReports(
    ValueProvider<String> yearMonthProvider) {
  return TextIO.<BillingEvent>writeCustomType()
      // TODO(larryruili): Replace with billing bucket/yyyy-MM after verifying 2017-12 output.
      .to(
          InvoicingUtils.makeDestinationFunction(beamBucket + "/results", yearMonthProvider),
          InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results"))
      .withFormatFunction(BillingEvent::toCsv)
      .withoutSharding()
      .withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(beamBucket + "/temporary"))
      .withHeader(BillingEvent.getHeader())
      .withSuffix(".csv");
}
 
开发者ID:google,项目名称:nomulus,代码行数:15,代码来源:InvoicingPipeline.java


示例12: testRuntimeValueProviderTopic

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
@Test
public void testRuntimeValueProviderTopic() {
  TestPipeline pipeline = TestPipeline.create();
  ValueProvider<String> topic = pipeline.newProvider("projects/project/topics/topic");
  Read<String> pubsubRead = PubsubIO.readStrings().fromTopic(topic);
  pipeline.apply(pubsubRead);
  assertThat(pubsubRead.getTopicProvider(), not(nullValue()));
  assertThat(pubsubRead.getTopicProvider().isAccessible(), is(false));
}
 
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:PubsubIOTest.java


示例13: populateDisplayData

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
@Override
public void populateDisplayData(DisplayData.Builder builder) {
  super.populateDisplayData(builder);
  builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
  Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
  for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) {
    String key = conf.getKey();
    if (!ignoredProducerPropertiesKeys.contains(key)) {
      Object value = DisplayData.inferType(conf.getValue()) != null
          ? conf.getValue() : String.valueOf(conf.getValue());
      builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
    }
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:KafkaIO.java


示例14: to

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
/** Like {@link #to(String)}. */
public TypedWrite<UserT, DestinationT, OutputT> to(ValueProvider<String> outputPrefix) {
  return toResource(
      NestedValueProvider.of(
          outputPrefix,
          // The function cannot be created as an anonymous class here since the enclosed class
          // may contain unserializable members.
          new OutputPrefixToResourceId()));
}
 
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:AvroIO.java


示例15: expand

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
@Override
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
  checkArgument(
      getFilenamePrefix() != null || getTempDirectory() != null,
      "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write "
          + "transform.");
  if (getFilenamePolicy() != null) {
    checkArgument(
        getShardTemplate() == null && getFilenameSuffix() == null,
        "shardTemplate and filenameSuffix should only be used with the default "
            + "filename policy");
  }
  if (getDynamicDestinations() != null) {
    checkArgument(
        getFormatFunction() == null,
        "A format function should not be specified "
            + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
  } else {
    checkArgument(
        getSchema() != null, "Unless using DynamicDestinations, .withSchema() is required.");
  }

  ValueProvider<ResourceId> tempDirectory = getTempDirectory();
  if (tempDirectory == null) {
    tempDirectory = getFilenamePrefix();
  }
  WriteFiles<UserT, DestinationT, OutputT> write =
      WriteFiles.to(
          new AvroSink<>(tempDirectory, resolveDynamicDestinations(), getGenericRecords()));
  if (getNumShards() > 0) {
    write = write.withNumShards(getNumShards());
  }
  if (getWindowedWrites()) {
    write = write.withWindowedWrites();
  }
  return input.apply("Write", write);
}
 
开发者ID:apache,项目名称:beam,代码行数:38,代码来源:AvroIO.java


示例16: fromTopic

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
/**
 * Like {@code topic()} but with a {@link ValueProvider}.
 */
public Read<T> fromTopic(ValueProvider<String> topic) {
  if (topic.isAccessible()) {
    // Validate.
    PubsubTopic.fromPath(topic.get());
  }
  return toBuilder()
      .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator()))
      .build();
}
 
开发者ID:apache,项目名称:beam,代码行数:13,代码来源:PubsubIO.java


示例17: WriterFn

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
WriterFn(
    PubsubClientFactory pubsubFactory,
    ValueProvider<TopicPath> topic,
    String timestampAttribute,
    String idAttribute,
    int publishBatchSize,
    int publishBatchBytes) {
  this.pubsubFactory = pubsubFactory;
  this.topic = topic;
  this.timestampAttribute = timestampAttribute;
  this.idAttribute = idAttribute;
  this.publishBatchSize = publishBatchSize;
  this.publishBatchBytes = publishBatchBytes;
}
 
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:PubsubUnboundedSink.java


示例18: PubsubUnboundedSink

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
public PubsubUnboundedSink(
    PubsubClientFactory pubsubFactory,
    ValueProvider<TopicPath> topic,
    String timestampAttribute,
    String idAttribute,
    int numShards) {
  this(pubsubFactory, topic, timestampAttribute, idAttribute, numShards,
       DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
       RecordIdMethod.RANDOM);
}
 
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:PubsubUnboundedSink.java


示例19: makeDestinationFunction

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
/**
 * Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
 *
 * <p>Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
 *
 * @param outputBucket the GCS bucket we're outputting reports to
 * @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
 */
static SerializableFunction<BillingEvent, Params> makeDestinationFunction(
    String outputBucket, ValueProvider<String> yearMonthProvider) {
  return billingEvent ->
      new Params()
          .withShardTemplate("")
          .withSuffix(".csv")
          .withBaseFilename(
              NestedValueProvider.of(
                  yearMonthProvider,
                  yearMonth ->
                      FileBasedSink.convertToFileResourceIfPossible(
                          String.format(
                              "%s/%s", outputBucket, billingEvent.toFilename(yearMonth)))));
}
 
开发者ID:google,项目名称:nomulus,代码行数:23,代码来源:InvoicingUtils.java


示例20: ViaFileBasedSink

import org.apache.beam.sdk.options.ValueProvider; //导入依赖的package包/类
private ViaFileBasedSink(
    Write<DestinationT, UserT> spec) {
  super(
      ValueProvider.NestedValueProvider.of(
          spec.getTempDirectory(),
          new SerializableFunction<String, ResourceId>() {
            @Override
            public ResourceId apply(String input) {
              return FileSystems.matchNewResource(input, true /* isDirectory */);
            }
          }),
      new DynamicDestinationsAdapter<UserT, DestinationT, OutputT>(spec),
      spec.getCompression());
  this.spec = spec;
}
 
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:FileIO.java



注:本文中的org.apache.beam.sdk.options.ValueProvider类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Namespaces类代码示例发布时间:2022-05-22
下一篇:
Java Extension类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap