本文整理汇总了Java中org.apache.beam.sdk.PipelineResult类的典型用法代码示例。如果您正苦于以下问题:Java PipelineResult类的具体用法?Java PipelineResult怎么用?Java PipelineResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
PipelineResult类属于org.apache.beam.sdk包,在下文中一共展示了PipelineResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // forced for this demo
Pipeline p = Pipeline.create(options);
// register Avro coders for serializing our messages
Coders.registerAvroCoders(p, ExtendedRecord.class, UntypedOccurrence.class);
PCollection<UntypedOccurrence> verbatimRecords = p.apply(
"Read Avro", AvroIO.read(UntypedOccurrence.class).from("demo/output/data*"));
verbatimRecords.apply("Write file per Genus",
AvroIO.write(UntypedOccurrence.class)
.to("demo/output-split/data*") // prefix, is required but overwritten
.to(new GenusDynamicAvroDestinations(
FileSystems.matchNewResource("demo/output-split/data*", true))));
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:24,代码来源:MultiAvroOutDemo.java
示例2: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
public static void main(String[] args) {
Configuration conf = new Configuration(); // assume defaults on CP
conf.setClass("mapreduce.job.inputformat.class", DwCAInputFormat.class, InputFormat.class);
conf.setStrings("mapreduce.input.fileinputformat.inputdir", "hdfs://ha-nn/tmp/dwca-lep5.zip");
conf.setClass("key.class", Text.class, Object.class);
conf.setClass("value.class", ExtendedRecord.class, Object.class);
Pipeline p = newPipeline(args, conf);
Coders.registerAvroCoders(p, UntypedOccurrence.class, TypedOccurrence.class, ExtendedRecord.class);
PCollection<KV<Text, ExtendedRecord>> rawRecords =
p.apply("Read DwC-A", HadoopInputFormatIO.<Text, ExtendedRecord>read().withConfiguration(conf));
PCollection<UntypedOccurrence> verbatimRecords = rawRecords.apply(
"Convert to Avro", ParDo.of(fromExtendedRecordKVP()));
verbatimRecords.apply(
"Write Avro files", AvroIO.write(UntypedOccurrence.class).to("hdfs://ha-nn/tmp/dwca-lep5.avro"));
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:25,代码来源:DwCA2AvroPipeline.java
示例3: getDistributionMetric
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
/**
* Return the current value for a long counter, or a default value if can't be retrieved.
* Note this uses only attempted metrics because some runners don't support committed metrics.
*/
private long getDistributionMetric(PipelineResult result, String namespace, String name,
DistributionType distType, long defaultValue) {
MetricQueryResults metrics = result.metrics().queryMetrics(
MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions();
try {
MetricResult<DistributionResult> distributionResult = distributions.iterator().next();
switch (distType) {
case MIN:
return distributionResult.attempted().min();
case MAX:
return distributionResult.attempted().max();
default:
return defaultValue;
}
} catch (NoSuchElementException e) {
LOG.error(
"Failed to get distribution metric {} for namespace {}",
name,
namespace);
}
return defaultValue;
}
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:NexmarkLauncher.java
示例4: verifyPAssertsSucceeded
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
/**
* Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful.
*
* <p>Note this only runs for runners which support Metrics. Runners which do not should verify
* this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p>
*/
public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) {
if (MetricsEnvironment.isMetricsSupported()) {
long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline);
long successfulAssertions = 0;
Iterable<MetricResult<Long>> successCounterResults =
pipelineResult.metrics().queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
.build())
.counters();
for (MetricResult<Long> counter : successCounterResults) {
if (counter.attempted() > 0) {
successfulAssertions++;
}
}
assertThat(String
.format("Expected %d successful assertions, but found %d.", expectedNumberOfAssertions,
successfulAssertions), successfulAssertions, is(expectedNumberOfAssertions));
}
}
开发者ID:apache,项目名称:beam,代码行数:29,代码来源:TestPipeline.java
示例5: testBoundedSourceMetrics
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
@Test
@Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
public void testBoundedSourceMetrics() {
long numElements = 1000;
pipeline.apply(GenerateSequence.from(0).to(numElements));
PipelineResult pipelineResult = pipeline.run();
MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(ELEMENTS_READ.namespace(), ELEMENTS_READ.name()))
.build());
assertThat(metrics.counters(), hasItem(
attemptedMetricsResult(
ELEMENTS_READ.namespace(),
ELEMENTS_READ.name(),
"Read(BoundedCountingSource)",
1000L)));
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:MetricsTest.java
示例6: matchesSafely
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
@Override
protected boolean matchesSafely(PipelineResult pipelineResult) {
pipelineResult.waitUntilFinish();
Session session = cluster.connect();
ResultSet result = session.execute("select id,name from " + CassandraTestDataSet.KEYSPACE
+ "." + tableName);
List<Row> rows = result.all();
if (rows.size() != 1000) {
return false;
}
for (Row row : rows) {
if (!row.getString("name").matches("Name.*")) {
return false;
}
}
return true;
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:CassandraIOIT.java
示例7: testWithValidProvidedContext
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
private void testWithValidProvidedContext(JavaSparkContext jsc) throws Exception {
SparkContextOptions options = getSparkContextOptions(jsc);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
// Run test from pipeline
PipelineResult result = p.run();
TestPipeline.verifyPAssertsSucceeded(p, result);
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:ProvidedSparkContextTest.java
示例8: run
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
@Override
public PipelineResult run(Pipeline pipeline) {
try {
return delegate.run(pipeline);
} catch (Throwable t) {
// Special case hack to pull out assertion errors from PAssert; instead there should
// probably be a better story along the lines of UserCodeException.
UserCodeException innermostUserCodeException = null;
Throwable current = t;
for (; current.getCause() != null; current = current.getCause()) {
if (current instanceof UserCodeException) {
innermostUserCodeException = ((UserCodeException) current);
}
}
if (innermostUserCodeException != null) {
current = innermostUserCodeException.getCause();
}
if (current instanceof AssertionError) {
throw (AssertionError) current;
}
throw new PipelineExecutionException(current);
}
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:TestFlinkRunner.java
示例9: testWaitUntilFinishTimeout
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
@Test
public void testWaitUntilFinishTimeout() throws Exception {
DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
options.setBlockOnRun(false);
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
p
.apply(Create.of(1L))
.apply(ParDo.of(
new DoFn<Long, Long>() {
@ProcessElement
public void hang(ProcessContext context) throws InterruptedException {
// Hangs "forever"
Thread.sleep(Long.MAX_VALUE);
}
}));
PipelineResult result = p.run();
// The pipeline should never complete;
assertThat(result.getState(), is(State.RUNNING));
// Must time out, otherwise this test will never complete
result.waitUntilFinish(Duration.millis(1L));
assertThat(result.getState(), is(State.RUNNING));
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:DirectRunnerTest.java
示例10: getSample
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
@Override
public void getSample(int limit, Consumer<IndexedRecord> consumer) {
// Create a pipeline using the input component to get records.
DirectOptions options = BeamLocalRunnerOption.getOptions();
final Pipeline p = Pipeline.create(options);
// Create an input runtime based on the properties.
BigQueryInputRuntime inputRuntime = new BigQueryInputRuntime();
BigQueryInputProperties inputProperties = new BigQueryInputProperties(null);
inputProperties.init();
inputProperties.setDatasetProperties(properties);
inputRuntime.initialize(new BeamJobRuntimeContainer(options), inputProperties);
try (DirectConsumerCollector<IndexedRecord> collector = DirectConsumerCollector.of(consumer)) {
// Collect a sample of the input records.
p.apply(inputRuntime) //
.apply(Sample.<IndexedRecord> any(limit)).apply(collector);
PipelineResult pr = p.run();
pr.waitUntilFinish();
}
}
开发者ID:Talend,项目名称:components,代码行数:22,代码来源:BigQueryDatasetRuntime.java
示例11: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // forced for this demo
Pipeline p = Pipeline.create(options);
// register Avro coders for serializing our messages
Coders.registerAvroCoders(p, ExtendedRecord.class, UntypedOccurrence.class);
// Read the DwC-A using our custom reader
PCollection<ExtendedRecord> rawRecords = p.apply(
"Read from Darwin Core Archive", DwCAIO.Read.withPaths("demo/dwca.zip", "demo/target/tmp"));
// Convert the ExtendedRecord into an UntypedOccurrence record
DoFn<ExtendedRecord,UntypedOccurrence> fn = BeamFunctions.beamify(FunctionFactory.untypedOccurrenceBuilder());
// TODO: Explore the generics as to why the coder registry does not find it and we need to set the coder explicitly
PCollection<UntypedOccurrence> verbatimRecords = rawRecords.apply(
"Convert the objects into untyped DwC style records",ParDo.of(fn))
.setCoder(AvroCoder.of(UntypedOccurrence.class));
// Write the result as an Avro file
verbatimRecords.apply(
"Save the records as Avro", AvroIO.write(UntypedOccurrence.class).to("demo/output/data"));
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:30,代码来源:DwCA2AvroPipeline.java
示例12: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
public static void main(String[] args) {
Configuration conf = new Configuration(); // assume defaults on CP
Pipeline p = newPipeline(args, conf);
Coders.registerAvroCoders(p, UntypedOccurrenceLowerCase.class, TypedOccurrence.class, ExtendedRecord.class);
// Read Avro files
PCollection<UntypedOccurrenceLowerCase> verbatimRecords = p.apply(
"Read Avro files", AvroIO.read(UntypedOccurrenceLowerCase.class).from(SOURCE_PATH));
// Convert the objects (interpretation)
PCollection<TypedOccurrence> interpreted = verbatimRecords.apply(
"Interpret occurrence records", ParDo.of(BeamFunctions.beamify(FunctionFactory.interpretOccurrenceLowerCase())))
.setCoder(AvroCoder.of(TypedOccurrence.class));
// Do the nub lookup
PCollection<TypedOccurrence> matched = interpreted.apply(
"Align to backbone using species/match", ParDo.of(
BeamFunctions.beamify(FunctionFactory.gbifSpeciesMatch())))
.setCoder(AvroCoder.of(TypedOccurrence.class));
// Write the file to SOLR
final SolrIO.ConnectionConfiguration conn = SolrIO.ConnectionConfiguration
.create(SOLR_HOST);
PCollection<SolrInputDocument> inputDocs = matched.apply(
"Convert to SOLR", ParDo.of(new SolrDocBuilder()));
inputDocs.apply(SolrIO.write().to("beam-demo1").withConnectionConfiguration(conn));
// instruct the writer to use a provided document ID
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:37,代码来源:Avro2SolrPipeline.java
示例13: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
public static void main(String[] args) {
Configuration conf = new Configuration(); // assume defaults on CP
Pipeline p = newPipeline(args, conf);
Coders.registerAvroCoders(p, UntypedOccurrenceLowerCase.class, TypedOccurrence.class, ExtendedRecord.class);
// Read Avro files
PCollection<UntypedOccurrenceLowerCase> verbatimRecords = p.apply(
"Read Avro files", AvroIO.read(UntypedOccurrenceLowerCase.class).from(SOURCE_PATH));
// Convert the objects (interpretation)
PCollection<TypedOccurrence> interpreted = verbatimRecords.apply(
"Interpret occurrence records", ParDo.of(BeamFunctions.beamify(FunctionFactory.interpretOccurrenceLowerCase())))
.setCoder(AvroCoder.of(TypedOccurrence.class));
// Do the nub lookup
PCollection<TypedOccurrence> matched = interpreted.apply(
"Align to backbone using species/match", ParDo.of(
BeamFunctions.beamify(FunctionFactory.gbifSpeciesMatch())))
.setCoder(AvroCoder.of(TypedOccurrence.class));
// Convert to JSON
PCollection<String> json = matched.apply(
"Convert to JSON", ParDo.of(BeamFunctions.asJson(TypedOccurrence.class)));
// Write the file to ES
ElasticsearchIO.ConnectionConfiguration conn = ElasticsearchIO.ConnectionConfiguration
.create(ES_HOSTS,ES_INDEX, ES_TYPE);
// Index in ES
json.apply(ElasticsearchIO.write().withConnectionConfiguration(conn).withMaxBatchSize(BATCH_SIZE));
// instruct the writer to use a provided document ID
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:39,代码来源:Avro2ElasticSearchPipeline.java
示例14: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // forced for this demo
Pipeline p = Pipeline.create(options);
// register Avro coders for serializing our messages
Coders.registerAvroCoders(p, ExtendedRecord.class, UntypedOccurrence.class);
// Read the DwC-A using our custom reader
PCollection<ExtendedRecord> rawRecords = p.apply(
"Read from Darwin Core Archive", DwCAIO.Read.withPaths("/tmp/dwca-s-bryophytes-v4.1.zip", "demo/target/tmp"));
// Convert the ExtendedRecord into an UntypedOccurrence record
PCollection<UntypedOccurrence> verbatimRecords = rawRecords.apply(
"Convert the objects into untyped DwC style records",
ParDo.of(BeamFunctions.beamify(FunctionFactory.untypedOccurrenceBuilder())))
.setCoder(AvroCoder.of(UntypedOccurrence.class));
// Write the file to SOLR
final SolrIO.ConnectionConfiguration conn = SolrIO.ConnectionConfiguration
.create(SOLR_HOSTS);
PCollection<SolrInputDocument> inputDocs = verbatimRecords.apply(
"Convert to SOLR", ParDo.of(new SolrDocBuilder()));
inputDocs.apply(SolrIO.write().to("beam-demo1").withConnectionConfiguration(conn));
LOG.info("Starting the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
LOG.info("Pipeline finished with state: {} ", result.getState());
}
开发者ID:gbif,项目名称:pipelines,代码行数:33,代码来源:DwCA2SolrPipeline.java
示例15: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
/**
* Sets up and starts streaming pipeline.
*
* @throws IOException if there is a problem setting up resources
*/
public static void main(String[] args) throws IOException {
StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingWordExtractOptions.class);
options.setStreaming(true);
options.setBigQuerySchema(StringToRowConverter.getSchema());
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
String tableSpec = new StringBuilder()
.append(options.getProject()).append(":")
.append(options.getBigQueryDataset()).append(".")
.append(options.getBigQueryTable())
.toString();
pipeline
.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWords()))
.apply(ParDo.of(new Uppercase()))
.apply(ParDo.of(new StringToRowConverter()))
.apply(BigQueryIO.writeTableRows().to(tableSpec)
.withSchema(StringToRowConverter.getSchema()));
PipelineResult result = pipeline.run();
// ExampleUtils will try to cancel the pipeline before the program exists.
exampleUtils.waitToFinish(result);
}
开发者ID:apache,项目名称:beam,代码行数:36,代码来源:StreamingWordExtract.java
示例16: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
/**
* Sets up and starts streaming pipeline.
*
* @throws IOException if there is a problem setting up resources
*/
public static void main(String[] args) throws IOException {
TrafficRoutesOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TrafficRoutesOptions.class);
options.setBigQuerySchema(FormatStatsFn.getSchema());
// Using ExampleUtils to set up required resources.
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
tableRef.setProjectId(options.getProject());
tableRef.setDatasetId(options.getBigQueryDataset());
tableRef.setTableId(options.getBigQueryTable());
pipeline
.apply("ReadLines", new ReadFileAndExtractTimestamps(options.getInputFile()))
// row... => <station route, station speed> ...
.apply(ParDo.of(new ExtractStationSpeedFn()))
// map the incoming data stream into sliding windows.
.apply(Window.<KV<String, StationSpeed>>into(SlidingWindows.of(
Duration.standardMinutes(options.getWindowDuration())).
every(Duration.standardMinutes(options.getWindowSlideEvery()))))
.apply(new TrackSpeed())
.apply(BigQueryIO.writeTableRows().to(tableRef)
.withSchema(FormatStatsFn.getSchema()));
// Run the pipeline.
PipelineResult result = pipeline.run();
// ExampleUtils will try to cancel the pipeline and the injector before the program exists.
exampleUtils.waitToFinish(result);
}
开发者ID:apache,项目名称:beam,代码行数:40,代码来源:TrafficRoutes.java
示例17: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
/**
* Sets up and starts streaming pipeline.
*
* @throws IOException if there is a problem setting up resources
*/
public static void main(String[] args) throws IOException {
TrafficMaxLaneFlowOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TrafficMaxLaneFlowOptions.class);
options.setBigQuerySchema(FormatMaxesFn.getSchema());
// Using ExampleUtils to set up required resources.
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
tableRef.setProjectId(options.getProject());
tableRef.setDatasetId(options.getBigQueryDataset());
tableRef.setTableId(options.getBigQueryTable());
pipeline
.apply("ReadLines", new ReadFileAndExtractTimestamps(options.getInputFile()))
// row... => <station route, station speed> ...
.apply(ParDo.of(new ExtractFlowInfoFn()))
// map the incoming data stream into sliding windows.
.apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of(
Duration.standardMinutes(options.getWindowDuration())).
every(Duration.standardMinutes(options.getWindowSlideEvery()))))
.apply(new MaxLaneFlow())
.apply(BigQueryIO.writeTableRows().to(tableRef)
.withSchema(FormatMaxesFn.getSchema()));
// Run the pipeline.
PipelineResult result = pipeline.run();
// ExampleUtils will try to cancel the pipeline and the injector before the program exists.
exampleUtils.waitToFinish(result);
}
开发者ID:apache,项目名称:beam,代码行数:39,代码来源:TrafficMaxLaneFlow.java
示例18: main
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
TrafficFlowOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TrafficFlowOptions.class);
options.setStreaming(true);
options.setBigQuerySchema(getSchema());
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = getTableReference(options.getProject(),
options.getBigQueryDataset(), options.getBigQueryTable());
PCollectionList<TableRow> resultList = pipeline
.apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
.apply(ParDo.of(new ExtractFlowInfo()))
.apply(new CalculateTotalFlow(options.getWindowDuration()));
for (int i = 0; i < resultList.size(); i++){
resultList.get(i).apply(BigQueryIO.writeTableRows()
.to(tableRef)
.withSchema(getSchema()));
}
PipelineResult result = pipeline.run();
// ExampleUtils will try to cancel the pipeline and the injector before the program exits.
exampleUtils.waitToFinish(result);
}
开发者ID:apache,项目名称:beam,代码行数:34,代码来源:TriggerExample.java
示例19: matchesSafely
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
@Override
public boolean matchesSafely(PipelineResult pipelineResult) {
try {
// Load output data
List<String> outputLines = new ArrayList<>();
for (ShardedFile outputFile : outputFiles) {
outputLines.addAll(
outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()));
}
// Since the windowing is nondeterministic we only check the sums
actualCounts = new TreeMap<>();
for (String line : outputLines) {
String[] splits = line.split(": ");
String word = splits[0];
long count = Long.parseLong(splits[1]);
Long current = actualCounts.get(word);
if (current == null) {
actualCounts.put(word, count);
} else {
actualCounts.put(word, current + count);
}
}
return actualCounts.equals(expectedWordCounts);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to read from sharded output: %s due to exception",
outputFiles), e);
}
}
开发者ID:apache,项目名称:beam,代码行数:33,代码来源:WindowedWordCountIT.java
示例20: getCounterMetric
import org.apache.beam.sdk.PipelineResult; //导入依赖的package包/类
/**
* Return the current value for a long counter, or a default value if can't be retrieved.
* Note this uses only attempted metrics because some runners don't support committed metrics.
*/
private long getCounterMetric(PipelineResult result, String namespace, String name,
long defaultValue) {
MetricQueryResults metrics = result.metrics().queryMetrics(
MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build());
Iterable<MetricResult<Long>> counters = metrics.counters();
try {
MetricResult<Long> metricResult = counters.iterator().next();
return metricResult.attempted();
} catch (NoSuchElementException e) {
LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
}
return defaultValue;
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:NexmarkLauncher.java
注:本文中的org.apache.beam.sdk.PipelineResult类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论