• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java KeyValueIterator类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.streams.state.KeyValueIterator的典型用法代码示例。如果您正苦于以下问题:Java KeyValueIterator类的具体用法?Java KeyValueIterator怎么用?Java KeyValueIterator使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



KeyValueIterator类属于org.apache.kafka.streams.state包,在下文中一共展示了KeyValueIterator类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: notFoundWithNoResult

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void notFoundWithNoResult(TestContext context){
    KafkaStreams streamMock = mock(KafkaStreams.class);
    ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
    KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
    when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
    SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
    when(storeMock.range(any(), any())).thenReturn(iterator);


    rule.vertx().deployVerticle(new RangeKeyValueQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{

        RangeKeyValueQuery query = new RangeKeyValueQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), "key".getBytes(), "key".getBytes());

        rule.vertx().eventBus().send(Config.RANGE_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{

            context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
            MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
            context.assertEquals(0, response.getResults().size());
            context.assertTrue(iterator.closed);

        }));

    }));

}
 
开发者ID:ftrossbach,项目名称:kiqr,代码行数:27,代码来源:RangeKeyValuesQueryVerticleTest.java


示例2: getLocalMetrics

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
/**
 * Query local state store to extract metrics
 *
 * @return local Metrics
 */
private Metrics getLocalMetrics() {
    HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
    KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();

    String source = thisInstance.host() + ":" + thisInstance.port();
    Metrics localMetrics = new Metrics();

    ReadOnlyKeyValueStore<String, Double> averageStore = ks
            .store(storeName,
                    QueryableStoreTypes.<String, Double>keyValueStore());

    LOGGER.log(Level.INFO, "Entries in store {0}", averageStore.approximateNumEntries());
    KeyValueIterator<String, Double> storeIterator = averageStore.all();

    while (storeIterator.hasNext()) {
        KeyValue<String, Double> kv = storeIterator.next();
        localMetrics.add(source, kv.key, String.valueOf(kv.value));

    }
    LOGGER.log(Level.INFO, "Local store state {0}", localMetrics);
    return localMetrics;
}
 
开发者ID:abhirockzz,项目名称:docker-kafka-streams,代码行数:28,代码来源:MetricsResource.java


示例3: hasNextCondition

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
    return new HasNextCondition() {
        @Override
        public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
            while (iterator.hasNext()) {
                final Bytes bytes = iterator.peekNextKey();
                final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
                final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
                if (keyBytes.compareTo(binaryKeyFrom) >= 0
                    && keyBytes.compareTo(binaryKeyTo) <= 0
                    && time >= from
                    && time <= to) {
                    return true;
                }
                iterator.next();
            }
            return false;
        }
    };
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:WindowKeySchema.java


示例4: fetch

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
private KeyValueIterator<Windowed<K>, V> fetch(Fetcher<K, V> fetcher) {
    final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType);
    for (final ReadOnlySessionStore<K, V> store : stores) {
        try {
            final KeyValueIterator<Windowed<K>, V> result = fetcher.fetch(store);
            if (!result.hasNext()) {
                result.close();
            } else {
                return result;
            }
        } catch (final InvalidStateStoreException ise) {
            throw new InvalidStateStoreException("State store  [" + storeName + "] is not available anymore" +
                                                 " and may have been migrated to another instance; " +
                                                 "please re-discover its location from the state metadata.");
        }
    }
    return KeyValueIterators.emptyIterator();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:CompositeReadOnlySessionStore.java


示例5: fetch

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
public <IteratorType extends KeyValueIterator<?, V>> IteratorType fetch(Fetcher<K, V, IteratorType> fetcher) {
    final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
    for (ReadOnlyWindowStore<K, V> windowStore : stores) {
        try {
            final IteratorType result = fetcher.fetch(windowStore);
            if (!result.hasNext()) {
                result.close();
            } else {
                return result;
            }
        } catch (InvalidStateStoreException e) {
            throw new InvalidStateStoreException(
                "State store is not available anymore and may have been migrated to another instance; " +
                "please re-discover its location from the state metadata.");
        }
    }

    return fetcher.empty();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:CompositeReadOnlyWindowStore.java


示例6: findSessions

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
                                                       final long earliestSessionEndTime,
                                                       final long latestSessionStartTime) {
    validateStoreOpen();
    final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key));

    final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(binarySessionId, earliestSessionEndTime));
    final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(binarySessionId, latestSessionStartTime));
    final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo);

    final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(
        binarySessionId, earliestSessionEndTime, latestSessionStartTime
    );
    final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionId,
                                                                         binarySessionId,
                                                                         earliestSessionEndTime,
                                                                         latestSessionStartTime);
    final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);
    return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes, cacheFunction);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:CachingSessionStore.java


示例7: shouldFetchRangeCorrectlyAcrossSegments

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldFetchRangeCorrectlyAcrossSegments() throws Exception {
    final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0));
    final Windowed<String> aa1 = new Windowed<>("aa", new SessionWindow(0, 0));
    final Windowed<String> a2 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
    final Windowed<String> a3 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
    final Windowed<String> aa3 = new Windowed<>("aa", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
    cachingStore.put(a1, 1L);
    cachingStore.put(aa1, 1L);
    cachingStore.put(a2, 2L);
    cachingStore.put(a3, 3L);
    cachingStore.put(aa3, 3L);
    cachingStore.flush();

    final KeyValueIterator<Windowed<String>, Long> rangeResults = cachingStore.findSessions("a", "aa", 0, Segments.MIN_SEGMENT_INTERVAL * 2);
    assertEquals(a1, rangeResults.next().key);
    assertEquals(aa1, rangeResults.next().key);
    assertEquals(a2, rangeResults.next().key);
    assertEquals(a3, rangeResults.next().key);
    assertEquals(aa3, rangeResults.next().key);
    assertFalse(rangeResults.hasNext());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:CachingSessionStoreTest.java


示例8: verifyStateStore

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
private void verifyStateStore(final KafkaStreams streams, final Set<KeyValue<Long, Long>> expectedStoreContent) {
    ReadOnlyKeyValueStore<Long, Long> store = null;

    final long maxWaitingTime = System.currentTimeMillis() + 300000L;
    while (System.currentTimeMillis() < maxWaitingTime) {
        try {
            store = streams.store(storeName, QueryableStoreTypes.<Long, Long>keyValueStore());
            break;
        } catch (final InvalidStateStoreException okJustRetry) {
            try {
                Thread.sleep(5000L);
            } catch (final Exception ignore) { }
        }
    }

    final KeyValueIterator<Long, Long> it = store.all();
    while (it.hasNext()) {
        assertTrue(expectedStoreContent.remove(it.next()));
    }

    assertTrue(expectedStoreContent.isEmpty());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:EosIntegrationTest.java


示例9: shouldMergeSessions

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldMergeSessions() throws Exception {
    context.setTime(0);
    final String sessionId = "mel";
    processor.process(sessionId, "first");
    assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());

    // move time beyond gap
    context.setTime(GAP_MS + 1);
    processor.process(sessionId, "second");
    assertTrue(sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext());
    // should still exist as not within gap
    assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
    // move time back
    context.setTime(GAP_MS / 2);
    processor.process(sessionId, "third");

    final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
    final KeyValue<Windowed<String>, Long> kv = iterator.next();

    assertEquals(Long.valueOf(3), kv.value);
    assertFalse(iterator.hasNext());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:KStreamSessionWindowAggregateProcessorTest.java


示例10: shouldRemoveMergedSessionsFromStateStore

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldRemoveMergedSessionsFromStateStore() throws Exception {
    context.setTime(0);
    processor.process("a", "1");

    // first ensure it is in the store
    final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessions("a", 0, 0);
    assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a1.next());

    context.setTime(100);
    processor.process("a", "2");
    // a1 from above should have been removed
    // should have merged session in store
    final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessions("a", 0, 100);
    assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 100)), 2L), a2.next());
    assertFalse(a2.hasNext());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:KStreamSessionWindowAggregateProcessorTest.java


示例11: shouldIterateOverRange

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldIterateOverRange() throws Exception {
    final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
    for (int i = 0; i < bytes.length; i += 2) {
        store.put(Bytes.wrap(bytes[i]), bytes[i]);
        cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
    }

    final Bytes from = Bytes.wrap(new byte[]{2});
    final Bytes to = Bytes.wrap(new byte[]{9});
    final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.range(from, to));
    final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to);

    final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
    byte[][] values = new byte[8][];
    int index = 0;
    int bytesIndex = 2;
    while (iterator.hasNext()) {
        final byte[] value = iterator.next().value;
        values[index++] = value;
        assertArrayEquals(bytes[bytesIndex++], value);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:MergedSortedCacheKeyValueStoreIteratorTest.java


示例12: shouldPutFetchFromCache

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldPutFetchFromCache() throws Exception {
    cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
    cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
    cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);

    assertEquals(3, cache.size());

    final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessions("a", 0, 0);
    final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessions("b", 0, 0);

    assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a.next());
    assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), b.next());
    assertFalse(a.hasNext());
    assertFalse(b.hasNext());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:CachingSessionStoreTest.java


示例13: punctuate

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public void punctuate(long timestamp) {
    KeyValueIterator<Long, List<OrderGoods>> iter = this.kvStore.all();

    while (iter.hasNext()) {
        KeyValue<Long, List<OrderGoods>> entry = (KeyValue<Long, List<OrderGoods>>) iter.next();
        context.forward(entry.key, entry.value.toString());
    }

    iter.close();
    context.commit();
}
 
开发者ID:jiumao-org,项目名称:wechat-mall,代码行数:13,代码来源:GoodOrderProcessor.java


示例14: process

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public void process(final K key, final V value) {
    // if the key is null, we do not need proceed aggregating
    // the record with the table
    if (key == null) {
        return;
    }

    final long timestamp = context().timestamp();
    final List<KeyValue<Windowed<K>, T>> merged = new ArrayList<>();
    final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
    SessionWindow mergedWindow = newSessionWindow;
    T agg = initializer.apply();

    try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessions(key, timestamp - windows.inactivityGap(),
                                                                              timestamp + windows.inactivityGap())) {
        while (iterator.hasNext()) {
            final KeyValue<Windowed<K>, T> next = iterator.next();
            merged.add(next);
            agg = sessionMerger.apply(key, agg, next.value);
            mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window());
        }
    }

    agg = aggregator.apply(key, value, agg);
    final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
    if (!mergedWindow.equals(newSessionWindow)) {
        for (final KeyValue<Windowed<K>, T> session : merged) {
            store.remove(session.key);
            tupleForwarder.maybeForward(session.key, null, session.value);
        }
    }
    store.put(sessionKey, agg);
    tupleForwarder.maybeForward(sessionKey, agg, null);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:36,代码来源:KStreamSessionWindowAggregate.java


示例15: shouldIterateOverRange

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Test
public void shouldIterateOverRange() throws Exception {
    int items = addItemsToCache();
    final KeyValueIterator<String, String> range = store.range(String.valueOf(0), String.valueOf(items));
    final List<String> results = new ArrayList<>();
    while (range.hasNext()) {
        results.add(range.next().key);
    }
    assertEquals(items, results.size());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:CachingKeyValueStoreTest.java


示例16: stateTable

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@SuppressWarnings("unchecked")
public <K, V> Map<K, V> stateTable(String name) throws EmptyInputException, NoTopologyException {
    return withProcessedDriver(driver -> {
        KeyValueIterator<Object, Object> records = driver.getKeyValueStore(name).all();

        Map<K, V> result = new LinkedHashMap<>();
        records.forEachRemaining(record -> result.put((K) record.key, (V) record.value));
        records.close();
        return result;
    });
}
 
开发者ID:carlosmenezes,项目名称:mockafka,代码行数:12,代码来源:MockafkaBuilder.java


示例17: MergedSortedCacheSessionStoreIterator

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
                                      final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator,
                                      final StateSerdes<K, AGG> serdes,
                                      final SegmentedCacheFunction cacheFunction) {
    super(cacheIterator, storeIterator);
    this.serdes = serdes;
    this.cacheFunction = cacheFunction;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:MergedSortedCacheSessionStoreIterator.java


示例18: findSessions

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
    final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
        Bytes.wrap(serdes.rawKey(keyFrom)), Bytes.wrap(serdes.rawKey(keyTo)),
        earliestSessionEndTime, latestSessionStartTime
    );
    return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:RocksDBSessionStore.java


示例19: toList

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
    final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
    while (iterator.hasNext()) {
        final KeyValue<Bytes, byte[]> next = iterator.next();
        final KeyValue<Windowed<String>, Long> deserialized
                = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"), Serdes.Long().deserializer().deserialize("", next.value));
        results.add(deserialized);
    }
    return results;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:RocksDBSegmentedBytesStoreTest.java


示例20: range

import org.apache.kafka.streams.state.KeyValueIterator; //导入依赖的package包/类
@Override
public synchronized KeyValueIterator<K, V> range(K from, K to) {
    validateStoreOpen();
    // query rocksdb
    final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), serdes, from, to);
    openIterators.add(rocksDBRangeIterator);
    return rocksDBRangeIterator;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:RocksDBStore.java



注:本文中的org.apache.kafka.streams.state.KeyValueIterator类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ByteArrayDataOutput类代码示例发布时间:2022-05-22
下一篇:
Java BehaviorProcessor类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap