• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java KafkaSpout类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.storm.kafka.KafkaSpout的典型用法代码示例。如果您正苦于以下问题:Java KafkaSpout类的具体用法?Java KafkaSpout怎么用?Java KafkaSpout使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



KafkaSpout类属于org.apache.storm.kafka包,在下文中一共展示了KafkaSpout类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String args[]) throws InterruptedException {
	Config config = new Config();
	config.setNumWorkers(3);
	TopologyBuilder topologyBuilder = new TopologyBuilder();
	
	String zkConnString = "localhost:2181";
	String topicName = "sensor-data";
	
	BrokerHosts hosts = new ZkHosts(zkConnString);
	SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName , "/" + topicName, UUID.randomUUID().toString());
	spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

	KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
	topologyBuilder.setSpout("spout", kafkaSpout, 1);
       topologyBuilder.setBolt("es-bolt", new ElasticSearchBolt(), 1).shuffleGrouping("spout");
       
       LocalCluster cluster = new LocalCluster();
       cluster.submitTopology("storm-es-example", config, topologyBuilder.createTopology());
}
 
开发者ID:PacktPublishing,项目名称:Practical-Real-time-Processing-and-Analytics,代码行数:20,代码来源:SensorTopology.java


示例2: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
	Config config = new Config();
	config.setNumWorkers(3);
	TopologyBuilder topologyBuilder = new TopologyBuilder();
	
	String zkConnString = "localhost:2181";
	String topicName = "storm-diy";
	
	BrokerHosts hosts = new ZkHosts(zkConnString);
	SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName , "/" + topicName, UUID.randomUUID().toString());
	spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

	KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
	topologyBuilder.setSpout("spout", kafkaSpout, 1);
	topologyBuilder.setBolt("parser", new ParseAndUsageBolt(), 1).shuffleGrouping("spout");
	topologyBuilder.setBolt("usageCassandra", new UsageCassandraBolt("localhost", "usage"), 1).shuffleGrouping("parser", "usagestream");
	topologyBuilder.setBolt("tdrCassandra", new TDRCassandraBolt("localhost", "tdr"), 1).shuffleGrouping("parser", "tdrstream");
	
	LocalCluster cluster = new LocalCluster();
       cluster.submitTopology("storm-diy", config, topologyBuilder.createTopology());
}
 
开发者ID:PacktPublishing,项目名称:Practical-Real-time-Processing-and-Analytics,代码行数:22,代码来源:TelecomProcessorTopology.java


示例3: getTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {

  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int viewBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_VIEW_BOLT_NUM);
  final int cntBoltNum = BenchmarkUtils.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);

  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
          config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(VIEW_ID, new PageViewBolt(Item.URL, Item.ONE), viewBoltNum)
         .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(COUNT_ID, new WordCount.Count(), cntBoltNum)
          .fieldsGrouping(VIEW_ID, new Fields(Item.URL.toString()));
  return builder.createTopology();
}
 
开发者ID:MBtech,项目名称:stormbenchmark,代码行数:19,代码来源:PageViewCount.java


示例4: getTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {
  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int pvBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_PV_BOLT_NUM);
  final int filterBoltNum = BenchmarkUtils.getInt(config, FILTER_NUM, DEFAULT_FILTER_BOLT_NUM);
  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
          config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(VIEW_ID, new PageViewBolt(Item.STATUS, Item.ALL), pvBoltNum)
          .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(FILTER_ID, new FilterBolt<Integer>(404), filterBoltNum)
          .fieldsGrouping(VIEW_ID, new Fields(Item.STATUS.toString()));
  return builder.createTopology();
}
 
开发者ID:MBtech,项目名称:stormbenchmark,代码行数:17,代码来源:DataClean.java


示例5: getTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {

  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int matBoltNum = BenchmarkUtils.getInt(config, FM_NUM, DEFAULT_MAT_BOLT_NUM);
  final int cntBoltNum = BenchmarkUtils.getInt(config, CM_NUM, DEFAULT_CNT_BOLT_NUM);
  final String ptnString = (String) Utils.get(config, PATTERN_STRING, DEFAULT_PATTERN_STR);

  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(FM_ID, new FindMatchingSentence(ptnString), matBoltNum)
          .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(CM_ID, new CountMatchingSentence(), cntBoltNum)
          .fieldsGrouping(FM_ID, new Fields(FindMatchingSentence.FIELDS));

  return builder.createTopology();
}
 
开发者ID:MBtech,项目名称:stormbenchmark,代码行数:20,代码来源:Grep.java


示例6: getTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {

  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int spBoltNum = BenchmarkUtils.getInt(config, SPLIT_NUM, DEFAULT_SP_BOLT_NUM);
  final int rcBoltNum = BenchmarkUtils.getInt(config, COUNTER_NUM, DEFAULT_RC_BOLT_NUM);
  final int windowLength = BenchmarkUtils.getInt(config, WINDOW_LENGTH,
          RollingBolt.DEFAULT_SLIDING_WINDOW_IN_SECONDS);
  final int emitFreq = BenchmarkUtils.getInt(config, EMIT_FREQ,
          RollingBolt.DEFAULT_EMIT_FREQUENCY_IN_SECONDS);

  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
          config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();

  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(SPLIT_ID, new WordCount.SplitSentence(), spBoltNum)
          .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(COUNTER_ID, new RollingCountBolt(windowLength, emitFreq), rcBoltNum)
          .fieldsGrouping(SPLIT_ID, new Fields(WordCount.SplitSentence.FIELDS));
  return builder.createTopology();
}
 
开发者ID:MBtech,项目名称:stormbenchmark,代码行数:24,代码来源:RollingCountKafka.java


示例7: getTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology getTopology(Config config) {

  final int spoutNum = BenchmarkUtils.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
  final int pvBoltNum = BenchmarkUtils.getInt(config, VIEW_NUM, DEFAULT_PV_BOLT_NUM);
  final int uvBoltNum = BenchmarkUtils.getInt(config, UNIQUER_NUM, DEFAULT_UV_BOLT_NUM);
  final int winLen = BenchmarkUtils.getInt(config, WINDOW_LENGTH, DEFAULT_WINDOW_LENGTH_IN_SEC);
  final int emitFreq = BenchmarkUtils.getInt(config, EMIT_FREQ, DEFAULT_EMIT_FREQ_IN_SEC);
  spout = new KafkaSpout(KafkaUtils.getSpoutConfig(
          config, new SchemeAsMultiScheme(new StringScheme())));

  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_ID, spout, spoutNum);
  builder.setBolt(VIEW_ID, new PageViewBolt(Item.URL, Item.USER), pvBoltNum)
          .localOrShuffleGrouping(SPOUT_ID);
  builder.setBolt(UNIQUER_ID, new UniqueVisitorBolt(winLen, emitFreq), uvBoltNum)
          .fieldsGrouping(VIEW_ID, new Fields(Item.URL.toString()));
  return builder.createTopology();
}
 
开发者ID:MBtech,项目名称:stormbenchmark,代码行数:20,代码来源:UniqueVisitor.java


示例8: createTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology createTopology() {
    LOGGER.info("Creating OpenTSDB topology");
    TopologyBuilder tb = new TopologyBuilder();

    final String topic = config.getKafkaOtsdbTopic();
    final String spoutId = topic + "-spout";
    final String boltId = topic + "-bolt";
    checkAndCreateTopic(topic);

    KafkaSpout kafkaSpout = createKafkaSpout(topic, spoutId);
    tb.setSpout(spoutId, kafkaSpout);

    tb.setBolt(boltId, new OpenTSDBFilterBolt())
            .shuffleGrouping(spoutId);

    OpenTsdbClient.Builder tsdbBuilder = OpenTsdbClient
            .newBuilder(config.getOpenTsDBHosts())
            .sync(config.getOpenTsdbTimeout())
            .returnDetails();
    OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(tsdbBuilder, TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER)
            .withBatchSize(10)
            .withFlushInterval(2)
            .failTupleForFailedMetrics();
    tb.setBolt("opentsdb", openTsdbBolt, config.getParallelism())
            .shuffleGrouping(boltId);

    return tb.createTopology();
}
 
开发者ID:telstra,项目名称:open-kilda,代码行数:30,代码来源:OpenTSDBTopology.java


示例9: createTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
@Override
public StormTopology createTopology() {
    logger.info("Creating Topology: {}", topologyName);

    final Integer parallelism = config.getParallelism();
    TopologyBuilder builder = new TopologyBuilder();

    final String kafkaSpoutId = StatsComponentType.STATS_OFS_KAFKA_SPOUT.toString();
    KafkaSpout kafkaSpout = createKafkaSpout(config.getKafkaStatsTopic(), kafkaSpoutId);
    builder.setSpout(kafkaSpoutId, kafkaSpout, parallelism);

    SpeakerBolt speakerBolt = new SpeakerBolt();
    final String statsOfsBolt = StatsComponentType.STATS_OFS_BOLT.toString();
    builder.setBolt(statsOfsBolt, speakerBolt, parallelism)
            .shuffleGrouping(kafkaSpoutId);

    builder.setBolt(PORT_STATS_METRIC_GEN.name(), new PortMetricGenBolt(), parallelism)
            .fieldsGrouping(statsOfsBolt, StatsStreamType.PORT_STATS.toString(), fieldMessage);
    builder.setBolt(METER_CFG_STATS_METRIC_GEN.name(), new MeterConfigMetricGenBolt(), parallelism)
            .fieldsGrouping(statsOfsBolt, StatsStreamType.METER_CONFIG_STATS.toString(), fieldMessage);
    builder.setBolt(FLOW_STATS_METRIC_GEN.name(), new FlowMetricGenBolt(), parallelism)
            .fieldsGrouping(statsOfsBolt, StatsStreamType.FLOW_STATS.toString(), fieldMessage);

    final String openTsdbTopic = config.getKafkaOtsdbTopic();
    checkAndCreateTopic(openTsdbTopic);
    builder.setBolt("stats-opentsdb", createKafkaBolt(openTsdbTopic))
            .shuffleGrouping(PORT_STATS_METRIC_GEN.name())
            .shuffleGrouping(METER_CFG_STATS_METRIC_GEN.name())
            .shuffleGrouping(FLOW_STATS_METRIC_GEN.name());

    createHealthCheckHandler(builder, ServiceType.STATS_TOPOLOGY.getId());

    return builder.createTopology();
}
 
开发者ID:telstra,项目名称:open-kilda,代码行数:35,代码来源:StatsTopology.java


示例10: getPirkTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
/***
 * Creates Pirk topology: KafkaSpout -> PartitionDataBolt -> EncRowCalcBolt -> EncColMultBolt -> OutputBolt Requires KafkaConfig to initialize KafkaSpout.
 *
 * @param kafkaConfig
 * @return
 */
public static StormTopology getPirkTopology(SpoutConfig kafkaConfig)
{
  // Create spout and bolts
  KafkaSpout spout = new KafkaSpout(kafkaConfig);
  PartitionDataBolt partitionDataBolt = new PartitionDataBolt();
  EncRowCalcBolt ercbolt = new EncRowCalcBolt();
  EncColMultBolt ecmbolt = new EncColMultBolt();
  OutputBolt outputBolt = new OutputBolt();

  // Build Storm topology
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(StormConstants.SPOUT_ID, spout, spoutParallelism);

  builder.setBolt(StormConstants.PARTITION_DATA_BOLT_ID, partitionDataBolt, partitionDataBoltParallelism).fieldsGrouping(StormConstants.SPOUT_ID,
      new Fields(StormConstants.HASH_FIELD));

  // TODO: Decide whether to use Resource Aware Scheduler. (If not, get rid of b2 and b3).
  BoltDeclarer b2 = builder.setBolt(StormConstants.ENCROWCALCBOLT_ID, ercbolt, encrowcalcboltParallelism)
      .fieldsGrouping(StormConstants.PARTITION_DATA_BOLT_ID, new Fields(StormConstants.HASH_FIELD))
      .allGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_SESSION_END)
      .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.ticktuple")));

  // b2.setMemoryLoad(5000);
  // b2.setCPULoad(150.0);

  BoltDeclarer b3 = builder.setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism)
      .fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
          new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.SALT))
      .allGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_FLUSH_SIG);
  // b3.setMemoryLoad(5000);
  // b3.setCPULoad(500.0);

  builder.setBolt(StormConstants.OUTPUTBOLT_ID, outputBolt, 1).globalGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_ID);

  return builder.createTopology();
}
 
开发者ID:apache,项目名称:incubator-pirk,代码行数:43,代码来源:PirkTopology.java


示例11: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args)
		throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {

	String topologyName = "TSAS";// 元组名
	// Zookeeper主机地址,会自动选取其中一个
	ZkHosts zkHosts = new ZkHosts("192.168.230.128:2181,192.168.230.129:2181,192.168.230.131:2181");
	String topic = "trademx";
	String zkRoot = "/storm";// storm在Zookeeper上的根路径
	String id = "tsaPro";

	// 创建SpoutConfig对象
	SpoutConfig spontConfig = new SpoutConfig(zkHosts, topic, zkRoot, id);

	TopologyBuilder builder = new TopologyBuilder();
	builder.setSpout("kafka", new KafkaSpout(spontConfig), 2);
	builder.setBolt("AccBolt", new AccBolt()).shuffleGrouping("kafka");
	builder.setBolt("ToDbBolt", new ToDbBolt()).shuffleGrouping("AccBolt");

	Config config = new Config();
	config.setDebug(false);

	if (args.length == 0) { // 本地运行,用于测试
		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology(topologyName, config, builder.createTopology());
		Thread.sleep(1000 * 3600);
		localCluster.killTopology(topologyName);
		localCluster.shutdown();
	} else { // 提交至集群运行
		StormSubmitter.submitTopology(topologyName, config, builder.createTopology());
	}

}
 
开发者ID:monsonlee,项目名称:BigData,代码行数:33,代码来源:StormKafkaProcess.java


示例12: configureKafkaSpout

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public int configureKafkaSpout(TopologyBuilder builder) {
    KafkaSpout kafkaSpout = constructKafkaSpout();

    //int spoutCount = Integer.valueOf(topologyConfig.getProperty("spout.thread.count"));
    //int boltCount = Integer.valueOf(topologyConfig.getProperty("bolt.thread.count"));
    
    int spoutCount = Integer.valueOf(1);
    int boltCount = Integer.valueOf(1);

    builder.setSpout("kafkaSpout", kafkaSpout, spoutCount);
    return boltCount;
}
 
开发者ID:bucaojit,项目名称:RealEstate-Streaming,代码行数:13,代码来源:PhoenixTest.java


示例13: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
	
	if(args.length <1){
		System.out.println("Please mention deployment mode either local or cluster");
		System.exit(1);
	}
	
	String deploymentMode = args[0];
	
	Config config = new Config();
	config.setNumWorkers(3);
	TopologyBuilder topologyBuilder = new TopologyBuilder();
	
	String zkConnString = "localhost:2181";
	String topicName = "vehicle-data";
	String hcHostName = "localhost";
	String hcPort = "5701";
	String esClusterName = "cluster.name";
	String esApplicationName = "my-application";
	String esHostName = "localhost";
	int esPort = 9300;
	
	BrokerHosts hosts = new ZkHosts(zkConnString);
	SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName , "/" + topicName, UUID.randomUUID().toString());
	spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

	KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
	topologyBuilder.setSpout("spout", kafkaSpout, 1);
	topologyBuilder.setBolt("parser", new ParseBolt(), 1).shuffleGrouping("spout");
	topologyBuilder.setBolt("checkAndAlert", new CheckDistanceAndAlertBolt(hcHostName, hcPort), 1).shuffleGrouping("parser","parsedstream");
	topologyBuilder.setBolt("saveTDR", new ElasticSearchBolt("vehicle-tdr", "tdr",esClusterName, esApplicationName,esHostName, esPort),1).shuffleGrouping("parser","parsedstream");
	topologyBuilder.setBolt("generateAlert", new GenerateAlertBolt(hcHostName, hcPort), 1).shuffleGrouping("checkAndAlert", "alertInfo");
	topologyBuilder.setBolt("saveAlert", new ElasticSearchBolt("vehicle-alert", "alert",esClusterName, esApplicationName,esHostName, esPort), 1).shuffleGrouping("generateAlert", "generatedAlertInfo");
	
	LocalCluster cluster = new LocalCluster();
	if (deploymentMode.equalsIgnoreCase("local")) {
		System.out.println("Submitting topology on local");
		cluster.submitTopology(topicName, config, topologyBuilder.createTopology());
	} else {
        try {
        	System.out.println("Submitting topology on cluster");
			StormSubmitter.submitTopology(topicName, config, topologyBuilder.createTopology());
		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
			e.printStackTrace();
		}
	}
}
 
开发者ID:PacktPublishing,项目名称:Practical-Real-time-Processing-and-Analytics,代码行数:48,代码来源:GeoFencingProcessorTopology.java


示例14: createTopology

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
/**
     * {@inheritDoc}
     */
    @Override
    public StormTopology createTopology() throws NameCollisionException {
        logger.info("Creating Topology: {}", topologyName);

        initKafkaTopics();

        Integer parallelism = config.getParallelism();

        TopologyBuilder builder = new TopologyBuilder();
        List<CtrlBoltRef> ctrlTargets = new ArrayList<>();
        BoltDeclarer boltSetup;

        KafkaSpout kafkaSpout;
        /*
         * Receives cache from storage.
         */
        kafkaSpout = createKafkaSpout(config.getKafkaTopoCacheTopic(), SPOUT_ID_COMMON);
        builder.setSpout(SPOUT_ID_COMMON, kafkaSpout, parallelism);

// (carmine) - as part of 0.8 refactor, merged inputs to one topic, so this isn't neccessary
//        /*
//         * Receives cache updates from WFM topology.
//         */
//        kafkaSpout = createKafkaSpout(config.getKafkaTopoCacheTopic(), SPOUT_ID_TOPOLOGY);
//        builder.setSpout(SPOUT_ID_TOPOLOGY, kafkaSpout, parallelism);

        /*
         * Stores network cache.
         */
        CacheBolt cacheBolt = new CacheBolt(config.getDiscoveryTimeout());
        boltSetup = builder.setBolt(BOLT_ID_CACHE, cacheBolt, parallelism)
                .shuffleGrouping(SPOUT_ID_COMMON)
// (carmine) as per above comment, only a single input streamt
//                .shuffleGrouping(SPOUT_ID_TOPOLOGY)
        ;
        ctrlTargets.add(new CtrlBoltRef(BOLT_ID_CACHE, cacheBolt, boltSetup));

        KafkaBolt kafkaBolt;
        /*
         * Sends network events to storage.
         */
        kafkaBolt = createKafkaBolt(config.getKafkaTopoEngTopic());
        builder.setBolt(BOLT_ID_COMMON_OUTPUT, kafkaBolt, parallelism)
                .shuffleGrouping(BOLT_ID_CACHE, StreamType.TPE.toString());

        /*
         * Sends cache dump and reroute requests to WFM topology.
         */
        kafkaBolt = createKafkaBolt(config.getKafkaFlowTopic());
        builder.setBolt(BOLT_ID_TOPOLOGY_OUTPUT, kafkaBolt, parallelism)
                .shuffleGrouping(BOLT_ID_CACHE, StreamType.WFM_DUMP.toString());

        /*
         * Sends requests for ISL to OFE topology.
         */
        KafkaBolt oFEKafkaBolt = createKafkaBolt(config.getKafkaFlowTopic());
        builder.setBolt(BOLD_ID_OFE, oFEKafkaBolt, parallelism)
                .shuffleGrouping(BOLT_ID_CACHE, StreamType.OFE.toString());

        createCtrlBranch(builder, ctrlTargets);
        createHealthCheckHandler(builder, ServiceType.CACHE_TOPOLOGY.getId());

        return builder.createTopology();
    }
 
开发者ID:telstra,项目名称:open-kilda,代码行数:68,代码来源:CacheTopology.java


示例15: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
    String zks = "root2:2181,root4:2181,root5:2181";// default zookeeper configuration
    String topic= "tweetswordtopic3";// default kafka topic configuration
    String zkRoot = "/stormkafka"; // default zookeeper root configuration for storm
    String id = "DStreamTopology";// default application ID

    BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");// default kafka BrokerHosts
    SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    spoutConf.scheme = new SchemeAsMultiScheme(new MyScheme());
    spoutConf.ignoreZkOffsets = true;
    spoutConf.zkServers = Arrays.asList(new String[] {"root2", "root4", "root5"});
    spoutConf.zkPort = 2181;
    //      spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
    spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    KafkaSpout kafkaSpout=new KafkaSpout(spoutConf);

    WordCounterBolt wordCounterBolt=new WordCounterBolt();
    MyAggregatorBolt aggregatorBolt=new MyAggregatorBolt();

    SchedulingTopologyBuilder builder=new SchedulingTopologyBuilder();
    Integer numworkers=Integer.valueOf(args[1]);

    builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout, 9);
    builder.setBalancingScheduling(KAFKA_SPOUT_ID,"word");
    builder.setBolt(WORDCOUNTER_BOLT_ID,wordCounterBolt, 36).fieldsGrouping(SPLITTER_BOLT_ID+builder.getSchedulingNum(), Constraints.nohotFileds, new Fields(Constraints.wordFileds)).shuffleGrouping(SPLITTER_BOLT_ID+builder.getSchedulingNum(), Constraints.hotFileds);
    builder.setBolt(AGGREGATOR_BOLT_ID, aggregatorBolt, 36).fieldsGrouping(WORDCOUNTER_BOLT_ID, new Fields(Constraints.wordFileds));
    //Topology config
    Config config=new Config();
    config.setNumWorkers(numworkers);//config numworkers
    if(args[0].equals("local")){
        LocalCluster localCluster=new LocalCluster();

        localCluster.submitTopology(TOPOLOGY_NAME,config,builder.createTopology());
        Utils.sleep(50*1000);//50s
        localCluster.killTopology(TOPOLOGY_NAME);
        localCluster.shutdown();
    }else {
        StormSubmitter.submitTopology(args[0],config,builder.createTopology());
    }

}
 
开发者ID:DStream-Storm,项目名称:DStream,代码行数:42,代码来源:DStreamTopology.java


示例16: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
	// zookeeper hosts for the Kafka cluster
	BrokerHosts zkHosts = new ZkHosts("localhost:2181");

	// Create the KafkaReadSpout configuartion
	// Second argument is the topic name
	// Third argument is the zookeeper root for Kafka
	// Fourth argument is consumer group id
	SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "dataTopic", "",
			"id7");

	// Specify that the kafka messages are String
	kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

	// We want to consume all the first messages in the topic everytime
	// we run the topology to help in debugging. In production, this
	// property should be false
	kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

	// Now we create the topology
	TopologyBuilder builder = new TopologyBuilder();

	// set the kafka spout class
	builder.setSpout("KafkaReadSpout", new KafkaSpout(kafkaConfig), 1);

	// use "|" instead of "," for field delimiter
	RecordFormat format = new DelimitedRecordFormat()
			.withFieldDelimiter(",");

	// sync the filesystem after every 1k tuples
	SyncPolicy syncPolicy = new CountSyncPolicy(1000);

	// rotate files when they reach 5MB
	FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f,
			Units.MB);

	FileNameFormat fileNameFormatHDFS = new DefaultFileNameFormat()
			.withPath("/hdfs-bolt-output/");

	HdfsBolt hdfsBolt2 = new HdfsBolt().withFsUrl("hdfs://127.0.0.1:8020")
			.withFileNameFormat(fileNameFormatHDFS)
			.withRecordFormat(format).withRotationPolicy(rotationPolicy)
			.withSyncPolicy(syncPolicy);

	//
	builder.setBolt("HDFS2", hdfsBolt2).shuffleGrouping("KafkaReadSpout");

	// create an instance of LocalCluster class for executing topology in
	// local mode.
	LocalCluster cluster = new LocalCluster();
	Config conf = new Config();

	// Submit topology for execution
	cluster.submitTopology("KafkaHadoop", conf, builder.createTopology());

	try {
		// Wait for some time before exiting
		System.out.println("Waiting to consume from kafka");
		Thread.sleep(6000000);
	} catch (Exception exception) {
		System.out.println("Thread interrupted exception : " + exception);
	}

	// kill the KafkaTopology
	cluster.killTopology("KafkaToplogy");

	// shut down the storm test cluster
	cluster.shutdown();

}
 
开发者ID:PacktPublishing,项目名称:Mastering-Apache-Storm,代码行数:71,代码来源:StormHDFSTopology.java


示例17: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
	// zookeeper hosts for the Kafka cluster
	BrokerHosts zkHosts = new ZkHosts("localhost:2181");

	// Create the KafkaSpout configuartion
	// Second argument is the topic name
	// Third argument is the zookeeper root for Kafka
	// Fourth argument is consumer group id
	SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "twitterData", "",
			"id7");

	// Specify that the kafka messages are String
	kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

	// We want to consume all the first messages in the topic everytime
	// we run the topology to help in debugging. In production, this
	// property should be false
	kafkaConfig.startOffsetTime = kafka.api.OffsetRequest
			.EarliestTime();

	// Now we create the topology
	TopologyBuilder builder = new TopologyBuilder();

	// set the kafka spout class
	builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);

	// use "|" instead of "," for field delimiter
	RecordFormat format = new DelimitedRecordFormat()
			.withFieldDelimiter(",");

	// sync the filesystem after every 1k tuples
	SyncPolicy syncPolicy = new CountSyncPolicy(1000);

	// rotate files when they reach 5MB
	FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f,
			Units.MB);

	FileNameFormat fileNameFormatSentiment = new DefaultFileNameFormat()
	.withPath("/sentiment-tweet/");

	HdfsBolt hdfsBolt2 = new HdfsBolt().withFsUrl("hdfs://127.0.0.1:8020")
			.withFileNameFormat(fileNameFormatSentiment).withRecordFormat(format)
			.withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);


	//builder.setBolt("HDFSBolt", hdfsBolt).shuffleGrouping("KafkaSpout");
	builder.setBolt("json", new JSONParsingBolt()).shuffleGrouping("KafkaSpout");
	
	//
	builder.setBolt("sentiment", new SentimentBolt("/home/centos/Desktop/workspace/storm_twitter/src/main/resources/AFINN-111.txt")).shuffleGrouping("json","stream2");

	//
	builder.setBolt("HDFS2", hdfsBolt2).shuffleGrouping("sentiment");

	// create an instance of LocalCluster class for executing topology in
	// local mode.
	LocalCluster cluster = new LocalCluster();
	Config conf = new Config();

	// Submit topology for execution
	cluster.submitTopology("KafkaToplogy", conf, builder.createTopology());

	try {
		// Wait for some time before exiting
		System.out.println("Waiting to consume from kafka");
		Thread.sleep(6000000);
	} catch (Exception exception) {
		System.out.println("Thread interrupted exception : " + exception);
	}

	// kill the KafkaTopology
	cluster.killTopology("KafkaToplogy");

	// shut down the storm test cluster
	cluster.shutdown();

}
 
开发者ID:PacktPublishing,项目名称:Mastering-Apache-Storm,代码行数:78,代码来源:StormHDFSTopology.java


示例18: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) {
	try {
		// zookeeper hosts for the Kafka cluster
		BrokerHosts zkHosts = new ZkHosts("10.191.208.89:2183");

		// Create the KafkaSpout configuartion
		// Second argument is the topic name
		// Third argument is the zookeepr root for Kafka
		// Fourth argument is consumer group id
		SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "new_topic", "",
				"id1");

		// Specify that the kafka messages are String
		// We want to consume all the first messages in the topic everytime
		// we run the topology to help in debugging. In production, this
		// property should be false
		kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		kafkaConfig.startOffsetTime = kafka.api.OffsetRequest
				.EarliestTime();

		// Now we create the topology
		TopologyBuilder builder = new TopologyBuilder();

		// set the kafka spout class
		builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 2);

		// set the word and sentence bolt class
		builder.setBolt("WordBolt", new WordBolt(), 1).globalGrouping(
				"KafkaSpout");
		builder.setBolt("SentenceBolt", new SentenceBolt(), 1)
				.globalGrouping("WordBolt");

		// create an instance of LocalCluster class for executing topology
		// in local mode.
		LocalCluster cluster = new LocalCluster();
		Config conf = new Config();
		conf.setDebug(true);
		if (args.length > 0) {
			conf.setNumWorkers(2);
			conf.setMaxSpoutPending(5000);
			StormSubmitter.submitTopology("KafkaToplogy1", conf,
					builder.createTopology());

		} else {
			// Submit topology for execution
			cluster.submitTopology("KafkaToplogy1", conf,
					builder.createTopology());
			System.out.println("called1");
			Thread.sleep(1000000);
			// Wait for sometime before exiting
			System.out.println("Waiting to consume from kafka");

			System.out.println("called2");
			// kill the KafkaTopology
			cluster.killTopology("KafkaToplogy1");
			System.out.println("called3");
			// shutdown the storm test cluster
			cluster.shutdown();
		}

	} catch (Exception exception) {
		System.out.println("Thread interrupted exception : " + exception);
	}
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Apache-Storm,代码行数:65,代码来源:KafkaTopology.java


示例19: main

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
public static void main(String[] args) throws Exception{
    Config config = new Config();
    IRedisDB _redis = RedisDB.getInstance("127.0.0.1", 6379);
    config.setDebug(true);
    config.put("mongodb.ip", "127.0.0.1");
    config.put("mongodb.port", 27017);
    config.put("redis.ip", "127.0.0.1");
    config.put("redis.port", 6379);
    config.put("kafka.properties", "127.0.0.1:9092");
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    String zkConnString = "localhost:2181";
    BrokerHosts brokerHosts = new ZkHosts(zkConnString);

    SpoutConfig eventConfig = new SpoutConfig(brokerHosts, "event", "/event","storm");
    eventConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    eventConfig.startOffsetTime = -1;
    
    SpoutConfig proceedConfig = new SpoutConfig(brokerHosts, "proceed", "/proceed", "storm");
    proceedConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    proceedConfig.startOffsetTime = -1;
    
    SpoutConfig orderConfig = new SpoutConfig(brokerHosts, "order", "/order", "storm");
    orderConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    orderConfig.startOffsetTime = -1;
    
    String topicTrigger = "trigger";
    SpoutConfig triggerConfig = new SpoutConfig(brokerHosts, topicTrigger, "/"+topicTrigger, "storm");
    triggerConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    triggerConfig.startOffsetTime = -1;
    
    String topicStatus = "status";
    SpoutConfig statusConfig = new SpoutConfig(brokerHosts, topicStatus, "/"+topicStatus, "storm");
    statusConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    statusConfig.startOffsetTime = -1;
    
    TopologyBuilder builder = new TopologyBuilder();
    
    builder.setSpout("event-spout", new KafkaSpout(eventConfig));
    builder.setSpout("proceed-spout", new KafkaSpout(proceedConfig));
    builder.setSpout("order-spout", new KafkaSpout(orderConfig));
    builder.setBolt("indexing-bolt", new IndexingBolt()).allGrouping("event-spout").allGrouping("proceed-spout").allGrouping("order-spout");
    builder.setBolt("staging-bolt", new StagingBolt()).fieldsGrouping("indexing-bolt", new Fields("roadMapId"));
    builder.setBolt("calling-trigger-bolt", new CallingTriggerBolt()).fieldsGrouping("staging-bolt", new Fields("roadMapId"));

    builder.setSpout("trigger-spout", new KafkaSpout(triggerConfig));
    builder.setSpout("status-spout", new KafkaSpout(statusConfig));
    builder.setBolt("scheduling-bolt", new SchedulingBolt())
            .shuffleGrouping("trigger-spout");
    builder.setBolt("status-bolt", new StatusBolt())
            .shuffleGrouping("status-spout");
    builder.setBolt("executing-bolt", new ExecutingBolt()).fieldsGrouping("scheduling-bolt",new Fields("roadMapId"));
    builder.setBolt("provisioning-bolt", new ProvisioningBolt()).fieldsGrouping("executing-bolt",new Fields("roadMapId"));
    builder.setBolt("calling-feed-bolt", new CallingFeedBolt()).fieldsGrouping("provisioning-bolt",new Fields("roadMapId"));
    _redis.deleteAllNodes();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("TriggerTopology", config, builder.createTopology());

}
 
开发者ID:ENOW-IJI,项目名称:ENOW-server,代码行数:59,代码来源:LocalSubmitter.java


示例20: constructKafkaSpout

import org.apache.storm.kafka.KafkaSpout; //导入依赖的package包/类
private KafkaSpout constructKafkaSpout() {
    KafkaSpout kafkaSpout = new KafkaSpout(constructKafkaSpoutConf());
    return kafkaSpout;
}
 
开发者ID:bucaojit,项目名称:RealEstate-Streaming,代码行数:5,代码来源:PhoenixTest.java



注:本文中的org.apache.storm.kafka.KafkaSpout类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java EventType类代码示例发布时间:2022-05-22
下一篇:
Java Server类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap