• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java BoltDeclarer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.storm.topology.BoltDeclarer的典型用法代码示例。如果您正苦于以下问题:Java BoltDeclarer类的具体用法?Java BoltDeclarer怎么用?Java BoltDeclarer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



BoltDeclarer类属于org.apache.storm.topology包,在下文中一共展示了BoltDeclarer类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createCtrlBranch

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
protected void createCtrlBranch(TopologyBuilder builder, List<CtrlBoltRef> targets)
        throws StreamNameCollisionException {
    checkAndCreateTopic(config.getKafkaCtrlTopic());

    org.apache.storm.kafka.KafkaSpout kafkaSpout;
    kafkaSpout = createKafkaSpout(config.getKafkaCtrlTopic(), SPOUT_ID_CTRL);
    builder.setSpout(SPOUT_ID_CTRL, kafkaSpout);

    RouteBolt route = new RouteBolt(getTopologyName());
    builder.setBolt(BOLT_ID_CTRL_ROUTE, route)
            .shuffleGrouping(SPOUT_ID_CTRL);

    KafkaBolt kafkaBolt = createKafkaBolt(config.getKafkaCtrlTopic());
    BoltDeclarer outputSetup = builder.setBolt(BOLT_ID_CTRL_OUTPUT, kafkaBolt)
            .shuffleGrouping(BOLT_ID_CTRL_ROUTE, route.STREAM_ID_ERROR);

    for (CtrlBoltRef ref : targets) {
        String boltId = ref.getBoltId();
        ref.getDeclarer().allGrouping(BOLT_ID_CTRL_ROUTE, route.registerEndpoint(boltId));
        outputSetup.shuffleGrouping(boltId, ref.getBolt().getCtrlStreamId());
    }
}
 
开发者ID:telstra,项目名称:open-kilda,代码行数:23,代码来源:AbstractTopology.java


示例2: setOperatorGrouping

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
private void setOperatorGrouping(IRichOperator operator)
    throws StreamingException
{
    BoltDeclarer bolt = createBoltDeclarer(operator);
    List<String> streams = operator.getInputStream();
    if (streams == null)
    {
        StreamingException exception = new StreamingException(ErrorCode.PLATFORM_INVALID_TOPOLOGY);
        LOG.error("The operator input streaming is null.");
        throw exception;
    }

    for (String streamName : operator.getInputStream())
    {
        GroupInfo groupInfo = operator.getGroupInfo().get(streamName);
        setBoltGrouping(bolt, streamName, groupInfo);
    }
}
 
开发者ID:HuaweiBigData,项目名称:StreamCQL,代码行数:19,代码来源:StormApplication.java


示例3: SetRemoteTopology

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
public static void SetRemoteTopology()
        throws Exception {
    String streamName = (String) conf.get(Config.TOPOLOGY_NAME);
    if (streamName == null) {
        String[] className = Thread.currentThread().getStackTrace()[1].getClassName().split("\\.");
        streamName = className[className.length - 1];
    }
    
    TopologyBuilder builder = new TopologyBuilder();
    
    int spout_Parallelism_hint = Utils.getInt(conf.get(TOPOLOGY_SPOUT_PARALLELISM_HINT), 1);
    int bolt_Parallelism_hint = Utils.getInt(conf.get(TOPOLOGY_BOLT_PARALLELISM_HINT), 2);
    builder.setSpout("spout", new TestSpout(), spout_Parallelism_hint);
    
    BoltDeclarer boltDeclarer = builder.setBolt("bolt", new TestBolt(), bolt_Parallelism_hint);
    // localFirstGrouping is only for jstorm
    // boltDeclarer.localFirstGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
    boltDeclarer.shuffleGrouping("spout");
    // .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
    
    StormSubmitter.submitTopology(streamName, conf, builder.createTopology());
    
}
 
开发者ID:alibaba,项目名称:jstorm,代码行数:24,代码来源:PerformanceTestTopology.java


示例4: getPirkTopology

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
/***
 * Creates Pirk topology: KafkaSpout -> PartitionDataBolt -> EncRowCalcBolt -> EncColMultBolt -> OutputBolt Requires KafkaConfig to initialize KafkaSpout.
 *
 * @param kafkaConfig
 * @return
 */
public static StormTopology getPirkTopology(SpoutConfig kafkaConfig)
{
  // Create spout and bolts
  KafkaSpout spout = new KafkaSpout(kafkaConfig);
  PartitionDataBolt partitionDataBolt = new PartitionDataBolt();
  EncRowCalcBolt ercbolt = new EncRowCalcBolt();
  EncColMultBolt ecmbolt = new EncColMultBolt();
  OutputBolt outputBolt = new OutputBolt();

  // Build Storm topology
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(StormConstants.SPOUT_ID, spout, spoutParallelism);

  builder.setBolt(StormConstants.PARTITION_DATA_BOLT_ID, partitionDataBolt, partitionDataBoltParallelism).fieldsGrouping(StormConstants.SPOUT_ID,
      new Fields(StormConstants.HASH_FIELD));

  // TODO: Decide whether to use Resource Aware Scheduler. (If not, get rid of b2 and b3).
  BoltDeclarer b2 = builder.setBolt(StormConstants.ENCROWCALCBOLT_ID, ercbolt, encrowcalcboltParallelism)
      .fieldsGrouping(StormConstants.PARTITION_DATA_BOLT_ID, new Fields(StormConstants.HASH_FIELD))
      .allGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_SESSION_END)
      .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.ticktuple")));

  // b2.setMemoryLoad(5000);
  // b2.setCPULoad(150.0);

  BoltDeclarer b3 = builder.setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism)
      .fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
          new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.SALT))
      .allGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_FLUSH_SIG);
  // b3.setMemoryLoad(5000);
  // b3.setCPULoad(500.0);

  builder.setBolt(StormConstants.OUTPUTBOLT_ID, outputBolt, 1).globalGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_ID);

  return builder.createTopology();
}
 
开发者ID:apache,项目名称:incubator-pirk,代码行数:43,代码来源:PirkTopology.java


示例5: setBoltGrouping

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
private void setBoltGrouping(BoltDeclarer bolt, String streamName, GroupInfo groupInfo)
    throws StreamingException
{
    if (null == groupInfo)
    {
        setDefaultBoltGrouping(bolt, streamName);
        return;
    }

    DistributeType distribute = groupInfo.getDitributeType();
    switch (distribute)
    {
        case FIELDS:
            Fields fields = new Fields(groupInfo.getFields());
            IRichOperator operator = getOperatorByOutputStreamName(streamName);
            if (operator == null)
            {
                StreamingException exception = new StreamingException(ErrorCode.PLATFORM_INVALID_TOPOLOGY);
                LOG.error("Can't find operator by stream name : {} .", streamName, exception);
                throw exception;
            }
            bolt.fieldsGrouping(operator.getOperatorId(), streamName, fields);
            break;
        case GLOBAL:
            break;
        case LOCALORSHUFFLE:
            break;
        case ALL:
            break;
        case DIRECT:
            break;
        case CUSTOM:
            break;
        case SHUFFLE:
        case NONE:
        default:
            setDefaultBoltGrouping(bolt, streamName);
    }
}
 
开发者ID:HuaweiBigData,项目名称:StreamCQL,代码行数:40,代码来源:StormApplication.java


示例6: setDefaultBoltGrouping

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
private void setDefaultBoltGrouping(BoltDeclarer bolt, String streanName)
    throws StreamingException
{
    IRichOperator operator = getOperatorByOutputStreamName(streanName);
    if (operator == null)
    {
        StreamingException exception = new StreamingException(ErrorCode.PLATFORM_INVALID_TOPOLOGY);
        LOG.error("Can't find operator by stream name : {} .", streanName, exception);
        throw exception;
    }
    bolt.shuffleGrouping(operator.getOperatorId(), streanName);
}
 
开发者ID:HuaweiBigData,项目名称:StreamCQL,代码行数:13,代码来源:StormApplication.java


示例7: CtrlBoltRef

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
public CtrlBoltRef(String boltId, ICtrlBolt bolt, BoltDeclarer declarer) {
    this.boltId = boltId;
    this.bolt = bolt;
    this.declarer = declarer;
}
 
开发者ID:telstra,项目名称:open-kilda,代码行数:6,代码来源:CtrlBoltRef.java


示例8: getDeclarer

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
public BoltDeclarer getDeclarer() {
    return declarer;
}
 
开发者ID:telstra,项目名称:open-kilda,代码行数:4,代码来源:CtrlBoltRef.java


示例9: createTopology

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
/**
     * {@inheritDoc}
     */
    @Override
    public StormTopology createTopology() throws NameCollisionException {
        logger.info("Creating Topology: {}", topologyName);

        initKafkaTopics();

        Integer parallelism = config.getParallelism();

        TopologyBuilder builder = new TopologyBuilder();
        List<CtrlBoltRef> ctrlTargets = new ArrayList<>();
        BoltDeclarer boltSetup;

        KafkaSpout kafkaSpout;
        /*
         * Receives cache from storage.
         */
        kafkaSpout = createKafkaSpout(config.getKafkaTopoCacheTopic(), SPOUT_ID_COMMON);
        builder.setSpout(SPOUT_ID_COMMON, kafkaSpout, parallelism);

// (carmine) - as part of 0.8 refactor, merged inputs to one topic, so this isn't neccessary
//        /*
//         * Receives cache updates from WFM topology.
//         */
//        kafkaSpout = createKafkaSpout(config.getKafkaTopoCacheTopic(), SPOUT_ID_TOPOLOGY);
//        builder.setSpout(SPOUT_ID_TOPOLOGY, kafkaSpout, parallelism);

        /*
         * Stores network cache.
         */
        CacheBolt cacheBolt = new CacheBolt(config.getDiscoveryTimeout());
        boltSetup = builder.setBolt(BOLT_ID_CACHE, cacheBolt, parallelism)
                .shuffleGrouping(SPOUT_ID_COMMON)
// (carmine) as per above comment, only a single input streamt
//                .shuffleGrouping(SPOUT_ID_TOPOLOGY)
        ;
        ctrlTargets.add(new CtrlBoltRef(BOLT_ID_CACHE, cacheBolt, boltSetup));

        KafkaBolt kafkaBolt;
        /*
         * Sends network events to storage.
         */
        kafkaBolt = createKafkaBolt(config.getKafkaTopoEngTopic());
        builder.setBolt(BOLT_ID_COMMON_OUTPUT, kafkaBolt, parallelism)
                .shuffleGrouping(BOLT_ID_CACHE, StreamType.TPE.toString());

        /*
         * Sends cache dump and reroute requests to WFM topology.
         */
        kafkaBolt = createKafkaBolt(config.getKafkaFlowTopic());
        builder.setBolt(BOLT_ID_TOPOLOGY_OUTPUT, kafkaBolt, parallelism)
                .shuffleGrouping(BOLT_ID_CACHE, StreamType.WFM_DUMP.toString());

        /*
         * Sends requests for ISL to OFE topology.
         */
        KafkaBolt oFEKafkaBolt = createKafkaBolt(config.getKafkaFlowTopic());
        builder.setBolt(BOLD_ID_OFE, oFEKafkaBolt, parallelism)
                .shuffleGrouping(BOLT_ID_CACHE, StreamType.OFE.toString());

        createCtrlBranch(builder, ctrlTargets);
        createHealthCheckHandler(builder, ServiceType.CACHE_TOPOLOGY.getId());

        return builder.createTopology();
    }
 
开发者ID:telstra,项目名称:open-kilda,代码行数:68,代码来源:CacheTopology.java


示例10: createTopology

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
/**
 * The best place to look for detailed design information regarding this topologies
 * interactions is to look at docs/design/usecase/network-discovery.md
 *
 * At a high level, it receives input from the speaker, and sends output to the
 * topology engine.
 *
 * @return
 * @throws StreamNameCollisionException
 */
public StormTopology createTopology() throws StreamNameCollisionException {
    logger.debug("Building Topology - " + this.getClass().getSimpleName());

    String kafkaTopoDiscoTopic = config.getKafkaTopoDiscoTopic();
    String kafkaTopoEngTopic = config.getKafkaTopoEngTopic();
    String kafkaSpeakerTopic = config.getKafkaSpeakerTopic();

    checkAndCreateTopic(kafkaTopoDiscoTopic);
    checkAndCreateTopic(kafkaTopoEngTopic);

    TopologyBuilder builder = new TopologyBuilder();
    List<CtrlBoltRef> ctrlTargets = new ArrayList<>();

    String spoutName = SPOUT_ID_INPUT;
    String boltName = BOLT_ID;

    builder.setSpout(spoutName, createKafkaSpout(kafkaTopoDiscoTopic, spoutName));

    IStatefulBolt bolt = new OFELinkBolt(config);

    // TODO: resolve the comments below; are there any state issues?
    // NB: with shuffleGrouping, we can't maintain state .. would need to parse first
    //      just to pull out switchID.
    // (crimi) - not sure I agree here .. state can be maintained, albeit distributed.
    //
    BoltDeclarer bd = builder.setBolt(boltName, bolt, config.getParallelism())
            .shuffleGrouping(spoutName);

    builder.setBolt(kafkaTopoEngTopic, createKafkaBolt(kafkaTopoEngTopic),
            config.getParallelism()).shuffleGrouping(boltName, kafkaTopoEngTopic);
    builder.setBolt(kafkaSpeakerTopic, createKafkaBolt(kafkaSpeakerTopic),
            config.getParallelism()).shuffleGrouping(boltName, kafkaSpeakerTopic);

    // TODO: verify this ctrlTarget after refactoring.
    ctrlTargets.add(new CtrlBoltRef(boltName, (ICtrlBolt) bolt, bd));
    createCtrlBranch(builder, ctrlTargets);
    // TODO: verify WFM_TOPOLOGY health check
    createHealthCheckHandler(builder, ServiceType.WFM_TOPOLOGY.getId());

    return builder.createTopology();
}
 
开发者ID:telstra,项目名称:open-kilda,代码行数:52,代码来源:OFEventWFMTopology.java


示例11: createBoltDeclarer

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
private BoltDeclarer createBoltDeclarer(IRichOperator operator)
    throws StreamingException
{
    IRichBolt bolt = ComponentCreator.createBolt(operator, stormConf);
    return builder.setBolt(operator.getOperatorId(), bolt, operator.getParallelNumber());
}
 
开发者ID:HuaweiBigData,项目名称:StreamCQL,代码行数:7,代码来源:StormApplication.java


示例12: buildStreams

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
protected void buildStreams(EcoExecutionContext executionContext, TopologyBuilder builder,
                            ObjectBuilder objectBuilder)
    throws IllegalAccessException, InstantiationException, ClassNotFoundException,
    NoSuchFieldException, InvocationTargetException {
  EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition();
  Map<String, ComponentStream> componentStreams = new HashMap<>();

  HashMap<String, BoltDeclarer> declarers = new HashMap<>();
  for (StreamDefinition stream : topologyDefinition.getStreams()) {
    Object boltObj = executionContext.getBolt(stream.getTo());
    BoltDeclarer declarer = declarers.get(stream.getTo());
    if (boltObj instanceof IRichBolt) {
      if (declarer == null) {
        declarer = builder.setBolt(stream.getTo(),
            (IRichBolt) boltObj,
            topologyDefinition.parallelismForBolt(stream.getTo()));
        declarers.put(stream.getTo(), declarer);
      }
    } else if (boltObj instanceof IBasicBolt) {
      if (declarer == null) {
        declarer = builder.setBolt(
            stream.getTo(),
            (IBasicBolt) boltObj,
            topologyDefinition.parallelismForBolt(stream.getTo()));
        declarers.put(stream.getTo(), declarer);
      }
    } else if (boltObj instanceof IWindowedBolt) {
      if (declarer == null) {
        declarer = builder.setBolt(
            stream.getTo(),
            (IWindowedBolt) boltObj,
            topologyDefinition.parallelismForBolt(stream.getTo()));
        declarers.put(stream.getTo(), declarer);
      }
    }  else {
      throw new IllegalArgumentException("Class does not appear to be a bolt: "
          + boltObj.getClass().getName());
    }

    GroupingDefinition grouping = stream.getGrouping();
    // if the streamId is defined, use it for the grouping,
    // otherwise assume default stream
    // Todo(joshfischer) Not sure if "default" is still valid
    String streamId = grouping.getStreamId() == null
        ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId();


    switch (grouping.getType()) {
      case SHUFFLE:
        declarer.shuffleGrouping(stream.getFrom(), streamId);
        break;
      case FIELDS:
        //TODO check for null grouping args
        List<String> groupingArgs = grouping.getArgs();
        if (groupingArgs == null) {
          throw new IllegalArgumentException("You must supply arguments for Fields grouping");
        }
        declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(groupingArgs));
        break;
      case ALL:
        declarer.allGrouping(stream.getFrom(), streamId);
        break;
      case GLOBAL:
        declarer.globalGrouping(stream.getFrom(), streamId);
        break;
      case NONE:
        declarer.noneGrouping(stream.getFrom(), streamId);
        break;
      case CUSTOM:
        declarer.customGrouping(stream.getFrom(), streamId,
            buildCustomStreamGrouping(stream.getGrouping().getCustomClass(),
                executionContext,
                objectBuilder));
        break;
      default:
        throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
    }
  }
  executionContext.setStreams(componentStreams);
}
 
开发者ID:twitter,项目名称:heron,代码行数:81,代码来源:StreamBuilder.java


示例13: main

import org.apache.storm.topology.BoltDeclarer; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();

  SpoutDeclarer spout =  builder.setSpout("word", new TestWordSpout(), 10);
  //set cpu requirement
  spout.setCPULoad(20);
  //set onheap and offheap memory requirement
  spout.setMemoryLoad(64, 16);

  BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
  //sets cpu requirement.  Not neccessary to set both CPU and memory.
  //For requirements not set, a default value will be used
  bolt1.setCPULoad(15);

  BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
  bolt2.setMemoryLoad(100);

  Config conf = new Config();
  conf.setDebug(true);

  /**
   * Use to limit the maximum amount of memory (in MB) allocated to one worker process.
   * Can be used to spread executors to to multiple workers
   */
  conf.setTopologyWorkerMaxHeapSize(1024.0);

  //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
  //Recommended range of 0-29 but no hard limit set.
  conf.setTopologyPriority(29);

  // Set strategy to schedule topology. If not specified, default to org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
  conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);

  if (args != null && args.length > 0) {
    conf.setNumWorkers(3);

    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  }
  else {

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", conf, builder.createTopology());
    Utils.sleep(10000);
    cluster.killTopology("test");
    cluster.shutdown();
  }
}
 
开发者ID:ziyunhx,项目名称:storm-net-adapter,代码行数:48,代码来源:ResourceAwareExampleTopology.java



注:本文中的org.apache.storm.topology.BoltDeclarer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java StaticMembershipInterceptor类代码示例发布时间:2022-05-22
下一篇:
Java S30PacketWindowItems类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap