• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ReadOnlyKeyValueStore类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.streams.state.ReadOnlyKeyValueStore的典型用法代码示例。如果您正苦于以下问题:Java ReadOnlyKeyValueStore类的具体用法?Java ReadOnlyKeyValueStore怎么用?Java ReadOnlyKeyValueStore使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ReadOnlyKeyValueStore类属于org.apache.kafka.streams.state包,在下文中一共展示了ReadOnlyKeyValueStore类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: worker

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public ReadOnlyKeyValueStore<Long, byte[]> worker() {
    Properties config = super.configBuilder()//
            .put(StreamsConfig.APPLICATION_ID_CONFIG, MallConstants.ORDER_COMMITED_TOPIC)//
            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap)//
            .put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass())//
            .put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass())//
            .build();

    StreamsBuilder builder = new StreamsBuilder();
    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config));
    streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
        // TODO Auto-generated method stub
        log.error(e.getMessage());
    });
    streams.start();

    return this.worker = // k-v query
            streams.store(queryableStoreName, QueryableStoreTypes.<Long, byte[]>keyValueStore());
}
 
开发者ID:jiumao-org,项目名称:wechat-mall,代码行数:21,代码来源:OrderTable.java


示例2: start

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public void start() throws Exception {

    execute(Config.ALL_KEY_VALUE_QUERY_ADDRESS_PREFIX, (query, keySerde, valueSerde) -> {

        ReadOnlyKeyValueStore<Object, Object> kvStore = streams.store(query.getStoreName(), QueryableStoreTypes.keyValueStore());
        MultiValuedKeyValueQueryResponse response;
        try (KeyValueIterator<Object, Object> result = kvStore.all()) {
            if (result.hasNext()) {
                Map<String, String> results = new HashMap<>();
                while (result.hasNext()) {
                    KeyValue<Object, Object> kvEntry = result.next();

                    results.put(base64Encode(keySerde, kvEntry.key), base64Encode(valueSerde, kvEntry.value));
                }
               return new MultiValuedKeyValueQueryResponse(results);
            } else {
               return new MultiValuedKeyValueQueryResponse(Collections.emptyMap());
            }
        }
    });

}
 
开发者ID:ftrossbach,项目名称:kiqr,代码行数:24,代码来源:AllKeyValuesQueryVerticle.java


示例3: notFoundWithNoResult

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void notFoundWithNoResult(TestContext context){
    KafkaStreams streamMock = mock(KafkaStreams.class);
    ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
    KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
    when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
    SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
    when(storeMock.range(any(), any())).thenReturn(iterator);


    rule.vertx().deployVerticle(new RangeKeyValueQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{

        RangeKeyValueQuery query = new RangeKeyValueQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), "key".getBytes(), "key".getBytes());

        rule.vertx().eventBus().send(Config.RANGE_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{

            context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
            MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
            context.assertEquals(0, response.getResults().size());
            context.assertTrue(iterator.closed);

        }));

    }));

}
 
开发者ID:ftrossbach,项目名称:kiqr,代码行数:27,代码来源:RangeKeyValuesQueryVerticleTest.java


示例4: getLocalMetrics

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
/**
 * Query local state store to extract metrics
 *
 * @return local Metrics
 */
private Metrics getLocalMetrics() {
    HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
    KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();

    String source = thisInstance.host() + ":" + thisInstance.port();
    Metrics localMetrics = new Metrics();

    ReadOnlyKeyValueStore<String, Double> averageStore = ks
            .store(storeName,
                    QueryableStoreTypes.<String, Double>keyValueStore());

    LOGGER.log(Level.INFO, "Entries in store {0}", averageStore.approximateNumEntries());
    KeyValueIterator<String, Double> storeIterator = averageStore.all();

    while (storeIterator.hasNext()) {
        KeyValue<String, Double> kv = storeIterator.next();
        localMetrics.add(source, kv.key, String.valueOf(kv.value));

    }
    LOGGER.log(Level.INFO, "Local store state {0}", localMetrics);
    return localMetrics;
}
 
开发者ID:abhirockzz,项目名称:docker-kafka-streams,代码行数:28,代码来源:MetricsResource.java


示例5: get

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public V get(final K key) {
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    for (ReadOnlyKeyValueStore<K, V> store : stores) {
        try {
            final V result = store.get(key);
            if (result != null) {
                return result;
            }
        } catch (InvalidStateStoreException e) {
            throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
        }

    }
    return null;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:CompositeReadOnlyKeyValueStore.java


示例6: verifyStateStore

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
private void verifyStateStore(final KafkaStreams streams, final Set<KeyValue<Long, Long>> expectedStoreContent) {
    ReadOnlyKeyValueStore<Long, Long> store = null;

    final long maxWaitingTime = System.currentTimeMillis() + 300000L;
    while (System.currentTimeMillis() < maxWaitingTime) {
        try {
            store = streams.store(storeName, QueryableStoreTypes.<Long, Long>keyValueStore());
            break;
        } catch (final InvalidStateStoreException okJustRetry) {
            try {
                Thread.sleep(5000L);
            } catch (final Exception ignore) { }
        }
    }

    final KeyValueIterator<Long, Long> it = store.all();
    while (it.hasNext()) {
        assertTrue(expectedStoreContent.remove(it.next()));
    }

    assertTrue(expectedStoreContent.isEmpty());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:EosIntegrationTest.java


示例7: verifyCanGetByKey

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
private void verifyCanGetByKey(final String[] keys,
                               final Set<KeyValue<String, Long>> expectedWindowState,
                               final Set<KeyValue<String, Long>> expectedCount,
                               final ReadOnlyWindowStore<String, Long> windowStore,
                               final ReadOnlyKeyValueStore<String, Long> myCount)
    throws InterruptedException {
    final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
    final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);

    final long timeout = System.currentTimeMillis() + 30000;
    while ((windowState.size() < keys.length ||
        countState.size() < keys.length) &&
        System.currentTimeMillis() < timeout) {
        Thread.sleep(10);
        for (final String key : keys) {
            windowState.addAll(fetch(windowStore, key));
            final Long value = myCount.get(key);
            if (value != null) {
                countState.add(new KeyValue<>(key, value));
            }
        }
    }
    assertThat(windowState, equalTo(expectedWindowState));
    assertThat(countState, equalTo(expectedCount));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:QueryableStateIntegrationTest.java


示例8: start

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public void start() throws Exception {

    execute(Config.RANGE_KEY_VALUE_QUERY_ADDRESS_PREFIX, (abstractQuery, keySerde, valueSerde) -> {

        RangeKeyValueQuery query = (RangeKeyValueQuery) abstractQuery;
        ReadOnlyKeyValueStore<Object, Object> kvStore = streams.store(query.getStoreName(), QueryableStoreTypes.keyValueStore());
        MultiValuedKeyValueQueryResponse response;
        try (KeyValueIterator<Object, Object> result = kvStore.range(deserializeObject(keySerde, query.getFrom()), deserializeObject(keySerde, query.getTo()))) {
            if (result.hasNext()) {
                Map<String, String> results = new HashMap<>();
                while (result.hasNext()) {
                    KeyValue<Object, Object> kvEntry = result.next();
                    results.put(base64Encode(keySerde, kvEntry.key), base64Encode(valueSerde, kvEntry.value));
                }
                return new MultiValuedKeyValueQueryResponse(results);
            } else {
               return new MultiValuedKeyValueQueryResponse(Collections.emptyMap());
            }
        }
    } );


}
 
开发者ID:ftrossbach,项目名称:kiqr,代码行数:25,代码来源:RangeKeyValueQueryVerticle.java


示例9: start

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public void start() throws Exception {

    execute(Config.KEY_VALUE_QUERY_ADDRESS_PREFIX, (abstractQuery, keySerde, valueSerde) -> {
        KeyBasedQuery query = (KeyBasedQuery) abstractQuery;

        Object deserializedKey = deserializeObject(keySerde, query.getKey());
        ReadOnlyKeyValueStore<Object, Object> kvStore = streams.store(query.getStoreName(), QueryableStoreTypes.keyValueStore());
        Object result = kvStore.get(deserializedKey);
        if (result != null) {
            return new ScalarKeyValueQueryResponse(base64Encode(valueSerde, result));
        } else {
            throw new ScalarValueNotFoundException(String.format("Key %s not found in store %s", deserializedKey.toString(), abstractQuery.getStoreName()));
        }
    });

}
 
开发者ID:ftrossbach,项目名称:kiqr,代码行数:18,代码来源:ScalarKeyValueQueryVerticle.java


示例10: notFoundWithNoResult

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void notFoundWithNoResult(TestContext context){
    KafkaStreams streamMock = mock(KafkaStreams.class);
    ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
    KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
    when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
    SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
    when(storeMock.all()).thenReturn(iterator);


    rule.vertx().deployVerticle(new AllKeyValuesQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{

        StoreWideQuery query = new StoreWideQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName());

        rule.vertx().eventBus().send(Config.ALL_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{

            context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
            MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
            context.assertEquals(0, response.getResults().size());
            context.assertTrue(iterator.closed);

        }));

    }));

}
 
开发者ID:ftrossbach,项目名称:kiqr,代码行数:27,代码来源:AllKeyValuesQueryVerticleTest.java


示例11: getTweets

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public List<String> getTweets(String filter) {
    List<String> tweetsResult = new ArrayList<>();
    final ReadOnlyKeyValueStore<Long, String> kvs = streams.store(
            STORE_NAME,
            QueryableStoreTypes.<Long, String>keyValueStore());
    kvs.all().forEachRemaining(kv -> tweetsResult.add(kv.value));
    return tweetsResult
            .stream()
            .filter(s -> s.toLowerCase().contains(filter.toLowerCase()))
            .collect(toList());
}
 
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:13,代码来源:KafkaTweetRepository.java


示例12: CompositeReadOnlyKeyValueStore

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
public CompositeReadOnlyKeyValueStore(final StateStoreProvider storeProvider,
                                      final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType,
                                      final String storeName) {
    this.storeProvider = storeProvider;
    this.storeType = storeType;
    this.storeName = storeName;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:CompositeReadOnlyKeyValueStore.java


示例13: range

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
    final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
        @Override
        public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
            try {
                return store.range(from, to);
            } catch (InvalidStateStoreException e) {
                throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    };
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:CompositeReadOnlyKeyValueStore.java


示例14: all

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public KeyValueIterator<K, V> all() {
    final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
        @Override
        public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
            try {
                return store.all();
            } catch (InvalidStateStoreException e) {
                throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    };
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:CompositeReadOnlyKeyValueStore.java


示例15: approximateNumEntries

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Override
public long approximateNumEntries() {
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    long total = 0;
    for (ReadOnlyKeyValueStore<K, V> store : stores) {
        total += store.approximateNumEntries();
    }
    return total < 0 ? Long.MAX_VALUE : total;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:CompositeReadOnlyKeyValueStore.java


示例16: concurrentAccesses

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void concurrentAccesses() throws Exception {

    final int numIterations = 500000;

    final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations);
    final Thread producerThread = new Thread(producerRunnable);
    kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, streamsConfiguration);

    kafkaStreams.start();
    producerThread.start();

    try {
        waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);

        final ReadOnlyKeyValueStore<String, Long>
            keyValueStore = kafkaStreams.store("word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());

        final ReadOnlyWindowStore<String, Long> windowStore =
            kafkaStreams.store("windowed-word-count-store-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());


        final Map<String, Long> expectedWindowState = new HashMap<>();
        final Map<String, Long> expectedCount = new HashMap<>();
        while (producerRunnable.getCurrIteration() < numIterations) {
            verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
                expectedCount, windowStore, keyValueStore, false);
        }
        // finally check if all keys are there
        verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
            expectedCount, windowStore, keyValueStore, true);
    } finally {
        producerRunnable.shutdown();
        producerThread.interrupt();
        producerThread.join();
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:38,代码来源:QueryableStateIntegrationTest.java


示例17: verifyRangeAndAll

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
                               final ReadOnlyKeyValueStore<String, Long> myCount) {
    final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
    final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
    final Set<KeyValue<String, Long>>
        expectedRangeResults =
        new TreeSet<>(stringLongComparator);

    expectedRangeResults.addAll(Arrays.asList(
        new KeyValue<>("hello", 1L),
        new KeyValue<>("go", 1L),
        new KeyValue<>("goodbye", 1L),
        new KeyValue<>("kafka", 1L)
    ));

    try (final KeyValueIterator<String, Long> range = myCount.range("go", "kafka")) {
        while (range.hasNext()) {
            countRangeResults.add(range.next());
        }
    }

    try (final KeyValueIterator<String, Long> all = myCount.all()) {
        while (all.hasNext()) {
            countAllResults.add(all.next());
        }
    }

    assertThat(countRangeResults, equalTo(expectedRangeResults));
    assertThat(countAllResults, equalTo(expectedCount));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:31,代码来源:QueryableStateIntegrationTest.java


示例18: shouldReturnSingleItemListIfStoreExists

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void shouldReturnSingleItemListIfStoreExists() throws Exception {
    final GlobalStateStoreProvider provider =
            new GlobalStateStoreProvider(Collections.<String, StateStore>singletonMap("global", new NoOpReadOnlyStore<>()));
    final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global", QueryableStoreTypes.keyValueStore());
    assertEquals(stores.size(), 1);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:GlobalStateStoreProviderTest.java


示例19: shouldReturnEmptyItemListIfStoreDoesntExist

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@Test
public void shouldReturnEmptyItemListIfStoreDoesntExist() throws Exception {
    final GlobalStateStoreProvider provider =
            new GlobalStateStoreProvider(Collections.<String, StateStore>emptyMap());
    final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global", QueryableStoreTypes.keyValueStore());
    assertTrue(stores.isEmpty());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:GlobalStateStoreProviderTest.java


示例20: init

import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; //导入依赖的package包/类
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
    store = (ReadOnlyKeyValueStore<K, V>) context.getStateStore(storeName);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:5,代码来源:KTableSourceValueGetterSupplier.java



注:本文中的org.apache.kafka.streams.state.ReadOnlyKeyValueStore类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Form类代码示例发布时间:2022-05-22
下一篇:
Java KafkaAvroDeserializer类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap