本文整理汇总了Java中org.apache.beam.sdk.metrics.Metrics类的典型用法代码示例。如果您正苦于以下问题:Java Metrics类的具体用法?Java Metrics怎么用?Java Metrics使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Metrics类属于org.apache.beam.sdk.metrics包,在下文中一共展示了Metrics类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: snoop
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
/**
* Return a transform to pass-through events, but count them as they go by.
*/
public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
return ParDo.of(new DoFn<Event, Event>() {
final Counter eventCounter = Metrics.counter(name, "events");
final Counter newPersonCounter = Metrics.counter(name, "newPersons");
final Counter newAuctionCounter = Metrics.counter(name, "newAuctions");
final Counter bidCounter = Metrics.counter(name, "bids");
final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream");
@ProcessElement
public void processElement(ProcessContext c) {
eventCounter.inc();
if (c.element().newPerson != null) {
newPersonCounter.inc();
} else if (c.element().newAuction != null) {
newAuctionCounter.inc();
} else if (c.element().bid != null) {
bidCounter.inc();
} else {
endOfStreamCounter.inc();
}
info("%s snooping element %s", name, c.element());
c.output(c.element());
}
});
}
开发者ID:apache,项目名称:beam,代码行数:29,代码来源:NexmarkUtils.java
示例2: applyTyped
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
private PCollection<Event> applyTyped(PCollection<Event> events) {
final Coder<Event> coder = events.getCoder();
return events
// Force round trip through coder.
.apply(name + ".Serialize",
ParDo.of(new DoFn<Event, Event>() {
private final Counter bytesMetric =
Metrics.counter(name , "bytes");
@ProcessElement
public void processElement(ProcessContext c) throws CoderException, IOException {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
coder.encode(c.element(), outStream, Coder.Context.OUTER);
byte[] byteArray = outStream.toByteArray();
bytesMetric.inc((long) byteArray.length);
ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
Event event = coder.decode(inStream, Coder.Context.OUTER);
c.output(event);
}
}));
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:Query0.java
示例3: logBytesMetric
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
private PTransform<? super PCollection<BeamRecord>, PCollection<BeamRecord>> logBytesMetric(
final BeamRecordCoder coder) {
return ParDo.of(new DoFn<BeamRecord, BeamRecord>() {
private final Counter bytesMetric = Metrics.counter(name , "bytes");
@ProcessElement
public void processElement(ProcessContext c) throws CoderException, IOException {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
coder.encode(c.element(), outStream, Coder.Context.OUTER);
byte[] byteArray = outStream.toByteArray();
bytesMetric.inc((long) byteArray.length);
ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
BeamRecord record = coder.decode(inStream, Coder.Context.OUTER);
c.output(record);
}
});
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:SqlQuery0.java
示例4: processElement
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@ProcessElement
public void processElement(ProcessContext c) {
GenomicsFactory factory = GenomicsFactory.builder().build();
Genomics genomics = factory.fromOfflineAuth(auth);
processApiCall(genomics, c, c.element());
Metrics.counter(GenomicsApiReader.class, "Genomics API Initialized Request Count")
.inc(factory.initializedRequestsCount());
Metrics.counter(GenomicsApiReader.class, "Genomics API Unsuccessful Response Count")
.inc(factory.unsuccessfulResponsesCount());
Metrics.counter(GenomicsApiReader.class, "Genomics API IOException Response Count")
.inc(factory.ioExceptionsCount());
LOG.info("ApiReader processed " + factory.initializedRequestsCount() + " requests ("
+ factory.unsuccessfulResponsesCount() + " server errors and "
+ factory.ioExceptionsCount() + " IO exceptions)");
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:18,代码来源:GenomicsApiReader.java
示例5: processElement
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@ProcessElement
public void processElement(ProcessContext c) throws IOException, GeneralSecurityException, InterruptedException {
Metrics.counter(RetrieveVariants.class, "Initialized Shard Count").inc();
Stopwatch stopWatch = Stopwatch.createStarted();
Iterator<StreamVariantsResponse> iter = VariantStreamIterator.enforceShardBoundary(auth, c.element(), shardBoundary, fields);
while (iter.hasNext()) {
StreamVariantsResponse variantResponse = iter.next();
c.output(variantResponse.getVariantsList());
}
stopWatch.stop();
Metrics.distribution(RetrieveVariants.class, "Shard Processing Time (sec)")
.update(stopWatch.elapsed(TimeUnit.SECONDS));
Metrics.counter(RetrieveVariants.class, "Finished Shard Count").inc();
stats.addValue(stopWatch.elapsed(TimeUnit.SECONDS));
LOG.info("Shard Duration in Seconds - Min: " + stats.getMin() + " Max: " + stats.getMax() +
" Avg: " + stats.getMean() + " StdDev: " + stats.getStandardDeviation());
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:18,代码来源:VariantStreamer.java
示例6: startBundle
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@StartBundle
public void startBundle(DoFn<Read, String>.StartBundleContext c) throws IOException {
LOG.info("Starting bundle ");
storage = Transport.newStorageClient(c.getPipelineOptions().as(GCSOptions.class)).build().objects();
Metrics.counter(WriteBAMFn.class, "Initialized Write Shard Count").inc();
stopWatch = Stopwatch.createStarted();
options = c.getPipelineOptions().as(Options.class);
readCount = 0;
unmappedReadCount = 0;
headerInfo = null;
prevRead = null;
minAlignment = Long.MAX_VALUE;
maxAlignment = Long.MIN_VALUE;
hadOutOfOrder = false;
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:19,代码来源:WriteBAMFn.java
示例7: finishBundle
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@FinishBundle
public void finishBundle(DoFn<Read, String>.FinishBundleContext c) throws IOException {
bw.close();
Metrics.distribution(WriteBAMFn.class, "Maximum Write Shard Processing Time (sec)")
.update(stopWatch.elapsed(TimeUnit.SECONDS));
LOG.info("Finished writing " + shardContig);
Metrics.counter(WriteBAMFn.class, "Finished Write Shard Count").inc();
final long bytesWritten = ts.getBytesWrittenExceptingTruncation();
LOG.info("Wrote " + readCount + " reads, " + unmappedReadCount + " unmapped, into " + shardName +
(hadOutOfOrder ? "ignored out of order" : "") + ", wrote " + bytesWritten + " bytes");
Metrics.counter(WriteBAMFn.class, "Written reads").inc(readCount);
Metrics.counter(WriteBAMFn.class, "Written unmapped reads").inc(unmappedReadCount);
final long totalReadCount = (long)readCount + (long)unmappedReadCount;
Metrics.distribution(WriteBAMFn.class, "Maximum Reads Per Shard").update(totalReadCount);
c.output(shardName, window.maxTimestamp(), window);
c.output(SEQUENCE_SHARD_SIZES_TAG, KV.of(sequenceIndex, bytesWritten),
window.maxTimestamp(), window);
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:19,代码来源:WriteBAMFn.java
示例8: processElement
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@ProcessElement
public void processElement(DoFn<String, Integer>.ProcessContext context) throws Exception {
String variantId = context.element();
// Call the deletion operation via exponential backoff so that "Rate Limit Exceeded"
// quota issues do not cause the pipeline to fail.
ExponentialBackOff backoff = new ExponentialBackOff.Builder().build();
while (true) {
try {
genomics.variants().delete(variantId).execute();
Metrics.counter(DeleteVariantFn.class, "Number of variants deleted").inc();
context.output(1);
return;
} catch (Exception e) {
if (e.getMessage().startsWith("429 Too Many Requests")) {
LOG.warn("Backing-off per: ", e);
long backOffMillis = backoff.nextBackOffMillis();
if (backOffMillis == BackOff.STOP) {
throw e;
}
Thread.sleep(backOffMillis);
} else {
throw e;
}
}
}
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:27,代码来源:DeleteVariants.java
示例9: processElement
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@ProcessElement
public void processElement(DoFn<Read, KV<Contig, Read>>.ProcessContext c)
throws Exception {
final Read read = c.element();
long pos = read.getAlignment().getPosition().getPosition();
minPos = Math.min(minPos, pos);
maxPos = Math.max(maxPos, pos);
count++;
c.output(
KV.of(
shardKeyForRead(read, lociPerShard),
read));
Metrics.counter(KeyReadsFn.class, "Keyed reads").inc();
if (isUnmapped(read)) {
Metrics.counter(KeyReadsFn.class, "Keyed unmapped reads").inc();
}
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:18,代码来源:KeyReadsFn.java
示例10: expand
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@Override
public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>> userScores) {
// Get the sum of scores for each user.
PCollection<KV<String, Integer>> sumScores = userScores
.apply("UserSum", Sum.<String>integersPerKey());
// Extract the score from each element, and use it to find the global mean.
final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
.apply(Mean.<Integer>globally().asSingletonView());
// Filter the user sums using the global mean.
PCollection<KV<String, Integer>> filtered = sumScores
.apply("ProcessAndFilter", ParDo
// use the derived mean total score as a side input
.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
private final Counter numSpammerUsers = Metrics.counter("main", "SpammerUsers");
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
Double gmc = c.sideInput(globalMeanScore);
if (score > (gmc * SCORE_WEIGHT)) {
LOG.info("user " + c.element().getKey() + " spammer score " + score
+ " with mean " + gmc);
numSpammerUsers.inc();
c.output(c.element());
}
}
}).withSideInputs(globalMeanScore));
return filtered;
}
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:GameStats.java
示例11: devNull
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
/**
* Return a transform to count and discard each element.
*/
public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
return ParDo.of(new DoFn<T, Void>() {
final Counter discardedCounterMetric = Metrics.counter(name, "discarded");
@ProcessElement
public void processElement(ProcessContext c) {
discardedCounterMetric.inc();
}
});
}
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:NexmarkUtils.java
示例12: format
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
/**
* Return a transform to format each element as a string.
*/
public static <T> ParDo.SingleOutput<T, String> format(final String name) {
return ParDo.of(new DoFn<T, String>() {
final Counter recordCounterMetric = Metrics.counter(name, "records");
@ProcessElement
public void processElement(ProcessContext c) {
recordCounterMetric.inc();
c.output(c.element().toString());
}
});
}
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:NexmarkUtils.java
示例13: JoinDoFn
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
private JoinDoFn(String name, int maxAuctionsWaitingTime) {
this.name = name;
this.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
newAuctionCounter = Metrics.counter(name, "newAuction");
newPersonCounter = Metrics.counter(name, "newPerson");
newNewOutputCounter = Metrics.counter(name, "newNewOutput");
newOldOutputCounter = Metrics.counter(name, "newOldOutput");
oldNewOutputCounter = Metrics.counter(name, "oldNewOutput");
fatalCounter = Metrics.counter(name , "fatal");
}
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:Query3.java
示例14: NexmarkQuery
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
super(name);
this.configuration = configuration;
if (configuration.debug) {
eventMonitor = new Monitor<>(name + ".Events", "event");
resultMonitor = new Monitor<>(name + ".Results", "result");
endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
fatalCounter = Metrics.counter(name , "fatal");
} else {
eventMonitor = null;
resultMonitor = null;
endOfStreamMonitor = null;
fatalCounter = null;
}
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:NexmarkQuery.java
示例15: testRunPTransform
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testRunPTransform() {
final String namespace = PipelineRunnerTest.class.getName();
final Counter counter = Metrics.counter(namespace, "count");
final PipelineResult result = PipelineRunner.create().run(
new PTransform<PBegin, POutput>() {
@Override
public POutput expand(PBegin input) {
PCollection<Double> output = input
.apply(Create.<Integer>of(1, 2, 3, 4))
.apply("ScaleByTwo", MapElements.via(new ScaleFn<Integer>(2.0, counter)));
PAssert.that(output).containsInAnyOrder(2.0, 4.0, 6.0, 8.0);
return output;
}
}
);
// Checking counters to verify the pipeline actually ran.
assertThat(
result.metrics().queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace(namespace))
.build()
).counters(),
hasItem(metricsResult(namespace, "count", "ScaleByTwo", 4L, true))
);
}
开发者ID:apache,项目名称:beam,代码行数:29,代码来源:PipelineRunnerTest.java
示例16: ReduceFnRunner
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
public ReduceFnRunner(
K key,
WindowingStrategy<?, W> windowingStrategy,
ExecutableTriggerStateMachine triggerStateMachine,
StateInternals stateInternals,
TimerInternals timerInternals,
OutputWindowedValue<KV<K, OutputT>> outputter,
SideInputReader sideInputReader,
ReduceFn<K, InputT, OutputT, W> reduceFn,
PipelineOptions options) {
this.key = key;
this.timerInternals = timerInternals;
this.paneInfoTracker = new PaneInfoTracker(timerInternals);
this.stateInternals = stateInternals;
this.outputter = outputter;
this.reduceFn = reduceFn;
this.droppedDueToClosedWindow = Metrics.counter(ReduceFnRunner.class,
DROPPED_DUE_TO_CLOSED_WINDOW);
@SuppressWarnings("unchecked")
WindowingStrategy<Object, W> objectWindowingStrategy =
(WindowingStrategy<Object, W>) windowingStrategy;
this.windowingStrategy = objectWindowingStrategy;
this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
// Note this may incur I/O to load persisted window set data.
this.activeWindows = createActiveWindowSet();
this.contextFactory =
new ReduceFnContextFactory<>(key, reduceFn, this.windowingStrategy,
stateInternals, this.activeWindows, timerInternals, sideInputReader, options);
this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
this.triggerRunner =
new TriggerStateMachineRunner<>(
triggerStateMachine,
new TriggerStateMachineContextFactory<>(
windowingStrategy.getWindowFn(), stateInternals, activeWindows));
}
开发者ID:apache,项目名称:beam,代码行数:41,代码来源:ReduceFnRunner.java
示例17: LateDataFilter
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
public LateDataFilter(
WindowingStrategy<?, ?> windowingStrategy,
TimerInternals timerInternals) {
this.windowingStrategy = windowingStrategy;
this.timerInternals = timerInternals;
this.droppedDueToLateness = Metrics.counter(LateDataDroppingDoFnRunner.class,
DROPPED_DUE_TO_LATENESS);
}
开发者ID:apache,项目名称:beam,代码行数:9,代码来源:LateDataDroppingDoFnRunner.java
示例18: GroupAlsoByWindowEvaluator
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
public GroupAlsoByWindowEvaluator(
final EvaluationContext evaluationContext,
CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
final AppliedPTransform<
PCollection<KeyedWorkItem<K, V>>,
PCollection<KV<K, Iterable<V>>>,
DirectGroupAlsoByWindow<K, V>> application) {
this.evaluationContext = evaluationContext;
this.application = application;
structuralKey = inputBundle.getKey();
stepContext = evaluationContext
.getExecutionContext(application, inputBundle.getKey())
.getStepContext(
evaluationContext.getStepName(application));
windowingStrategy =
(WindowingStrategy<?, BoundedWindow>)
application.getTransform().getInputWindowingStrategy();
outputBundles = new ArrayList<>();
unprocessedElements = ImmutableList.builder();
Coder<V> valueCoder =
application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
reduceFn = SystemReduceFn.buffering(valueCoder);
droppedDueToClosedWindow = Metrics.counter(GroupAlsoByWindowEvaluator.class,
GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class,
GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER);
}
开发者ID:apache,项目名称:beam,代码行数:31,代码来源:GroupAlsoByWindowEvaluatorFactory.java
示例19: processElement
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@ProcessElement
public void processElement(ProcessContext c) throws IOException, GeneralSecurityException {
Metrics.counter(RetrieveReads.class, "Initialized Shard Count").inc();
Stopwatch stopWatch = Stopwatch.createStarted();
Iterator<StreamReadsResponse> iter = ReadStreamIterator.enforceShardBoundary(auth, c.element(), shardBoundary, fields);
while (iter.hasNext()) {
StreamReadsResponse readResponse = iter.next();
c.output(readResponse.getAlignmentsList());
}
stopWatch.stop();
Metrics.distribution(RetrieveReads.class, "Shard Processing Time (sec)")
.update(stopWatch.elapsed(TimeUnit.SECONDS));
Metrics.counter(RetrieveReads.class, "Finished Shard Count").inc();
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:15,代码来源:ReadStreamer.java
示例20: processElement
import org.apache.beam.sdk.metrics.Metrics; //导入依赖的package包/类
@ProcessElement
public void processElement(ProcessContext c) throws java.lang.Exception {
final Reader reader = new Reader(storage, options, c.element(), c);
reader.process();
Metrics.counter(ReadBAMTransform.class, "Processed records").inc(reader.recordsProcessed);
Metrics.counter(ReadBAMTransform.class, "Reads generated").inc(reader.readsGenerated);
Metrics.counter(ReadBAMTransform.class, "Skipped start").inc(reader.recordsBeforeStart);
Metrics.counter(ReadBAMTransform.class, "Skipped end").inc(reader.recordsAfterEnd);
Metrics.counter(ReadBAMTransform.class, "Ref mismatch").inc(reader.mismatchedSequence);
}
开发者ID:googlegenomics,项目名称:dataflow-java,代码行数:12,代码来源:ReadBAMTransform.java
注:本文中的org.apache.beam.sdk.metrics.Metrics类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论