本文整理汇总了Java中org.apache.kafka.streams.state.ReadOnlyKeyValueStore类的典型用法代码示例。如果您正苦于以下问题:Java ReadOnlyKeyValueStore类的具体用法?Java ReadOnlyKeyValueStore怎么用?Java ReadOnlyKeyValueStore使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ReadOnlyKeyValueStore类属于org.apache.kafka.streams.state包,在下文中一共展示了ReadOnlyKeyValueStore类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: worker
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public ReadOnlyKeyValueStore<Long, byte[]> worker() {
Properties config = super.configBuilder()//
.put(StreamsConfig.APPLICATION_ID_CONFIG, MallConstants.ORDER_COMMITED_TOPIC)//
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap)//
.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass())//
.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass())//
.build();
StreamsBuilder builder = new StreamsBuilder();
KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config));
streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
// TODO Auto-generated method stub
log.error(e.getMessage());
});
streams.start();
return this.worker = // k-v query
streams.store(queryableStoreName, QueryableStoreTypes.<Long, byte[]>keyValueStore());
}
开发者ID:jiumao-org,项目名称:wechat-mall,代码行数:21,代码来源:OrderTable.java
示例2: start
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public void start() throws Exception {
execute(Config.ALL_KEY_VALUE_QUERY_ADDRESS_PREFIX, (query, keySerde, valueSerde) -> {
ReadOnlyKeyValueStore<Object, Object> kvStore = streams.store(query.getStoreName(), QueryableStoreTypes.keyValueStore());
MultiValuedKeyValueQueryResponse response;
try (KeyValueIterator<Object, Object> result = kvStore.all()) {
if (result.hasNext()) {
Map<String, String> results = new HashMap<>();
while (result.hasNext()) {
KeyValue<Object, Object> kvEntry = result.next();
results.put(base64Encode(keySerde, kvEntry.key), base64Encode(valueSerde, kvEntry.value));
}
return new MultiValuedKeyValueQueryResponse(results);
} else {
return new MultiValuedKeyValueQueryResponse(Collections.emptyMap());
}
}
});
}
开发者ID:ftrossbach,项目名称:kiqr,代码行数:24,代码来源:AllKeyValuesQueryVerticle.java
示例3: notFoundWithNoResult
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void notFoundWithNoResult(TestContext context){
KafkaStreams streamMock = mock(KafkaStreams.class);
ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
when(storeMock.range(any(), any())).thenReturn(iterator);
rule.vertx().deployVerticle(new RangeKeyValueQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{
RangeKeyValueQuery query = new RangeKeyValueQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), "key".getBytes(), "key".getBytes());
rule.vertx().eventBus().send(Config.RANGE_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{
context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
context.assertEquals(0, response.getResults().size());
context.assertTrue(iterator.closed);
}));
}));
}
开发者ID:ftrossbach,项目名称:kiqr,代码行数:27,代码来源:RangeKeyValuesQueryVerticleTest.java
示例4: getLocalMetrics
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
/**
* Query local state store to extract metrics
*
* @return local Metrics
*/
private Metrics getLocalMetrics() {
HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
String source = thisInstance.host() + ":" + thisInstance.port();
Metrics localMetrics = new Metrics();
ReadOnlyKeyValueStore<String, Double> averageStore = ks
.store(storeName,
QueryableStoreTypes.<String, Double>keyValueStore());
LOGGER.log(Level.INFO, "Entries in store {0}", averageStore.approximateNumEntries());
KeyValueIterator<String, Double> storeIterator = averageStore.all();
while (storeIterator.hasNext()) {
KeyValue<String, Double> kv = storeIterator.next();
localMetrics.add(source, kv.key, String.valueOf(kv.value));
}
LOGGER.log(Level.INFO, "Local store state {0}", localMetrics);
return localMetrics;
}
开发者ID:abhirockzz,项目名称:docker-kafka-streams,代码行数:28,代码来源:MetricsResource.java
示例5: get
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public V get(final K key) {
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
for (ReadOnlyKeyValueStore<K, V> store : stores) {
try {
final V result = store.get(key);
if (result != null) {
return result;
}
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
return null;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:CompositeReadOnlyKeyValueStore.java
示例6: verifyStateStore
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
private void verifyStateStore(final KafkaStreams streams, final Set<KeyValue<Long, Long>> expectedStoreContent) {
ReadOnlyKeyValueStore<Long, Long> store = null;
final long maxWaitingTime = System.currentTimeMillis() + 300000L;
while (System.currentTimeMillis() < maxWaitingTime) {
try {
store = streams.store(storeName, QueryableStoreTypes.<Long, Long>keyValueStore());
break;
} catch (final InvalidStateStoreException okJustRetry) {
try {
Thread.sleep(5000L);
} catch (final Exception ignore) { }
}
}
final KeyValueIterator<Long, Long> it = store.all();
while (it.hasNext()) {
assertTrue(expectedStoreContent.remove(it.next()));
}
assertTrue(expectedStoreContent.isEmpty());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:EosIntegrationTest.java
示例7: verifyCanGetByKey
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
private void verifyCanGetByKey(final String[] keys,
final Set<KeyValue<String, Long>> expectedWindowState,
final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyWindowStore<String, Long> windowStore,
final ReadOnlyKeyValueStore<String, Long> myCount)
throws InterruptedException {
final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
final long timeout = System.currentTimeMillis() + 30000;
while ((windowState.size() < keys.length ||
countState.size() < keys.length) &&
System.currentTimeMillis() < timeout) {
Thread.sleep(10);
for (final String key : keys) {
windowState.addAll(fetch(windowStore, key));
final Long value = myCount.get(key);
if (value != null) {
countState.add(new KeyValue<>(key, value));
}
}
}
assertThat(windowState, equalTo(expectedWindowState));
assertThat(countState, equalTo(expectedCount));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:QueryableStateIntegrationTest.java
示例8: start
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public void start() throws Exception {
execute(Config.RANGE_KEY_VALUE_QUERY_ADDRESS_PREFIX, (abstractQuery, keySerde, valueSerde) -> {
RangeKeyValueQuery query = (RangeKeyValueQuery) abstractQuery;
ReadOnlyKeyValueStore<Object, Object> kvStore = streams.store(query.getStoreName(), QueryableStoreTypes.keyValueStore());
MultiValuedKeyValueQueryResponse response;
try (KeyValueIterator<Object, Object> result = kvStore.range(deserializeObject(keySerde, query.getFrom()), deserializeObject(keySerde, query.getTo()))) {
if (result.hasNext()) {
Map<String, String> results = new HashMap<>();
while (result.hasNext()) {
KeyValue<Object, Object> kvEntry = result.next();
results.put(base64Encode(keySerde, kvEntry.key), base64Encode(valueSerde, kvEntry.value));
}
return new MultiValuedKeyValueQueryResponse(results);
} else {
return new MultiValuedKeyValueQueryResponse(Collections.emptyMap());
}
}
} );
}
开发者ID:ftrossbach,项目名称:kiqr,代码行数:25,代码来源:RangeKeyValueQueryVerticle.java
示例9: start
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public void start() throws Exception {
execute(Config.KEY_VALUE_QUERY_ADDRESS_PREFIX, (abstractQuery, keySerde, valueSerde) -> {
KeyBasedQuery query = (KeyBasedQuery) abstractQuery;
Object deserializedKey = deserializeObject(keySerde, query.getKey());
ReadOnlyKeyValueStore<Object, Object> kvStore = streams.store(query.getStoreName(), QueryableStoreTypes.keyValueStore());
Object result = kvStore.get(deserializedKey);
if (result != null) {
return new ScalarKeyValueQueryResponse(base64Encode(valueSerde, result));
} else {
throw new ScalarValueNotFoundException(String.format("Key %s not found in store %s", deserializedKey.toString(), abstractQuery.getStoreName()));
}
});
}
开发者ID:ftrossbach,项目名称:kiqr,代码行数:18,代码来源:ScalarKeyValueQueryVerticle.java
示例10: notFoundWithNoResult
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void notFoundWithNoResult(TestContext context){
KafkaStreams streamMock = mock(KafkaStreams.class);
ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
when(storeMock.all()).thenReturn(iterator);
rule.vertx().deployVerticle(new AllKeyValuesQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{
StoreWideQuery query = new StoreWideQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName());
rule.vertx().eventBus().send(Config.ALL_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{
context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
context.assertEquals(0, response.getResults().size());
context.assertTrue(iterator.closed);
}));
}));
}
开发者ID:ftrossbach,项目名称:kiqr,代码行数:27,代码来源:AllKeyValuesQueryVerticleTest.java
示例11: getTweets
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public List<String> getTweets(String filter) {
List<String> tweetsResult = new ArrayList<>();
final ReadOnlyKeyValueStore<Long, String> kvs = streams.store(
STORE_NAME,
QueryableStoreTypes.<Long, String>keyValueStore());
kvs.all().forEachRemaining(kv -> tweetsResult.add(kv.value));
return tweetsResult
.stream()
.filter(s -> s.toLowerCase().contains(filter.toLowerCase()))
.collect(toList());
}
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:13,代码来源:KafkaTweetRepository.java
示例12: CompositeReadOnlyKeyValueStore
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
public CompositeReadOnlyKeyValueStore(final StateStoreProvider storeProvider,
final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType,
final String storeName) {
this.storeProvider = storeProvider;
this.storeType = storeType;
this.storeName = storeName;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:CompositeReadOnlyKeyValueStore.java
示例13: range
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
try {
return store.range(from, to);
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
};
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:CompositeReadOnlyKeyValueStore.java
示例14: all
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public KeyValueIterator<K, V> all() {
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
try {
return store.all();
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
};
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:CompositeReadOnlyKeyValueStore.java
示例15: approximateNumEntries
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public long approximateNumEntries() {
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
long total = 0;
for (ReadOnlyKeyValueStore<K, V> store : stores) {
total += store.approximateNumEntries();
}
return total < 0 ? Long.MAX_VALUE : total;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:CompositeReadOnlyKeyValueStore.java
示例16: concurrentAccesses
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void concurrentAccesses() throws Exception {
final int numIterations = 500000;
final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations);
final Thread producerThread = new Thread(producerRunnable);
kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, streamsConfiguration);
kafkaStreams.start();
producerThread.start();
try {
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
final ReadOnlyKeyValueStore<String, Long>
keyValueStore = kafkaStreams.store("word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store("windowed-word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
final Map<String, Long> expectedWindowState = new HashMap<>();
final Map<String, Long> expectedCount = new HashMap<>();
while (producerRunnable.getCurrIteration() < numIterations) {
verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
expectedCount, windowStore, keyValueStore, false);
}
// finally check if all keys are there
verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
expectedCount, windowStore, keyValueStore, true);
} finally {
producerRunnable.shutdown();
producerThread.interrupt();
producerThread.join();
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:38,代码来源:QueryableStateIntegrationTest.java
示例17: verifyRangeAndAll
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyKeyValueStore<String, Long> myCount) {
final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>>
expectedRangeResults =
new TreeSet<>(stringLongComparator);
expectedRangeResults.addAll(Arrays.asList(
new KeyValue<>("hello", 1L),
new KeyValue<>("go", 1L),
new KeyValue<>("goodbye", 1L),
new KeyValue<>("kafka", 1L)
));
try (final KeyValueIterator<String, Long> range = myCount.range("go", "kafka")) {
while (range.hasNext()) {
countRangeResults.add(range.next());
}
}
try (final KeyValueIterator<String, Long> all = myCount.all()) {
while (all.hasNext()) {
countAllResults.add(all.next());
}
}
assertThat(countRangeResults, equalTo(expectedRangeResults));
assertThat(countAllResults, equalTo(expectedCount));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:31,代码来源:QueryableStateIntegrationTest.java
示例18: shouldReturnSingleItemListIfStoreExists
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void shouldReturnSingleItemListIfStoreExists() throws Exception {
final GlobalStateStoreProvider provider =
new GlobalStateStoreProvider(Collections.<String, StateStore>singletonMap("global", new NoOpReadOnlyStore<>()));
final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global", QueryableStoreTypes.keyValueStore());
assertEquals(stores.size(), 1);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:GlobalStateStoreProviderTest.java
示例19: shouldReturnEmptyItemListIfStoreDoesntExist
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void shouldReturnEmptyItemListIfStoreDoesntExist() throws Exception {
final GlobalStateStoreProvider provider =
new GlobalStateStoreProvider(Collections.<String, StateStore>emptyMap());
final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global", QueryableStoreTypes.keyValueStore());
assertTrue(stores.isEmpty());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:GlobalStateStoreProviderTest.java
示例20: init
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
store = (ReadOnlyKeyValueStore<K, V>) context.getStateStore(storeName);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:5,代码来源:KTableSourceValueGetterSupplier.java
注:本文中的org.apache.kafka.streams.state.ReadOnlyKeyValueStore类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论