本文整理汇总了Java中org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType类的典型用法代码示例。如果您正苦于以下问题:Java NodesListManagerEventType类的具体用法?Java NodesListManagerEventType怎么用?Java NodesListManagerEventType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
NodesListManagerEventType类属于org.apache.hadoop.yarn.server.resourcemanager包,在下文中一共展示了NodesListManagerEventType类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: transition
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
NodeState initialState = rmNode.getState();
if (!initialState.equals(NodeState.UNHEALTHY)) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
}
开发者ID:naver,项目名称:hadoop,代码行数:26,代码来源:RMNodeImpl.java
示例2: reportNodeUnusable
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
/**
* Report node is UNUSABLE and update metrics.
* @param rmNode
* @param finalState
*/
public static void reportNodeUnusable(RMNodeImpl rmNode,
NodeState finalState) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
NodeState initialState = rmNode.getState();
if (!initialState.equals(NodeState.UNHEALTHY)) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:25,代码来源:RMNodeImpl.java
示例3: transition
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(finalState);
}
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:20,代码来源:RMNodeImpl.java
示例4: reportNodeUnusable
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
/**
* Report node is UNUSABLE and update metrics.
*
* @param rmNode
* @param finalState
*/
public static void reportNodeUnusable(RMNodeImpl rmNode,
NodeState finalState) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
NodeState initialState = rmNode.getState();
if (!initialState.equals(NodeState.UNHEALTHY)) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
}
开发者ID:hopshadoop,项目名称:hops,代码行数:26,代码来源:RMNodeImplNotDist.java
示例5: transition
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
NodeState initialState = rmNode.getState();
//Add by ME
if (!initialState.equals(NodeState.UNHEALTHY) && !initialState.equals(NodeState.UNTRUST)) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
}
开发者ID:chendave,项目名称:hadoop-TCP,代码行数:27,代码来源:RMNodeImpl.java
示例6: transition
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
// Old node rejoining
RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host);
rmNode.context.getInactiveRMNodes().remove(host);
rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
}
}
开发者ID:Seagate,项目名称:hadoop-on-lustre2,代码行数:22,代码来源:RMNodeImpl.java
示例7: transition
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
List<NMContainerStatus> containers = null;
NodeId nodeId = rmNode.nodeId;
if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) {
// Old node rejoining
RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId);
rmNode.context.getInactiveRMNodes().remove(nodeId);
rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
}
}
if (null != startEvent.getRunningApplications()) {
for (ApplicationId appId : startEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
}
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeAddedSchedulerEvent(rmNode, containers));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:38,代码来源:RMNodeImpl.java
示例8: reportNodeUnusable
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
/**
* Report node is UNUSABLE and update metrics.
*
* @param rmNode
* @param finalState
*/
public void reportNodeUnusable(RMNodeImpl rmNode,
NodeState finalState) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
NodeState initialState = rmNode.getState();
if (!initialState.equals(NodeState.UNHEALTHY)) {
// if (rmNode.context.isDistributed() && !rmNode.context.isLeader()) {
//Add NodeRemovedSchedulerEvent to TransactionState
LOG.debug("HOP :: Added Pending event to TransactionState");
toCommit.addPendingEvent(PendingEvent.Type.NODE_REMOVED,
PendingEvent.Status.NEW);
// } else {
// rmNode.context.getDispatcher().getEventHandler()
// .handle(new NodeRemovedSchedulerEvent(rmNode));
// }
}
if(rmNode.context.isLeader()){
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
}
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
}
开发者ID:hopshadoop,项目名称:hops,代码行数:40,代码来源:RMNodeImplDist.java
示例9: statusUpdateWhenUnHealthyTransitionInternal
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
protected NodeState statusUpdateWhenUnHealthyTransitionInternal(
RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
// Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());
rmNode.setAggregatedContainersUtilization(
statusEvent.getAggregatedContainersUtilization());
rmNode.setNodeUtilization(statusEvent.getNodeUtilization());
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
// if (rmNode.context.isDistributed() && !rmNode.context.isLeader()) {
//Add NodeAddedSchedulerEvent to TransactionState
LOG.debug("HOP :: Added Pending event to TransactionState");
toCommit.addPendingEvent(PendingEvent.Type.NODE_ADDED,
PendingEvent.Status.NEW);
// } else {
// rmNode.context.getDispatcher().getEventHandler().handle(
// new NodeAddedSchedulerEvent(rmNode));
if(rmNode.context.isLeader()){
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
}
// ??? how about updating metrics before notifying to ensure that
// notifiers get update metadata because they will very likely query it
// upon notification
// Update metrics
rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY);
return NodeState.RUNNING;
}
return NodeState.UNHEALTHY;
}
开发者ID:hopshadoop,项目名称:hops,代码行数:39,代码来源:RMNodeImplDist.java
示例10: statusUpdateWhenUnHealthyTransitionInternal
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
protected NodeState statusUpdateWhenUnHealthyTransitionInternal(
RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
// Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());
rmNode.setAggregatedContainersUtilization(
statusEvent.getAggregatedContainersUtilization());
rmNode.setNodeUtilization(statusEvent.getNodeUtilization());
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
// ??? how about updating metrics before notifying to ensure that
// notifiers get update metadata because they will very likely query it
// upon notification
// Update metrics
rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY);
return NodeState.RUNNING;
}
return NodeState.UNHEALTHY;
}
开发者ID:hopshadoop,项目名称:hops,代码行数:30,代码来源:RMNodeImplNotDist.java
示例11: testNodeUsableEvent
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Test(timeout = 300000)
public void testNodeUsableEvent() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
final Dispatcher dispatcher = getDispatcher();
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 28000);
NodesListManager nodesListManager = rm.getNodesListManager();
Resource clusterResource = Resource.newInstance(28000, 8);
RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource);
// Create killing APP
RMApp killrmApp = rm.submitApp(200);
rm.killApp(killrmApp.getApplicationId());
rm.waitForState(killrmApp.getApplicationId(), RMAppState.KILLED);
// Create finish APP
RMApp finshrmApp = rm.submitApp(2000);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = finshrmApp.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
// Create submitted App
RMApp subrmApp = rm.submitApp(200);
// Fire Event for NODE_USABLE
nodesListManager.handle(new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmnode));
if (applist.size() > 0) {
Assert.assertTrue(
"Event based on running app expected " + subrmApp.getApplicationId(),
applist.contains(subrmApp.getApplicationId()));
Assert.assertFalse(
"Event based on finish app not expected "
+ finshrmApp.getApplicationId(),
applist.contains(finshrmApp.getApplicationId()));
Assert.assertFalse(
"Event based on killed app not expected "
+ killrmApp.getApplicationId(),
applist.contains(killrmApp.getApplicationId()));
} else {
Assert.fail("Events received should have beeen more than 1");
}
applist.clear();
// Fire Event for NODE_UNUSABLE
nodesListManager.handle(new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmnode));
if (applist.size() > 0) {
Assert.assertTrue(
"Event based on running app expected " + subrmApp.getApplicationId(),
applist.contains(subrmApp.getApplicationId()));
Assert.assertFalse(
"Event based on finish app not expected "
+ finshrmApp.getApplicationId(),
applist.contains(finshrmApp.getApplicationId()));
Assert.assertFalse(
"Event based on killed app not expected "
+ killrmApp.getApplicationId(),
applist.contains(killrmApp.getApplicationId()));
} else {
Assert.fail("Events received should have beeen more than 1");
}
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:77,代码来源:TestNodesListManager.java
示例12: testCachedResolverWithEvent
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Test
public void testCachedResolverWithEvent() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, 30);
MockRM rm = new MockRM(conf);
rm.init(conf);
NodesListManager nodesListManager = rm.getNodesListManager();
nodesListManager.init(conf);
nodesListManager.start();
NodesListManager.CachedResolver resolver =
(NodesListManager.CachedResolver)nodesListManager.getResolver();
resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
Assert.assertEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
1, "testCachedResolverHost1", 1234);
RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
1, "testCachedResolverHost2", 1234);
nodesListManager.handle(
new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
rmnode1));
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
nodesListManager.handle(
new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
rmnode2));
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertNotEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:47,代码来源:TestNodesListManager.java
示例13: addNodeTransitionInternal
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Override
protected void addNodeTransitionInternal(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
List<NMContainerStatus> containers = null;
NodeId nodeId = rmNode.nodeId;
RMNode previousRMNode =
rmNode.context.getInactiveRMNodes().remove(nodeId);
if (previousRMNode != null) {
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
} else {
NodeId unknownNodeId =
NodesListManager.createUnknownNodeId(nodeId.getHost());
previousRMNode =
rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
if (previousRMNode != null) {
ClusterMetrics.getMetrics().decrDecommisionedNMs();
}
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
}
}
if (null != startEvent.getRunningApplications()) {
for (ApplicationId appId : startEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
}
// if (rmNode.context.isDistributed() && !rmNode.context.isLeader()) {
//Add NodeAddedSchedulerEvent to TransactionState
toCommit.addPendingEvent(PendingEvent.Type.NODE_ADDED,
PendingEvent.Status.NEW);
// } else {
// rmNode.context.getDispatcher().getEventHandler()
// .handle(new NodeAddedSchedulerEvent(rmNode, containers));
if(rmNode.context.isLeader()){
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
}
}
开发者ID:hopshadoop,项目名称:hops,代码行数:51,代码来源:RMNodeImplDist.java
示例14: addNodeTransitionInternal
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Override
protected void addNodeTransitionInternal(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
List<NMContainerStatus> containers = null;
NodeId nodeId = rmNode.nodeId;
RMNode previousRMNode =
rmNode.context.getInactiveRMNodes().remove(nodeId);
if (previousRMNode != null) {
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
} else {
NodeId unknownNodeId =
NodesListManager.createUnknownNodeId(nodeId.getHost());
previousRMNode =
rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
if (previousRMNode != null) {
ClusterMetrics.getMetrics().decrDecommisionedNMs();
}
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
}
}
if (null != startEvent.getRunningApplications()) {
for (ApplicationId appId : startEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
}
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeAddedSchedulerEvent(rmNode, containers));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
}
开发者ID:hopshadoop,项目名称:hops,代码行数:44,代码来源:RMNodeImplNotDist.java
示例15: testNodeUsableEvent
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; //导入依赖的package包/类
@Test(timeout = 300000)
public void testNodeUsableEvent() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
YarnConfiguration conf = new YarnConfiguration();
RMStorageFactory.setConfiguration(conf);
YarnAPIStorageFactory.setConfiguration(conf);
DBUtility.InitializeDB();
MockRM rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return getDispatcher();
}
};
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 28000);
NodesListManager nodesListManager = rm.getNodesListManager();
Resource clusterResource = Resource.newInstance(28000, 8);
RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource);
// Create killing APP
RMApp killrmApp = rm.submitApp(200);
rm.killApp(killrmApp.getApplicationId());
rm.waitForState(killrmApp.getApplicationId(), RMAppState.KILLED);
// Create finish APP
RMApp finshrmApp = rm.submitApp(2000);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = finshrmApp.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
// Create submitted App
RMApp subrmApp = rm.submitApp(200);
// Fire Event for NODE_USABLE
nodesListManager.handle(new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmnode));
if (applist.size() > 0) {
Assert.assertTrue(
"Event based on running app expected " + subrmApp.getApplicationId(),
applist.contains(subrmApp.getApplicationId()));
Assert.assertFalse(
"Event based on finish app not expected "
+ finshrmApp.getApplicationId(),
applist.contains(finshrmApp.getApplicationId()));
Assert.assertFalse(
"Event based on killed app not expected "
+ killrmApp.getApplicationId(),
applist.contains(killrmApp.getApplicationId()));
} else {
Assert.fail("Events received should have beeen more than 1");
}
applist.clear();
// Fire Event for NODE_UNUSABLE
nodesListManager.handle(new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmnode));
if (applist.size() > 0) {
Assert.assertTrue(
"Event based on running app expected " + subrmApp.getApplicationId(),
applist.contains(subrmApp.getApplicationId()));
Assert.assertFalse(
"Event based on finish app not expected "
+ finshrmApp.getApplicationId(),
applist.contains(finshrmApp.getApplicationId()));
Assert.assertFalse(
"Event based on killed app not expected "
+ killrmApp.getApplicationId(),
applist.contains(killrmApp.getApplicationId()));
} else {
Assert.fail("Events received should have beeen more than 1");
}
}
开发者ID:hopshadoop,项目名称:hops,代码行数:81,代码来源:TestNodesListManager.java
注:本文中的org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论