YARN-4078. Add getPendingResourceRequestForAttempt in YarnScheduler interface. Contributed by Naganarasimha G R

This commit is contained in:
Jian He 2015-09-16 14:58:32 +08:00
parent 2ffe2db95e
commit 452079af8b
7 changed files with 29 additions and 32 deletions

View File

@ -842,6 +842,9 @@ Release 2.8.0 - UNRELEASED
YARN-4151. Fix findbugs errors in hadoop-yarn-server-common module. YARN-4151. Fix findbugs errors in hadoop-yarn-server-common module.
(Meng Ding via wangda) (Meng Ding via wangda)
YARN-4078. Add getPendingResourceRequestForAttempt in YarnScheduler interface.
(Naganarasimha G R via jianhe)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -687,6 +687,7 @@ protected void refreshMaximumAllocation(Resource newMaxAlloc) {
} }
} }
@Override
public List<ResourceRequest> getPendingResourceRequestsForAttempt( public List<ResourceRequest> getPendingResourceRequestsForAttempt(
ApplicationAttemptId attemptId) { ApplicationAttemptId attemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId); SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId);

View File

@ -337,4 +337,10 @@ public void updateApplicationPriority(Priority newPriority,
* @throws YarnException * @throws YarnException
*/ */
void setClusterMaxPriority(Configuration conf) throws YarnException; void setClusterMaxPriority(Configuration conf) throws YarnException;
/**
* @param attemptId
*/
List<ResourceRequest> getPendingResourceRequestsForAttempt(
ApplicationAttemptId attemptId);
} }

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@ -177,9 +176,8 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
allocatedVCores = usedResources.getVirtualCores(); allocatedVCores = usedResources.getVirtualCores();
runningContainers = resourceReport.getNumUsedContainers(); runningContainers = resourceReport.getNumUsedContainers();
} }
resourceRequests = resourceRequests = rm.getRMContext().getScheduler()
((AbstractYarnScheduler) rm.getRMContext().getScheduler()) .getPendingResourceRequestsForAttempt(attempt.getAppAttemptId());
.getPendingResourceRequestsForAttempt(attempt.getAppAttemptId());
} }
} }

View File

@ -23,14 +23,10 @@
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -62,10 +58,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@ -73,9 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -616,8 +610,7 @@ public void testAMfailedBetweenRMRestart() throws Exception {
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
Thread.sleep(3000); Thread.sleep(3000);
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm2.getResourceScheduler();
(AbstractYarnScheduler) rm2.getResourceScheduler();
// Previous AM failed, The failed AM should once again release the // Previous AM failed, The failed AM should once again release the
// just-recovered containers. // just-recovered containers.
assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
@ -669,8 +662,7 @@ public void testContainersNotRecoveredForCompletedApps() throws Exception {
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
Thread.sleep(3000); Thread.sleep(3000);
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm2.getResourceScheduler();
(AbstractYarnScheduler) rm2.getResourceScheduler();
// scheduler should not recover containers for finished apps. // scheduler should not recover containers for finished apps.
assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
@ -724,9 +716,8 @@ public void testAMContainerStatusWithRMRestart() throws Exception {
MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1); MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt(); RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt();
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm1.getResourceScheduler();
((AbstractYarnScheduler) rm1.getResourceScheduler());
Assert.assertTrue(scheduler.getRMContainer( Assert.assertTrue(scheduler.getRMContainer(
attempt0.getMasterContainer().getId()).isAMContainer()); attempt0.getMasterContainer().getId()).isAMContainer());
@ -742,7 +733,7 @@ public void testAMContainerStatusWithRMRestart() throws Exception {
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId()); waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
scheduler = ((AbstractYarnScheduler) rm2.getResourceScheduler()); scheduler = rm2.getResourceScheduler();
Assert.assertTrue(scheduler.getRMContainer( Assert.assertTrue(scheduler.getRMContainer(
attempt0.getMasterContainer().getId()).isAMContainer()); attempt0.getMasterContainer().getId()).isAMContainer());
} }

View File

@ -79,7 +79,7 @@ public void testMaximimumAllocationMemory() throws Exception {
try { try {
rm.start(); rm.start();
testMaximumAllocationMemoryHelper( testMaximumAllocationMemoryHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(), rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory, node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
@ -94,7 +94,7 @@ public void testMaximimumAllocationMemory() throws Exception {
try { try {
rm.start(); rm.start();
testMaximumAllocationMemoryHelper( testMaximumAllocationMemoryHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(), rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory, node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
node2MaxMemory, node3MaxMemory, node2MaxMemory); node2MaxMemory, node3MaxMemory, node2MaxMemory);
@ -104,7 +104,7 @@ public void testMaximimumAllocationMemory() throws Exception {
} }
private void testMaximumAllocationMemoryHelper( private void testMaximumAllocationMemoryHelper(
AbstractYarnScheduler scheduler, YarnScheduler scheduler,
final int node1MaxMemory, final int node2MaxMemory, final int node1MaxMemory, final int node2MaxMemory,
final int node3MaxMemory, final int... expectedMaxMemory) final int node3MaxMemory, final int... expectedMaxMemory)
throws Exception { throws Exception {
@ -166,7 +166,7 @@ public void testMaximimumAllocationVCores() throws Exception {
try { try {
rm.start(); rm.start();
testMaximumAllocationVCoresHelper( testMaximumAllocationVCoresHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(), rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores, node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores); configuredMaxVCores, configuredMaxVCores, configuredMaxVCores);
@ -181,7 +181,7 @@ public void testMaximimumAllocationVCores() throws Exception {
try { try {
rm.start(); rm.start();
testMaximumAllocationVCoresHelper( testMaximumAllocationVCoresHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(), rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores, node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
node2MaxVCores, node3MaxVCores, node2MaxVCores); node2MaxVCores, node3MaxVCores, node2MaxVCores);
@ -191,7 +191,7 @@ public void testMaximimumAllocationVCores() throws Exception {
} }
private void testMaximumAllocationVCoresHelper( private void testMaximumAllocationVCoresHelper(
AbstractYarnScheduler scheduler, YarnScheduler scheduler,
final int node1MaxVCores, final int node2MaxVCores, final int node1MaxVCores, final int node2MaxVCores,
final int node3MaxVCores, final int... expectedMaxVCores) final int node3MaxVCores, final int... expectedMaxVCores)
throws Exception { throws Exception {
@ -494,7 +494,7 @@ public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
} }
private void verifyMaximumResourceCapability( private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) { Resource expectedMaximumResource, YarnScheduler scheduler) {
final Resource schedulerMaximumResourceCapability = scheduler final Resource schedulerMaximumResourceCapability = scheduler
.getMaximumResourceCapability(); .getMaximumResourceCapability();

View File

@ -113,6 +113,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -1950,8 +1951,7 @@ public void testMoveAllApps() throws Exception {
@Test @Test
public void testMoveAllAppsInvalidDestination() throws Exception { public void testMoveAllAppsInvalidDestination() throws Exception {
MockRM rm = setUpMove(); MockRM rm = setUpMove();
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm.getResourceScheduler();
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app // submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
@ -2009,8 +2009,7 @@ public void testMoveAllAppsInvalidDestination() throws Exception {
@Test @Test
public void testMoveAllAppsInvalidSource() throws Exception { public void testMoveAllAppsInvalidSource() throws Exception {
MockRM rm = setUpMove(); MockRM rm = setUpMove();
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm.getResourceScheduler();
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app // submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
@ -2113,8 +2112,7 @@ public void testKillAllAppsInQueue() throws Exception {
@Test @Test
public void testKillAllAppsInvalidSource() throws Exception { public void testKillAllAppsInvalidSource() throws Exception {
MockRM rm = setUpMove(); MockRM rm = setUpMove();
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm.getResourceScheduler();
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app // submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");