• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java FsStateBackend类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.runtime.state.filesystem.FsStateBackend的典型用法代码示例。如果您正苦于以下问题:Java FsStateBackend类的具体用法?Java FsStateBackend怎么用?Java FsStateBackend使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FsStateBackend类属于org.apache.flink.runtime.state.filesystem包,在下文中一共展示了FsStateBackend类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: initStateBackend

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Before
public void initStateBackend() throws IOException {
	switch (stateBackendEnum) {
		case MEM:
			this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
			break;
		case FILE: {
			String backups = tempFolder.newFolder().getAbsolutePath();
			this.stateBackend = new FsStateBackend("file://" + backups);
			break;
		}
		case ROCKSDB_FULLY_ASYNC: {
			String rocksDb = tempFolder.newFolder().getAbsolutePath();
			RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
			rdb.setDbStoragePath(rocksDb);
			this.stateBackend = rdb;
			break;
		}

	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:EventTimeWindowCheckpointingITCase.java


示例2: main

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	// parse arguments
	ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);

	// create streaming environment
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	// enable event time processing
	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

	// enable fault-tolerance
	env.enableCheckpointing(1000);

	// enable restarts
	env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L));

	env.setStateBackend(new FsStateBackend("file:///home/robert/flink-workdir/flink-streaming-etl/state-backend"));

	// run each operator separately
	env.disableOperatorChaining();

	// get data from Kafka
	Properties kParams = params.getProperties();
	kParams.setProperty("group.id", UUID.randomUUID().toString());
	DataStream<ObjectNode> inputStream = env.addSource(new FlinkKafkaConsumer09<>(params.getRequired("topic"), new JSONDeserializationSchema(), kParams)).name("Kafka 0.9 Source")
		.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.minutes(1L)) {
			@Override
			public long extractTimestamp(ObjectNode jsonNodes) {
				return jsonNodes.get("timestamp_ms").asLong();
			}
		}).name("Timestamp extractor");

	// filter out records without lang field
	DataStream<ObjectNode> tweetsWithLang = inputStream.filter(jsonNode -> jsonNode.has("user") && jsonNode.get("user").has("lang")).name("Filter records without 'lang' field");

	// select only lang = "en" tweets
	DataStream<ObjectNode> englishTweets = tweetsWithLang.filter(jsonNode -> jsonNode.get("user").get("lang").asText().equals("en")).name("Select 'lang'=en tweets");

	// write to file system
	RollingSink<ObjectNode> rollingSink = new RollingSink<>(params.get("sinkPath", "/home/robert/flink-workdir/flink-streaming-etl/rolling-sink"));
	rollingSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm")); // do a bucket for each minute
	englishTweets.addSink(rollingSink).name("Rolling FileSystem Sink");

	// build aggregates (count per language) using window (10 seconds tumbling):
	DataStream<Tuple3<Long, String, Long>> languageCounts = tweetsWithLang.keyBy(jsonNode -> jsonNode.get("user").get("lang").asText())
		.timeWindow(Time.seconds(10))
		.apply(new Tuple3<>(0L, "", 0L), new JsonFoldCounter(), new CountEmitter()).name("Count per Langauage (10 seconds tumbling)");

	// write window aggregate to ElasticSearch
	List<InetSocketAddress> transportNodes = ImmutableList.of(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
	ElasticsearchSink<Tuple3<Long, String, Long>> elasticsearchSink = new ElasticsearchSink<>(params.toMap(), transportNodes, new ESRequest());

	languageCounts.addSink(elasticsearchSink).name("ElasticSearch2 Sink");

	// word-count on the tweet stream
	DataStream<Tuple2<Date, List<Tuple2<String, Long>>>> topWordCount = tweetsWithLang
		// get text from tweets
		.map(tweet -> tweet.get("text").asText()).name("Get text from Tweets")
		// split text into (word, 1) tuples
		.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
			@Override
			public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
				String[] splits = s.split(" ");
				for (String sp : splits) {
					collector.collect(new Tuple2<>(sp, 1L));
				}
			}
		}).name("Tokenize words")
		// group by word
		.keyBy(0)
		// build 1 min windows, compute every 10 seconds --> count word frequency
		.timeWindow(Time.minutes(1L), Time.seconds(10L)).apply(new WordCountingWindow()).name("Count word frequency (1 min, 10 sec sliding window)")
		// build top n every 10 seconds
		.timeWindowAll(Time.seconds(10L)).apply(new TopNWords(10)).name("TopN Window (10s)");

	// write top Ns to Kafka topic
	topWordCount.addSink(new FlinkKafkaProducer09<>(params.getRequired("wc-topic"), new ListSerSchema(), params.getProperties())).name("Write topN to Kafka");

	env.execute("Streaming ETL");

}
 
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:82,代码来源:StreamingETL.java


示例3: main

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(1);
	env.enableCheckpointing(1000);
	env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
	env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));

	CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
		.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
		.enableWriteAheadLog()
		.setClusterBuilder(new ClusterBuilder() {

			private static final long serialVersionUID = 2793938419775311824L;

			@Override
			public Cluster buildCluster(Cluster.Builder builder) {
				return builder.addContactPoint("127.0.0.1").build();
			}
		})
		.build();

	sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");

	env.execute();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:CassandraTupleWriteAheadSinkExample.java


示例4: parameters

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Parameterized.Parameters
public static Collection<AbstractStateBackend> parameters() throws IOException {
	TemporaryFolder tempFolder = new TemporaryFolder();
	tempFolder.create();

	MemoryStateBackend syncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
	MemoryStateBackend asyncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);

	FsStateBackend syncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), false);
	FsStateBackend asyncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), true);

	RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
	fullRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());

	RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
	incRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());

	return Arrays.asList(
		syncMemBackend,
		asyncMemBackend,
		syncFsBackend,
		asyncFsBackend,
		fullRocksDbBackend,
		incRocksDbBackend);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:PartitionedStateCheckpointingITCase.java


示例5: verifyExternalizedCheckpointRestore

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
private void verifyExternalizedCheckpointRestore(
		CompletedCheckpoint checkpoint,
		Map<JobVertexID, ExecutionJobVertex> jobVertices,
		ExecutionVertex... vertices) throws IOException {

	String pointer = checkpoint.getExternalPointer();
	StreamStateHandle metadataHandle = new FsStateBackend(tmp.getRoot().toURI()).resolveCheckpoint(pointer);

	CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(
			checkpoint.getJobId(),
			jobVertices,
			checkpoint.getExternalPointer(),
			metadataHandle,
			Thread.currentThread().getContextClassLoader(),
			false);

	for (ExecutionVertex vertex : vertices) {
		for (OperatorID operatorID : vertex.getJobVertex().getOperatorIDs()) {
			assertEquals(checkpoint.getOperatorStates().get(operatorID), loaded.getOperatorStates().get(operatorID));
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:CheckpointExternalResumeTest.java


示例6: main

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(1);
	env.enableCheckpointing(1000);
	env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
	env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));

	CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
		.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
		.enableWriteAheadLog()
		.setClusterBuilder(new ClusterBuilder() {
			@Override
			public Cluster buildCluster(Cluster.Builder builder) {
				return builder.addContactPoint("127.0.0.1").build();
			}
		})
		.build();

	sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");

	env.execute();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:CassandraTupleWriteAheadSinkExample.java


示例7: main

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

		ParameterTool params = ParameterTool.fromArgs(args);
		final String input = params.getRequired("input");
		final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

		// set up checkpointing
		env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
		env.enableCheckpointing(1000);
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)));

		DataStream<TaxiRide> rides = env.addSource(new CheckpointedTaxiRideSource(input, servingSpeedFactor));

		DataStream<TaxiRide> longRides = rides
				.filter(new RideCleansing.NYCFilter())
				.keyBy("rideId")
				.process(new MatchFunction());

		longRides.print();

		env.execute("Long Taxi Rides (checkpointed)");
	}
 
开发者ID:dataArtisans,项目名称:flink-training-exercises,代码行数:27,代码来源:CheckpointedLongRides.java


示例8: main

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(2);
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setStateBackend(new FsStateBackend("file:///Users/zhouzhou/Binary/flink-1.3.2/testcheckpoints/"));
        RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
        Properties configProps = new Properties();
        configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
        configProps.put(ConfigConstants.LOG_PROJECT, sProject);
        configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
        configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
        configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
        configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "23_ots_sla_etl_product");
        DataStream<RawLogGroupList> logTestStream = env.addSource(
                new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps)
        );

        logTestStream.writeAsText("/Users/zhouzhou/Binary/flink-1.3.2/data/newb.txt." + System.nanoTime());
        env.execute("flink log connector");
    }
 
开发者ID:aliyun,项目名称:aliyun-log-flink-connector,代码行数:29,代码来源:ConsumerSample.java


示例9: testExternalizedFSCheckpointsStandalone

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testExternalizedFSCheckpointsStandalone() throws Exception {
	final File checkpointDir = temporaryFolder.newFolder();
	testExternalizedCheckpoints(
		checkpointDir,
		null,
		new FsStateBackend(checkpointDir.toURI().toString(), true));

}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:ExternalizedCheckpointITCase.java


示例10: testExternalizedFSCheckpointsZookeeper

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testExternalizedFSCheckpointsZookeeper() throws Exception {
	TestingServer zkServer = new TestingServer();
	zkServer.start();
	try {
		final File checkpointDir = temporaryFolder.newFolder();
		testExternalizedCheckpoints(
			checkpointDir,
			zkServer.getConnectString(),
			new FsStateBackend(checkpointDir.toURI().toString(), true));
	} finally {
		zkServer.stop();
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:15,代码来源:ExternalizedCheckpointITCase.java


示例11: testCheckpointedStreamingProgramIncrementalRocksDB

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testCheckpointedStreamingProgramIncrementalRocksDB() throws Exception {
	testCheckpointedStreamingProgram(
		new RocksDBStateBackend(
			new FsStateBackend(FileStateBackendBasePath.getAbsoluteFile().toURI(), 16),
			true));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:JobManagerHACheckpointRecoveryITCase.java


示例12: getStateBackend

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Override
protected RocksDBStateBackend getStateBackend() throws IOException {
	dbPath = tempFolder.newFolder().getAbsolutePath();
	String checkpointPath = tempFolder.newFolder().toURI().toString();
	RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), enableIncrementalCheckpointing);
	backend.setDbStoragePath(dbPath);
	return backend;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:RocksDBStateBackendTest.java


示例13: testRocksDbReconfigurationCopiesExistingValues

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {
	final FsStateBackend checkpointBackend = new FsStateBackend(tempFolder.newFolder().toURI().toString());
	final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();

	final RocksDBStateBackend original = new RocksDBStateBackend(checkpointBackend, incremental);

	// these must not be the default options
	final PredefinedOptions predOptions = PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
	assertNotEquals(predOptions, original.getPredefinedOptions());
	original.setPredefinedOptions(predOptions);

	final OptionsFactory optionsFactory = mock(OptionsFactory.class);
	original.setOptions(optionsFactory);

	final String[] localDirs = new String[] {
			tempFolder.newFolder().getAbsolutePath(), tempFolder.newFolder().getAbsolutePath() };
	original.setDbStoragePaths(localDirs);

	RocksDBStateBackend copy = original.configure(new Configuration());

	assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled());
	assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths());
	assertEquals(original.getOptions(), copy.getOptions());
	assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions());

	FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();
	assertEquals(checkpointBackend.getCheckpointPath(), copyCheckpointBackend.getCheckpointPath());
	assertEquals(checkpointBackend.getSavepointPath(), copyCheckpointBackend.getSavepointPath());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:31,代码来源:RocksDBStateBackendConfigTest.java


示例14: testLoadFileSystemStateBackendMixed

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
/**
 * Validates taking the application-defined file system state backend and adding with additional
 * parameters from the cluster configuration, but giving precedence to application-defined
 * parameters over configuration-defined parameters.
 */
@Test
public void testLoadFileSystemStateBackendMixed() throws Exception {
	final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString();
	final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
	final String savepointDir = new Path(tmp.newFolder().toURI()).toString();

	final Path expectedCheckpointsPath = new Path(new URI(appCheckpointDir));
	final Path expectedSavepointsPath = new Path(savepointDir);

	final int threshold = 1000000;

	final FsStateBackend backend = new FsStateBackend(new URI(appCheckpointDir), threshold);

	final Configuration config = new Configuration();
	config.setString(backendKey, "jobmanager"); // this should not be picked up 
	config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
	config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
	config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 20); // this should not be picked up

	final StateBackend loadedBackend =
			StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
	assertTrue(loadedBackend instanceof FsStateBackend);

	final FsStateBackend fs = (FsStateBackend) loadedBackend;
	assertEquals(expectedCheckpointsPath, fs.getCheckpointPath());
	assertEquals(expectedSavepointsPath, fs.getSavepointPath());
	assertEquals(threshold, fs.getMinFileSizeThreshold());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:34,代码来源:StateBackendLoadingTest.java


示例15: getFlinkJob

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
private JobGraph getFlinkJob(
        final int sourceParallelism,
        final String streamName,
        final int numElements) throws IOException {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(sourceParallelism);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));

    // to make the test faster, we use a combination of fast triggering of checkpoints,
    // but some pauses after completed checkpoints
    env.getCheckpointConfig().setCheckpointInterval(100);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);

    // we currently need this to work around the case where tasks are
    // started too late, a checkpoint was already triggered, and some tasks
    // never see the checkpoint event
    env.getCheckpointConfig().setCheckpointTimeout(5000);

    // checkpoint to files (but aggregate state below 1 MB) and don't to any async checkpoints
    env.setStateBackend(new FsStateBackend(tmpFolder.newFolder().toURI(), 1024 * 1024, false));

    // the Pravega reader
    final FlinkPravegaReader<Integer> pravegaSource = new FlinkPravegaReader<>(
            SETUP_UTILS.getControllerUri(),
            SETUP_UTILS.getScope(),
            Collections.singleton(streamName),
            0,
            new IntDeserializer(),
            "my_reader_name");

    env
            .addSource(pravegaSource)

            // hook in the notifying mapper
            .map(new NotifyingMapper<>())
            .setParallelism(1)

            // the sink validates that the exactly-once semantics hold
            // it must be non-parallel so that it sees all elements and can trivially
            // check for duplicates
            .addSink(new IntSequenceExactlyOnceValidator(numElements))
            .setParallelism(1);
    
    return env.getStreamGraph().getJobGraph();
}
 
开发者ID:pravega,项目名称:flink-connectors,代码行数:47,代码来源:FlinkPravegaReaderSavepointTest.java


示例16: setStateBackend

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
public StreamExecutionEnvBuilder setStateBackend(String path) throws IOException {

    env.setStateBackend(new FsStateBackend(path, true));
    return this;
  }
 
开发者ID:ehabqadah,项目名称:in-situ-processing-datAcron,代码行数:6,代码来源:StreamExecutionEnvBuilder.java


示例17: createStateBackend

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Override
protected AbstractStateBackend createStateBackend() throws Exception {
	return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:NonHAQueryableStateITCaseFsBackend.java


示例18: testValueStateDefaultValueFsBackend

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
/**
 * Tests simple value state queryable state instance with a default value
 * set using the {@link FsStateBackend}.
 */
@Test(expected = UnknownKeyOrNamespace.class)
public void testValueStateDefaultValueFsBackend() throws Exception {
	testValueStateDefault(new FsStateBackend(
		temporaryFolder.newFolder().toURI().toString()));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:QueryableStateITCase.java


示例19: testWithFsBackendSync

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testWithFsBackendSync() throws Exception {
	FsStateBackend syncFsBackend = new FsStateBackend(tmpFolder.newFolder().toURI().toString(), false);
	testProgramWithBackend(syncFsBackend);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:6,代码来源:KeyedStateCheckpointingITCase.java


示例20: testWithFsBackendAsync

import org.apache.flink.runtime.state.filesystem.FsStateBackend; //导入依赖的package包/类
@Test
public void testWithFsBackendAsync() throws Exception {
	FsStateBackend asyncFsBackend = new FsStateBackend(tmpFolder.newFolder().toURI().toString(), true);
	testProgramWithBackend(asyncFsBackend);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:6,代码来源:KeyedStateCheckpointingITCase.java



注:本文中的org.apache.flink.runtime.state.filesystem.FsStateBackend类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java SoundBuffer类代码示例发布时间:2022-05-22
下一篇:
Java LoadableComponent类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap