• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java TopologyDetails类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中backtype.storm.scheduler.TopologyDetails的典型用法代码示例。如果您正苦于以下问题:Java TopologyDetails类的具体用法?Java TopologyDetails怎么用?Java TopologyDetails使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



TopologyDetails类属于backtype.storm.scheduler包,在下文中一共展示了TopologyDetails类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: assignSlots

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> slots) {
  if (slots.size() == 0) {
    LOG.debug("assignSlots: no slots passed in, nothing to do");
    return;
  }
  for (Map.Entry<String, Collection<WorkerSlot>> topologyToSlots : slots.entrySet()) {
    String topologyId = topologyToSlots.getKey();
    for (WorkerSlot slot : topologyToSlots.getValue()) {
      TopologyDetails details = topologies.getById(topologyId);
      LOG.debug("assignSlots: topologyId: {} worker being assigned to slot: {} with workerCpu: {} workerMem: {}",
                topologyId, slot, MesosCommon.topologyWorkerCpu(_conf, details), MesosCommon.topologyWorkerMem(_conf, details));
    }
  }
  synchronized (_offersLock) {
    computeLaunchList(topologies, slots);
  }
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:19,代码来源:MesosNimbus.java


示例2: getSpyCluster

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
private Cluster getSpyCluster(int numWorkers, int numExecutors) {
  Cluster spyCluster = getSpyCluster();

  List<MesosWorkerSlot> mesosWorkerSlots = this.generateMesosWorkerSlots(numWorkers);
  initializeMesosWorkerSlotMap(mesosWorkerSlots);

  Set<ExecutorDetails> executorsToAssign = this.generateExecutorDetailsSet(numExecutors);
  List<WorkerSlot> workerSlotList = getWorkerSlotFromMesosWorkerSlot(mesosWorkerSlots);
  topologyMap.put(sampleTopologyId, TestUtils.constructTopologyDetails(sampleTopologyId, numWorkers, 0.1, 100));
  this.topologies = new Topologies(topologyMap);

  doReturn(workerSlotList).when(spyCluster).getAvailableSlots();
  doReturn(executorsToAssign).when(spyCluster).getUnassignedExecutors(any(TopologyDetails.class));

  return spyCluster;
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:17,代码来源:DefaultSchedulerTest.java


示例3: testScheduleWithOneWorkerSlot

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Test
public void testScheduleWithOneWorkerSlot() {
  Cluster spyCluster = getSpyCluster();

  List<MesosWorkerSlot> mesosWorkerSlots = this.generateMesosWorkerSlots(1);
  initializeMesosWorkerSlotMap(mesosWorkerSlots);

  Set<ExecutorDetails> executorsToAssign = this.generateExecutorDetailsSet(4);
  List<WorkerSlot> workerSlotList = getWorkerSlotFromMesosWorkerSlot(mesosWorkerSlots);

  doReturn(workerSlotList).when(spyCluster).getAvailableSlots();
  doReturn(executorsToAssign).when(spyCluster).getUnassignedExecutors(any(TopologyDetails.class));

  defaultScheduler.schedule(topologies, spyCluster);

  Set<ExecutorDetails> assignedExecutors = spyCluster.getAssignmentById(sampleTopologyId).getExecutors();
  assertEquals(executorsToAssign, assignedExecutors);
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:19,代码来源:DefaultSchedulerTest.java


示例4: schedule

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void schedule(Topologies topologies, Cluster cluster) {
	
	logger.info("Online Scheduler");
	logger.info("+++++++++++++++++++++++++++");
	if (!topologies.getTopologies().isEmpty()) {
		int rescheduleTimeout = DEFAULT_RESCHEDULE_TIMEOUT;
		for (TopologyDetails topology : topologies.getTopologies()) {
			rescheduleTimeout = Integer.parseInt(topology.getConf().get(Utils.RESCHEDULE_TIMEOUT).toString());
		}
		long now = System.currentTimeMillis();
		long elapsedTime = (now - lastRescheduling) / 1000; // s
		if (lastRescheduling == 0 || elapsedTime >= rescheduleTimeout)
			doSchedule(topologies, cluster);
		else
			logger.info("It's not time to reschedule yet, " + elapsedTime + " seconds have passed, other " + (rescheduleTimeout - elapsedTime) + " seconds have to pass");
	}

	logger.info("---------------------------");
	
	logger.info("Calling EvenScheduler to schedule remaining executors...");
	new EvenScheduler().schedule(topologies, cluster);
	logger.info("Ok, EvenScheduler succesfully called");
	
	assignmentTracker.checkAssignment(topologies, cluster);
}
 
开发者ID:leonardoaniello,项目名称:storm-adaptive-schedulers,代码行数:27,代码来源:OnlineScheduler.java


示例5: addTopology

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void addTopology(TopologyDetails td) {
    String topId = td.getId();
    LOG.debug("Adding in Topology {}", topId);
    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
    Set<Node> assignedNodes = new HashSet<Node>();
    if (assignment != null) {
        for (WorkerSlot ws : assignment.getSlots()) {
            Node n = _nodeIdToNode.get(ws.getNodeId());
            assignedNodes.add(n);
        }
    }
    _usedNodes += assignedNodes.size();
    _topologyIdToNodes.put(topId, assignedNodes);
    _tds.put(topId, td);
    if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
        _isolated.add(topId);
    }
}
 
开发者ID:kkllwww007,项目名称:jstrom,代码行数:20,代码来源:IsolatedPool.java


示例6: getTopologyDetails

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
 * Get all running topology's details.
 *
 * @param conf storm's conf, used to connect nimbus
 * @return null if nimbus is not available, otherwise all TopologyDetails
 */
public static Topologies getTopologyDetails(Map<String, Object> conf) {
    NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf);
    try {
        Nimbus.Client nimbus = nimbusClient.getClient();
        Map<String, TopologyDetails> topologies = nimbus.getClusterInfo().get_topologies().stream()
                .map(topoSummary -> getTopologyDetails(nimbus, topoSummary))
                .filter(Objects::nonNull)
                .collect(Collectors.toMap(topoDetails -> topoDetails.getId(), topoDetails -> topoDetails));
        return new Topologies(topologies);
    } catch (TException e) {
    } finally {
        nimbusClient.close();
    }
    return null;
}
 
开发者ID:ADSC-Resa,项目名称:resa,代码行数:22,代码来源:TopologyHelper.java


示例7: componentToTasks

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
 * Get component tasks from TopologyDetails object
 *
 * @param topoDetails
 * @return
 */
private static Map<String, List<Integer>> componentToTasks(TopologyDetails topoDetails,
                                                           Predicate<Map.Entry<ExecutorDetails, String>> p) {
    Stream<Map.Entry<ExecutorDetails, String>> stream = topoDetails.getExecutorToComponent().entrySet().stream();
    if (p != null) {
        stream = stream.filter(p);
    }
    return stream.collect(Collectors.groupingBy(Map.Entry::getValue,
                    Collectors.mapping(e -> getTaskIds(e.getKey()),
                            Collector.of(ArrayList::new, (all, l) -> all.addAll(l),
                                    (all, l) -> {
                                        all.addAll(l);
                                        return all;
                                    }
                            )
                    )
            )
    );
}
 
开发者ID:ADSC-Resa,项目名称:resa,代码行数:25,代码来源:TopologyHelper.java


示例8: addTopology

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void addTopology(TopologyDetails td) {
    String topId = td.getId();
    LOG.debug("Adding in topology {}", topId);
    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
    Set<Node> assignedNodes = new HashSet<>();
    if (assignment != null) {
        for (WorkerSlot ws : assignment.getSlots()) {
            Node n = _nodeIdToNode.get(ws.getNodeId());
            assignedNodes.add(n);
        }
    }
    _usedNodes += assignedNodes.size();
    _topologyIdToNodes.put(topId, assignedNodes);
    _tds.put(topId, td);
    if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
        _isolated.add(topId);
    }
}
 
开发者ID:alibaba,项目名称:jstorm,代码行数:20,代码来源:IsolatedPool.java


示例9: schedule

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
 * Schedule function looks in the "mesosWorkerSlotMap" to determine which topology owns the particular
 * WorkerSlot and assigns the executors accordingly.
 */
@Override
public void schedule(Topologies topologies, Cluster cluster) {
  List<WorkerSlot> workerSlots = cluster.getAvailableSlots();
  Map<String, List<MesosWorkerSlot>> perTopologySlotList = getMesosWorkerSlotPerTopology(workerSlots);

  // So far we know how many MesosSlots each of the topologies have got. Lets assign executors for each of them
  for (String topologyId : perTopologySlotList.keySet()) {
    TopologyDetails topologyDetails = topologies.getById(topologyId);
    List<MesosWorkerSlot> mesosWorkerSlots = perTopologySlotList.get(topologyId);

    int countSlotsRequested = topologyDetails.getNumWorkers();
    int countSlotsAssigned = cluster.getAssignedNumWorkers(topologyDetails);

    if (mesosWorkerSlots.size() == 0) {
      log.warn("No slots found for topology {} while scheduling", topologyId);
      continue;
    }

    int countSlotsAvailable = Math.min(mesosWorkerSlots.size(), (countSlotsRequested - countSlotsAssigned));

    List<List<ExecutorDetails>> executorsPerWorkerList = executorsPerWorkerList(cluster, topologyDetails, countSlotsAvailable);

    for (int i = 0; i < countSlotsAvailable; i++) {
      cluster.assign(mesosWorkerSlots.remove(0), topologyId, executorsPerWorkerList.remove(0));
    }
  }
  mesosWorkerSlotMap.clear();
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:33,代码来源:DefaultScheduler.java


示例10: isFit

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
public static boolean isFit(Map mesosStormConf, OfferResources offerResources, TopologyDetails topologyDetails, boolean supervisorExists) {
  double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
  double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);

  requestedWorkerCpu += supervisorExists ? 0 : MesosCommon.executorCpu(mesosStormConf);
  requestedWorkerMem += supervisorExists ? 0 : MesosCommon.executorMem(mesosStormConf);

  if (requestedWorkerCpu <= offerResources.getCpu() && requestedWorkerMem <= offerResources.getMem()) {
    return true;
  }
  return false;
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:13,代码来源:SchedulerUtils.java


示例11: createWorkerSlotFromOfferResources

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
public static MesosWorkerSlot createWorkerSlotFromOfferResources(Map mesosStormConf, OfferResources offerResources,
                                                          TopologyDetails topologyDetails, boolean supervisorExists) {
  double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
  double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);

  requestedWorkerCpu += supervisorExists ? 0 : MesosCommon.executorCpu(mesosStormConf);
  requestedWorkerMem += supervisorExists ? 0 : MesosCommon.executorMem(mesosStormConf);

  if (requestedWorkerCpu > offerResources.getCpu()) {
    log.warn("Refusing to create worker slot. requestedWorkerCpu: {} but OfferedCpu: {} at node: {}",
             requestedWorkerCpu, offerResources.getCpu(), offerResources.getHostName());
    return null;
  }

  if (requestedWorkerMem > offerResources.getMem()) {
    log.warn("Refusing to create worker slot. requestedWorkerMem: {} but OfferedMem: {} at node: {}",
             requestedWorkerMem, offerResources.getMem(), offerResources.getHostName());
    return null;
  }

  long port = offerResources.getPort();

  if (port == -1) {
    log.warn("Refusing to create worker slot. There are no ports available with offer {}", offerResources.toString());
    return null;
  }

  offerResources.decCpu(requestedWorkerCpu);
  offerResources.decMem(requestedWorkerMem);

  return new MesosWorkerSlot(offerResources.getHostName(), port, topologyDetails.getId());
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:33,代码来源:SchedulerUtils.java


示例12: constructTopologyDetails

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
private TopologyDetails constructTopologyDetails(String topologyName, int numWorkers) {
  Map<String, TopologyDetails> topologyConf1 = new HashMap<>();

  StormTopology stormTopology = new StormTopology();
  TopologyDetails topologyDetails= new TopologyDetails(topologyName, topologyConf1, stormTopology, numWorkers);

  return topologyDetails;
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:9,代码来源:DefaultSchedulerTest.java


示例13: constructTopologyDetails

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
public static TopologyDetails constructTopologyDetails(String topologyName, int numWorkers, double numCpus, double memSize) {
  Map<String, TopologyDetails> topologyConf = new HashMap<>();

  StormTopology stormTopology = new StormTopology();
  TopologyDetails topologyDetails= new TopologyDetails(topologyName, topologyConf, stormTopology, numWorkers);
  topologyDetails.getConf().put(MesosCommon.WORKER_CPU_CONF, Double.valueOf(numCpus));
  topologyDetails.getConf().put(MesosCommon.WORKER_MEM_CONF, Double.valueOf(memSize));

  return topologyDetails;
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:11,代码来源:TestUtils.java


示例14: addTopology

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void addTopology(TopologyDetails td) {
    String topId = td.getId();
    LOG.debug("Adding in Topology {}", topId);
    _tds.put(topId, td);
    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
    if (assignment != null) {
        for (WorkerSlot ws : assignment.getSlots()) {
            Node n = _nodeIdToNode.get(ws.getNodeId());
            _nodes.add(n);
        }
    }
}
 
开发者ID:kkllwww007,项目名称:jstrom,代码行数:14,代码来源:DefaultPool.java


示例15: canAdd

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public boolean canAdd(TopologyDetails td) {
    // Only add topologies that are not sharing nodes with other topologies
    String topId = td.getId();
    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
    if (assignment != null) {
        for (WorkerSlot ws : assignment.getSlots()) {
            Node n = _nodeIdToNode.get(ws.getNodeId());
            if (n.getRunningTopologies().size() > 1) {
                return false;
            }
        }
    }
    return true;
}
 
开发者ID:kkllwww007,项目名称:jstrom,代码行数:16,代码来源:IsolatedPool.java


示例16: getNodesForNotIsolatedTop

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
 * Get the nodes needed to schedule a non-isolated topology.
 * 
 * @param td the topology to be scheduled
 * @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed.
 * @param lesserPools node pools we can steal nodes from
 * @return the number of additional slots that should be used for scheduling.
 */
private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools) {
    String topId = td.getId();
    LOG.debug("Topology {} is not isolated", topId);
    int totalTasks = td.getExecutors().size();
    int origRequest = td.getNumWorkers();
    int slotsRequested = Math.min(totalTasks, origRequest);
    int slotsUsed = Node.countSlotsUsed(topId, allNodes);
    int slotsFree = Node.countFreeSlotsAlive(allNodes);
    // Check to see if we have enough slots before trying to get them
    int slotsAvailable = 0;
    if (slotsRequested > slotsFree) {
        slotsAvailable = NodePool.slotsAvailable(lesserPools);
    }
    int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
    LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", new Object[] { slotsRequested, slotsUsed, slotsFree, slotsAvailable,
            slotsToUse });
    if (slotsToUse <= 0) {
        _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
        return 0;
    }
    int slotsNeeded = slotsToUse - slotsFree;
    int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
    LOG.debug("Nodes... new {} used {} max {}", new Object[] { numNewNodes, _usedNodes, _maxNodes });
    if ((numNewNodes + _usedNodes) > _maxNodes) {
        _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + (numNewNodes - (_maxNodes - _usedNodes))
                + " more nodes needed to run topology.");
        return 0;
    }

    Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools);
    _usedNodes += found.size();
    allNodes.addAll(found);
    return slotsToUse;
}
 
开发者ID:kkllwww007,项目名称:jstrom,代码行数:43,代码来源:IsolatedPool.java


示例17: synTopologies

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
 * Synchronize the watching topologies with alive topologies in the cluster. Dead topology will
 * be removed from the watching list.
 *
 * @param topologies alive topologies
 */
public void synTopologies(Topologies topologies) {
    Set<String> aliveTopoIds = topologies.getTopologies().stream().map(TopologyDetails::getId)
            .collect(Collectors.toSet());
    // remove topologies that dead
    watchingTopologies.keySet().retainAll(aliveTopoIds);
    topologies.getTopologies().stream().forEach(this::addOrUpdateTopology);
}
 
开发者ID:ADSC-Resa,项目名称:resa,代码行数:14,代码来源:TopologyListener.java


示例18: addOrUpdateTopology

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
 * Add a topology to the watching list or update corresponding TopologyDetails object
 * if this topology is under watching.
 *
 * @param topoDetails
 */
public void addOrUpdateTopology(TopologyDetails topoDetails) {
    String topoId = topoDetails.getId();
    // For a new joined topology, set a new watcher on it and add it to watching list
    // For a watching topology, update its running detail
    watchingTopologies.compute(topoId, (topologyId, context) ->
            (context == null ? watchTopology(topoDetails) : context.setTopologyDetails(topoDetails)));
}
 
开发者ID:ADSC-Resa,项目名称:resa,代码行数:14,代码来源:TopologyListener.java


示例19: watchTopology

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
private AllocationContext watchTopology(TopologyDetails topoDetails) {
    // get current assignment and set a watcher on the zk node
    Map<String, Integer> compExecutors = getCompExecutorsAndWatch(topoDetails.getId());
    if (compExecutors == null) {
        return null;
    }
    LOG.info("Begin to watching topology " + topoDetails.getId());
    return new AllocationContext(topoDetails, compExecutors);
}
 
开发者ID:ADSC-Resa,项目名称:resa,代码行数:10,代码来源:TopologyListener.java


示例20: topologyCpu

import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
public static double topologyCpu(Map conf, TopologyDetails info) {
    conf = getFullTopologyConfig(conf, info);
    Object cpuObj = conf.get(CPU_CONF);
    if(cpuObj!=null && !(cpuObj instanceof Number)) {
        LOG.warn("Topology has invalid mesos cpu configuration: " + cpuObj + " for topology " + info.getId());
        cpuObj = null;
    }
    if(cpuObj==null) return DEFAULT_CPU;
    else return ((Number)cpuObj).doubleValue();
}
 
开发者ID:deric,项目名称:storm-mesos,代码行数:11,代码来源:MesosCommon.java



注:本文中的backtype.storm.scheduler.TopologyDetails类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Layer类代码示例发布时间:2022-05-22
下一篇:
Java TLSProtocolConfigurer类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap