• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java DominantResourceCalculator类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.hadoop.yarn.util.resource.DominantResourceCalculator的典型用法代码示例。如果您正苦于以下问题:Java DominantResourceCalculator类的具体用法?Java DominantResourceCalculator怎么用?Java DominantResourceCalculator使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DominantResourceCalculator类属于org.apache.hadoop.yarn.util.resource包,在下文中一共展示了DominantResourceCalculator类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: buildEnv

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
private void buildEnv(String labelsConfig, String nodesConfig,
    String queuesConfig, String appsConfig,
    boolean useDominantResourceCalculator) throws IOException {
  if (useDominantResourceCalculator) {
    when(cs.getResourceCalculator()).thenReturn(
        new DominantResourceCalculator());
  }
  mockNodeLabelsManager(labelsConfig);
  mockSchedulerNodes(nodesConfig);
  for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
    when(cs.getSchedulerNode(nodeId)).thenReturn(
        nodeIdToSchedulerNodes.get(nodeId));
  }
  ParentQueue root = mockQueueHierarchy(queuesConfig);
  when(cs.getRootQueue()).thenReturn(root);
  when(cs.getClusterResource()).thenReturn(clusterResource);
  mockApplications(appsConfig);

  policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs,
      mClock);
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:22,代码来源:TestProportionalCapacityPreemptionPolicyForNodePartitions.java


示例2: buildPolicy

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
    String[][] resData, boolean useDominantResourceCalculator) {
  if (useDominantResourceCalculator) {
    when(mCS.getResourceCalculator()).thenReturn(
        new DominantResourceCalculator());
  }
  ProportionalCapacityPreemptionPolicy policy =
      new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
  clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
      qData[2]);
  ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
  when(mCS.getRootQueue()).thenReturn(mRoot);

  setResourceAndNodeDetails();
  return policy;
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:17,代码来源:TestProportionalCapacityPreemptionPolicy.java


示例3: buildEnv

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
public void buildEnv(String labelsConfig, String nodesConfig,
    String queuesConfig, String appsConfig,
    boolean useDominantResourceCalculator) throws IOException {
  if (useDominantResourceCalculator) {
    when(cs.getResourceCalculator()).thenReturn(
        new DominantResourceCalculator());
  }
  mockNodeLabelsManager(labelsConfig);
  mockSchedulerNodes(nodesConfig);
  for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
    when(cs.getSchedulerNode(nodeId)).thenReturn(
        nodeIdToSchedulerNodes.get(nodeId));
  }
  when(cs.getAllNodes()).thenReturn(nodeIdToSchedulerNodes);
  ParentQueue root = mockQueueHierarchy(queuesConfig);
  when(cs.getRootQueue()).thenReturn(root);
  when(cs.getClusterResource()).thenReturn(clusterResource);
  mockApplications(appsConfig);

  policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
      mClock);
}
 
开发者ID:hopshadoop,项目名称:hops,代码行数:23,代码来源:ProportionalCapacityPreemptionPolicyMockFramework.java


示例4: testNormalizeRequestWithDominantResourceCalculator

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test (timeout = 30000)
public void testNormalizeRequestWithDominantResourceCalculator() {
  ResourceCalculator resourceCalculator = new DominantResourceCalculator();
  
  Resource minResource = Resources.createResource(1024, 1, 0);
  Resource maxResource = Resources.createResource(10240, 10, 10);
  Resource clusterResource = Resources.createResource(10 * 1024, 10, 10);
  
  ResourceRequest ask = new ResourceRequestPBImpl();

  // case negative memory/vcores/gcores
  ask.setCapability(Resources.createResource(-1024, -1, -1));
  SchedulerUtils.normalizeRequest(
      ask, resourceCalculator, clusterResource, minResource, maxResource);
  assertEquals(minResource, ask.getCapability());

  // case zero memory/vcores/gcores
  ask.setCapability(Resources.createResource(0, 0, 0));
  SchedulerUtils.normalizeRequest(
      ask, resourceCalculator, clusterResource, minResource, maxResource);
  assertEquals(minResource, ask.getCapability());
  assertEquals(1, ask.getCapability().getVirtualCores());
  assertEquals(1024, ask.getCapability().getMemory());
  assertEquals(0, ask.getCapability().getGpuCores());

  // case non-zero memory & zero cores
  ask.setCapability(Resources.createResource(1536, 0, 0));
  SchedulerUtils.normalizeRequest(
      ask, resourceCalculator, clusterResource, minResource, maxResource);
  assertEquals(Resources.createResource(2048, 1, 0), ask.getCapability());
  assertEquals(1, ask.getCapability().getVirtualCores());
  assertEquals(2048, ask.getCapability().getMemory());
  assertEquals(0, ask.getCapability().getGpuCores());
}
 
开发者ID:naver,项目名称:hadoop,代码行数:35,代码来源:TestSchedulerUtils.java


示例5: testAMLimitUsage

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 30000)
public void testAMLimitUsage() throws Exception {

  CapacitySchedulerConfiguration config =
      new CapacitySchedulerConfiguration();

  config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
      DefaultResourceCalculator.class.getName());
  verifyAMLimitForLeafQueue(config);

  config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
      DominantResourceCalculator.class.getName());
  verifyAMLimitForLeafQueue(config);

}
 
开发者ID:naver,项目名称:hadoop,代码行数:16,代码来源:TestCapacityScheduler.java


示例6: testNormalizeRequestWithDominantResourceCalculator

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test (timeout = 30000)
public void testNormalizeRequestWithDominantResourceCalculator() {
  ResourceCalculator resourceCalculator = new DominantResourceCalculator();
  
  Resource minResource = Resources.createResource(1024, 1);
  Resource maxResource = Resources.createResource(10240, 10);
  Resource clusterResource = Resources.createResource(10 * 1024, 10);
  
  ResourceRequest ask = new ResourceRequestPBImpl();

  // case negative memory/vcores
  ask.setCapability(Resources.createResource(-1024, -1));
  SchedulerUtils.normalizeRequest(
      ask, resourceCalculator, clusterResource, minResource, maxResource);
  assertEquals(minResource, ask.getCapability());

  // case zero memory/vcores
  ask.setCapability(Resources.createResource(0, 0));
  SchedulerUtils.normalizeRequest(
      ask, resourceCalculator, clusterResource, minResource, maxResource);
  assertEquals(minResource, ask.getCapability());
  assertEquals(1, ask.getCapability().getVirtualCores());
  assertEquals(1024, ask.getCapability().getMemory());

  // case non-zero memory & zero cores
  ask.setCapability(Resources.createResource(1536, 0));
  SchedulerUtils.normalizeRequest(
      ask, resourceCalculator, clusterResource, minResource, maxResource);
  assertEquals(Resources.createResource(2048, 1), ask.getCapability());
  assertEquals(1, ask.getCapability().getVirtualCores());
  assertEquals(2048, ask.getCapability().getMemory());
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:33,代码来源:TestSchedulerUtils.java


示例7: testAMLimitUsage

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 30000)
public void testAMLimitUsage() throws Exception {

  CapacitySchedulerConfiguration config =
      new CapacitySchedulerConfiguration();

  config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
      DefaultResourceCalculator.class.getName());
  verifyAMLimitForLeafQueue(config);

  config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
      DominantResourceCalculator.class.getName());
  verifyAMLimitForLeafQueue(config);
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:15,代码来源:TestCapacityScheduler.java


示例8: testAMLimitUsage

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 60000)
public void testAMLimitUsage() throws Exception {

  CapacitySchedulerConfiguration config =
      new CapacitySchedulerConfiguration();

  config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
      DefaultResourceCalculator.class.getName());
  verifyAMLimitForLeafQueue(config);

  config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
      DominantResourceCalculator.class.getName());
  verifyAMLimitForLeafQueue(config);
}
 
开发者ID:hopshadoop,项目名称:hops,代码行数:15,代码来源:TestCapacityScheduler.java


示例9: testResourceTypes

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 3000000)
public void testResourceTypes() throws Exception {
  HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>> driver =
      new HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>>();

  CapacitySchedulerConfiguration csconf =
      new CapacitySchedulerConfiguration();
  csconf.setResourceComparator(DominantResourceCalculator.class);
  YarnConfiguration testCapacityDRConf = new YarnConfiguration(csconf);
  testCapacityDRConf.setClass(YarnConfiguration.RM_SCHEDULER,
    CapacityScheduler.class, ResourceScheduler.class);
  YarnConfiguration testCapacityDefConf = new YarnConfiguration();
  testCapacityDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
    CapacityScheduler.class, ResourceScheduler.class);
  YarnConfiguration testFairDefConf = new YarnConfiguration();
  testFairDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
    FairScheduler.class, ResourceScheduler.class);

  driver.put(conf, EnumSet.of(SchedulerResourceTypes.MEMORY));
  driver.put(testCapacityDRConf,
    EnumSet.of(SchedulerResourceTypes.CPU, SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.GPU));
  driver.put(testCapacityDefConf, EnumSet.of(SchedulerResourceTypes.MEMORY));
  driver.put(testFairDefConf,
    EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU, SchedulerResourceTypes.GPU));

  for (Map.Entry<YarnConfiguration, EnumSet<SchedulerResourceTypes>> entry : driver
    .entrySet()) {
    EnumSet<SchedulerResourceTypes> expectedValue = entry.getValue();
    MockRM rm = new MockRM(entry.getKey());
    rm.start();
    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
    RMApp app1 = rm.submitApp(2048);
    nm1.nodeHeartbeat(true);
    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
    RegisterApplicationMasterResponse resp = am1.registerAppAttempt();
    EnumSet<SchedulerResourceTypes> types = resp.getSchedulerResourceTypes();
    LOG.info("types = " + types.toString());
    Assert.assertEquals(expectedValue, types);
    rm.stop();
  }
}
 
开发者ID:naver,项目名称:hadoop,代码行数:43,代码来源:TestApplicationMasterService.java


示例10: testAppReservationWithDominantResourceCalculator

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 30000)
public void testAppReservationWithDominantResourceCalculator() throws Exception {
  CapacitySchedulerConfiguration csconf =
      new CapacitySchedulerConfiguration();
  csconf.setResourceComparator(DominantResourceCalculator.class);

  YarnConfiguration conf = new YarnConfiguration(csconf);
  conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
    ResourceScheduler.class);

  MockRM rm = new MockRM(conf);
  rm.start();

  MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1, 1);

  // register extra nodes to bump up cluster resource
  MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4, 4);
  rm.registerNode("127.0.0.1:1236", 10 * GB, 4, 4);

  RMApp app1 = rm.submitApp(1024);
  // kick the scheduling
  nm1.nodeHeartbeat(true);
  RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
  MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
  am1.registerAppAttempt();
  SchedulerNodeReport report_nm1 =
      rm.getResourceScheduler().getNodeReport(nm1.getNodeId());

  // check node report
  Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemory());
  Assert.assertEquals(9 * GB, report_nm1.getAvailableResource().getMemory());

  // add request for containers
  am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 1 * GB, 1, 1);
  am1.schedule(); // send the request

  // kick the scheduler, container reservation should not happen
  nm1.nodeHeartbeat(true);
  Thread.sleep(1000);
  AllocateResponse allocResponse = am1.schedule();
  ApplicationResourceUsageReport report =
      rm.getResourceScheduler().getAppResourceUsageReport(
        attempt1.getAppAttemptId());
  Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
  Assert.assertEquals(0, report.getNumReservedContainers());

  // container should get allocated on this node
  nm2.nodeHeartbeat(true);

  while (allocResponse.getAllocatedContainers().size() == 0) {
    Thread.sleep(100);
    allocResponse = am1.schedule();
  }
  report =
      rm.getResourceScheduler().getAppResourceUsageReport(
        attempt1.getAppAttemptId());
  Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
  Assert.assertEquals(0, report.getNumReservedContainers());
  rm.stop();
}
 
开发者ID:naver,项目名称:hadoop,代码行数:61,代码来源:TestCapacityScheduler.java


示例11: testResourceTypes

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 3000000)
public void testResourceTypes() throws Exception {
  HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>> driver =
      new HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>>();

  CapacitySchedulerConfiguration csconf =
      new CapacitySchedulerConfiguration();
  csconf.setResourceComparator(DominantResourceCalculator.class);
  YarnConfiguration testCapacityDRConf = new YarnConfiguration(csconf);
  testCapacityDRConf.setClass(YarnConfiguration.RM_SCHEDULER,
    CapacityScheduler.class, ResourceScheduler.class);
  YarnConfiguration testCapacityDefConf = new YarnConfiguration();
  testCapacityDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
    CapacityScheduler.class, ResourceScheduler.class);
  YarnConfiguration testFairDefConf = new YarnConfiguration();
  testFairDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
    FairScheduler.class, ResourceScheduler.class);

  driver.put(conf, EnumSet.of(SchedulerResourceTypes.MEMORY));
  driver.put(testCapacityDRConf,
    EnumSet.of(SchedulerResourceTypes.CPU, SchedulerResourceTypes.MEMORY));
  driver.put(testCapacityDefConf, EnumSet.of(SchedulerResourceTypes.MEMORY));
  driver.put(testFairDefConf,
    EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU));

  for (Map.Entry<YarnConfiguration, EnumSet<SchedulerResourceTypes>> entry : driver
    .entrySet()) {
    EnumSet<SchedulerResourceTypes> expectedValue = entry.getValue();
    MockRM rm = new MockRM(entry.getKey());
    rm.start();
    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
    RMApp app1 = rm.submitApp(2048);
    nm1.nodeHeartbeat(true);
    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
    RegisterApplicationMasterResponse resp = am1.registerAppAttempt();
    EnumSet<SchedulerResourceTypes> types = resp.getSchedulerResourceTypes();
    LOG.info("types = " + types.toString());
    Assert.assertEquals(expectedValue, types);
    rm.stop();
  }
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:43,代码来源:TestApplicationMasterService.java


示例12: testAppReservationWithDominantResourceCalculator

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 30000)
public void testAppReservationWithDominantResourceCalculator() throws Exception {
  CapacitySchedulerConfiguration csconf =
      new CapacitySchedulerConfiguration();
  csconf.setResourceComparator(DominantResourceCalculator.class);

  YarnConfiguration conf = new YarnConfiguration(csconf);
  conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
    ResourceScheduler.class);

  MockRM rm = new MockRM(conf);
  rm.start();

  MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1);

  // register extra nodes to bump up cluster resource
  MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4);
  rm.registerNode("127.0.0.1:1236", 10 * GB, 4);

  RMApp app1 = rm.submitApp(1024);
  // kick the scheduling
  nm1.nodeHeartbeat(true);
  RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
  MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
  am1.registerAppAttempt();
  SchedulerNodeReport report_nm1 =
      rm.getResourceScheduler().getNodeReport(nm1.getNodeId());

  // check node report
  Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemory());
  Assert.assertEquals(9 * GB, report_nm1.getAvailableResource().getMemory());

  // add request for containers
  am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 1 * GB, 1, 1);
  am1.schedule(); // send the request

  // kick the scheduler, container reservation should not happen
  nm1.nodeHeartbeat(true);
  Thread.sleep(1000);
  AllocateResponse allocResponse = am1.schedule();
  ApplicationResourceUsageReport report =
      rm.getResourceScheduler().getAppResourceUsageReport(
        attempt1.getAppAttemptId());
  Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
  Assert.assertEquals(0, report.getNumReservedContainers());

  // container should get allocated on this node
  nm2.nodeHeartbeat(true);

  while (allocResponse.getAllocatedContainers().size() == 0) {
    Thread.sleep(100);
    allocResponse = am1.schedule();
  }
  report =
      rm.getResourceScheduler().getAppResourceUsageReport(
        attempt1.getAppAttemptId());
  Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
  Assert.assertEquals(0, report.getNumReservedContainers());
  rm.stop();
}
 
开发者ID:aliyun-beta,项目名称:aliyun-oss-hadoop-fs,代码行数:61,代码来源:TestCapacityScheduler.java


示例13: allocate

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {

  // Make sure this application exists
  FSAppAttempt application = getSchedulerApp(appAttemptId);
  if (application == null) {
    LOG.info("Calling allocate on removed " +
        "or non existant application " + appAttemptId);
    return EMPTY_ALLOCATION;
  }

  // Sanity check
  SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
      clusterResource, minimumAllocation, getMaximumResourceCapability(),
      incrAllocation);

  // Record container allocation start time
  application.recordContainerRequestTime(getClock().getTime());

  // Set amResource for this app
  if (!application.getUnmanagedAM() && ask.size() == 1
      && application.getLiveContainers().isEmpty()) {
    application.setAMResource(ask.get(0).getCapability());
  }

  // Release containers
  releaseContainers(release, application);

  synchronized (application) {
    if (!ask.isEmpty()) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("allocate: pre-update" +
            " applicationAttemptId=" + appAttemptId +
            " application=" + application.getApplicationId());
      }
      application.showRequests();

      // Update application requests
      application.updateResourceRequests(ask);

      application.showRequests();
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("allocate: post-update" +
          " applicationAttemptId=" + appAttemptId +
          " #ask=" + ask.size() +
          " reservation= " + application.getCurrentReservation());

      LOG.debug("Preempting " + application.getPreemptionContainers().size()
          + " container(s)");
    }

    Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
    for (RMContainer container : application.getPreemptionContainers()) {
      preemptionContainerIds.add(container.getContainerId());
    }

    application.updateBlacklist(blacklistAdditions, blacklistRemovals);
    ContainersAndNMTokensAllocation allocation =
        application.pullNewlyAllocatedContainersAndNMTokens();

    // Record container allocation time
    if (!(allocation.getContainerList().isEmpty())) {
      application.recordContainerAllocationTime(getClock().getTime());
    }

    return new Allocation(allocation.getContainerList(),
      application.getHeadroom(), preemptionContainerIds, null, null,
      allocation.getNMTokenList());
  }
}
 
开发者ID:Nextzero,项目名称:hadoop-2.6.0-cdh5.4.3,代码行数:74,代码来源:FairScheduler.java


示例14: allocate

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Override
  public Allocation allocate(ApplicationAttemptId appAttemptId,
      List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {

    // Make sure this application exists
    FSSchedulerApp application = applications.get(appAttemptId);
    if (application == null) {
      LOG.info("Calling allocate on removed " +
          "or non existant application " + appAttemptId);
      return EMPTY_ALLOCATION;
    }
    //correct!
    /**
    for (ResourceRequest a : ask) {
      LOG.info("resource before normalized: " + a.getCapability());
    }
     */

    // Sanity check
    SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
        clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
//    for (ResourceRequest a : ask) {
//      LOG.info("resource after normalized: " + a.getCapability());
//    }

    // Release containers
    for (ContainerId releasedContainerId : release) {
      RMContainer rmContainer = getRMContainer(releasedContainerId);
      if (rmContainer == null) {
        RMAuditLogger.logFailure(application.getUser(),
            AuditConstants.RELEASE_CONTAINER,
            "Unauthorized access or invalid container", "FairScheduler",
            "Trying to release container not owned by app or with invalid id",
            application.getApplicationId(), releasedContainerId);
      }
      completedContainer(rmContainer,
          SchedulerUtils.createAbnormalContainerStatus(
              releasedContainerId,
              SchedulerUtils.RELEASED_CONTAINER),
          RMContainerEventType.RELEASED);
    }

    synchronized (application) {
      if (!ask.isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("allocate: pre-update" +
              " applicationAttemptId=" + appAttemptId +
              " application=" + application.getApplicationId());
        }
        application.showRequests();

        // Update application requests
        application.updateResourceRequests(ask);

        LOG.debug("allocate: post-update");
        application.showRequests();
      }

      if (LOG.isDebugEnabled()) {
        LOG.debug("allocate:" +
            " applicationAttemptId=" + appAttemptId +
            " #ask=" + ask.size());

        LOG.debug("Preempting " + application.getPreemptionContainers().size()
            + " container(s)");
      }
      
      Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
      for (RMContainer container : application.getPreemptionContainers()) {
        preemptionContainerIds.add(container.getContainerId());
      }
      
      return new Allocation(application.pullNewlyAllocatedContainers(),
          application.getHeadroom(), preemptionContainerIds);
    }
  }
 
开发者ID:ict-carch,项目名称:hadoop-plus,代码行数:77,代码来源:FairScheduler.java


示例15: testResourceTypes

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 3000000)
public void testResourceTypes() throws Exception {
  HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>> driver =
      new HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>>();

  CapacitySchedulerConfiguration csconf =
      new CapacitySchedulerConfiguration();
  csconf.setResourceComparator(DominantResourceCalculator.class);
  YarnConfiguration testCapacityDRConf = new YarnConfiguration(csconf);
  testCapacityDRConf.setClass(YarnConfiguration.RM_SCHEDULER,
    CapacityScheduler.class, ResourceScheduler.class);
  YarnConfiguration testCapacityDefConf = new YarnConfiguration();
  testCapacityDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
    CapacityScheduler.class, ResourceScheduler.class);
  YarnConfiguration testFairDefConf = new YarnConfiguration();
  testFairDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
    FairScheduler.class, ResourceScheduler.class);

  driver.put(conf, EnumSet.of(SchedulerResourceTypes.MEMORY));
  driver.put(testCapacityDRConf,
      EnumSet.of(SchedulerResourceTypes.CPU, SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.GPU));
  driver.put(testCapacityDefConf, EnumSet.of(SchedulerResourceTypes.MEMORY));
  driver.put(testFairDefConf,
      EnumSet.of(SchedulerResourceTypes.CPU, SchedulerResourceTypes.MEMORY));

  for (Map.Entry<YarnConfiguration, EnumSet<SchedulerResourceTypes>> entry : driver
    .entrySet()) {
    EnumSet<SchedulerResourceTypes> expectedValue = entry.getValue();
    MockRM rm = new MockRM(entry.getKey());
    rm.start();
    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
    RMApp app1 = rm.submitApp(2048);
    nm1.nodeHeartbeat(true);
    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
    RegisterApplicationMasterResponse resp = am1.registerAppAttempt();
    EnumSet<SchedulerResourceTypes> types = resp.getSchedulerResourceTypes();
    LOG.info("types = " + types.toString());
    Assert.assertEquals(expectedValue, types);
    rm.stop();
  }
}
 
开发者ID:hopshadoop,项目名称:hops,代码行数:43,代码来源:TestApplicationMasterService.java


示例16: testAppReservationWithDominantResourceCalculator

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Test(timeout = 30000)
public void testAppReservationWithDominantResourceCalculator() throws Exception {
  CapacitySchedulerConfiguration csconf =
      new CapacitySchedulerConfiguration();
  csconf.setResourceComparator(DominantResourceCalculator.class);

  YarnConfiguration conf = new YarnConfiguration(csconf);
  conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
    ResourceScheduler.class);

  MockRM rm = new MockRM(conf);
  rm.start();

  MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1, 1);

  // register extra nodes to bump up cluster resource
  MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4, 4);
  rm.registerNode("127.0.0.1:1236", 10 * GB, 4, 4);

  RMApp app1 = rm.submitApp(1024);
  // kick the scheduling
  nm1.nodeHeartbeat(true);
  RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
  MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
  am1.registerAppAttempt();
  SchedulerNodeReport report_nm1 =
      rm.getResourceScheduler().getNodeReport(nm1.getNodeId());

  // check node report
  Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemorySize());
  Assert.assertEquals(9 * GB, report_nm1.getAvailableResource().getMemorySize());

  // add request for containers
  am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 1 * GB, 1, 1);
  am1.schedule(); // send the request

  // kick the scheduler, container reservation should not happen
  nm1.nodeHeartbeat(true);
  Thread.sleep(1000);
  AllocateResponse allocResponse = am1.schedule();
  ApplicationResourceUsageReport report =
      rm.getResourceScheduler().getAppResourceUsageReport(
        attempt1.getAppAttemptId());
  Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
  Assert.assertEquals(0, report.getNumReservedContainers());

  // container should get allocated on this node
  nm2.nodeHeartbeat(true);

  while (allocResponse.getAllocatedContainers().size() == 0) {
    Thread.sleep(100);
    allocResponse = am1.schedule();
  }
  report =
      rm.getResourceScheduler().getAppResourceUsageReport(
        attempt1.getAppAttemptId());
  Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
  Assert.assertEquals(0, report.getNumReservedContainers());
  rm.stop();
}
 
开发者ID:hopshadoop,项目名称:hops,代码行数:61,代码来源:TestCapacityScheduler.java


示例17: allocate

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {

  // Make sure this application exists
  FSSchedulerApp application = applications.get(appAttemptId);
  if (application == null) {
    LOG.info("Calling allocate on removed " +
        "or non existant application " + appAttemptId);
    return EMPTY_ALLOCATION;
  }

  // Sanity check
  SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
      clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);

  // Release containers
  for (ContainerId releasedContainerId : release) {
    RMContainer rmContainer = getRMContainer(releasedContainerId);
    if (rmContainer == null) {
      RMAuditLogger.logFailure(application.getUser(),
          AuditConstants.RELEASE_CONTAINER,
          "Unauthorized access or invalid container", "FairScheduler",
          "Trying to release container not owned by app or with invalid id",
          application.getApplicationId(), releasedContainerId);
    }
    completedContainer(rmContainer,
        SchedulerUtils.createAbnormalContainerStatus(
            releasedContainerId,
            SchedulerUtils.RELEASED_CONTAINER),
        RMContainerEventType.RELEASED);
  }

  synchronized (application) {
    if (!ask.isEmpty()) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("allocate: pre-update" +
            " applicationAttemptId=" + appAttemptId +
            " application=" + application.getApplicationId());
      }
      application.showRequests();

      // Update application requests
      application.updateResourceRequests(ask);

      LOG.debug("allocate: post-update");
      application.showRequests();
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("allocate:" +
          " applicationAttemptId=" + appAttemptId +
          " #ask=" + ask.size());

      LOG.debug("Preempting " + application.getPreemptionContainers().size()
          + " container(s)");
    }
    
    Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
    for (RMContainer container : application.getPreemptionContainers()) {
      preemptionContainerIds.add(container.getContainerId());
    }
    
    return new Allocation(application.pullNewlyAllocatedContainers(),
        application.getHeadroom(), preemptionContainerIds);
  }
}
 
开发者ID:chendave,项目名称:hadoop-TCP,代码行数:68,代码来源:FairScheduler.java


示例18: allocate

import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; //导入依赖的package包/类
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {

  // Make sure this application exists
  FSSchedulerApp application = getSchedulerApp(appAttemptId);
  if (application == null) {
    LOG.info("Calling allocate on removed " +
        "or non existant application " + appAttemptId);
    return EMPTY_ALLOCATION;
  }

  // Sanity check
  SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
      clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);

  // Release containers
  for (ContainerId releasedContainerId : release) {
    RMContainer rmContainer = getRMContainer(releasedContainerId);
    if (rmContainer == null) {
      RMAuditLogger.logFailure(application.getUser(),
          AuditConstants.RELEASE_CONTAINER,
          "Unauthorized access or invalid container", "FairScheduler",
          "Trying to release container not owned by app or with invalid id",
          application.getApplicationId(), releasedContainerId);
    }
    completedContainer(rmContainer,
        SchedulerUtils.createAbnormalContainerStatus(
            releasedContainerId,
            SchedulerUtils.RELEASED_CONTAINER),
        RMContainerEventType.RELEASED);
  }

  synchronized (application) {
    if (!ask.isEmpty()) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("allocate: pre-update" +
            " applicationAttemptId=" + appAttemptId +
            " application=" + application.getApplicationId());
      }
      application.showRequests();

      // Update application requests
      application.updateResourceRequests(ask);

      LOG.debug("allocate: post-update");
      application.showRequests();
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("allocate:" +
          " applicationAttemptId=" + appAttemptId +
          " #ask=" + ask.size());

      LOG.debug("Preempting " + application.getPreemptionContainers().size()
          + " container(s)");
    }
    
    Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
    for (RMContainer container : application.getPreemptionContainers()) {
      preemptionContainerIds.add(container.getContainerId());
    }

    application.updateBlacklist(blacklistAdditions, blacklistRemovals);
    ContainersAndNMTokensAllocation allocation =
        application.pullNewlyAllocatedContainersAndNMTokens();
    return new Allocation(allocation.getContainerList(),
      application.getHeadroom(), preemptionContainerIds, null, null,
      allocation.getNMTokenList());
  }
}
 
开发者ID:Seagate,项目名称:hadoop-on-lustre2,代码行数:72,代码来源:FairScheduler.java



注:本文中的org.apache.hadoop.yarn.util.resource.DominantResourceCalculator类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AmqpError类代码示例发布时间:2022-05-22
下一篇:
Java Furnace类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap