本文整理汇总了Java中org.apache.beam.sdk.io.FileBasedSink类的典型用法代码示例。如果您正苦于以下问题:Java FileBasedSink类的具体用法?Java FileBasedSink怎么用?Java FileBasedSink使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
FileBasedSink类属于org.apache.beam.sdk.io包,在下文中一共展示了FileBasedSink类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testEncodedProto
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Test
public void testEncodedProto() throws Exception {
RunnerApi.WriteFilesPayload payload =
WriteFilesTranslation.payloadForWriteFiles(writeFiles, SdkComponents.create());
assertThat(
payload.getRunnerDeterminedSharding(),
equalTo(
writeFiles.getNumShardsProvider() == null
&& writeFiles.getComputeNumShards() == null));
assertThat(payload.getWindowedWrites(), equalTo(writeFiles.getWindowedWrites()));
assertThat(
(FileBasedSink<String, Void, String>)
WriteFilesTranslation.sinkFromProto(payload.getSink()),
equalTo(writeFiles.getSink()));
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:WriteFilesTranslationTest.java
示例2: testDestinationFunction_generatesProperFileParams
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Test
public void testDestinationFunction_generatesProperFileParams() {
SerializableFunction<BillingEvent, Params> destinationFunction =
InvoicingUtils.makeDestinationFunction("my/directory", StaticValueProvider.of("2017-10"));
BillingEvent billingEvent = mock(BillingEvent.class);
// We mock BillingEvent to make the test independent of the implementation of toFilename()
when(billingEvent.toFilename(any())).thenReturn("invoice_details_2017-10_registrar_tld");
assertThat(destinationFunction.apply(billingEvent))
.isEqualTo(
new Params()
.withShardTemplate("")
.withSuffix(".csv")
.withBaseFilename(
FileBasedSink.convertToFileResourceIfPossible(
"my/directory/invoice_details_2017-10_registrar_tld")));
}
开发者ID:google,项目名称:nomulus,代码行数:19,代码来源:InvoicingUtilsTest.java
示例3: expand
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Override
public PDone expand(PCollection<String> input) {
// Verify that the input has a compatible window type.
checkArgument(
input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
// filenamePrefix may contain a directory and a filename component. Pull out only the filename
// component from that path for the PerWindowFiles.
String prefix = "";
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
if (!resource.isDirectory()) {
prefix = verifyNotNull(
resource.getFilename(),
"A non-directory resource should have a non-null filename: %s",
resource);
}
return input.apply(
TextIO.write()
.to(resource.getCurrentDirectory())
.withFilenamePolicy(new PerWindowFiles(prefix))
.withWindowedWrites()
.withNumShards(3));
}
开发者ID:GoogleCloudPlatform,项目名称:DataflowSDK-examples,代码行数:25,代码来源:WriteToText.java
示例4: expand
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Override
public PDone expand(PCollection<String> input) {
// filenamePrefix may contain a directory and a filename component. Pull out only the filename
// component from that path for the PerWindowFiles.
String prefix = "";
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
if (!resource.isDirectory()) {
prefix = verifyNotNull(
resource.getFilename(),
"A non-directory resource should have a non-null filename: %s",
resource);
}
TextIO.Write write = TextIO.write()
.to(resource.getCurrentDirectory())
.withFilenamePolicy(new PerWindowFiles(prefix))
.withWindowedWrites();
if (numShards != null) {
write = write.withNumShards(numShards);
}
return input.apply(write);
}
开发者ID:GoogleCloudPlatform,项目名称:DataflowSDK-examples,代码行数:24,代码来源:WriteOneFilePerWindow.java
示例5: getFilenamePolicy
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(String genus) {
return DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
baseDir.resolve(genus, RESOLVE_FILE)),
ShardNameTemplate.INDEX_OF_MAX,
".avro",
false);
}
开发者ID:gbif,项目名称:pipelines,代码行数:10,代码来源:MultiAvroOutDemo.java
示例6: expand
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Override
public PDone expand(PCollection<String> input) {
// Verify that the input has a compatible window type.
checkArgument(
input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
return input.apply(
TextIO.write()
.to(new PerWindowFiles(resource))
.withTempDirectory(resource.getCurrentDirectory())
.withWindowedWrites()
.withNumShards(3));
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:WriteToText.java
示例7: expand
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Override
public PDone expand(PCollection<String> input) {
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
TextIO.Write write =
TextIO.write()
.to(new PerWindowFiles(resource))
.withTempDirectory(resource.getCurrentDirectory())
.withWindowedWrites();
if (numShards != null) {
write = write.withNumShards(numShards);
}
return input.apply(write);
}
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:WriteOneFilePerWindow.java
示例8: testWindowedWordCountPipeline
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) throws Exception {
String outputPrefix = options.getOutput();
PerWindowFiles filenamePolicy =
new PerWindowFiles(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
final Instant windowStart =
new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
expectedOutputFiles.add(
new NumberedShardedFile(
filenamePolicy.filenamePrefixForWindow(
new IntervalWindow(
windowStart, windowStart.plus(Duration.standardMinutes(10)))) + "*"));
}
ShardedFile inputFile = new ExplicitShardedFile(Collections.singleton(options.getInputFile()));
// For this integration test, input is tiny and we can build the expected counts
SortedMap<String, Long> expectedWordCounts = new TreeMap<>();
for (String line :
inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) {
String[] words = line.split(ExampleUtils.TOKENIZER_PATTERN);
for (String word : words) {
if (!word.isEmpty()) {
expectedWordCounts.put(
word, MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) + 1L);
}
}
}
options.setOnSuccessMatcher(
new WordCountsMatcher(expectedWordCounts, expectedOutputFiles));
WindowedWordCount.main(TestPipeline.convertToArgs(options));
}
开发者ID:apache,项目名称:beam,代码行数:41,代码来源:WindowedWordCountIT.java
示例9: sinkFromProto
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@VisibleForTesting
static FileBasedSink<?, ?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
checkArgument(
sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
"Cannot extract %s instance from %s with URN %s",
FileBasedSink.class.getSimpleName(),
FunctionSpec.class.getSimpleName(),
sinkProto.getSpec().getUrn());
byte[] serializedSink = sinkProto.getSpec().getPayload().toByteArray();
return (FileBasedSink<?, ?, ?>)
SerializableUtils.deserializeFromByteArray(
serializedSink, FileBasedSink.class.getSimpleName());
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:WriteFilesTranslation.java
示例10: getSink
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
AppliedPTransform<
PCollection<UserT>, WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
return (FileBasedSink<UserT, DestinationT, OutputT>)
sinkFromProto(getWriteFilesPayload(transform).getSink());
}
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:WriteFilesTranslation.java
示例11: writeWithRunnerDeterminedSharding
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Test
public void writeWithRunnerDeterminedSharding() {
ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
FilenamePolicy policy =
DefaultFilenamePolicy.fromStandardParameters(
StaticValueProvider.of(outputDirectory),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
"",
false);
WriteFiles<Integer, Void, Integer> write =
WriteFiles.to(
new FileBasedSink<Integer, Void, Integer>(
StaticValueProvider.of(outputDirectory),
DynamicFileDestinations.<Integer>constant(new FakeFilenamePolicy())) {
@Override
public WriteOperation<Void, Integer> createWriteOperation() {
return null;
}
});
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));
WriteFiles<Integer, Void, Integer> withStaticSharding = write.withNumShards(3);
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withStaticSharding)),
is(false));
WriteFiles<Integer, Void, Integer> withCustomSharding =
write.withSharding(Sum.integersGlobally().asSingletonView());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withCustomSharding)),
is(false));
}
开发者ID:apache,项目名称:beam,代码行数:37,代码来源:PTransformMatchersTest.java
示例12: windowedFilename
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
FileBasedSink.OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("should not be called");
}
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:PTransformMatchersTest.java
示例13: getReplacementTransform
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Override
public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
getReplacementTransform(
AppliedPTransform<
PCollection<UserT>, WriteFilesResult<DestinationT>,
WriteFiles<UserT, DestinationT, OutputT>>
transform) {
// By default, if numShards is not set WriteFiles will produce one file per bundle. In
// streaming, there are large numbers of small bundles, resulting in many tiny files.
// Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
// (current_num_workers * 2 might be a better choice, but that value is not easily available
// today).
// If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
int numShards;
if (options.getMaxNumWorkers() > 0) {
numShards = options.getMaxNumWorkers() * 2;
} else if (options.getNumWorkers() > 0) {
numShards = options.getNumWorkers() * 2;
} else {
numShards = DEFAULT_NUM_SHARDS;
}
try {
List<PCollectionView<?>> sideInputs =
WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
FileBasedSink sink = WriteFilesTranslation.getSink(transform);
WriteFiles<UserT, DestinationT, OutputT> replacement =
WriteFiles.to(sink).withSideInputs(sideInputs);
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
replacement.withNumShards(numShards));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
开发者ID:apache,项目名称:beam,代码行数:39,代码来源:DataflowRunner.java
示例14: withNoShardingSpecifiedReturnsNewTransform
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
PTransform<PCollection<Object>, WriteFilesResult<Void>> original =
WriteFiles.to(
new FileBasedSink<Object, Void, Object>(
StaticValueProvider.of(outputDirectory),
DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
@Override
public WriteOperation<Void, Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
});
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
AppliedPTransform<
PCollection<Object>, WriteFilesResult<Void>,
PTransform<PCollection<Object>, WriteFilesResult<Void>>>
originalApplication =
AppliedPTransform.of(
"write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
assertThat(
factory.getReplacementTransform(originalApplication).getTransform(),
not(equalTo((Object) original)));
}
开发者ID:apache,项目名称:beam,代码行数:29,代码来源:WriteWithShardingFactoryTest.java
示例15: windowedFilename
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
FileBasedSink.OutputFileHints outputFileHints) {
throw new IllegalArgumentException("Should not be used");
}
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:WriteWithShardingFactoryTest.java
示例16: unwindowedFilename
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Nullable
@Override
public ResourceId unwindowedFilename(
int shardNumber,
int numShards,
FileBasedSink.OutputFileHints outputFileHints) {
throw new IllegalArgumentException("Should not be used");
}
开发者ID:apache,项目名称:beam,代码行数:9,代码来源:WriteWithShardingFactoryTest.java
示例17: writeDetailReports
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
/** Returns an IO transform that writes detail reports to registrar-tld keyed CSV files. */
private TextIO.TypedWrite<BillingEvent, Params> writeDetailReports(
ValueProvider<String> yearMonthProvider) {
return TextIO.<BillingEvent>writeCustomType()
// TODO(larryruili): Replace with billing bucket/yyyy-MM after verifying 2017-12 output.
.to(
InvoicingUtils.makeDestinationFunction(beamBucket + "/results", yearMonthProvider),
InvoicingUtils.makeEmptyDestinationParams(beamBucket + "/results"))
.withFormatFunction(BillingEvent::toCsv)
.withoutSharding()
.withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(beamBucket + "/temporary"))
.withHeader(BillingEvent.getHeader())
.withSuffix(".csv");
}
开发者ID:google,项目名称:nomulus,代码行数:15,代码来源:InvoicingPipeline.java
示例18: makeDestinationFunction
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
/**
* Returns a function mapping from {@code BillingEvent} to filename {@code Params}.
*
* <p>Beam uses this to determine which file a given {@code BillingEvent} should get placed into.
*
* @param outputBucket the GCS bucket we're outputting reports to
* @param yearMonthProvider a runtime provider for the yyyy-MM we're generating the invoice for
*/
static SerializableFunction<BillingEvent, Params> makeDestinationFunction(
String outputBucket, ValueProvider<String> yearMonthProvider) {
return billingEvent ->
new Params()
.withShardTemplate("")
.withSuffix(".csv")
.withBaseFilename(
NestedValueProvider.of(
yearMonthProvider,
yearMonth ->
FileBasedSink.convertToFileResourceIfPossible(
String.format(
"%s/%s", outputBucket, billingEvent.toFilename(yearMonth)))));
}
开发者ID:google,项目名称:nomulus,代码行数:23,代码来源:InvoicingUtils.java
示例19: makeEmptyDestinationParams
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
/**
* Returns the default filename parameters for an unmappable {@code BillingEvent}.
*
* <p>The "failed" file should only be populated when an error occurs, which warrants further
* investigation.
*/
static Params makeEmptyDestinationParams(String outputBucket) {
return new Params()
.withBaseFilename(
FileBasedSink.convertToFileResourceIfPossible(
String.format("%s/%s", outputBucket, "FAILURES")));
}
开发者ID:google,项目名称:nomulus,代码行数:13,代码来源:InvoicingUtils.java
示例20: testEmptyDestinationParams
import org.apache.beam.sdk.io.FileBasedSink; //导入依赖的package包/类
@Test
public void testEmptyDestinationParams() {
assertThat(InvoicingUtils.makeEmptyDestinationParams("my/directory"))
.isEqualTo(
new Params()
.withBaseFilename(
FileBasedSink.convertToFileResourceIfPossible("my/directory/FAILURES")));
}
开发者ID:google,项目名称:nomulus,代码行数:9,代码来源:InvoicingUtilsTest.java
注:本文中的org.apache.beam.sdk.io.FileBasedSink类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论