本文整理汇总了Java中backtype.storm.generated.TopologySummary类的典型用法代码示例。如果您正苦于以下问题:Java TopologySummary类的具体用法?Java TopologySummary怎么用?Java TopologySummary使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TopologySummary类属于backtype.storm.generated包,在下文中一共展示了TopologySummary类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: getTopologyTPS
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
protected long getTopologyTPS(TopologySummary topology, Client client) throws NotAliveException, TException{
long topologyTps = 0l;
String topologyId = topology.get_id();
if(topologyId.startsWith("ClusterMonitor")){
return topologyTps;
}
TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
if(topologyInfo == null){
return topologyTps;
}
List<ExecutorSummary> executorSummaryList = topologyInfo.get_executors();
for(ExecutorSummary executor : executorSummaryList){
topologyTps += getComponentTPS(executor);
}
LOGGER.info("topology = " + topology.get_name() + ", tps = " + topologyTps);
return topologyTps;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:18,代码来源:ClusterInfoBolt.java
示例2: topologySummary
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
/**
* Convert thrift TopologySummary to UI bean TopologySumm
*
* @param ts
* @return
*/
public static List<TopologySumm> topologySummary(List<TopologySummary> ts) {
List<TopologySumm> tsumm = new ArrayList<TopologySumm>();
if (ts != null) {
for (TopologySummary t : ts) {
TopologySumm topologySumm = new TopologySumm();
topologySumm.setTopologyId(t.get_id());
topologySumm.setTopologyName(t.get_name());
topologySumm.setStatus(t.get_status());
topologySumm.setUptime(StatBuckets.prettyUptimeStr(t
.get_uptime_secs()));
topologySumm.setNumWorkers(String.valueOf(t.get_num_workers()));
topologySumm.setNumTasks(String.valueOf(t.get_num_tasks()));
topologySumm.setErrorInfo(t.get_error_info());
tsumm.add(topologySumm);
}
}
return tsumm;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:30,代码来源:UIUtils.java
示例3: topologyNameExists
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
private static boolean topologyNameExists(Map conf, String name) {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
ClusterSummary summary = client.getClient().getClusterInfo();
for (TopologySummary s : summary.get_topologies()) {
if (s.get_name().equals(name)) {
return true;
}
}
return false;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
client.close();
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:18,代码来源:StormSubmitter.java
示例4: parseResults
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
private void parseResults(List<TopologySummary> topologies)
{
if (topologies == null)
{
return;
}
for (TopologySummary topology : topologies)
{
String[] result = new String[RESULTSHEAD.length];
result[0] = topology.get_name();
result[1] = topology.get_status();
result[2] = String.valueOf(topology.get_num_workers());
result[3] = String.valueOf(topology.get_uptime_secs());
results.add(result);
}
}
开发者ID:HuaweiBigData,项目名称:StreamCQL,代码行数:18,代码来源:StormApplicationResults.java
示例5: getComponents
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
/**
* @@@ Don't be compatible with Storm
*
* Here skip the logic
* @param client
* @param topology
* @return
* @throws Exception
*/
private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception {
HashSet<String> components = new HashSet<String>();
ClusterSummary clusterSummary = client.getClusterInfo();
TopologySummary topologySummary = null;
for (TopologySummary ts : clusterSummary.get_topologies()) {
if (topology.equals(ts.get_name())) {
topologySummary = ts;
break;
}
}
if (topologySummary == null) {
throw new IllegalArgumentException("topology: " + topology + " not found");
} else {
String id = topologySummary.get_id();
// GetInfoOptions getInfoOpts = new GetInfoOptions();
// getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
// TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
// for (ExecutorSummary es: info.get_executors()) {
// components.add(es.get_component_id());
// }
}
return components;
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:33,代码来源:Monitor.java
示例6: topologyNameExists
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
private static boolean topologyNameExists(Map conf, String name) {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
ClusterSummary summary = client.getClient().getClusterInfo();
for(TopologySummary s : summary.get_topologies()) {
if(s.get_name().equals(name)) {
return true;
}
}
return false;
} catch(Exception e) {
throw new RuntimeException(e);
} finally {
client.close();
}
}
开发者ID:metamx,项目名称:incubator-storm,代码行数:18,代码来源:StormSubmitter.java
示例7: cleanCluster
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
public static void cleanCluster() {
try {
NimbusClient client = getNimbusClient(null);
ClusterSummary clusterSummary = client.getClient().getClusterInfo();
List<TopologySummary> topologySummaries = clusterSummary.get_topologies();
KillOptions killOption = new KillOptions();
killOption.set_wait_secs(1);
for (TopologySummary topologySummary : topologySummaries) {
client.getClient().killTopologyWithOpts(topologySummary.get_name(), killOption);
LOG.info("Successfully kill " + topologySummary.get_name());
}
} catch (Exception e) {
if (client != null) {
client.close();
client = null;
}
LOG.error("Failed to kill all topology ", e);
}
}
开发者ID:alibaba,项目名称:jstorm,代码行数:25,代码来源:JStormHelper.java
示例8: mkTopologySummary
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
public static TopologySummary mkTopologySummary(Assignment assignment,
String topologyId, String topologyName, String status,
int uptime_secs, Map<Integer, String> lastErrTimeStamp) {
int num_workers = assignment.getWorkers().size();
int num_tasks = 0;
for (ResourceWorkerSlot worker : assignment.getWorkers()) {
num_tasks = num_tasks + worker.getTasks().size();
}
long currentTimeSecs = System.currentTimeMillis() / 1000;
String errorInfo = "";
if (lastErrTimeStamp != null)
{
for (Entry<Integer, String> entry : lastErrTimeStamp.entrySet()) {
if ((currentTimeSecs - Long.valueOf(entry.getValue())) < entry.getKey()) {
errorInfo = "Y";
break;
}
}
}
TopologySummary ret = new TopologySummary(topologyId, topologyName,
status, uptime_secs, num_tasks, num_workers, errorInfo);
return ret;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:29,代码来源:NimbusUtils.java
示例9: mkTopologySummary
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
public static TopologySummary mkTopologySummary(Assignment assignment, String topologyId, String topologyName,
String status, int uptime_secs,
Map<Integer, String> lastErrTimeStamp) {
int num_workers = assignment.getWorkers().size();
int num_tasks = 0;
for (ResourceWorkerSlot worker : assignment.getWorkers()) {
num_tasks = num_tasks + worker.getTasks().size();
}
long currentTimeSecs = System.currentTimeMillis() / 1000;
String errorInfo = "";
if (lastErrTimeStamp != null) {
for (Entry<Integer, String> entry : lastErrTimeStamp.entrySet()) {
if ((currentTimeSecs - Long.valueOf(entry.getValue())) < entry.getKey()) {
errorInfo = "Y";
break;
}
}
}
TopologySummary ret = new TopologySummary(topologyId, topologyName, status, uptime_secs, num_tasks,
num_workers, errorInfo);
return ret;
}
开发者ID:songtk,项目名称:learn_jstorm,代码行数:28,代码来源:NimbusUtils.java
示例10: parseResults
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
private void parseResults(List<TopologySummary> topologies) {
if (topologies == null) {
return;
}
for (TopologySummary topology : topologies) {
String[] result = new String[RESULTSHEAD.length];
result[0] = topology.get_name();
result[1] = topology.get_status();
result[2] = String.valueOf(topology.get_numWorkers());
result[3] = String.valueOf(topology.get_uptimeSecs());
results.add(result);
}
}
开发者ID:HuaweiBigData,项目名称:StreamCQL,代码行数:15,代码来源:JStormApplicationResults.java
示例11: getClusterInfo
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
private void getClusterInfo(Client client) {
try {
ClusterSummary clusterSummary = client.getClusterInfo();
List<SupervisorSummary> supervisorSummaryList = clusterSummary.get_supervisors();
int totalWorkers = 0;
int usedWorkers = 0;
for(SupervisorSummary summary : supervisorSummaryList){
totalWorkers += summary.get_num_workers() ;
usedWorkers += summary.get_num_used_workers();
}
int freeWorkers = totalWorkers - usedWorkers;
LOGGER.info("cluster totalWorkers = " + totalWorkers
+ ", usedWorkers = " + usedWorkers
+ ", freeWorkers = " + freeWorkers);
HttpCatClient.sendMetric("ClusterMonitor", "freeSlots", "avg", String.valueOf(freeWorkers));
HttpCatClient.sendMetric("ClusterMonitor", "totalSlots", "avg", String.valueOf(totalWorkers));
List<TopologySummary> topologySummaryList = clusterSummary.get_topologies();
long clusterTPS = 0l;
for(TopologySummary topology : topologySummaryList){
long topologyTPS = getTopologyTPS(topology, client);
clusterTPS += topologyTPS;
if(topology.get_name().startsWith("ClusterMonitor")){
continue;
}
HttpCatClient.sendMetric(topology.get_name(), topology.get_name() + "-TPS", "avg", String.valueOf(topologyTPS));
}
HttpCatClient.sendMetric("ClusterMonitor", "ClusterEmitTPS", "avg", String.valueOf(clusterTPS));
} catch (TException e) {
initClient(configMap);
LOGGER.error("get client info error.", e);
}
catch(NotAliveException nae){
LOGGER.warn("topology is dead.", nae);
}
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:39,代码来源:ClusterInfoBolt.java
示例12: topologyExists
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
/**
* Returns whether a given topology exists within <code>summary</code>.
*
* @param summary the summary object
* @param topologyName the topology name
* @return <code>true</code> if the topology exists, <code>false</code> else
*/
private static boolean topologyExists(ClusterSummary summary, String topologyName) {
boolean result = false;
for (TopologySummary s : summary.get_topologies()) {
if (s.get_name().equals(topologyName)) {
result = true;
break;
}
}
return result;
}
开发者ID:QualiMaster,项目名称:Infrastructure,代码行数:18,代码来源:StormUtils.java
示例13: getTopologyInfo
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
/**
* Gets the topology info based on the pipeline name.
*
* @param pipelineName
* the pipeline to search for
* @return the topology info (<b>null</b> if not found)
*/
public TopologyInfo getTopologyInfo(String pipelineName) {
TopologyInfo result = null;
ClusterSummary summary;
try {
logger.info("The thrift connection is " + connection);
if (connection != null) {
summary = connection.getClusterSummary();
List<TopologySummary> topologies = summary.get_topologies();
for (int t = 0; t < topologies.size(); t++) {
TopologySummary topologySummary = topologies.get(t);
if (pipelineName.equals(topologySummary.get_name())) {
try {
logger.info("Obtaining the TopologyInfo for the pipeine: " + pipelineName);
result = connection.getTopologyInfo(topologySummary
.get_id());
logger.info("the TopologyInfo is " + result);
} catch (NotAliveException | TException e) {
}
}
}
}
} catch (TException e1) {
e1.printStackTrace();
}
return result;
}
开发者ID:QualiMaster,项目名称:Infrastructure,代码行数:34,代码来源:CollectingTopologyInfo.java
示例14: getTopologySummaryByName
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
/**
* Returns a topology summary by <code>name</code>.
*
* @param name the topology name
* @return the topology summary (may be <b>null</b>)
* @throws TException in case of problems accessing the remote topology info
*/
public TopologySummary getTopologySummaryByName(String name) throws TException {
TopologySummary result = null;
ClusterSummary summary = getClusterSummary();
List<TopologySummary> topologies = summary.get_topologies();
for (int t = 0; null == result && t < topologies.size(); t++) {
TopologySummary tSummary = topologies.get(t);
if (tSummary.get_name().equals(name)) {
result = tSummary;
}
}
return result;
}
开发者ID:QualiMaster,项目名称:Infrastructure,代码行数:20,代码来源:ThriftConnection.java
示例15: getTopologySummary
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
public static TopologySummary getTopologySummary(ClusterSummary cs, String name) {
for (TopologySummary ts : cs.get_topologies()) {
if (name.equals(ts.get_name())) {
return ts;
}
}
return null;
}
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:9,代码来源:MetricsUtils.java
示例16: testGetTopologySummary
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
@Test
public void testGetTopologySummary() {
ClusterSummary cs = mock(ClusterSummary.class);
TopologySummary ts = mock(TopologySummary.class);
String tsName = "benchmarks";
String fakeName = "fake";
when(cs.get_topologies()).thenReturn(Lists.newArrayList(ts));
when(ts.get_name()).thenReturn(tsName);
assertThat(MetricsUtils.getTopologySummary(cs, tsName)).isEqualTo(ts);
assertThat(MetricsUtils.getTopologySummary(cs, fakeName)).isNull();
}
开发者ID:manuzhang,项目名称:storm-benchmark,代码行数:14,代码来源:MetricsUtilsTest.java
示例17: clusterSummary
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
/**
* Connvert thrift ClusterSummary to UI bean ClusterSumm
*
* @param summ
* @return
*/
public static List<ClusterSumm> clusterSummary(ClusterSummary summ,
NimbusClient client, Map conf) throws Exception {
// "Supervisors" "Used slots" "Free slots" "Total slots" "Running task"
List<SupervisorSummary> sups = summ.get_supervisors();
int supSize = 0;
int totalMemSlots = 0;
int useMemSlots = 0;
int freeMemSlots = 0;
int totalPortSlots = 0;
int usePortSlots = 0;
int freePortSlots = 0;
if (sups != null) {
supSize = sups.size();
for (SupervisorSummary ss : sups) {
totalPortSlots += ss.get_num_workers();
usePortSlots += ss.get_num_used_workers();
}
freeMemSlots = totalMemSlots - useMemSlots;
freePortSlots = totalPortSlots - usePortSlots;
}
// "Running tasks"
int totalTasks = 0;
List<TopologySummary> topos = summ.get_topologies();
if (topos != null) {
int topoSize = topos.size();
for (int j = 0; j < topoSize; j++) {
totalTasks += topos.get(j).get_num_tasks();
}
}
String nimbustime = StatBuckets.prettyUptimeStr(summ
.get_nimbus_uptime_secs());
List<ClusterSumm> clusumms = new ArrayList<ClusterSumm>();
ClusterSumm clusterSumm = new ClusterSumm();
String master = client.getMasterHost();
if (master.contains(":")) {
String firstPart = master.substring(0, master.indexOf(":") );
String lastPart = master.substring(master.indexOf(":"));
clusterSumm.setNimbusHostname(NetWorkUtils.ip2Host(firstPart) + lastPart);
clusterSumm.setNimbusIp(NetWorkUtils.host2Ip(firstPart));
} else {
clusterSumm.setNimbusHostname(master);
clusterSumm.setNimbusIp(NetWorkUtils.host2Ip(master));
}
int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
clusterSumm.setNimbusLogPort(String.valueOf(port));
clusterSumm.setNimbusUptime(nimbustime);
clusterSumm.setSupervisorNum(String.valueOf(supSize));
clusterSumm.setRunningTaskNum(String.valueOf(totalTasks));
clusterSumm.setTotalPortSlotNum(String.valueOf(totalPortSlots));
clusterSumm.setUsedPortSlotNum(String.valueOf(usePortSlots));
clusterSumm.setFreePortSlotNum(String.valueOf(freePortSlots));
clusterSumm.setVersion(summ.get_version());
clusumms.add(clusterSumm);
return clusumms;
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:76,代码来源:UIUtils.java
示例18: StormApplicationResults
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
public StormApplicationResults(List<TopologySummary> topologies)
{
results = Lists.newArrayList();
parseResults(topologies);
}
开发者ID:HuaweiBigData,项目名称:StreamCQL,代码行数:6,代码来源:StormApplicationResults.java
示例19: JStormApplicationResults
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
public JStormApplicationResults(List<TopologySummary> topologies) {
results = Lists.newArrayList();
parseResults(topologies);
}
开发者ID:HuaweiBigData,项目名称:StreamCQL,代码行数:5,代码来源:JStormApplicationResults.java
示例20: metrics
import backtype.storm.generated.TopologySummary; //导入依赖的package包/类
public void metrics(Nimbus.Client client, long now, MetricsState state) throws Exception {
long totalStatted = 0;
int componentParallelism = 0;
boolean streamFound = false;
ClusterSummary clusterSummary = client.getClusterInfo();
TopologySummary topologySummary = null;
for (TopologySummary ts : clusterSummary.get_topologies()) {
if (_topology.equals(ts.get_name())) {
topologySummary = ts;
break;
}
}
if (topologySummary == null) {
throw new IllegalArgumentException("topology: " + _topology + " not found");
} else {
// String id = topologySummary.get_id();
// GetInfoOptions getInfoOpts = new GetInfoOptions();
// getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
// TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
// for (ExecutorSummary es: info.get_executors()) {
// if (_component.equals(es.get_component_id())) {
// componentParallelism ++;
// ExecutorStats stats = es.get_stats();
// if (stats != null) {
// Map<String,Map<String,Long>> statted =
// WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
// if ( statted != null) {
// Map<String, Long> e2 = statted.get(":all-time");
// if (e2 != null) {
// Long stream = e2.get(_stream);
// if (stream != null){
// streamFound = true;
// totalStatted += stream;
// }
// }
// }
// }
// }
// }
}
if (componentParallelism <= 0) {
HashSet<String> components = getComponents(client, _topology);
System.out.println("Available components for " + _topology + " :");
System.out.println("------------------");
for (String comp : components) {
System.out.println(comp);
}
System.out.println("------------------");
throw new IllegalArgumentException("component: " + _component + " not found");
}
if (!streamFound) {
throw new IllegalArgumentException("stream: " + _stream + " not found");
}
long timeDelta = now - state.getLastTime();
long stattedDelta = totalStatted - state.getLastStatted();
state.setLastTime(now);
state.setLastStatted(totalStatted);
double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double) stattedDelta / (double) timeDelta);
System.out.println(_topology + "\t" + _component + "\t" + componentParallelism + "\t" + _stream + "\t" + timeDelta + "\t" + stattedDelta + "\t"
+ throughput);
}
开发者ID:kkllwww007,项目名称:jstrom,代码行数:65,代码来源:Monitor.java
注:本文中的backtype.storm.generated.TopologySummary类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论