本文整理汇总了Java中org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher类的典型用法代码示例。如果您正苦于以下问题:Java SystemMetricsPublisher类的具体用法?Java SystemMetricsPublisher怎么用?Java SystemMetricsPublisher使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
SystemMetricsPublisher类属于org.apache.hadoop.yarn.server.resourcemanager.metrics包,在下文中一共展示了SystemMetricsPublisher类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: mockRMContext
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
throws IOException {
Dispatcher dispatcher = mock(Dispatcher.class);
when(rmContext.getDispatcher()).thenReturn(dispatcher);
EventHandler eventHandler = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(eventHandler);
QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class);
queInfo.setQueueName("testqueue");
when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean()))
.thenReturn(queInfo);
when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean()))
.thenThrow(new IOException("queue does not exist"));
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
yarnScheduler);
when(rmContext.getRMApps()).thenReturn(apps);
when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
getSchedulerApps(apps));
ResourceScheduler rs = mock(ResourceScheduler.class);
when(rmContext.getScheduler()).thenReturn(rs);
}
开发者ID:naver,项目名称:hadoop,代码行数:25,代码来源:TestClientRMService.java
示例2: mockRMContext
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
throws IOException {
Dispatcher dispatcher = mock(Dispatcher.class);
when(rmContext.getDispatcher()).thenReturn(dispatcher);
EventHandler eventHandler = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(eventHandler);
QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class);
queInfo.setQueueName("testqueue");
when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean()))
.thenReturn(queInfo);
when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean()))
.thenThrow(new IOException("queue does not exist"));
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
yarnScheduler);
when(rmContext.getRMApps()).thenReturn(apps);
when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
getSchedulerApps(apps));
ResourceScheduler rs = mock(ResourceScheduler.class);
when(rmContext.getScheduler()).thenReturn(rs);
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:26,代码来源:TestClientRMService.java
示例3: setUp
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Override
@Before
public void setUp() throws Exception {
super.setUp();
context = new MockRMContext();
context.setDispatcher(TestObjectFactory.getMockDispatcher());
context.setSystemMetricsPublisher(new SystemMetricsPublisher());
nodeOne = TestObjectFactory.getRMNode("localhost-one", 8800, Resource.newInstance(1024, 2));
nodeTwo = TestObjectFactory.getRMNode("localhost-two", 8800, Resource.newInstance(2048, 4));
sNodeOne = new FSSchedulerNode(nodeOne, false);
sNodeTwo = new FSSchedulerNode(nodeTwo, false);
store = new NodeStore();
store.add(sNodeOne);
store.add(sNodeTwo);
containerOne = TestObjectFactory.getRMContainer(nodeOne, context, 1, 2, 1024);
}
开发者ID:apache,项目名称:incubator-myriad,代码行数:20,代码来源:NodeTest.java
示例4: mockRMContext
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
public RMContext mockRMContext(int n, long time) {
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
for (RMApp app : apps) {
map.put(app.getApplicationId(), app);
}
Dispatcher rmDispatcher = new AsyncDispatcher();
ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
rmDispatcher);
AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext context = new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null, writer) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return map;
}
};
((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
metricsPublisher = mock(SystemMetricsPublisher.class);
((RMContextImpl)context).setSystemMetricsPublisher(metricsPublisher);
return context;
}
开发者ID:naver,项目名称:hadoop,代码行数:28,代码来源:TestAppManager.java
示例5: testAppAttemptMetrics
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
FifoScheduler scheduler = new FifoScheduler();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null, writer, scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
Configuration conf = new Configuration();
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, rmContext);
QueueMetrics metrics = scheduler.getRootQueueMetrics();
int beforeAppsSubmitted = metrics.getAppsSubmitted();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
scheduler.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent2);
int afterAppsSubmitted = metrics.getAppsSubmitted();
Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
scheduler.stop();
}
开发者ID:naver,项目名称:hadoop,代码行数:39,代码来源:TestFifoScheduler.java
示例6: mockRMContext
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
public RMContext mockRMContext(int n, long time) {
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
for (RMApp app : apps) {
map.put(app.getApplicationId(), app);
}
Dispatcher rmDispatcher = new AsyncDispatcher();
ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
rmDispatcher);
AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext context = new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return map;
}
};
((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
metricsPublisher = mock(SystemMetricsPublisher.class);
context.setSystemMetricsPublisher(metricsPublisher);
context.setRMApplicationHistoryWriter(writer);
return context;
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:29,代码来源:TestAppManager.java
示例7: setUp
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
null, null, null, null, null);
rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
//DO NOTHING
}
return null;
}
}
).when(scheduler).handle(any(SchedulerEvent.class));
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:32,代码来源:TestRMAppLogAggregationStatus.java
示例8: testStoreAllContainerMetrics
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Test (timeout = 180000)
public void testStoreAllContainerMetrics() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
MockRM rm1 = new MockRM(conf);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
rm1.getRMContext().setSystemMetricsPublisher(publisher);
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
RMApp app1 = rm1.submitApp(1024);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
// request a container.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
ContainerId containerId2 = ContainerId.newContainerId(
am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
.getAllocatedContainers();
rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED);
rm1.stop();
// RMContainer should be publishing system metrics for all containers.
// Since there is 1 AM container and 1 non-AM container, there should be 2
// container created events and 2 container finished events.
verify(publisher, times(2)).containerCreated(any(RMContainer.class), anyLong());
verify(publisher, times(2)).containerFinished(any(RMContainer.class), anyLong());
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:39,代码来源:TestRMContainerImpl.java
示例9: testStoreOnlyAMContainerMetrics
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Test (timeout = 180000)
public void testStoreOnlyAMContainerMetrics() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
false);
MockRM rm1 = new MockRM(conf);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
rm1.getRMContext().setSystemMetricsPublisher(publisher);
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
RMApp app1 = rm1.submitApp(1024);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
// request a container.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
ContainerId containerId2 = ContainerId.newContainerId(
am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
.getAllocatedContainers();
rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED);
rm1.stop();
// RMContainer should be publishing system metrics only for AM container.
verify(publisher, times(1)).containerCreated(any(RMContainer.class), anyLong());
verify(publisher, times(1)).containerFinished(any(RMContainer.class), anyLong());
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:37,代码来源:TestRMContainerImpl.java
示例10: testAppAttemptMetrics
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
FifoScheduler scheduler = new FifoScheduler();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null, scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
Configuration conf = new Configuration();
((RMContextImpl) rmContext).setScheduler(scheduler);
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, rmContext);
QueueMetrics metrics = scheduler.getRootQueueMetrics();
int beforeAppsSubmitted = metrics.getAppsSubmitted();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
scheduler.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent2);
int afterAppsSubmitted = metrics.getAppsSubmitted();
Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
scheduler.stop();
}
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:40,代码来源:TestFifoScheduler.java
示例11: testAppAttemptMetrics
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null, writer);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new Configuration();
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, rmContext);
QueueMetrics metrics = scheduler.getRootQueueMetrics();
int beforeAppsSubmitted = metrics.getAppsSubmitted();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
scheduler.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent2);
int afterAppsSubmitted = metrics.getAppsSubmitted();
Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
scheduler.stop();
}
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:38,代码来源:TestFifoScheduler.java
示例12: setUp
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Override
@Before
public void setUp() throws Exception {
super.setUp();
this.baseStateStoreDirectory = "/tmp/yarn-node-capacity-manager-test";
context = new MockRMContext();
context.setDispatcher(TestObjectFactory.getMockDispatcher());
context.setSystemMetricsPublisher(new SystemMetricsPublisher());
nodeOne = TestObjectFactory.getRMNode("localhost-one", 8800, Resource.newInstance(2048, 4));
nodeTwo = TestObjectFactory.getRMNode("localhost-two", 8800, Resource.newInstance(1024, 2));
sNodeOne = new FSSchedulerNode(nodeOne, false);
sNodeTwo = new FSSchedulerNode(nodeTwo, false);
containerOne = TestObjectFactory.getRMContainer(nodeOne, context, 1, 2, 1024);
store = new NodeStore();
store.add(sNodeOne);
store.add(sNodeTwo);
MyriadDriver driver = TestObjectFactory.getMyriadDriver(new MockSchedulerDriver());
olManager = new OfferLifecycleManager(store, driver);
state = TestObjectFactory.getSchedulerState(new MyriadConfiguration(), "/tmp/yarn-node-capacity-manager-test");
MyriadFairScheduler scheduler = TestObjectFactory.getMyriadFairScheduler(context);
scheduler.addNode(sNodeOne);
scheduler.addNode(sNodeTwo);
manager = new YarnNodeCapacityManager(new CompositeInterceptor(), scheduler,
context, driver, olManager, store, state, new TaskUtils(this.cfg));
}
开发者ID:apache,项目名称:incubator-myriad,代码行数:30,代码来源:YarnNodeCapacityManagerTest.java
示例13: setSystemMetricsPublisher
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Private
@Unstable
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
}
开发者ID:naver,项目名称:hadoop,代码行数:7,代码来源:RMActiveServiceContext.java
示例14: getSystemMetricsPublisher
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Private
@Unstable
public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher;
}
开发者ID:naver,项目名称:hadoop,代码行数:6,代码来源:RMActiveServiceContext.java
示例15: createSystemMetricsPublisher
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
protected SystemMetricsPublisher createSystemMetricsPublisher() {
return new SystemMetricsPublisher();
}
开发者ID:naver,项目名称:hadoop,代码行数:4,代码来源:ResourceManager.java
示例16: setSystemMetricsPublisher
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher);
}
开发者ID:naver,项目名称:hadoop,代码行数:6,代码来源:RMContextImpl.java
示例17: getSystemMetricsPublisher
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
return activeServiceContext.getSystemMetricsPublisher();
}
开发者ID:naver,项目名称:hadoop,代码行数:5,代码来源:RMContextImpl.java
示例18: testExpireWhileRunning
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Test
public void testExpireWhileRunning() {
DrainDispatcher drainDispatcher = new DrainDispatcher();
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
appAttemptEventHandler);
drainDispatcher.register(RMNodeEventType.class, generic);
drainDispatcher.init(new YarnConfiguration());
drainDispatcher.start();
NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
Resource resource = BuilderUtils.newResource(512, 1, 1);
Priority priority = BuilderUtils.newPriority(5);
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
assertEquals(resource, rmContainer.getAllocatedResource());
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
verify(writer).containerStarted(any(RMContainer.class));
verify(publisher).containerCreated(any(RMContainer.class), anyLong());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
drainDispatcher.await();
assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.ACQUIRED));
drainDispatcher.await();
assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
assertEquals("http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
rmContainer.getLogURL());
// In RUNNING state. Verify EXPIRE and associated actions.
reset(appAttemptEventHandler);
ContainerStatus containerStatus = SchedulerUtils
.createAbnormalContainerStatus(containerId,
SchedulerUtils.EXPIRED_CONTAINER);
rmContainer.handle(new RMContainerFinishedEvent(containerId,
containerStatus, RMContainerEventType.EXPIRE));
drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
verify(writer, never()).containerFinished(any(RMContainer.class));
verify(publisher, never()).containerFinished(any(RMContainer.class),
anyLong());
}
开发者ID:naver,项目名称:hadoop,代码行数:73,代码来源:TestRMContainerImpl.java
示例19: setUp
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
AuthenticationMethod authMethod = AuthenticationMethod.SIMPLE;
if (isSecurityEnabled) {
authMethod = AuthenticationMethod.KERBEROS;
}
SecurityUtil.setAuthenticationMethod(authMethod, conf);
UserGroupInformation.setConfiguration(conf);
rmDispatcher = new DrainDispatcher();
ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class);
writer = mock(RMApplicationHistoryWriter.class);
DelegationTokenRenewer renewer = mock(DelegationTokenRenewer.class);
RMContext realRMContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
renewer, new AMRMTokenSecretManager(conf, this.rmContext),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(),
writer);
((RMContextImpl)realRMContext).setStateStore(store);
publisher = mock(SystemMetricsPublisher.class);
((RMContextImpl)realRMContext).setSystemMetricsPublisher(publisher);
this.rmContext = spy(realRMContext);
ResourceScheduler resourceScheduler = mock(ResourceScheduler.class);
doReturn(null).when(resourceScheduler)
.getAppResourceUsageReport((ApplicationAttemptId)Matchers.any());
doReturn(resourceScheduler).when(rmContext).getScheduler();
rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher(this.rmContext));
rmDispatcher.register(RMAppEventType.class,
new TestApplicationEventDispatcher(rmContext));
rmDispatcher.register(RMAppManagerEventType.class,
new TestApplicationManagerEventDispatcher());
schedulerDispatcher = new TestSchedulerEventDispatcher();
rmDispatcher.register(SchedulerEventType.class,
schedulerDispatcher);
rmDispatcher.init(conf);
rmDispatcher.start();
}
开发者ID:naver,项目名称:hadoop,代码行数:54,代码来源:TestRMAppTransitions.java
示例20: testNodeLocalAssignment
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; //导入依赖的package包/类
@Test(timeout=2000)
public void testNodeLocalAssignment() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
Configuration conf = new Configuration();
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.rollMasterKey();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
Resources.createResource(1024 * 64), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
scheduler.handle(nodeEvent1);
int _appId = 1;
int _appAttemptId = 1;
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
_appAttemptId);
createMockRMApp(appAttemptId, rmContext);
AppAddedSchedulerEvent appEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
"user1");
scheduler.handle(appEvent);
AppAttemptAddedSchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent);
int memory = 64;
int nConts = 3;
int priority = 20;
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest nodeLocal = createResourceRequest(memory,
node0.getHostName(), priority, nConts);
ResourceRequest rackLocal = createResourceRequest(memory,
node0.getRackName(), priority, nConts);
ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority,
nConts);
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
// Before the node update event, there are 3 local requests outstanding
Assert.assertEquals(3, nodeLocal.getNumContainers());
scheduler.handle(node0Update);
// After the node update event, check that there are no more local requests
// outstanding
Assert.assertEquals(0, nodeLocal.getNumContainers());
//Also check that the containers were scheduled
SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
Assert.assertEquals(3, info.getLiveContainers().size());
scheduler.stop();
}
开发者ID:naver,项目名称:hadoop,代码行数:76,代码来源:TestFifoScheduler.java
注:本文中的org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论