• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java TimestampExtractor类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.streams.processor.TimestampExtractor的典型用法代码示例。如果您正苦于以下问题:Java TimestampExtractor类的具体用法?Java TimestampExtractor怎么用?Java TimestampExtractor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



TimestampExtractor类属于org.apache.kafka.streams.processor包,在下文中一共展示了TimestampExtractor类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: doTable

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
                                    final Serde<K> keySerde,
                                    final Serde<V> valSerde,
                                    final TimestampExtractor timestampExtractor,
                                    final String topic,
                                    final StateStoreSupplier<KeyValueStore> storeSupplier,
                                    final boolean isQueryable) {
    final String source = newName(KStreamImpl.SOURCE_NAME);
    final String name = newName(KTableImpl.SOURCE_NAME);
    final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());

    addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
            valSerde == null ? null : valSerde.deserializer(),
            topic);
    addProcessor(name, processorSupplier, source);

    final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
            keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);

    addStateStore(storeSupplier, name);
    connectSourceStoreAndTopic(storeSupplier.name(), topic);

    return kTable;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:KStreamBuilder.java


示例2: defaultTimestampExtractor

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
public TimestampExtractor defaultTimestampExtractor() {
    TimestampExtractor timestampExtractor = getConfiguredInstance(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
    if (timestampExtractor == null) {
        return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
    }
    return timestampExtractor;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:StreamsConfig.java


示例3: RecordQueue

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
RecordQueue(final TopicPartition partition,
            final SourceNode source,
            final TimestampExtractor timestampExtractor) {
    this.partition = partition;
    this.source = source;
    this.timestampExtractor = timestampExtractor;
    this.fifoQueue = new ArrayDeque<>();
    this.timeTracker = new MinTimestampTracker<>();
    this.recordDeserializer = new SourceNodeRecordDeserializer(source);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:RecordQueue.java


示例4: SourceNode

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
public SourceNode(String name, List<String> topics, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
    super(name);
    this.topics = topics;
    this.timestampExtractor = timestampExtractor;
    this.keyDeserializer = ensureExtended(keyDeserializer);
    this.valDeserializer = ensureExtended(valDeserializer);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:SourceNode.java


示例5: table

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
 * Create a {@link KTable} for the specified topic.
 * Input {@link KeyValue} pairs with {@code null} key will be dropped.
 * <p>
 * Note that the specified input topic must be partitioned by key.
 * If this is not the case the returned {@link KTable} will be corrupted.
 * <p>
 * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
 * {@code storeName}.
 * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
 * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
 * <p>
 * To query the local {@link KeyValueStore} it must be obtained via
 * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
 * <pre>{@code
 * KafkaStreams streams = ...
 * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
 * String key = "some-key";
 * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
 * }</pre>
 * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
 * query the value of the key on a parallel running instance of your Kafka Streams application.
 *
 * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
 *                           committed offsets are available
 * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
 *                           if not specified the default extractor defined in the configs will be used
 * @param keySerde           key serde used to send key-value pairs,
 *                           if not specified the default key serde defined in the configuration will be used
 * @param valSerde           value serde used to send key-value pairs,
 *                           if not specified the default value serde defined in the configuration will be used
 * @param topic              the topic name; cannot be {@code null}
 * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
 * @return a {@link KTable} for the specified topic
 */
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                 final TimestampExtractor timestampExtractor,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde,
                                 final String topic,
                                 final String queryableStoreName) {
    final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
    final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
            keySerde,
            valSerde,
            false,
            Collections.<String, String>emptyMap(),
            true);
    return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:51,代码来源:KStreamBuilder.java


示例6: doGlobalTable

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
 * Create a {@link GlobalKTable} for the specified topic.
 * Input {@link KeyValue} pairs with {@code null} key will be dropped.
 * <p>
 * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
 * {@code storeName}.
 * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
 * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
 * <p>
 * To query the local {@link KeyValueStore} it must be obtained via
 * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
 * <pre>{@code
 * KafkaStreams streams = ...
 * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
 * String key = "some-key";
 * Long valueForKey = localStore.get(key);
 * }</pre>
 * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
 * regardless of the specified value in {@link StreamsConfig}.
 *
 * @param timestampExtractor the stateless timestamp extractor used for this source {@link GlobalKTable},
 *                           if not specified the default extractor defined in the configs will be used
 * @param keySerde           key serde used to send key-value pairs,
 *                           if not specified the default key serde defined in the configuration will be used
 * @param valSerde           value serde used to send key-value pairs,
 *                           if not specified the default value serde defined in the configuration will be used
 * @param topic              the topic name; cannot be {@code null}
 * @param storeSupplier      user defined state store supplier. Cannot be {@code null}.
 * @return a {@link GlobalKTable} for the specified topic
 */
@SuppressWarnings("unchecked")
private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
                                                final Serde<V> valSerde,
                                                final TimestampExtractor timestampExtractor,
                                                final String topic,
                                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    final String sourceName = newName(KStreamImpl.SOURCE_NAME);
    final String processorName = newName(KTableImpl.SOURCE_NAME);
    final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());


    final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
    final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();

    addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
    return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:49,代码来源:KStreamBuilder.java


示例7: StreamTask

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
 * Create {@link StreamTask} with its assigned partitions
 * @param id                    the ID of this task
 * @param applicationId         the ID of the stream processing application
 * @param partitions            the collection of assigned {@link TopicPartition}
 * @param topology              the instance of {@link ProcessorTopology}
 * @param consumer              the instance of {@link Consumer}
 * @param changelogReader       the instance of {@link ChangelogReader} used for restoring state
 * @param config                the {@link StreamsConfig} specified by the user
 * @param metrics               the {@link StreamsMetrics} created by the thread
 * @param stateDirectory        the {@link StateDirectory} created by the thread
 * @param producer              the instance of {@link Producer} used to produce records
 */
public StreamTask(final TaskId id,
                  final String applicationId,
                  final Collection<TopicPartition> partitions,
                  final ProcessorTopology topology,
                  final Consumer<byte[], byte[]> consumer,
                  final ChangelogReader changelogReader,
                  final StreamsConfig config,
                  final StreamsMetrics metrics,
                  final StateDirectory stateDirectory,
                  final ThreadCache cache,
                  final Time time,
                  final Producer<byte[], byte[]> producer) {
    super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache, config);
    punctuationQueue = new PunctuationQueue();
    maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
    this.metrics = new TaskMetrics(metrics);

    // create queues for each assigned partition and associate them
    // to corresponding source nodes in the processor topology
    final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();

    final TimestampExtractor defaultTimestampExtractor  = config.defaultTimestampExtractor();
    for (final TopicPartition partition : partitions) {
        final SourceNode source = topology.source(partition.topic());
        final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
        final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor);
        partitionQueues.put(partition, queue);
    }

    partitionGroup = new PartitionGroup(partitionQueues);

    // initialize the consumed offset cache
    consumedOffsets = new HashMap<>();

    this.producer = producer;
    recordCollector = createRecordCollector();

    // initialize the topology with its own context
    processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
    this.time = time;
    log.debug("{} Initializing", logPrefix);
    initializeStateStores();
    stateMgr.registerGlobalStateStores(topology.globalStateStores());
    if (eosEnabled) {
        this.producer.initTransactions();
        this.producer.beginTransaction();
        transactionInFlight = true;
    }
    initTopology();
    processorContext.initialized();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:65,代码来源:StreamTask.java


示例8: getTimestampExtractor

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
public TimestampExtractor getTimestampExtractor() {
    return timestampExtractor;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:SourceNode.java


示例9: stream

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
 * Create a {@link KStream} from the specified topics.
 * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
 * <p>
 * If multiple topics are specified there is no ordering guarantee for records from different topics.
 * <p>
 * Note that the specified input topics must be partitioned by key.
 * If this is not the case it is the user's responsibility to repartition the date before any key based operation
 * (like aggregation or join) is applied to the returned {@link KStream}.
 *
 * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
 *                           if not specified the default extractor defined in the configs will be used
 * @param keySerde           key serde used to read this source {@link KStream}, if not specified the default
 *                           serde defined in the configs will be used
 * @param valSerde           value serde used to read this source {@link KStream},
 *                           if not specified the default serde defined in the configs will be used
 * @param topics             the topic names; must contain at least one topic name
 * @return a {@link KStream} for the specified topics
 */
public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
                                   final Serde<K> keySerde,
                                   final Serde<V> valSerde,
                                   final String... topics) {
    return stream(null, timestampExtractor, keySerde, valSerde, topics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:KStreamBuilder.java


示例10: globalTable

import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
 * Create a {@link GlobalKTable} for the specified topic.
 * The default {@link TimestampExtractor} and default key and value deserializers as specified in
 * the {@link StreamsConfig config} are used.
 * Input {@link KeyValue} pairs with {@code null} key will be dropped.
 * <p>
 * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
 * {@code queryableStoreName}.
 * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
 * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
 * <p>
 * To query the local {@link KeyValueStore} it must be obtained via
 * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
 * <pre>{@code
 * KafkaStreams streams = ...
 * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
 * String key = "some-key";
 * Long valueForKey = localStore.get(key);
 * }</pre>
 * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
 * regardless of the specified value in {@link StreamsConfig}.
 *
 * @param keySerde  key serde used to send key-value pairs,
 *                  if not specified the default key serde defined in the configuration will be used
 * @param valSerde  value serde used to send key-value pairs,
 *                  if not specified the default value serde defined in the configuration will be used
 * @param topic     the topic name; cannot be {@code null}
 * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
 * @return a {@link GlobalKTable} for the specified topic
 */
@SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                             final Serde<V> valSerde,
                                             final TimestampExtractor timestampExtractor,
                                             final String topic,
                                             final String queryableStoreName) {
    final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
    return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
                        keySerde,
                        valSerde,
                false,
                        Collections.<String, String>emptyMap(),
                true));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:45,代码来源:KStreamBuilder.java



注:本文中的org.apache.kafka.streams.processor.TimestampExtractor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java PhotoDraweeView类代码示例发布时间:2022-05-22
下一篇:
Java PullToRefreshView类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap