本文整理汇总了Java中org.apache.kafka.streams.processor.TimestampExtractor类的典型用法代码示例。如果您正苦于以下问题:Java TimestampExtractor类的具体用法?Java TimestampExtractor怎么用?Java TimestampExtractor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TimestampExtractor类属于org.apache.kafka.streams.processor包,在下文中一共展示了TimestampExtractor类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: doTable
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier,
final boolean isQueryable) {
final String source = newName(KStreamImpl.SOURCE_NAME);
final String name = newName(KTableImpl.SOURCE_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
valSerde == null ? null : valSerde.deserializer(),
topic);
addProcessor(name, processorSupplier, source);
final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
addStateStore(storeSupplier, name);
connectSourceStoreAndTopic(storeSupplier.name(), topic);
return kTable;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:KStreamBuilder.java
示例2: defaultTimestampExtractor
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
public TimestampExtractor defaultTimestampExtractor() {
TimestampExtractor timestampExtractor = getConfiguredInstance(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
if (timestampExtractor == null) {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
}
return timestampExtractor;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:StreamsConfig.java
示例3: RecordQueue
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
RecordQueue(final TopicPartition partition,
final SourceNode source,
final TimestampExtractor timestampExtractor) {
this.partition = partition;
this.source = source;
this.timestampExtractor = timestampExtractor;
this.fifoQueue = new ArrayDeque<>();
this.timeTracker = new MinTimestampTracker<>();
this.recordDeserializer = new SourceNodeRecordDeserializer(source);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:RecordQueue.java
示例4: SourceNode
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
public SourceNode(String name, List<String> topics, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
super(name);
this.topics = topics;
this.timestampExtractor = timestampExtractor;
this.keyDeserializer = ensureExtended(keyDeserializer);
this.valDeserializer = ensureExtended(valDeserializer);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:SourceNode.java
示例5: table
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
* Create a {@link KTable} for the specified topic.
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
* <p>
* Note that the specified input topic must be partitioned by key.
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
* {@code storeName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
* committed offsets are available
* @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
* if not specified the default extractor defined in the configs will be used
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
* @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
final String queryableStoreName) {
final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
keySerde,
valSerde,
false,
Collections.<String, String>emptyMap(),
true);
return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:51,代码来源:KStreamBuilder.java
示例6: doGlobalTable
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
* Create a {@link GlobalKTable} for the specified topic.
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
* {@code storeName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
*
* @param timestampExtractor the stateless timestamp extractor used for this source {@link GlobalKTable},
* if not specified the default extractor defined in the configs will be used
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link GlobalKTable} for the specified topic
*/
@SuppressWarnings("unchecked")
private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final String sourceName = newName(KStreamImpl.SOURCE_NAME);
final String processorName = newName(KTableImpl.SOURCE_NAME);
final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:49,代码来源:KStreamBuilder.java
示例7: StreamTask
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
* Create {@link StreamTask} with its assigned partitions
* @param id the ID of this task
* @param applicationId the ID of the stream processing application
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
* @param changelogReader the instance of {@link ChangelogReader} used for restoring state
* @param config the {@link StreamsConfig} specified by the user
* @param metrics the {@link StreamsMetrics} created by the thread
* @param stateDirectory the {@link StateDirectory} created by the thread
* @param producer the instance of {@link Producer} used to produce records
*/
public StreamTask(final TaskId id,
final String applicationId,
final Collection<TopicPartition> partitions,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
final ChangelogReader changelogReader,
final StreamsConfig config,
final StreamsMetrics metrics,
final StateDirectory stateDirectory,
final ThreadCache cache,
final Time time,
final Producer<byte[], byte[]> producer) {
super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache, config);
punctuationQueue = new PunctuationQueue();
maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
this.metrics = new TaskMetrics(metrics);
// create queues for each assigned partition and associate them
// to corresponding source nodes in the processor topology
final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
for (final TopicPartition partition : partitions) {
final SourceNode source = topology.source(partition.topic());
final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor);
partitionQueues.put(partition, queue);
}
partitionGroup = new PartitionGroup(partitionQueues);
// initialize the consumed offset cache
consumedOffsets = new HashMap<>();
this.producer = producer;
recordCollector = createRecordCollector();
// initialize the topology with its own context
processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
this.time = time;
log.debug("{} Initializing", logPrefix);
initializeStateStores();
stateMgr.registerGlobalStateStores(topology.globalStateStores());
if (eosEnabled) {
this.producer.initTransactions();
this.producer.beginTransaction();
transactionInFlight = true;
}
initTopology();
processorContext.initialized();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:65,代码来源:StreamTask.java
示例8: getTimestampExtractor
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
public TimestampExtractor getTimestampExtractor() {
return timestampExtractor;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:4,代码来源:SourceNode.java
示例9: stream
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
* <p>
* If multiple topics are specified there is no ordering guarantee for records from different topics.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case it is the user's responsibility to repartition the date before any key based operation
* (like aggregation or join) is applied to the returned {@link KStream}.
*
* @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
* if not specified the default extractor defined in the configs will be used
* @param keySerde key serde used to read this source {@link KStream}, if not specified the default
* serde defined in the configs will be used
* @param valSerde value serde used to read this source {@link KStream},
* if not specified the default serde defined in the configs will be used
* @param topics the topic names; must contain at least one topic name
* @return a {@link KStream} for the specified topics
*/
public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
final Serde<K> keySerde,
final Serde<V> valSerde,
final String... topics) {
return stream(null, timestampExtractor, keySerde, valSerde, topics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:KStreamBuilder.java
示例10: globalTable
import org.apache.kafka.streams.processor.TimestampExtractor; //导入依赖的package包/类
/**
* Create a {@link GlobalKTable} for the specified topic.
* The default {@link TimestampExtractor} and default key and value deserializers as specified in
* the {@link StreamsConfig config} are used.
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
* {@code queryableStoreName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
* @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
* @return a {@link GlobalKTable} for the specified topic
*/
@SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final TimestampExtractor timestampExtractor,
final String topic,
final String queryableStoreName) {
final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
keySerde,
valSerde,
false,
Collections.<String, String>emptyMap(),
true));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:45,代码来源:KStreamBuilder.java
注:本文中的org.apache.kafka.streams.processor.TimestampExtractor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论