本文整理汇总了Java中storm.kafka.bolt.KafkaBolt类的典型用法代码示例。如果您正苦于以下问题:Java KafkaBolt类的具体用法?Java KafkaBolt怎么用?Java KafkaBolt使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KafkaBolt类属于storm.kafka.bolt包,在下文中一共展示了KafkaBolt类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: configureKafkaBolt
import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
private void configureKafkaBolt(TopologyBuilder builder, Config config) {
String topic = topologyConfig.getProperty("kafka.topic");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
props.put("metadata.broker.list", brokerUrl);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config.setMaxSpoutPending(20);
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>().withTopicSelector(new DefaultTopicSelector(topic))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>("key", "log"));
builder.setBolt("KafkaBolt", kafkaBolt, 3).shuffleGrouping(SPOUT_ID).setDebug(DEBUG);
}
开发者ID:desp0916,项目名称:LearnStorm,代码行数:15,代码来源:ApLogGenerator.java
示例2: buildProducerTopology
import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
/**
* A topology that produces random sentences using {@link RandomSentenceSpout} and
* publishes the sentences using a KafkaBolt to kafka "test" topic.
*
* @return the storm topology
*/
public StormTopology buildProducerTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 2);
/**
* The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField
* so that this gets written out as the message in the kafka topic.
*/
KafkaBolt bolt = new KafkaBolt()
.withTopicSelector(new DefaultTopicSelector(KAFKA_TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word"));
builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
return builder.createTopology();
}
开发者ID:desp0916,项目名称:LearnStorm,代码行数:20,代码来源:TridentKafkaWordCount.java
示例3: getProducerConfig
import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
/**
* Returns the storm config for the topology that publishes sentences to kafka "test" topic using a kafka bolt.
* The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt.
*
* @return the topology config
*/
public Config getProducerConfig() {
Config conf = new Config();
conf.setMaxSpoutPending(20);
Properties props = new Properties();
props.put("metadata.broker.list", brokerUrl);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
return conf;
}
开发者ID:desp0916,项目名称:LearnStorm,代码行数:20,代码来源:TridentKafkaWordCount.java
示例4: getTridentKafkaStateFactory
import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
public static TridentKafkaStateFactory getTridentKafkaStateFactory(String topicName, String brokerList, String keyField, String messageField, Map topologyConfig) {
Properties props = new Properties();
props.put("metadata.broker.list", brokerList);
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
topologyConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
return new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(topicName))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper(keyField, messageField));
}
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:12,代码来源:TridentConnectorUtil.java
示例5: getKafkaBolt
import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
public static KafkaBolt<String, String> getKafkaBolt(String topicName, String brokerList, String keyField,
String messageField, Map topologyConfig) {
Properties props = new Properties();
props.put("metadata.broker.list", brokerList);
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
topologyConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
return new KafkaBolt<String, String>()
.withTopicSelector(new DefaultTopicSelector(topicName))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>(keyField, messageField));
}
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:12,代码来源:ConnectorUtil.java
示例6: main
import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
/**
*
* This example is very dangerous to the consistency of your bank accounts. Guess why, or read the
* tutorial.
*
* @throws AlreadyAliveException
* @throws InvalidTopologyException
* @throws AuthorizationException
*/
public static void main(String... args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
// starting to build topology
TopologyBuilder builder = new TopologyBuilder();
// Kafka as a spout
builder.setSpout(IDs.kafkaSpout, new KafkaSpoutBuilder(Conf.zookeeper, Conf.inputTopic).build());
// bolt to println data
builder.setBolt(IDs.printlnBolt, new PrintlnBolt()).shuffleGrouping(IDs.kafkaSpout);
// bolt to perform transactions and simulate bank accounts
builder.setBolt(IDs.userAccountBolt, new BankAccountBolt()).shuffleGrouping(IDs.kafkaSpout);
// Kafka as a bolt -- sending messages to the output topic
KafkaBolt<Object, Object> bolt = new KafkaBolt<>().withTopicSelector(new DefaultTopicSelector(Conf.outputTopic))
.withTupleToKafkaMapper(new TransactionTupleToKafkaMapper());
builder.setBolt(IDs.kafkaBolt, bolt).shuffleGrouping(IDs.userAccountBolt);
// submit topolody to local cluster
new LocalCluster().submitTopology(IDs.kafkaAccountsTopology, topologyConfig(), builder.createTopology());
// wait a while, then simulate random transaction stream to Kafka
Sleep.seconds(5);
KafkaProduceExample.start(2000);
}
开发者ID:dzikowski,项目名称:simple-kafka-storm-java,代码行数:37,代码来源:KafkaStormExample.java
示例7: getKafkaConfig
import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
private Map getKafkaConfig(Map options) {
Map kafkaConfig = new HashMap();
Map brokerConfig = new HashMap();
String brokers = (String) Utils.get(options, BROKER_LIST, "localhost:9092");
String topic = (String) Utils.get(options, TOPIC, KafkaUtils.DEFAULT_TOPIC);
brokerConfig.put("metadata.broker.list", brokers);
brokerConfig.put("serializer.class", "kafka.serializer.StringEncoder");
brokerConfig.put("key.serializer.class", "kafka.serializer.StringEncoder");
brokerConfig.put("request.required.acks", "1");
kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig);
kafkaConfig.put(KafkaBolt.TOPIC, topic);
return kafkaConfig;
}
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:14,代码来源:KafkaProducer.java
示例8: initializeKafkaBolt
import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
private boolean initializeKafkaBolt(String name) {
try {
String messageUpstreamComponent = messageComponents
.get(messageComponents.size() - 1);
System.out.println("[OpenSOC] ------" + name
+ " is initializing from " + messageUpstreamComponent);
Map<String, String> kafka_broker_properties = new HashMap<String, String>();
kafka_broker_properties.put("zk.connect",
config.getString("kafka.zk"));
kafka_broker_properties.put("metadata.broker.list",
config.getString("kafka.br"));
kafka_broker_properties.put("serializer.class",
"com.opensoc.json.serialization.JSONKafkaSerializer");
kafka_broker_properties.put("key.serializer.class",
"kafka.serializer.StringEncoder");
String output_topic = config.getString("bolt.kafka.topic");
conf.put("kafka.broker.properties", kafka_broker_properties);
conf.put("topic", output_topic);
builder.setBolt(name, new KafkaBolt<String, JSONObject>(),
config.getInt("bolt.kafka.parallelism.hint"))
.shuffleGrouping(messageUpstreamComponent, "message")
.setNumTasks(config.getInt("bolt.kafka.num.tasks"));
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
return true;
}
开发者ID:OpenSOC,项目名称:opensoc-streaming,代码行数:37,代码来源:TopologyRunner.java
注:本文中的storm.kafka.bolt.KafkaBolt类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论