本文整理汇总了Java中org.apache.kafka.clients.consumer.OffsetAndTimestamp类的典型用法代码示例。如果您正苦于以下问题:Java OffsetAndTimestamp类的具体用法?Java OffsetAndTimestamp怎么用?Java OffsetAndTimestamp使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
OffsetAndTimestamp类属于org.apache.kafka.clients.consumer包,在下文中一共展示了OffsetAndTimestamp类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: seek
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
/**
* Seek consumer to specific timestamp
* @param timestamp Unix timestamp in milliseconds to seek to.
*/
public ConsumerState seek(final long timestamp) {
// Find offsets for timestamp
final Map<TopicPartition, Long> timestampMap = new HashMap<>();
for (final TopicPartition topicPartition: getAllPartitions()) {
timestampMap.put(topicPartition, timestamp);
}
final Map<TopicPartition, OffsetAndTimestamp> offsetMap = kafkaConsumer.offsetsForTimes(timestampMap);
// Build map of partition => offset
final Map<Integer, Long> partitionOffsetMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry: offsetMap.entrySet()) {
partitionOffsetMap.put(entry.getKey().partition(), entry.getValue().offset());
}
// Now lets seek to those offsets
return seek(partitionOffsetMap);
}
开发者ID:SourceLabOrg,项目名称:kafka-webview,代码行数:22,代码来源:WebKafkaConsumer.java
示例2: seekToTimestamp
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
/**
* Seek consumer to specific timestamp
* @param timestamp Unix timestamp in milliseconds to seek to.
*/
private void seekToTimestamp(final long timestamp) {
// Find offsets for timestamp
final Map<TopicPartition, Long> timestampMap = new HashMap<>();
for (final TopicPartition topicPartition: getAllPartitions()) {
timestampMap.put(topicPartition, timestamp);
}
final Map<TopicPartition, OffsetAndTimestamp> offsetMap = kafkaConsumer.offsetsForTimes(timestampMap);
// Build map of partition => offset
final Map<TopicPartition, Long> partitionOffsetMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry: offsetMap.entrySet()) {
partitionOffsetMap.put(entry.getKey(), entry.getValue().offset());
}
// Now lets seek to those offsets
seek(partitionOffsetMap);
}
开发者ID:SourceLabOrg,项目名称:kafka-webview,代码行数:22,代码来源:SocketKafkaConsumer.java
示例3: seekToMissingTransactions
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
private void seekToMissingTransactions(Map<TopicPartition, List<Long>> txByPartition) {
Map<TopicPartition, Long> timestamps = txByPartition.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> Collections.min(entry.getValue())
));
Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(timestamps);
Map<TopicPartition, OffsetAndMetadata> toCommit = foundOffsets.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> {
long offset = entry.getValue() != null? entry.getValue().offset() : 0;
return new OffsetAndMetadata(offset);
}
));
consumer.commitSync(toCommit);
}
开发者ID:epam,项目名称:Lagerta,代码行数:18,代码来源:ReconcilerImpl.java
示例4: seekToTransaction
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory,
String groupId) {
String topic = config.getLocalTopic();
Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId);
try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId);
}
consumer.assign(seekMap.keySet());
Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
if (entry.getValue() != null) {
offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()));
}
}
consumer.commitSync(offsetsToCommit);
}
}
开发者ID:epam,项目名称:Lagerta,代码行数:25,代码来源:PublisherKafkaService.java
示例5: main
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = createConsumer();
consumer.subscribe(Arrays.asList(TOPIC));
boolean flag = true;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
Set<TopicPartition> assignments = consumer.assignment();
Map<TopicPartition, Long> query = new HashMap<>();
for (TopicPartition topicPartition : assignments) {
query.put(
topicPartition,
Instant.now().minus(10, MINUTES).toEpochMilli());
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
result.entrySet()
.stream()
.forEach(entry ->
consumer.seek(
entry.getKey(),
Optional.ofNullable(entry.getValue())
.map(OffsetAndTimestamp::offset)
.orElse(new Long(0))));
flag = false;
}
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
开发者ID:jeqo,项目名称:post-kafka-rewind-consumer-offset,代码行数:40,代码来源:KafkaConsumerFromTime.java
示例6: offsetsForTimes
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
final Map<TopicPartition, Long> timestampsToSearch) {
return Retries.tryMe(new Callable<Map<TopicPartition, OffsetAndTimestamp>>() {
@Override
public Map<TopicPartition, OffsetAndTimestamp> call() throws Exception {
return inner.offsetsForTimes(timestampsToSearch);
}
}, strategy());
}
开发者ID:epam,项目名称:Lagerta,代码行数:11,代码来源:ConsumerProxyRetry.java
示例7: getOffsetsByTimes
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
long timeout) {
Map<TopicPartition, OffsetData> offsetData = retrieveOffsetsByTimes(timestampsToSearch, timeout, true);
HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(offsetData.size());
for (Map.Entry<TopicPartition, OffsetData> entry : offsetData.entrySet()) {
OffsetData data = entry.getValue();
if (data == null)
offsetsByTimes.put(entry.getKey(), null);
else
offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(data.offset, data.timestamp));
}
return offsetsByTimes;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:Fetcher.java
示例8: offsetsForTimes
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch) {
return consumer.offsetsForTimes(timestampsToSearch);
}
开发者ID:opentracing-contrib,项目名称:java-kafka-client,代码行数:6,代码来源:TracingKafkaConsumer.java
示例9: offsetsForTimes
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch) {
return activeConsumer().offsetsForTimes(timestampsToSearch);
}
开发者ID:epam,项目名称:Lagerta,代码行数:5,代码来源:ConsumerForTests.java
示例10: offsetsForTimes
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch) {
return Collections.emptyMap();
}
开发者ID:epam,项目名称:Lagerta,代码行数:5,代码来源:ConsumerAdapter.java
示例11: offsetsForTimes
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
return _kafkaConsumer.offsetsForTimes(timestampsToSearch);
}
开发者ID:becketqin,项目名称:likafka-clients,代码行数:5,代码来源:LiKafkaConsumerImpl.java
示例12: testOffsetsForTimes
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
@Test
public void testOffsetsForTimes(TestContext ctx) throws Exception {
String topicName = "testOffsetsForTimes";
String consumerId = topicName;
Async batch = ctx.async();
AtomicInteger index = new AtomicInteger();
int numMessages = 1000;
long beforeProduce = System.currentTimeMillis();
kafkaCluster.useTo().produceStrings(numMessages, batch::complete, () ->
new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement()));
batch.awaitSuccess(20000);
long produceDuration = System.currentTimeMillis() - beforeProduce;
Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Context context = vertx.getOrCreateContext();
consumer = createConsumer(context, config);
consumer.exceptionHandler(ctx::fail);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
// Test contains two sub-tests
Async done = ctx.async(2);
consumer.handler(handler -> {
// nothing to do in this test
});
consumer.subscribe(Collections.singleton(topicName), ctx.asyncAssertSuccess(subscribeRes -> {
// search by timestamp
// take timestamp BEFORE start of ingestion and add half of the ingestion duration to it
long searchTimestamp = beforeProduce + (produceDuration / 2);
consumer.offsetsForTimes(Collections.singletonMap(topicPartition, searchTimestamp), ctx.asyncAssertSuccess(offsetAndTimestamps -> {
OffsetAndTimestamp offsetAndTimestamp = offsetAndTimestamps.get(topicPartition);
ctx.assertEquals(1, offsetAndTimestamps.size());
// Offset must be somewhere between beginningOffset and endOffset
ctx.assertTrue(offsetAndTimestamp.offset() >= 0L && offsetAndTimestamp.offset() <= (long) numMessages,
"Invalid offset 0 <= " + offsetAndTimestamp.offset() + " <= " + numMessages);
// Timestamp of returned offset must be at >= searchTimestamp
ctx.assertTrue(offsetAndTimestamp.timestamp() >= searchTimestamp);
done.countDown();
}));
consumer.offsetsForTimes(topicPartition, searchTimestamp, ctx.asyncAssertSuccess(offsetAndTimestamp -> {
// Offset must be somewhere between beginningOffset and endOffset
ctx.assertTrue(offsetAndTimestamp.offset() >= 0L && offsetAndTimestamp.offset() <= (long) numMessages,
"Invalid offset 0 <= " + offsetAndTimestamp.offset() + " <= " + numMessages);
// Timestamp of returned offset must be at >= searchTimestamp
ctx.assertTrue(offsetAndTimestamp.timestamp() >= searchTimestamp);
done.countDown();
}));
}));
}
开发者ID:vert-x3,项目名称:vertx-kafka-client,代码行数:55,代码来源:ConsumerTestBase.java
示例13: offsetsForTimes
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; //导入依赖的package包/类
/**
* Look up the offsets for the given partitions by timestamp.
* @param topicPartitionTimestamps A map with pairs of (TopicPartition, Timestamp).
* @param handler handler called on operation completed
*/
void offsetsForTimes(Map<TopicPartition, Long> topicPartitionTimestamps, Handler<AsyncResult<Map<TopicPartition, OffsetAndTimestamp>>> handler);
开发者ID:vert-x3,项目名称:vertx-kafka-client,代码行数:7,代码来源:KafkaReadStream.java
注:本文中的org.apache.kafka.clients.consumer.OffsetAndTimestamp类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论