本文整理汇总了Java中backtype.storm.scheduler.Topologies类的典型用法代码示例。如果您正苦于以下问题:Java Topologies类的具体用法?Java Topologies怎么用?Java Topologies使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Topologies类属于backtype.storm.scheduler包,在下文中一共展示了Topologies类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: assignSlots
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> slots) {
if (slots.size() == 0) {
LOG.debug("assignSlots: no slots passed in, nothing to do");
return;
}
for (Map.Entry<String, Collection<WorkerSlot>> topologyToSlots : slots.entrySet()) {
String topologyId = topologyToSlots.getKey();
for (WorkerSlot slot : topologyToSlots.getValue()) {
TopologyDetails details = topologies.getById(topologyId);
LOG.debug("assignSlots: topologyId: {} worker being assigned to slot: {} with workerCpu: {} workerMem: {}",
topologyId, slot, MesosCommon.topologyWorkerCpu(_conf, details), MesosCommon.topologyWorkerMem(_conf, details));
}
}
synchronized (_offersLock) {
computeLaunchList(topologies, slots);
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:19,代码来源:MesosNimbus.java
示例2: getSpyCluster
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
private Cluster getSpyCluster(int numWorkers, int numExecutors) {
Cluster spyCluster = getSpyCluster();
List<MesosWorkerSlot> mesosWorkerSlots = this.generateMesosWorkerSlots(numWorkers);
initializeMesosWorkerSlotMap(mesosWorkerSlots);
Set<ExecutorDetails> executorsToAssign = this.generateExecutorDetailsSet(numExecutors);
List<WorkerSlot> workerSlotList = getWorkerSlotFromMesosWorkerSlot(mesosWorkerSlots);
topologyMap.put(sampleTopologyId, TestUtils.constructTopologyDetails(sampleTopologyId, numWorkers, 0.1, 100));
this.topologies = new Topologies(topologyMap);
doReturn(workerSlotList).when(spyCluster).getAvailableSlots();
doReturn(executorsToAssign).when(spyCluster).getUnassignedExecutors(any(TopologyDetails.class));
return spyCluster;
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:17,代码来源:DefaultSchedulerTest.java
示例3: initialize
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Before
public void initialize() {
defaultScheduler = new DefaultScheduler();
Map<String, Object> mesosStormConf = new HashMap<>();
defaultScheduler.prepare(mesosStormConf);
rotatingMap = new RotatingMap<>(2);
topologiesMissingAssignments = new HashSet<>();
topologiesMissingAssignments.add("test-topology1-65-1442255385");
topologiesMissingAssignments.add("test-topology1-65-1442255385");
existingSupervisors = new ArrayList<>();
existingSupervisors.add(new SupervisorDetails(MesosCommon.supervisorId(sampleHost, "test-topology1-65-1442255385"), sampleHost, null, null));
existingSupervisors.add(new SupervisorDetails(MesosCommon.supervisorId(sampleHost, "test-topology10-65-1442255385"), sampleHost, null, null));
topologyMap = new HashMap<>();
topologyMap.put(sampleTopologyId, TestUtils.constructTopologyDetails(sampleTopologyId, 1, 0.1, 100));
topologies = new Topologies(topologyMap);
mesosWorkerSlotMap = new HashMap<>();
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:23,代码来源:DefaultSchedulerTest.java
示例4: schedule
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public void schedule(Topologies topologies, Cluster cluster) {
logger.info("Online Scheduler");
logger.info("+++++++++++++++++++++++++++");
if (!topologies.getTopologies().isEmpty()) {
int rescheduleTimeout = DEFAULT_RESCHEDULE_TIMEOUT;
for (TopologyDetails topology : topologies.getTopologies()) {
rescheduleTimeout = Integer.parseInt(topology.getConf().get(Utils.RESCHEDULE_TIMEOUT).toString());
}
long now = System.currentTimeMillis();
long elapsedTime = (now - lastRescheduling) / 1000; // s
if (lastRescheduling == 0 || elapsedTime >= rescheduleTimeout)
doSchedule(topologies, cluster);
else
logger.info("It's not time to reschedule yet, " + elapsedTime + " seconds have passed, other " + (rescheduleTimeout - elapsedTime) + " seconds have to pass");
}
logger.info("---------------------------");
logger.info("Calling EvenScheduler to schedule remaining executors...");
new EvenScheduler().schedule(topologies, cluster);
logger.info("Ok, EvenScheduler succesfully called");
assignmentTracker.checkAssignment(topologies, cluster);
}
开发者ID:leonardoaniello,项目名称:storm-adaptive-schedulers,代码行数:27,代码来源:OnlineScheduler.java
示例5: testSynTopologies
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
public void testSynTopologies() throws Exception {
Topologies topologies = TopologyHelper.getTopologyDetails(conf);
System.out.println("total topology count: " + topologies.getTopologies().size());
topologyListener.synTopologies(topologies);
Utils.sleep(10000);
String topoId = TopologyHelper.getTopologyId("wc", conf);
Map<String, Integer> newAssignment = Collections.singletonMap("split", 5);
byte[] data = Utils.serialize(new HashMap<>(newAssignment));
if (zk.checkExists().forPath("/resa/" + topoId) == null) {
zk.create().creatingParentsIfNeeded().forPath("/resa/" + topoId, data);
System.out.println("Set assignment");
}
Utils.sleep(1000);
topologyListener.synTopologies(topologies);
System.out.println("Reset assignment");
zk.setData().forPath("/resa/" + topoId, data);
Utils.sleep(31000);
TopologyHelper.killTopology("wc", 0, conf);
Utils.sleep(10000);
topologies = TopologyHelper.getTopologyDetails(conf);
System.out.println("total topology count: " + topologies.getTopologies().size());
topologyListener.synTopologies(topologies);
}
开发者ID:ADSC-Resa,项目名称:resa,代码行数:26,代码来源:TopologyListenerTest.java
示例6: getTopologyDetails
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
/**
* Get all running topology's details.
*
* @param conf storm's conf, used to connect nimbus
* @return null if nimbus is not available, otherwise all TopologyDetails
*/
public static Topologies getTopologyDetails(Map<String, Object> conf) {
NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf);
try {
Nimbus.Client nimbus = nimbusClient.getClient();
Map<String, TopologyDetails> topologies = nimbus.getClusterInfo().get_topologies().stream()
.map(topoSummary -> getTopologyDetails(nimbus, topoSummary))
.filter(Objects::nonNull)
.collect(Collectors.toMap(topoDetails -> topoDetails.getId(), topoDetails -> topoDetails));
return new Topologies(topologies);
} catch (TException e) {
} finally {
nimbusClient.close();
}
return null;
}
开发者ID:ADSC-Resa,项目名称:resa,代码行数:22,代码来源:TopologyHelper.java
示例7: schedule
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
/**
* Schedule function looks in the "mesosWorkerSlotMap" to determine which topology owns the particular
* WorkerSlot and assigns the executors accordingly.
*/
@Override
public void schedule(Topologies topologies, Cluster cluster) {
List<WorkerSlot> workerSlots = cluster.getAvailableSlots();
Map<String, List<MesosWorkerSlot>> perTopologySlotList = getMesosWorkerSlotPerTopology(workerSlots);
// So far we know how many MesosSlots each of the topologies have got. Lets assign executors for each of them
for (String topologyId : perTopologySlotList.keySet()) {
TopologyDetails topologyDetails = topologies.getById(topologyId);
List<MesosWorkerSlot> mesosWorkerSlots = perTopologySlotList.get(topologyId);
int countSlotsRequested = topologyDetails.getNumWorkers();
int countSlotsAssigned = cluster.getAssignedNumWorkers(topologyDetails);
if (mesosWorkerSlots.size() == 0) {
log.warn("No slots found for topology {} while scheduling", topologyId);
continue;
}
int countSlotsAvailable = Math.min(mesosWorkerSlots.size(), (countSlotsRequested - countSlotsAssigned));
List<List<ExecutorDetails>> executorsPerWorkerList = executorsPerWorkerList(cluster, topologyDetails, countSlotsAvailable);
for (int i = 0; i < countSlotsAvailable; i++) {
cluster.assign(mesosWorkerSlots.remove(0), topologyId, executorsPerWorkerList.remove(0));
}
}
mesosWorkerSlotMap.clear();
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:33,代码来源:DefaultScheduler.java
示例8: allSlotsAvailableForScheduling
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public Collection<WorkerSlot> allSlotsAvailableForScheduling(
Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments) {
synchronized (_offersLock) {
return _mesosStormScheduler.allSlotsAvailableForScheduling(
_offers,
existingSupervisors,
topologies,
topologiesMissingAssignments);
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:12,代码来源:MesosNimbus.java
示例9: allSlotsAvailableForScheduling
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public Collection<WorkerSlot> allSlotsAvailableForScheduling(
Collection<SupervisorDetails> existingSupervisors,
Topologies topologies, Set<String> topologiesMissingAssignments) {
Collection<WorkerSlot> result = new HashSet<WorkerSlot>();
for (SupervisorDetails detail : existingSupervisors) {
for (Integer port : detail.getAllPorts())
result.add(new WorkerSlot(detail.getId(), port));
}
return result;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:12,代码来源:DefaultInimbus.java
示例10: allSlotsAvailableForScheduling
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public Collection<WorkerSlot> allSlotsAvailableForScheduling(
Collection<SupervisorDetails> existingSupervisors,
Topologies topologies, Set<String> topologiesMissingAssignments) {
// TODO Auto-generated method stub
Collection<WorkerSlot> result = new HashSet<WorkerSlot>();
for (SupervisorDetails detail : existingSupervisors) {
for (Integer port : detail.getAllPorts())
result.add(new WorkerSlot(detail.getId(), port));
}
return result;
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:13,代码来源:DefaultInimbus.java
示例11: allSlotsAvailableForScheduling
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies,
Set<String> topologiesMissingAssignments) {
// TODO Auto-generated method stub
Collection<WorkerSlot> result = new HashSet<WorkerSlot>();
for (SupervisorDetails detail : existingSupervisors) {
for (Integer port : detail.getAllPorts())
result.add(new WorkerSlot(detail.getId(), port));
}
return result;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:12,代码来源:DefaultInimbus.java
示例12: synTopologies
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
/**
* Synchronize the watching topologies with alive topologies in the cluster. Dead topology will
* be removed from the watching list.
*
* @param topologies alive topologies
*/
public void synTopologies(Topologies topologies) {
Set<String> aliveTopoIds = topologies.getTopologies().stream().map(TopologyDetails::getId)
.collect(Collectors.toSet());
// remove topologies that dead
watchingTopologies.keySet().retainAll(aliveTopoIds);
topologies.getTopologies().stream().forEach(this::addOrUpdateTopology);
}
开发者ID:ADSC-Resa,项目名称:resa,代码行数:14,代码来源:TopologyListener.java
示例13: allSlotsAvailableForScheduling
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors,
Topologies topologies,
Set<String> topologiesMissingAssignments) {
Collection<WorkerSlot> result = new HashSet<>();
for (SupervisorDetails detail : existingSupervisors) {
for (Integer port : detail.getAllPorts())
result.add(new WorkerSlot(detail.getId(), port));
}
return result;
}
开发者ID:alibaba,项目名称:jstorm,代码行数:12,代码来源:DefaultInimbus.java
示例14: testComputeResourcesForSlot
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Test
public void testComputeResourcesForSlot() throws Exception {
MesosNimbus mesosNimbus = new MesosNimbus();
mesosNimbus._configUrl = new URI("http://127.0.0.1/");
OfferID offerId = OfferID.newBuilder().setValue("derp").build();
RotatingMap<OfferID, Offer> offers = new RotatingMap<>(
new RotatingMap.ExpiredCallback<OfferID, Offer>() {
@Override
public void expire(OfferID key, Offer val) {
}
}
);
offers.put(
offerId,
TestUtils.buildOfferWithPorts("offer1", "host1.west", 2.0, 2048, 1000, 1000)
);
HashMap<String, TopologyDetails> topologyMap = new HashMap<>();
Map conf = new HashMap<>();
conf.put(MesosCommon.WORKER_CPU_CONF, 1);
conf.put(MesosCommon.WORKER_MEM_CONF, 1024);
conf.put(MesosCommon.EXECUTOR_CPU_CONF, 1);
conf.put(MesosCommon.EXECUTOR_MEM_CONF, 1024);
conf.put(MesosNimbus.CONF_EXECUTOR_URI, "");
mesosNimbus._conf = conf;
topologyMap.put("t1", new TopologyDetails("t1", conf, new StormTopology(), 5));
HashMap<OfferID, List<LaunchTask>> launchList = new HashMap<>();
HashMap<OfferID, List<WorkerSlot>> slotList = new HashMap<>();
slotList.put(offerId, Arrays.asList(new WorkerSlot("", 1000)));
Topologies topologies = new Topologies(topologyMap);
mesosNimbus.computeResourcesForSlot(
offers,
topologies,
launchList,
"t1",
slotList,
OfferID.newBuilder().setValue("derp").build()
);
assertEquals(1, launchList.size());
assertEquals(1, launchList.get(offerId).size());
assertEquals(
TestUtils.buildScalarResource("cpus", 1.0),
launchList.get(offerId).get(0).getTask().getResources(0)
);
assertEquals(0, offers.get(offerId).getResourcesCount());
}
开发者ID:PacktPublishing,项目名称:Mastering-Mesos,代码行数:54,代码来源:MesosNimbusTest.java
示例15: assignSlots
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public void assignSlots(Topologies topologies,
Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:DefaultInimbus.java
示例16: assignSlots
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public void assignSlots(Topologies topologies,
Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
// TODO Auto-generated method stub
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:7,代码来源:DefaultInimbus.java
示例17: schedule
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public void schedule(Topologies topologies, Cluster cluster) {
evenScheduler.schedule(topologies, cluster);
assignmentTracker.checkAssignment(topologies, cluster);
}
开发者ID:leonardoaniello,项目名称:storm-adaptive-schedulers,代码行数:6,代码来源:DefaultScheduler.java
示例18: assignSlots
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
// TODO Auto-generated method stub
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:6,代码来源:DefaultInimbus.java
示例19: allSlotsAvailableForScheduling
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public Collection<WorkerSlot> allSlotsAvailableForScheduling(
Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments) {
synchronized (OFFERS_LOCK) {
LOG.info("Currently have " + _offers.size() + " offers buffered");
if (!topologiesMissingAssignments.isEmpty()) {
LOG.info("Topologies that need assignments: " + topologiesMissingAssignments.toString());
} else {
LOG.info("Declining offers because no topologies need assignments");
_offers.clear();
}
}
Double cpu = null;
Double mem = null;
// TODO: maybe this isn't the best approach. if a topology raises #cpus keeps failing,
// it will mess up scheduling on this cluster permanently
for (String id : topologiesMissingAssignments) {
TopologyDetails details = topologies.getById(id);
double tcpu = MesosCommon.topologyCpu(_conf, details);
double tmem = MesosCommon.topologyMem(_conf, details);
if (cpu == null || tcpu > cpu) {
cpu = tcpu;
}
if (mem == null || tmem > mem) {
mem = tmem;
}
}
// need access to how many slots are currently used to limit number of slots taken up
List<WorkerSlot> allSlots = new ArrayList();
if (cpu != null && mem != null) {
synchronized (OFFERS_LOCK) {
for (Offer offer : _offers.values()) {
allSlots.addAll(toSlots(offer, cpu, mem));
}
}
}
LOG.info("Number of available slots: " + allSlots.size());
return allSlots;
}
开发者ID:deric,项目名称:storm-mesos,代码行数:44,代码来源:MesosNimbus.java
示例20: assignSlots
import backtype.storm.scheduler.Topologies; //导入依赖的package包/类
@Override
public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
}
开发者ID:alibaba,项目名称:jstorm,代码行数:4,代码来源:DefaultInimbus.java
注:本文中的backtype.storm.scheduler.Topologies类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论