• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java KafkaBolt类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中storm.kafka.bolt.KafkaBolt的典型用法代码示例。如果您正苦于以下问题:Java KafkaBolt类的具体用法?Java KafkaBolt怎么用?Java KafkaBolt使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



KafkaBolt类属于storm.kafka.bolt包,在下文中一共展示了KafkaBolt类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: configureKafkaBolt

import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
private void configureKafkaBolt(TopologyBuilder builder, Config config) {
	String topic = topologyConfig.getProperty("kafka.topic");
	Properties props = new Properties();
	props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
	props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
	props.put("metadata.broker.list", brokerUrl);
	props.put("serializer.class", "kafka.serializer.StringEncoder");
	props.put("request.required.acks", "1");
	config.setMaxSpoutPending(20);
	config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
	KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>().withTopicSelector(new DefaultTopicSelector(topic))
									.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>("key", "log"));
	builder.setBolt("KafkaBolt", kafkaBolt, 3).shuffleGrouping(SPOUT_ID).setDebug(DEBUG);
}
 
开发者ID:desp0916,项目名称:LearnStorm,代码行数:15,代码来源:ApLogGenerator.java


示例2: buildProducerTopology

import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
/**
 * A topology that produces random sentences using {@link RandomSentenceSpout} and
 * publishes the sentences using a KafkaBolt to kafka "test" topic.
 *
 * @return the storm topology
 */
public StormTopology buildProducerTopology() {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 2);
    /**
     * The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField
     * so that this gets written out as the message in the kafka topic.
     */
    KafkaBolt bolt = new KafkaBolt()
            .withTopicSelector(new DefaultTopicSelector(KAFKA_TOPIC))
            .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word"));
    builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
    return builder.createTopology();
}
 
开发者ID:desp0916,项目名称:LearnStorm,代码行数:20,代码来源:TridentKafkaWordCount.java


示例3: getProducerConfig

import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
/**
     * Returns the storm config for the topology that publishes sentences to kafka "test" topic using a kafka bolt.
     * The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt.
     *
     * @return the topology config
     */
    public Config getProducerConfig() {
        Config conf = new Config();
        conf.setMaxSpoutPending(20);
        Properties props = new Properties();
        props.put("metadata.broker.list", brokerUrl);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
//        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
        return conf;
    }
 
开发者ID:desp0916,项目名称:LearnStorm,代码行数:20,代码来源:TridentKafkaWordCount.java


示例4: getTridentKafkaStateFactory

import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
public static TridentKafkaStateFactory getTridentKafkaStateFactory(String topicName, String brokerList, String keyField, String messageField, Map topologyConfig) {
    Properties props = new Properties();
    props.put("metadata.broker.list", brokerList);
    props.put("request.required.acks", "1");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    topologyConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

    return new TridentKafkaStateFactory()
            .withKafkaTopicSelector(new DefaultTopicSelector(topicName))
            .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper(keyField, messageField));
}
 
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:12,代码来源:TridentConnectorUtil.java


示例5: getKafkaBolt

import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
public static KafkaBolt<String, String> getKafkaBolt(String topicName, String brokerList, String keyField,
                                                     String messageField, Map topologyConfig) {
    Properties props = new Properties();
    props.put("metadata.broker.list", brokerList);
    props.put("request.required.acks", "1");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    topologyConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
    return new KafkaBolt<String, String>()
            .withTopicSelector(new DefaultTopicSelector(topicName))
            .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>(keyField, messageField));
}
 
开发者ID:Parth-Brahmbhatt,项目名称:storm-smoke-test,代码行数:12,代码来源:ConnectorUtil.java


示例6: main

import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
/**
 * 
 * This example is very dangerous to the consistency of your bank accounts. Guess why, or read the
 * tutorial.
 * 
 * @throws AlreadyAliveException
 * @throws InvalidTopologyException
 * @throws AuthorizationException
 */
public static void main(String... args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {

	// starting to build topology
	TopologyBuilder builder = new TopologyBuilder();

	// Kafka as a spout
	builder.setSpout(IDs.kafkaSpout, new KafkaSpoutBuilder(Conf.zookeeper, Conf.inputTopic).build());

	// bolt to println data
	builder.setBolt(IDs.printlnBolt, new PrintlnBolt()).shuffleGrouping(IDs.kafkaSpout);

	// bolt to perform transactions and simulate bank accounts
	builder.setBolt(IDs.userAccountBolt, new BankAccountBolt()).shuffleGrouping(IDs.kafkaSpout);

	// Kafka as a bolt -- sending messages to the output topic
	KafkaBolt<Object, Object> bolt = new KafkaBolt<>().withTopicSelector(new DefaultTopicSelector(Conf.outputTopic))
			.withTupleToKafkaMapper(new TransactionTupleToKafkaMapper());
	builder.setBolt(IDs.kafkaBolt, bolt).shuffleGrouping(IDs.userAccountBolt);

	// submit topolody to local cluster
	new LocalCluster().submitTopology(IDs.kafkaAccountsTopology, topologyConfig(), builder.createTopology());

	// wait a while, then simulate random transaction stream to Kafka
	Sleep.seconds(5);
	KafkaProduceExample.start(2000);

}
 
开发者ID:dzikowski,项目名称:simple-kafka-storm-java,代码行数:37,代码来源:KafkaStormExample.java


示例7: getKafkaConfig

import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
private Map getKafkaConfig(Map options) {
  Map kafkaConfig = new HashMap();
  Map brokerConfig = new HashMap();
  String brokers = (String) Utils.get(options, BROKER_LIST, "localhost:9092");
  String topic = (String) Utils.get(options, TOPIC, KafkaUtils.DEFAULT_TOPIC);
  brokerConfig.put("metadata.broker.list", brokers);
  brokerConfig.put("serializer.class", "kafka.serializer.StringEncoder");
  brokerConfig.put("key.serializer.class", "kafka.serializer.StringEncoder");
  brokerConfig.put("request.required.acks", "1");
  kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig);
  kafkaConfig.put(KafkaBolt.TOPIC, topic);
  return kafkaConfig;
}
 
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:14,代码来源:KafkaProducer.java


示例8: initializeKafkaBolt

import storm.kafka.bolt.KafkaBolt; //导入依赖的package包/类
private boolean initializeKafkaBolt(String name) {
	try {

		String messageUpstreamComponent = messageComponents
				.get(messageComponents.size() - 1);

		System.out.println("[OpenSOC] ------" + name
				+ " is initializing from " + messageUpstreamComponent);

		Map<String, String> kafka_broker_properties = new HashMap<String, String>();
		kafka_broker_properties.put("zk.connect",
				config.getString("kafka.zk"));
		kafka_broker_properties.put("metadata.broker.list",
				config.getString("kafka.br"));

		kafka_broker_properties.put("serializer.class",
				"com.opensoc.json.serialization.JSONKafkaSerializer");

		kafka_broker_properties.put("key.serializer.class",
				"kafka.serializer.StringEncoder");

		String output_topic = config.getString("bolt.kafka.topic");

		conf.put("kafka.broker.properties", kafka_broker_properties);
		conf.put("topic", output_topic);

		builder.setBolt(name, new KafkaBolt<String, JSONObject>(),
				config.getInt("bolt.kafka.parallelism.hint"))
				.shuffleGrouping(messageUpstreamComponent, "message")
				.setNumTasks(config.getInt("bolt.kafka.num.tasks"));
	} catch (Exception e) {
		e.printStackTrace();
		System.exit(0);
	}
	return true;
}
 
开发者ID:OpenSOC,项目名称:opensoc-streaming,代码行数:37,代码来源:TopologyRunner.java



注:本文中的storm.kafka.bolt.KafkaBolt类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Formatter类代码示例发布时间:2022-05-22
下一篇:
Java KeyEvent类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap