本文整理汇总了Java中org.apache.flink.runtime.state.FunctionSnapshotContext类的典型用法代码示例。如果您正苦于以下问题:Java FunctionSnapshotContext类的具体用法?Java FunctionSnapshotContext怎么用?Java FunctionSnapshotContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
FunctionSnapshotContext类属于org.apache.flink.runtime.state包,在下文中一共展示了FunctionSnapshotContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
super.snapshotState(context);
nextTransactionalIdHintState.clear();
// To avoid duplication only first subtask keeps track of next transactional id hint. Otherwise all of the
// subtasks would write exactly same information.
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) {
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
// scaling up.
if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
}
nextTransactionalIdHintState.add(new NextTransactionalIdHint(
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:FlinkKafkaProducer011.java
示例2: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
// check for asynchronous errors and fail the checkpoint if necessary
checkErroneous();
if (flushOnCheckpoint) {
// flushing is activated: We need to wait until pendingRecords is 0
flush();
synchronized (pendingRecordsLock) {
if (pendingRecords != 0) {
throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
}
// if the flushed requests has errors, we should propagate it also and fail the checkpoint
checkErroneous();
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:FlinkKafkaProducerBase.java
示例3: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates,
"The " + getClass().getSimpleName() + " has not been properly initialized.");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (isWriterOpen) {
bucketState.currentFile = currentPartPath.toString();
bucketState.currentFileValidLength = writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
restoredBucketStates.clear();
restoredBucketStates.add(bucketState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:RollingSink.java
示例4: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
counterPartitions.clear();
checkCorrectSnapshot[getRuntimeContext().getIndexOfThisSubtask()] = counter;
int div = counter / NUM_PARTITIONS;
int mod = counter % NUM_PARTITIONS;
for (int i = 0; i < NUM_PARTITIONS; ++i) {
int partitionValue = div;
if (mod > 0) {
--mod;
++partitionValue;
}
counterPartitions.add(partitionValue);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:RescalingITCase.java
示例5: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
state.clear();
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:TwoPhaseCommitSinkFunction.java
示例6: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkState(this.checkpointedState != null,
"The " + getClass().getSimpleName() + " has not been properly initialized.");
if (LOG.isDebugEnabled()) {
LOG.debug("{} checkpointing: Messages: {}, checkpoint id: {}, timestamp: {}",
idsForCurrentCheckpoint, context.getCheckpointId(), context.getCheckpointTimestamp());
}
pendingCheckpoints.addLast(new Tuple2<>(context.getCheckpointId(), idsForCurrentCheckpoint));
idsForCurrentCheckpoint = new HashSet<>(64);
this.checkpointedState.clear();
this.checkpointedState.add(SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer));
}
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:MessageAcknowledgingSourceBase.java
示例7: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates,
"The " + getClass().getSimpleName() + " has not been properly initialized.");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (isWriterOpen) {
bucketState.currentFile = currentPartPath.toString();
bucketState.currentFileValidLength = writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
restoredBucketStates.clear();
restoredBucketStates.add(bucketState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:RollingSink.java
示例8: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
do {
try {
flushDocumentBuffer();
} catch (IOException e) {
//if the request fails, that's fine, just retry on the next iteration
}
} while (! documentBuffer.isEmpty());
}
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:11,代码来源:ElasticsearchJestSink.java
示例9: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
restoredBucketStates.clear();
synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();
if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}
}
开发者ID:breakEval13,项目名称:rocketmq-flink-plugin,代码行数:29,代码来源:TODBucketingSink.java
示例10: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkErrorAndRethrow();
if (flushOnCheckpoint) {
do {
bulkProcessor.flush();
checkErrorAndRethrow();
} while (numPendingRequests.get() != 0);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:ElasticsearchSinkBase.java
示例11: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
isFlushed = false;
super.snapshotState(ctx);
// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
if (flushOnCheckpoint && !isFlushed) {
throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:FlinkKafkaProducerBaseTest.java
示例12: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// check for asynchronous errors and fail the checkpoint if necessary
checkAndPropagateAsyncError();
flushSync();
if (producer.getOutstandingRecordsCount() > 0) {
throw new IllegalStateException(
"Number of outstanding records must be zero at this point: " + producer.getOutstandingRecordsCount());
}
// if the flushed requests has errors, we should propagate it also and fail the checkpoint
checkAndPropagateAsyncError();
}
开发者ID:axbaretto,项目名称:flink,代码行数:15,代码来源:FlinkKinesisProducer.java
示例13: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
isFlushed = false;
super.snapshotState(context);
// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
if (!isFlushed) {
throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:FlinkKinesisProducerTest.java
示例14: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
restoredBucketStates.clear();
synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();
if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:29,代码来源:BucketingSink.java
示例15: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkState(this.checkpointedState != null,
"The " + getClass().getSimpleName() + " state has not been properly initialized.");
this.checkpointedState.clear();
for (Long v : this.valuesToEmit) {
this.checkpointedState.add(v);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:11,代码来源:StatefulSequenceSource.java
示例16: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkState(this.checkpointedState != null,
"The " + getClass().getSimpleName() + " has not been properly initialized.");
this.checkpointedState.clear();
this.checkpointedState.add(this.numElementsEmitted);
}
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:FromElementsFunction.java
示例17: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkState(this.checkpointedState != null,
"The " + getClass().getSimpleName() + " state has not been properly initialized.");
this.checkpointedState.clear();
this.checkpointedState.add(this.globalModificationTime);
if (LOG.isDebugEnabled()) {
LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:ContinuousFileMonitoringFunction.java
示例18: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
if (flushOnCheckpoint) {
// flushing is activated: We need to wait until pendingRecords is 0
flush();
synchronized (pendingRecordsLock) {
if (pendingRecords != 0) {
throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
}
// pending records count is 0. We can now confirm the checkpoint
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:FlinkKafkaProducerBase.java
示例19: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
// call the actual snapshot state
super.snapshotState(ctx);
// notify test that snapshotting has been done
snapshottingFinished.set(true);
}
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:AtLeastOnceProducerTest.java
示例20: snapshotState
import org.apache.flink.runtime.state.FunctionSnapshotContext; //导入依赖的package包/类
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if(logProducer != null) {
logProducer.flush();
Thread.sleep(logProducer.getProducerConfig().packageTimeoutInMS);
}
}
开发者ID:aliyun,项目名称:aliyun-log-flink-connector,代码行数:7,代码来源:FlinkLogProducer.java
注:本文中的org.apache.flink.runtime.state.FunctionSnapshotContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论