YARN-4078. Add getPendingResourceRequestForAttempt in YarnScheduler interface. Contributed by Naganarasimha G R

(cherry picked from commit 452079af8b)
This commit is contained in:
Jian He 2015-09-16 14:58:32 +08:00
parent b4bb8e1731
commit 9eda3ce3fa
7 changed files with 29 additions and 32 deletions

View File

@ -790,6 +790,9 @@ Release 2.8.0 - UNRELEASED
YARN-4151. Fix findbugs errors in hadoop-yarn-server-common module. YARN-4151. Fix findbugs errors in hadoop-yarn-server-common module.
(Meng Ding via wangda) (Meng Ding via wangda)
YARN-4078. Add getPendingResourceRequestForAttempt in YarnScheduler interface.
(Naganarasimha G R via jianhe)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -687,6 +687,7 @@ public abstract class AbstractYarnScheduler
} }
} }
@Override
public List<ResourceRequest> getPendingResourceRequestsForAttempt( public List<ResourceRequest> getPendingResourceRequestsForAttempt(
ApplicationAttemptId attemptId) { ApplicationAttemptId attemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId); SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId);

View File

@ -337,4 +337,10 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @throws YarnException * @throws YarnException
*/ */
void setClusterMaxPriority(Configuration conf) throws YarnException; void setClusterMaxPriority(Configuration conf) throws YarnException;
/**
* @param attemptId
*/
List<ResourceRequest> getPendingResourceRequestsForAttempt(
ApplicationAttemptId attemptId);
} }

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@ -177,9 +176,8 @@ public class AppInfo {
allocatedVCores = usedResources.getVirtualCores(); allocatedVCores = usedResources.getVirtualCores();
runningContainers = resourceReport.getNumUsedContainers(); runningContainers = resourceReport.getNumUsedContainers();
} }
resourceRequests = resourceRequests = rm.getRMContext().getScheduler()
((AbstractYarnScheduler) rm.getRMContext().getScheduler()) .getPendingResourceRequestsForAttempt(attempt.getAppAttemptId());
.getPendingResourceRequestsForAttempt(attempt.getAppAttemptId());
} }
} }

View File

@ -23,14 +23,10 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -62,10 +58,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@ -73,9 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -616,8 +610,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
Thread.sleep(3000); Thread.sleep(3000);
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm2.getResourceScheduler();
(AbstractYarnScheduler) rm2.getResourceScheduler();
// Previous AM failed, The failed AM should once again release the // Previous AM failed, The failed AM should once again release the
// just-recovered containers. // just-recovered containers.
assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
@ -669,8 +662,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
Thread.sleep(3000); Thread.sleep(3000);
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm2.getResourceScheduler();
(AbstractYarnScheduler) rm2.getResourceScheduler();
// scheduler should not recover containers for finished apps. // scheduler should not recover containers for finished apps.
assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
@ -724,9 +716,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1); MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt(); RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt();
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm1.getResourceScheduler();
((AbstractYarnScheduler) rm1.getResourceScheduler());
Assert.assertTrue(scheduler.getRMContainer( Assert.assertTrue(scheduler.getRMContainer(
attempt0.getMasterContainer().getId()).isAMContainer()); attempt0.getMasterContainer().getId()).isAMContainer());
@ -742,7 +733,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId()); waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
scheduler = ((AbstractYarnScheduler) rm2.getResourceScheduler()); scheduler = rm2.getResourceScheduler();
Assert.assertTrue(scheduler.getRMContainer( Assert.assertTrue(scheduler.getRMContainer(
attempt0.getMasterContainer().getId()).isAMContainer()); attempt0.getMasterContainer().getId()).isAMContainer());
} }

View File

@ -79,7 +79,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
try { try {
rm.start(); rm.start();
testMaximumAllocationMemoryHelper( testMaximumAllocationMemoryHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(), rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory, node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
@ -94,7 +94,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
try { try {
rm.start(); rm.start();
testMaximumAllocationMemoryHelper( testMaximumAllocationMemoryHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(), rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory, node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
node2MaxMemory, node3MaxMemory, node2MaxMemory); node2MaxMemory, node3MaxMemory, node2MaxMemory);
@ -104,7 +104,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
} }
private void testMaximumAllocationMemoryHelper( private void testMaximumAllocationMemoryHelper(
AbstractYarnScheduler scheduler, YarnScheduler scheduler,
final int node1MaxMemory, final int node2MaxMemory, final int node1MaxMemory, final int node2MaxMemory,
final int node3MaxMemory, final int... expectedMaxMemory) final int node3MaxMemory, final int... expectedMaxMemory)
throws Exception { throws Exception {
@ -166,7 +166,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
try { try {
rm.start(); rm.start();
testMaximumAllocationVCoresHelper( testMaximumAllocationVCoresHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(), rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores, node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores); configuredMaxVCores, configuredMaxVCores, configuredMaxVCores);
@ -181,7 +181,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
try { try {
rm.start(); rm.start();
testMaximumAllocationVCoresHelper( testMaximumAllocationVCoresHelper(
(AbstractYarnScheduler) rm.getResourceScheduler(), rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores, node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
node2MaxVCores, node3MaxVCores, node2MaxVCores); node2MaxVCores, node3MaxVCores, node2MaxVCores);
@ -191,7 +191,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
} }
private void testMaximumAllocationVCoresHelper( private void testMaximumAllocationVCoresHelper(
AbstractYarnScheduler scheduler, YarnScheduler scheduler,
final int node1MaxVCores, final int node2MaxVCores, final int node1MaxVCores, final int node2MaxVCores,
final int node3MaxVCores, final int... expectedMaxVCores) final int node3MaxVCores, final int... expectedMaxVCores)
throws Exception { throws Exception {
@ -494,7 +494,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
} }
private void verifyMaximumResourceCapability( private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) { Resource expectedMaximumResource, YarnScheduler scheduler) {
final Resource schedulerMaximumResourceCapability = scheduler final Resource schedulerMaximumResourceCapability = scheduler
.getMaximumResourceCapability(); .getMaximumResourceCapability();

View File

@ -113,6 +113,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -1950,8 +1951,7 @@ public class TestCapacityScheduler {
@Test @Test
public void testMoveAllAppsInvalidDestination() throws Exception { public void testMoveAllAppsInvalidDestination() throws Exception {
MockRM rm = setUpMove(); MockRM rm = setUpMove();
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm.getResourceScheduler();
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app // submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
@ -2009,8 +2009,7 @@ public class TestCapacityScheduler {
@Test @Test
public void testMoveAllAppsInvalidSource() throws Exception { public void testMoveAllAppsInvalidSource() throws Exception {
MockRM rm = setUpMove(); MockRM rm = setUpMove();
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm.getResourceScheduler();
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app // submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
@ -2113,8 +2112,7 @@ public class TestCapacityScheduler {
@Test @Test
public void testKillAllAppsInvalidSource() throws Exception { public void testKillAllAppsInvalidSource() throws Exception {
MockRM rm = setUpMove(); MockRM rm = setUpMove();
AbstractYarnScheduler scheduler = YarnScheduler scheduler = rm.getResourceScheduler();
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app // submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");