本文整理汇总了Java中org.apache.flink.runtime.state.filesystem.FsStateBackend类的典型用法代码示例。如果您正苦于以下问题:Java FsStateBackend类的具体用法?Java FsStateBackend怎么用?Java FsStateBackend使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
FsStateBackend类属于org.apache.flink.runtime.state.filesystem包,在下文中一共展示了FsStateBackend类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: initStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Before
public void initStateBackend() throws IOException {
switch (stateBackendEnum) {
case MEM:
this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
break;
case FILE: {
String backups = tempFolder.newFolder().getAbsolutePath();
this.stateBackend = new FsStateBackend("file://" + backups);
break;
}
case ROCKSDB_FULLY_ASYNC: {
String rocksDb = tempFolder.newFolder().getAbsolutePath();
RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
rdb.setDbStoragePath(rocksDb);
this.stateBackend = rdb;
break;
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:EventTimeWindowCheckpointingITCase.java
示例2: main
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// parse arguments
ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);
// create streaming environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// enable fault-tolerance
env.enableCheckpointing(1000);
// enable restarts
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L));
env.setStateBackend(new FsStateBackend("file:///home/robert/flink-workdir/flink-streaming-etl/state-backend"));
// run each operator separately
env.disableOperatorChaining();
// get data from Kafka
Properties kParams = params.getProperties();
kParams.setProperty("group.id", UUID.randomUUID().toString());
DataStream<ObjectNode> inputStream = env.addSource(new FlinkKafkaConsumer09<>(params.getRequired("topic"), new JSONDeserializationSchema(), kParams)).name("Kafka 0.9 Source")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.minutes(1L)) {
@Override
public long extractTimestamp(ObjectNode jsonNodes) {
return jsonNodes.get("timestamp_ms").asLong();
}
}).name("Timestamp extractor");
// filter out records without lang field
DataStream<ObjectNode> tweetsWithLang = inputStream.filter(jsonNode -> jsonNode.has("user") && jsonNode.get("user").has("lang")).name("Filter records without 'lang' field");
// select only lang = "en" tweets
DataStream<ObjectNode> englishTweets = tweetsWithLang.filter(jsonNode -> jsonNode.get("user").get("lang").asText().equals("en")).name("Select 'lang'=en tweets");
// write to file system
RollingSink<ObjectNode> rollingSink = new RollingSink<>(params.get("sinkPath", "/home/robert/flink-workdir/flink-streaming-etl/rolling-sink"));
rollingSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm")); // do a bucket for each minute
englishTweets.addSink(rollingSink).name("Rolling FileSystem Sink");
// build aggregates (count per language) using window (10 seconds tumbling):
DataStream<Tuple3<Long, String, Long>> languageCounts = tweetsWithLang.keyBy(jsonNode -> jsonNode.get("user").get("lang").asText())
.timeWindow(Time.seconds(10))
.apply(new Tuple3<>(0L, "", 0L), new JsonFoldCounter(), new CountEmitter()).name("Count per Langauage (10 seconds tumbling)");
// write window aggregate to ElasticSearch
List<InetSocketAddress> transportNodes = ImmutableList.of(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
ElasticsearchSink<Tuple3<Long, String, Long>> elasticsearchSink = new ElasticsearchSink<>(params.toMap(), transportNodes, new ESRequest());
languageCounts.addSink(elasticsearchSink).name("ElasticSearch2 Sink");
// word-count on the tweet stream
DataStream<Tuple2<Date, List<Tuple2<String, Long>>>> topWordCount = tweetsWithLang
// get text from tweets
.map(tweet -> tweet.get("text").asText()).name("Get text from Tweets")
// split text into (word, 1) tuples
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
String[] splits = s.split(" ");
for (String sp : splits) {
collector.collect(new Tuple2<>(sp, 1L));
}
}
}).name("Tokenize words")
// group by word
.keyBy(0)
// build 1 min windows, compute every 10 seconds --> count word frequency
.timeWindow(Time.minutes(1L), Time.seconds(10L)).apply(new WordCountingWindow()).name("Count word frequency (1 min, 10 sec sliding window)")
// build top n every 10 seconds
.timeWindowAll(Time.seconds(10L)).apply(new TopNWords(10)).name("TopN Window (10s)");
// write top Ns to Kafka topic
topWordCount.addSink(new FlinkKafkaProducer09<>(params.getRequired("wc-topic"), new ListSerSchema(), params.getProperties())).name("Write topN to Kafka");
env.execute("Streaming ETL");
}
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:82,代码来源:StreamingETL.java
示例3: main
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.enableWriteAheadLog()
.setClusterBuilder(new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
env.execute();
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:CassandraTupleWriteAheadSinkExample.java
示例4: parameters
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Parameterized.Parameters
public static Collection<AbstractStateBackend> parameters() throws IOException {
TemporaryFolder tempFolder = new TemporaryFolder();
tempFolder.create();
MemoryStateBackend syncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
MemoryStateBackend asyncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
FsStateBackend syncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), false);
FsStateBackend asyncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), true);
RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
fullRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
incRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
return Arrays.asList(
syncMemBackend,
asyncMemBackend,
syncFsBackend,
asyncFsBackend,
fullRocksDbBackend,
incRocksDbBackend);
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:PartitionedStateCheckpointingITCase.java
示例5: verifyExternalizedCheckpointRestore
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
private void verifyExternalizedCheckpointRestore(
CompletedCheckpoint checkpoint,
Map<JobVertexID, ExecutionJobVertex> jobVertices,
ExecutionVertex... vertices) throws IOException {
String pointer = checkpoint.getExternalPointer();
StreamStateHandle metadataHandle = new FsStateBackend(tmp.getRoot().toURI()).resolveCheckpoint(pointer);
CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(
checkpoint.getJobId(),
jobVertices,
checkpoint.getExternalPointer(),
metadataHandle,
Thread.currentThread().getContextClassLoader(),
false);
for (ExecutionVertex vertex : vertices) {
for (OperatorID operatorID : vertex.getJobVertex().getOperatorIDs()) {
assertEquals(checkpoint.getOperatorStates().get(operatorID), loaded.getOperatorStates().get(operatorID));
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:CheckpointExternalResumeTest.java
示例6: main
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.enableWriteAheadLog()
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
env.execute();
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:CassandraTupleWriteAheadSinkExample.java
示例7: main
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.getRequired("input");
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// set up checkpointing
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)));
DataStream<TaxiRide> rides = env.addSource(new CheckpointedTaxiRideSource(input, servingSpeedFactor));
DataStream<TaxiRide> longRides = rides
.filter(new RideCleansing.NYCFilter())
.keyBy("rideId")
.process(new MatchFunction());
longRides.print();
env.execute("Long Taxi Rides (checkpointed)");
}
开发者ID:dataArtisans,项目名称:flink-training-exercises,代码行数:27,代码来源:CheckpointedLongRides.java
示例8: main
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(2);
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///Users/zhouzhou/Binary/flink-1.3.2/testcheckpoints/"));
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
Properties configProps = new Properties();
configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
configProps.put(ConfigConstants.LOG_PROJECT, sProject);
configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "23_ots_sla_etl_product");
DataStream<RawLogGroupList> logTestStream = env.addSource(
new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps)
);
logTestStream.writeAsText("/Users/zhouzhou/Binary/flink-1.3.2/data/newb.txt." + System.nanoTime());
env.execute("flink log connector");
}
开发者ID:aliyun,项目名称:aliyun-log-flink-connector,代码行数:29,代码来源:ConsumerSample.java
示例9: testExternalizedFSCheckpointsStandalone
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testExternalizedFSCheckpointsStandalone() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
null,
new FsStateBackend(checkpointDir.toURI().toString(), true));
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:ExternalizedCheckpointITCase.java
示例10: testExternalizedFSCheckpointsZookeeper
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testExternalizedFSCheckpointsZookeeper() throws Exception {
TestingServer zkServer = new TestingServer();
zkServer.start();
try {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
zkServer.getConnectString(),
new FsStateBackend(checkpointDir.toURI().toString(), true));
} finally {
zkServer.stop();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:15,代码来源:ExternalizedCheckpointITCase.java
示例11: testCheckpointedStreamingProgramIncrementalRocksDB
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testCheckpointedStreamingProgramIncrementalRocksDB() throws Exception {
testCheckpointedStreamingProgram(
new RocksDBStateBackend(
new FsStateBackend(FileStateBackendBasePath.getAbsoluteFile().toURI(), 16),
true));
}
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:JobManagerHACheckpointRecoveryITCase.java
示例12: getStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Override
protected RocksDBStateBackend getStateBackend() throws IOException {
dbPath = tempFolder.newFolder().getAbsolutePath();
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), enableIncrementalCheckpointing);
backend.setDbStoragePath(dbPath);
return backend;
}
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:RocksDBStateBackendTest.java
示例13: testRocksDbReconfigurationCopiesExistingValues
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {
final FsStateBackend checkpointBackend = new FsStateBackend(tempFolder.newFolder().toURI().toString());
final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
final RocksDBStateBackend original = new RocksDBStateBackend(checkpointBackend, incremental);
// these must not be the default options
final PredefinedOptions predOptions = PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
assertNotEquals(predOptions, original.getPredefinedOptions());
original.setPredefinedOptions(predOptions);
final OptionsFactory optionsFactory = mock(OptionsFactory.class);
original.setOptions(optionsFactory);
final String[] localDirs = new String[] {
tempFolder.newFolder().getAbsolutePath(), tempFolder.newFolder().getAbsolutePath() };
original.setDbStoragePaths(localDirs);
RocksDBStateBackend copy = original.configure(new Configuration());
assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled());
assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths());
assertEquals(original.getOptions(), copy.getOptions());
assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions());
FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();
assertEquals(checkpointBackend.getCheckpointPath(), copyCheckpointBackend.getCheckpointPath());
assertEquals(checkpointBackend.getSavepointPath(), copyCheckpointBackend.getSavepointPath());
}
开发者ID:axbaretto,项目名称:flink,代码行数:31,代码来源:RocksDBStateBackendConfigTest.java
示例14: testLoadFileSystemStateBackendMixed
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
/**
* Validates taking the application-defined file system state backend and adding with additional
* parameters from the cluster configuration, but giving precedence to application-defined
* parameters over configuration-defined parameters.
*/
@Test
public void testLoadFileSystemStateBackendMixed() throws Exception {
final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
final Path expectedCheckpointsPath = new Path(new URI(appCheckpointDir));
final Path expectedSavepointsPath = new Path(savepointDir);
final int threshold = 1000000;
final FsStateBackend backend = new FsStateBackend(new URI(appCheckpointDir), threshold);
final Configuration config = new Configuration();
config.setString(backendKey, "jobmanager"); // this should not be picked up
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 20); // this should not be picked up
final StateBackend loadedBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
assertTrue(loadedBackend instanceof FsStateBackend);
final FsStateBackend fs = (FsStateBackend) loadedBackend;
assertEquals(expectedCheckpointsPath, fs.getCheckpointPath());
assertEquals(expectedSavepointsPath, fs.getSavepointPath());
assertEquals(threshold, fs.getMinFileSizeThreshold());
}
开发者ID:axbaretto,项目名称:flink,代码行数:34,代码来源:StateBackendLoadingTest.java
示例15: getFlinkJob
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
private JobGraph getFlinkJob(
final int sourceParallelism,
final String streamName,
final int numElements) throws IOException {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(sourceParallelism);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
// to make the test faster, we use a combination of fast triggering of checkpoints,
// but some pauses after completed checkpoints
env.getCheckpointConfig().setCheckpointInterval(100);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
// we currently need this to work around the case where tasks are
// started too late, a checkpoint was already triggered, and some tasks
// never see the checkpoint event
env.getCheckpointConfig().setCheckpointTimeout(5000);
// checkpoint to files (but aggregate state below 1 MB) and don't to any async checkpoints
env.setStateBackend(new FsStateBackend(tmpFolder.newFolder().toURI(), 1024 * 1024, false));
// the Pravega reader
final FlinkPravegaReader<Integer> pravegaSource = new FlinkPravegaReader<>(
SETUP_UTILS.getControllerUri(),
SETUP_UTILS.getScope(),
Collections.singleton(streamName),
0,
new IntDeserializer(),
"my_reader_name");
env
.addSource(pravegaSource)
// hook in the notifying mapper
.map(new NotifyingMapper<>())
.setParallelism(1)
// the sink validates that the exactly-once semantics hold
// it must be non-parallel so that it sees all elements and can trivially
// check for duplicates
.addSink(new IntSequenceExactlyOnceValidator(numElements))
.setParallelism(1);
return env.getStreamGraph().getJobGraph();
}
开发者ID:pravega,项目名称:flink-connectors,代码行数:47,代码来源:FlinkPravegaReaderSavepointTest.java
示例16: setStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public StreamExecutionEnvBuilder setStateBackend(String path) throws IOException {
env.setStateBackend(new FsStateBackend(path, true));
return this;
}
开发者ID:ehabqadah,项目名称:in-situ-processing-datAcron,代码行数:6,代码来源:StreamExecutionEnvBuilder.java
示例17: createStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Override
protected AbstractStateBackend createStateBackend() throws Exception {
return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
}
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:NonHAQueryableStateITCaseFsBackend.java
示例18: testValueStateDefaultValueFsBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
/**
* Tests simple value state queryable state instance with a default value
* set using the {@link FsStateBackend}.
*/
@Test(expected = UnknownKeyOrNamespace.class)
public void testValueStateDefaultValueFsBackend() throws Exception {
testValueStateDefault(new FsStateBackend(
temporaryFolder.newFolder().toURI().toString()));
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:QueryableStateITCase.java
示例19: testWithFsBackendSync
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testWithFsBackendSync() throws Exception {
FsStateBackend syncFsBackend = new FsStateBackend(tmpFolder.newFolder().toURI().toString(), false);
testProgramWithBackend(syncFsBackend);
}
开发者ID:axbaretto,项目名称:flink,代码行数:6,代码来源:KeyedStateCheckpointingITCase.java
示例20: testWithFsBackendAsync
import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testWithFsBackendAsync() throws Exception {
FsStateBackend asyncFsBackend = new FsStateBackend(tmpFolder.newFolder().toURI().toString(), true);
testProgramWithBackend(asyncFsBackend);
}
开发者ID:axbaretto,项目名称:flink,代码行数:6,代码来源:KeyedStateCheckpointingITCase.java
注:本文中的org.apache.flink.runtime.state.filesystem.FsStateBackend类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论