本文整理汇总了Java中backtype.storm.scheduler.TopologyDetails类的典型用法代码示例。如果您正苦于以下问题:Java TopologyDetails类的具体用法?Java TopologyDetails怎么用?Java TopologyDetails使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TopologyDetails类属于backtype.storm.scheduler包,在下文中一共展示了TopologyDetails类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: assignSlots
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> slots) {
if (slots.size() == 0) {
LOG.debug("assignSlots: no slots passed in, nothing to do");
return;
}
for (Map.Entry<String, Collection<WorkerSlot>> topologyToSlots : slots.entrySet()) {
String topologyId = topologyToSlots.getKey();
for (WorkerSlot slot : topologyToSlots.getValue()) {
TopologyDetails details = topologies.getById(topologyId);
LOG.debug("assignSlots: topologyId: {} worker being assigned to slot: {} with workerCpu: {} workerMem: {}",
topologyId, slot, MesosCommon.topologyWorkerCpu(_conf, details), MesosCommon.topologyWorkerMem(_conf, details));
}
}
synchronized (_offersLock) {
computeLaunchList(topologies, slots);
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:19,代码来源:MesosNimbus.java
示例2: getSpyCluster
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
private Cluster getSpyCluster(int numWorkers, int numExecutors) {
Cluster spyCluster = getSpyCluster();
List<MesosWorkerSlot> mesosWorkerSlots = this.generateMesosWorkerSlots(numWorkers);
initializeMesosWorkerSlotMap(mesosWorkerSlots);
Set<ExecutorDetails> executorsToAssign = this.generateExecutorDetailsSet(numExecutors);
List<WorkerSlot> workerSlotList = getWorkerSlotFromMesosWorkerSlot(mesosWorkerSlots);
topologyMap.put(sampleTopologyId, TestUtils.constructTopologyDetails(sampleTopologyId, numWorkers, 0.1, 100));
this.topologies = new Topologies(topologyMap);
doReturn(workerSlotList).when(spyCluster).getAvailableSlots();
doReturn(executorsToAssign).when(spyCluster).getUnassignedExecutors(any(TopologyDetails.class));
return spyCluster;
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:17,代码来源:DefaultSchedulerTest.java
示例3: testScheduleWithOneWorkerSlot
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Test
public void testScheduleWithOneWorkerSlot() {
Cluster spyCluster = getSpyCluster();
List<MesosWorkerSlot> mesosWorkerSlots = this.generateMesosWorkerSlots(1);
initializeMesosWorkerSlotMap(mesosWorkerSlots);
Set<ExecutorDetails> executorsToAssign = this.generateExecutorDetailsSet(4);
List<WorkerSlot> workerSlotList = getWorkerSlotFromMesosWorkerSlot(mesosWorkerSlots);
doReturn(workerSlotList).when(spyCluster).getAvailableSlots();
doReturn(executorsToAssign).when(spyCluster).getUnassignedExecutors(any(TopologyDetails.class));
defaultScheduler.schedule(topologies, spyCluster);
Set<ExecutorDetails> assignedExecutors = spyCluster.getAssignmentById(sampleTopologyId).getExecutors();
assertEquals(executorsToAssign, assignedExecutors);
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:19,代码来源:DefaultSchedulerTest.java
示例4: schedule
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void schedule(Topologies topologies, Cluster cluster) {
logger.info("Online Scheduler");
logger.info("+++++++++++++++++++++++++++");
if (!topologies.getTopologies().isEmpty()) {
int rescheduleTimeout = DEFAULT_RESCHEDULE_TIMEOUT;
for (TopologyDetails topology : topologies.getTopologies()) {
rescheduleTimeout = Integer.parseInt(topology.getConf().get(Utils.RESCHEDULE_TIMEOUT).toString());
}
long now = System.currentTimeMillis();
long elapsedTime = (now - lastRescheduling) / 1000; // s
if (lastRescheduling == 0 || elapsedTime >= rescheduleTimeout)
doSchedule(topologies, cluster);
else
logger.info("It's not time to reschedule yet, " + elapsedTime + " seconds have passed, other " + (rescheduleTimeout - elapsedTime) + " seconds have to pass");
}
logger.info("---------------------------");
logger.info("Calling EvenScheduler to schedule remaining executors...");
new EvenScheduler().schedule(topologies, cluster);
logger.info("Ok, EvenScheduler succesfully called");
assignmentTracker.checkAssignment(topologies, cluster);
}
开发者ID:leonardoaniello,项目名称:storm-adaptive-schedulers,代码行数:27,代码来源:OnlineScheduler.java
示例5: addTopology
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void addTopology(TopologyDetails td) {
String topId = td.getId();
LOG.debug("Adding in Topology {}", topId);
SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
Set<Node> assignedNodes = new HashSet<Node>();
if (assignment != null) {
for (WorkerSlot ws : assignment.getSlots()) {
Node n = _nodeIdToNode.get(ws.getNodeId());
assignedNodes.add(n);
}
}
_usedNodes += assignedNodes.size();
_topologyIdToNodes.put(topId, assignedNodes);
_tds.put(topId, td);
if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
_isolated.add(topId);
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:20,代码来源:IsolatedPool.java
示例6: getTopologyDetails
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
* Get all running topology's details.
*
* @param conf storm's conf, used to connect nimbus
* @return null if nimbus is not available, otherwise all TopologyDetails
*/
public static Topologies getTopologyDetails(Map<String, Object> conf) {
NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf);
try {
Nimbus.Client nimbus = nimbusClient.getClient();
Map<String, TopologyDetails> topologies = nimbus.getClusterInfo().get_topologies().stream()
.map(topoSummary -> getTopologyDetails(nimbus, topoSummary))
.filter(Objects::nonNull)
.collect(Collectors.toMap(topoDetails -> topoDetails.getId(), topoDetails -> topoDetails));
return new Topologies(topologies);
} catch (TException e) {
} finally {
nimbusClient.close();
}
return null;
}
开发者ID:ADSC-Resa,项目名称:resa,代码行数:22,代码来源:TopologyHelper.java
示例7: componentToTasks
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
* Get component tasks from TopologyDetails object
*
* @param topoDetails
* @return
*/
private static Map<String, List<Integer>> componentToTasks(TopologyDetails topoDetails,
Predicate<Map.Entry<ExecutorDetails, String>> p) {
Stream<Map.Entry<ExecutorDetails, String>> stream = topoDetails.getExecutorToComponent().entrySet().stream();
if (p != null) {
stream = stream.filter(p);
}
return stream.collect(Collectors.groupingBy(Map.Entry::getValue,
Collectors.mapping(e -> getTaskIds(e.getKey()),
Collector.of(ArrayList::new, (all, l) -> all.addAll(l),
(all, l) -> {
all.addAll(l);
return all;
}
)
)
)
);
}
开发者ID:ADSC-Resa,项目名称:resa,代码行数:25,代码来源:TopologyHelper.java
示例8: addTopology
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void addTopology(TopologyDetails td) {
String topId = td.getId();
LOG.debug("Adding in topology {}", topId);
SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
Set<Node> assignedNodes = new HashSet<>();
if (assignment != null) {
for (WorkerSlot ws : assignment.getSlots()) {
Node n = _nodeIdToNode.get(ws.getNodeId());
assignedNodes.add(n);
}
}
_usedNodes += assignedNodes.size();
_topologyIdToNodes.put(topId, assignedNodes);
_tds.put(topId, td);
if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
_isolated.add(topId);
}
}
开发者ID:alibaba,项目名称:jstorm,代码行数:20,代码来源:IsolatedPool.java
示例9: schedule
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
* Schedule function looks in the "mesosWorkerSlotMap" to determine which topology owns the particular
* WorkerSlot and assigns the executors accordingly.
*/
@Override
public void schedule(Topologies topologies, Cluster cluster) {
List<WorkerSlot> workerSlots = cluster.getAvailableSlots();
Map<String, List<MesosWorkerSlot>> perTopologySlotList = getMesosWorkerSlotPerTopology(workerSlots);
// So far we know how many MesosSlots each of the topologies have got. Lets assign executors for each of them
for (String topologyId : perTopologySlotList.keySet()) {
TopologyDetails topologyDetails = topologies.getById(topologyId);
List<MesosWorkerSlot> mesosWorkerSlots = perTopologySlotList.get(topologyId);
int countSlotsRequested = topologyDetails.getNumWorkers();
int countSlotsAssigned = cluster.getAssignedNumWorkers(topologyDetails);
if (mesosWorkerSlots.size() == 0) {
log.warn("No slots found for topology {} while scheduling", topologyId);
continue;
}
int countSlotsAvailable = Math.min(mesosWorkerSlots.size(), (countSlotsRequested - countSlotsAssigned));
List<List<ExecutorDetails>> executorsPerWorkerList = executorsPerWorkerList(cluster, topologyDetails, countSlotsAvailable);
for (int i = 0; i < countSlotsAvailable; i++) {
cluster.assign(mesosWorkerSlots.remove(0), topologyId, executorsPerWorkerList.remove(0));
}
}
mesosWorkerSlotMap.clear();
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:33,代码来源:DefaultScheduler.java
示例10: isFit
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
public static boolean isFit(Map mesosStormConf, OfferResources offerResources, TopologyDetails topologyDetails, boolean supervisorExists) {
double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);
requestedWorkerCpu += supervisorExists ? 0 : MesosCommon.executorCpu(mesosStormConf);
requestedWorkerMem += supervisorExists ? 0 : MesosCommon.executorMem(mesosStormConf);
if (requestedWorkerCpu <= offerResources.getCpu() && requestedWorkerMem <= offerResources.getMem()) {
return true;
}
return false;
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:13,代码来源:SchedulerUtils.java
示例11: createWorkerSlotFromOfferResources
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
public static MesosWorkerSlot createWorkerSlotFromOfferResources(Map mesosStormConf, OfferResources offerResources,
TopologyDetails topologyDetails, boolean supervisorExists) {
double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);
requestedWorkerCpu += supervisorExists ? 0 : MesosCommon.executorCpu(mesosStormConf);
requestedWorkerMem += supervisorExists ? 0 : MesosCommon.executorMem(mesosStormConf);
if (requestedWorkerCpu > offerResources.getCpu()) {
log.warn("Refusing to create worker slot. requestedWorkerCpu: {} but OfferedCpu: {} at node: {}",
requestedWorkerCpu, offerResources.getCpu(), offerResources.getHostName());
return null;
}
if (requestedWorkerMem > offerResources.getMem()) {
log.warn("Refusing to create worker slot. requestedWorkerMem: {} but OfferedMem: {} at node: {}",
requestedWorkerMem, offerResources.getMem(), offerResources.getHostName());
return null;
}
long port = offerResources.getPort();
if (port == -1) {
log.warn("Refusing to create worker slot. There are no ports available with offer {}", offerResources.toString());
return null;
}
offerResources.decCpu(requestedWorkerCpu);
offerResources.decMem(requestedWorkerMem);
return new MesosWorkerSlot(offerResources.getHostName(), port, topologyDetails.getId());
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:33,代码来源:SchedulerUtils.java
示例12: constructTopologyDetails
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
private TopologyDetails constructTopologyDetails(String topologyName, int numWorkers) {
Map<String, TopologyDetails> topologyConf1 = new HashMap<>();
StormTopology stormTopology = new StormTopology();
TopologyDetails topologyDetails= new TopologyDetails(topologyName, topologyConf1, stormTopology, numWorkers);
return topologyDetails;
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:9,代码来源:DefaultSchedulerTest.java
示例13: constructTopologyDetails
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
public static TopologyDetails constructTopologyDetails(String topologyName, int numWorkers, double numCpus, double memSize) {
Map<String, TopologyDetails> topologyConf = new HashMap<>();
StormTopology stormTopology = new StormTopology();
TopologyDetails topologyDetails= new TopologyDetails(topologyName, topologyConf, stormTopology, numWorkers);
topologyDetails.getConf().put(MesosCommon.WORKER_CPU_CONF, Double.valueOf(numCpus));
topologyDetails.getConf().put(MesosCommon.WORKER_MEM_CONF, Double.valueOf(memSize));
return topologyDetails;
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:11,代码来源:TestUtils.java
示例14: addTopology
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public void addTopology(TopologyDetails td) {
String topId = td.getId();
LOG.debug("Adding in Topology {}", topId);
_tds.put(topId, td);
SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
if (assignment != null) {
for (WorkerSlot ws : assignment.getSlots()) {
Node n = _nodeIdToNode.get(ws.getNodeId());
_nodes.add(n);
}
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:14,代码来源:DefaultPool.java
示例15: canAdd
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
@Override
public boolean canAdd(TopologyDetails td) {
// Only add topologies that are not sharing nodes with other topologies
String topId = td.getId();
SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
if (assignment != null) {
for (WorkerSlot ws : assignment.getSlots()) {
Node n = _nodeIdToNode.get(ws.getNodeId());
if (n.getRunningTopologies().size() > 1) {
return false;
}
}
}
return true;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:16,代码来源:IsolatedPool.java
示例16: getNodesForNotIsolatedTop
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
* Get the nodes needed to schedule a non-isolated topology.
*
* @param td the topology to be scheduled
* @param allNodes the nodes already scheduled for this topology. This will be updated to include new nodes if needed.
* @param lesserPools node pools we can steal nodes from
* @return the number of additional slots that should be used for scheduling.
*/
private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes, NodePool[] lesserPools) {
String topId = td.getId();
LOG.debug("Topology {} is not isolated", topId);
int totalTasks = td.getExecutors().size();
int origRequest = td.getNumWorkers();
int slotsRequested = Math.min(totalTasks, origRequest);
int slotsUsed = Node.countSlotsUsed(topId, allNodes);
int slotsFree = Node.countFreeSlotsAlive(allNodes);
// Check to see if we have enough slots before trying to get them
int slotsAvailable = 0;
if (slotsRequested > slotsFree) {
slotsAvailable = NodePool.slotsAvailable(lesserPools);
}
int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", new Object[] { slotsRequested, slotsUsed, slotsFree, slotsAvailable,
slotsToUse });
if (slotsToUse <= 0) {
_cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
return 0;
}
int slotsNeeded = slotsToUse - slotsFree;
int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
LOG.debug("Nodes... new {} used {} max {}", new Object[] { numNewNodes, _usedNodes, _maxNodes });
if ((numNewNodes + _usedNodes) > _maxNodes) {
_cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this user would be exceeded. " + (numNewNodes - (_maxNodes - _usedNodes))
+ " more nodes needed to run topology.");
return 0;
}
Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools);
_usedNodes += found.size();
allNodes.addAll(found);
return slotsToUse;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:43,代码来源:IsolatedPool.java
示例17: synTopologies
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
* Synchronize the watching topologies with alive topologies in the cluster. Dead topology will
* be removed from the watching list.
*
* @param topologies alive topologies
*/
public void synTopologies(Topologies topologies) {
Set<String> aliveTopoIds = topologies.getTopologies().stream().map(TopologyDetails::getId)
.collect(Collectors.toSet());
// remove topologies that dead
watchingTopologies.keySet().retainAll(aliveTopoIds);
topologies.getTopologies().stream().forEach(this::addOrUpdateTopology);
}
开发者ID:ADSC-Resa,项目名称:resa,代码行数:14,代码来源:TopologyListener.java
示例18: addOrUpdateTopology
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
/**
* Add a topology to the watching list or update corresponding TopologyDetails object
* if this topology is under watching.
*
* @param topoDetails
*/
public void addOrUpdateTopology(TopologyDetails topoDetails) {
String topoId = topoDetails.getId();
// For a new joined topology, set a new watcher on it and add it to watching list
// For a watching topology, update its running detail
watchingTopologies.compute(topoId, (topologyId, context) ->
(context == null ? watchTopology(topoDetails) : context.setTopologyDetails(topoDetails)));
}
开发者ID:ADSC-Resa,项目名称:resa,代码行数:14,代码来源:TopologyListener.java
示例19: watchTopology
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
private AllocationContext watchTopology(TopologyDetails topoDetails) {
// get current assignment and set a watcher on the zk node
Map<String, Integer> compExecutors = getCompExecutorsAndWatch(topoDetails.getId());
if (compExecutors == null) {
return null;
}
LOG.info("Begin to watching topology " + topoDetails.getId());
return new AllocationContext(topoDetails, compExecutors);
}
开发者ID:ADSC-Resa,项目名称:resa,代码行数:10,代码来源:TopologyListener.java
示例20: topologyCpu
import backtype.storm.scheduler.TopologyDetails; //导入依赖的package包/类
public static double topologyCpu(Map conf, TopologyDetails info) {
conf = getFullTopologyConfig(conf, info);
Object cpuObj = conf.get(CPU_CONF);
if(cpuObj!=null && !(cpuObj instanceof Number)) {
LOG.warn("Topology has invalid mesos cpu configuration: " + cpuObj + " for topology " + info.getId());
cpuObj = null;
}
if(cpuObj==null) return DEFAULT_CPU;
else return ((Number)cpuObj).doubleValue();
}
开发者ID:deric,项目名称:storm-mesos,代码行数:11,代码来源:MesosCommon.java
注:本文中的backtype.storm.scheduler.TopologyDetails类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论