本文整理汇总了Java中org.apache.kafka.common.serialization.LongDeserializer类的典型用法代码示例。如果您正苦于以下问题:Java LongDeserializer类的具体用法?Java LongDeserializer怎么用?Java LongDeserializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
LongDeserializer类属于org.apache.kafka.common.serialization包,在下文中一共展示了LongDeserializer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: worker
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
@Override
public KafkaConsumer<Long, byte[]> worker() {
Properties props = AbstractKafkaClient.configBuilder()//
.put(BOOTSTRAP_SERVERS_CONFIG, bootstrap)//
.put(GROUP_ID_CONFIG, MallConstants.ORDER_GROUP)//
.put(ENABLE_AUTO_COMMIT_CONFIG, true)//
.put(MAX_POLL_RECORDS_CONFIG, "100")//
.put(SESSION_TIMEOUT_MS_CONFIG, "30000")//
.put(FETCH_MIN_BYTES_CONFIG, 1)//
.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS)//
.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())//
.put(VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName())//
.build();
this.worker = new KafkaConsumer<>(props);
this.worker.subscribe(topics);
System.out.printf("started");
return this.worker;
}
开发者ID:jiumao-org,项目名称:wechat-mall,代码行数:21,代码来源:OrderConsumer.java
示例2: createDefaultMessageFormats
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
/**
* Creates default message formats.
*/
private void createDefaultMessageFormats() {
final Map<String, String> defaultFormats = new HashMap<>();
defaultFormats.put("Short", ShortDeserializer.class.getName());
defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName());
defaultFormats.put("Bytes", BytesDeserializer.class.getName());
defaultFormats.put("Double", DoubleDeserializer.class.getName());
defaultFormats.put("Float", FloatDeserializer.class.getName());
defaultFormats.put("Integer", IntegerDeserializer.class.getName());
defaultFormats.put("Long", LongDeserializer.class.getName());
defaultFormats.put("String", StringDeserializer.class.getName());
// Create if needed.
for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) {
MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey());
if (messageFormat == null) {
messageFormat = new MessageFormat();
}
messageFormat.setName(entry.getKey());
messageFormat.setClasspath(entry.getValue());
messageFormat.setJar("n/a");
messageFormat.setDefaultFormat(true);
messageFormatRepository.save(messageFormat);
}
}
开发者ID:SourceLabOrg,项目名称:kafka-webview,代码行数:28,代码来源:DataLoaderConfig.java
示例3: consumeRecords
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
private static void consumeRecords(String bootstrapServers) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "byte-array-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Consumer<Long, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<Long, byte[]> records = consumer.poll(10000);
for (ConsumerRecord<Long, byte[]> record : records)
out.printf(
"key = %s value = %s%n",
record.key(),
new String(record.value()));
consumer.close();
}
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:23,代码来源:ProduceConsumeLongByteArrayRecord.java
示例4: setupConfigsAndUtils
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:JoinIntegrationTest.java
示例5: readResult
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
final String groupId) throws Exception {
if (groupId != null) {
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
groupId,
LongDeserializer.class,
LongDeserializer.class,
new Properties() {
{
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
}
}),
SINGLE_PARTITION_OUTPUT_TOPIC,
numberOfRecords
);
}
// read uncommitted
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
SINGLE_PARTITION_OUTPUT_TOPIC,
numberOfRecords
);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:EosIntegrationTest.java
示例6: main
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.setProperty("bootstrap.servers", args[0]);
props.setProperty("group.id", UUID.randomUUID().toString());
props.setProperty("key.deserializer", LongDeserializer.class.getName());
props.setProperty("value.deserializer", TradeDeserializer.class.getName());
props.setProperty("auto.offset.reset", "earliest");
KafkaConsumer<Long, Trade> consumer = new KafkaConsumer<>(props);
List<String> topics = Arrays.asList(args[1]);
consumer.subscribe(topics);
System.out.println("Subscribed to topics " + topics);
long count = 0;
long start = System.nanoTime();
while (true) {
ConsumerRecords<Long, Trade> poll = consumer.poll(5000);
System.out.println("Partitions in batch: " + poll.partitions());
LongSummaryStatistics stats = StreamSupport.stream(poll.spliterator(), false)
.mapToLong(r -> r.value().getTime()).summaryStatistics();
System.out.println("Oldest record time: " + stats.getMin() + ", newest record: " + stats.getMax());
count += poll.count();
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long rate = (long) ((double) count / elapsed * 1000);
System.out.printf("Total count: %,d in %,dms. Average rate: %,d records/s %n", count, elapsed, rate);
}
}
开发者ID:hazelcast,项目名称:big-data-benchmark,代码行数:27,代码来源:TradeTestConsumer.java
示例7: mkKafkaReadTransform
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
/**
* Creates a consumer with two topics, with 10 partitions each.
* numElements are (round-robin) assigned all the 20 partitions.
*/
private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
int numElements,
int maxNumRecords,
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
List<String> topics = ImmutableList.of("topic_a", "topic_b");
KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
.withBootstrapServers("myServer1:9092,myServer2:9092")
.withTopics(topics)
.withConsumerFactoryFn(new ConsumerFactoryFn(
topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
.withMaxNumRecords(maxNumRecords);
if (timestampFn != null) {
return reader.withTimestampFn(timestampFn);
} else {
return reader;
}
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:KafkaIOTest.java
示例8: testUnboundedSourceWithSingleTopic
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
@Test
public void testUnboundedSourceWithSingleTopic() {
// same as testUnboundedSource, but with single topic
int numElements = 1000;
String topic = "my_topic";
KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
.withBootstrapServers("none")
.withTopic("my_topic")
.withConsumerFactoryFn(new ConsumerFactoryFn(
ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
.withMaxNumRecords(numElements)
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class);
PCollection<Long> input = p
.apply(reader.withoutMetadata())
.apply(Values.<Long>create());
addCountingAsserts(input, numElements);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:KafkaIOTest.java
示例9: testSourceWithExplicitPartitionsDisplayData
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
@Test
public void testSourceWithExplicitPartitionsDisplayData() {
KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
.withBootstrapServers("myServer1:9092,myServer2:9092")
.withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5),
new TopicPartition("test", 6)))
.withConsumerFactoryFn(new ConsumerFactoryFn(
Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(LongDeserializer.class);
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6"));
assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:KafkaIOTest.java
示例10: testInferKeyCoder
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
@Test
public void testInferKeyCoder() {
CoderRegistry registry = CoderRegistry.createDefault();
assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class).getValueCoder()
instanceof VarLongCoder);
assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class).getValueCoder()
instanceof StringUtf8Coder);
assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class).getValueCoder()
instanceof InstantCoder);
assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class).getValueCoder()
instanceof VarLongCoder);
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:KafkaIOTest.java
示例11: worker
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
@Override
public KafkaProducer<Long, byte[]> worker() {
Properties props = AbstractKafkaClient.configBuilder()//
.put(BOOTSTRAP_SERVERS_CONFIG, bootstrap)//
.put(ACKS_CONFIG, "all").put(RETRIES_CONFIG, 3)//
.put(BATCH_SIZE_CONFIG, 16384)//
.put(LINGER_MS_CONFIG, 1)//
.put(BUFFER_MEMORY_CONFIG, 33554432)//
.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())//
.put(VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName())//
.build();
return this.worker = new KafkaProducer<>(props);
}
开发者ID:jiumao-org,项目名称:wechat-mall,代码行数:16,代码来源:OrderProducer.java
示例12: listTopics
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
private Collection<String> listTopics() {
Properties consumerConfig = new Properties() {{
setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:%s", LOCALHOST, kafkaPort));
setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
}};
try (Consumer<Long, Long> consumer = new KafkaConsumer<>(consumerConfig)) {
return consumer.listTopics().keySet();
}
}
开发者ID:epam,项目名称:Lagerta,代码行数:11,代码来源:EmbeddedKafkaRule.java
示例13: shouldCountHelper
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
private void shouldCountHelper() throws Exception {
startStreams();
produceMessages(mockTime.milliseconds());
final List<KeyValue<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
10);
Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
return KStreamAggregationIntegrationTest.compare(o1, o2);
}
});
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1L),
KeyValue.pair("A", 2L),
KeyValue.pair("B", 1L),
KeyValue.pair("B", 2L),
KeyValue.pair("C", 1L),
KeyValue.pair("C", 2L),
KeyValue.pair("D", 1L),
KeyValue.pair("D", 2L),
KeyValue.pair("E", 1L),
KeyValue.pair("E", 2L)
)));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:30,代码来源:KStreamAggregationIntegrationTest.java
示例14: waitUntilAtLeastNumRecordProcessed
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException {
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
config,
topic,
numRecs,
120 * 1000);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:QueryableStateIntegrationTest.java
示例15: run
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
public void run(Configuration configuration, Environment environment) throws Exception {
// INSTRUMENTATION
// Metrics Instrumentation
final CollectorRegistry collectorRegistry = new CollectorRegistry();
collectorRegistry.register(new DropwizardExports(environment.metrics()));
environment.admin()
.addServlet("metrics", new MetricsServlet(collectorRegistry))
.addMapping("/metrics");
final PrometheusMetricsReporter reporter = PrometheusMetricsReporter.newMetricsReporter()
.withCollectorRegistry(collectorRegistry)
.withConstLabel("service", getName())
.build();
// Tracing Instrumentation
final Tracer tracer = getTracer();
final Tracer metricsTracer = io.opentracing.contrib.metrics.Metrics.decorate(tracer, reporter);
GlobalTracer.register(metricsTracer);
final HttpHost httpHost = new HttpHost("tweets-elasticsearch", 9200);
final RestClientBuilder restClientBuilder =
RestClient.builder(httpHost).setHttpClientConfigCallback(new TracingHttpClientConfigCallback(metricsTracer));
final RestClient restClient = restClientBuilder.build();
final ElasticsearchTweetRepository elasticsearchRepository = new ElasticsearchTweetRepository(restClient);
final Properties consumerConfigs = new Properties();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "tweets-kafka:9092");
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, getName());
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
final KafkaConsumer<Long, String> kafkaConsumer = new KafkaConsumer<>(consumerConfigs, new LongDeserializer(), new StringDeserializer());
final TracingKafkaConsumer<Long, String> tracingKafkaConsumer = new TracingKafkaConsumer<>(kafkaConsumer, metricsTracer);
final Runnable kafkaTweetEventConsumer = new KafkaTweetEventConsumer(tracingKafkaConsumer, elasticsearchRepository);
final ExecutorService executorService = environment.lifecycle().executorService("kafka-consumer").build();
executorService.submit(kafkaTweetEventConsumer);
}
开发者ID:jeqo,项目名称:talk-observing-distributed-systems,代码行数:37,代码来源:IndexerServiceApplication.java
示例16: getKafkaProperties
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
private static Properties getKafkaProperties(String brokerUrl, String offsetReset) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", brokerUrl);
props.setProperty("group.id", UUID.randomUUID().toString());
props.setProperty("key.deserializer", LongDeserializer.class.getName());
props.setProperty("value.deserializer", TradeDeserializer.class.getName());
props.setProperty("auto.offset.reset", offsetReset);
props.setProperty("max.poll.records", "32768");
//props.setProperty("metadata.max.age.ms", "5000");
return props;
}
开发者ID:hazelcast,项目名称:big-data-benchmark,代码行数:12,代码来源:JetTradeMonitor.java
示例17: getKafkaProperties
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
private static Properties getKafkaProperties(String brokerUrl, String offsetReset) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", brokerUrl);
props.setProperty("group.id", UUID.randomUUID().toString());
props.setProperty("key.deserializer", LongDeserializer.class.getName());
props.setProperty("value.deserializer", TradeDeserializer.class.getName());
props.setProperty("auto.offset.reset", offsetReset);
props.setProperty("max.poll.records", "32768");
return props;
}
开发者ID:hazelcast,项目名称:big-data-benchmark,代码行数:11,代码来源:FlinkTradeMonitor.java
示例18: getKafkaProperties
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
private static Map<String, Object> getKafkaProperties(String brokerUrl) {
Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", brokerUrl);
props.put("group.id", UUID.randomUUID().toString());
props.put("key.deserializer", LongDeserializer.class);
props.put("value.deserializer", TradeDeserializer.class);
props.put("auto.offset.reset", "latest");
return props;
}
开发者ID:hazelcast,项目名称:big-data-benchmark,代码行数:10,代码来源:SparkTradeMonitor.java
示例19: testConsumerCustomDeserializer
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testConsumerCustomDeserializer() throws Exception {
Binding<?> binding = null;
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
Map<String, String> propertiesToOverride = configurationProperties.getConfiguration();
propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
configurationProperties.setConfiguration(propertiesToOverride);
String testTopicName = "existing" + System.currentTimeMillis();
configurationProperties.setAutoCreateTopics(false);
Binder binder = getBinder(configurationProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
assertTrue("Expected StringDeserializer as a custom key deserializer",
consumerAccessor.getPropertyValue("keyDeserializer") instanceof StringDeserializer);
assertTrue("Expected LongDeserializer as a custom value deserializer",
consumerAccessor.getPropertyValue("valueDeserializer") instanceof LongDeserializer);
}
finally {
if (binding != null) {
binding.unbind();
}
}
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-kafka,代码行数:31,代码来源:KafkaBinderTests.java
示例20: testUnreachableKafkaBrokers
import org.apache.kafka.common.serialization.LongDeserializer; //导入依赖的package包/类
@Test
public void testUnreachableKafkaBrokers() {
// Expect an exception when the Kafka brokers are not reachable on the workers.
// We specify partitions explicitly so that splitting does not involve server interaction.
// Set request timeout to 10ms so that test does not take long.
thrown.expect(Exception.class);
thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
int numElements = 1000;
PCollection<Long> input = p
.apply(KafkaIO.<Integer, Long>read()
.withBootstrapServers("8.8.8.8:9092") // Google public DNS ip.
.withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0)))
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
.updateConsumerProperties(ImmutableMap.<String, Object>of(
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10,
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8,
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8))
.withMaxNumRecords(10)
.withoutMetadata())
.apply(Values.<Long>create());
addCountingAsserts(input, numElements);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:29,代码来源:KafkaIOTest.java
注:本文中的org.apache.kafka.common.serialization.LongDeserializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论