From 312a7e71001d55f88781e56b331ab1b40a72a980 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 28 Sep 2011 07:31:03 +0000 Subject: [PATCH] MAPREDUCE-3078. Ensure MapReduce AM reports progress correctly for displaying on the RM Web-UI. Contributed by Vinod K V. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1176762 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop-mapreduce-client-app/pom.xml | 6 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 4 +- .../mapreduce/v2/app/job/impl/JobImpl.java | 29 +- .../v2/app/local/LocalContainerAllocator.java | 21 +- .../mapreduce/v2/app/rm/RMCommunicator.java | 54 +- .../v2/app/rm/RMContainerRequestor.java | 13 +- .../v2/app/TestRMContainerAllocator.java | 1069 ++++++++++------- .../mapreduce/v2/util/MRBuilderUtils.java | 32 +- .../hadoop-mapreduce-client-jobclient/pom.xml | 6 + .../hadoop-mapreduce-client/pom.xml | 6 + .../apache/hadoop/yarn/util/BuilderUtils.java | 24 +- .../pom.xml | 14 + .../server/resourcemanager/RMAppManager.java | 12 +- .../resourcemanager/ResourceManager.java | 16 +- .../server/resourcemanager/rmapp/RMApp.java | 2 +- .../rmapp/attempt/RMAppAttempt.java | 2 +- .../rmapp/attempt/RMAppAttemptImpl.java | 2 + .../scheduler/SchedulerApp.java | 15 +- .../event/NodeUpdateSchedulerEvent.java | 3 - .../scheduler/fifo/FifoScheduler.java | 4 +- .../yarn/server/resourcemanager/MockAM.java | 10 +- .../TestAMRMRPCResponseId.java | 13 +- .../resourcemanager/rmapp/MockRMApp.java | 1 + .../TestContainerTokenSecretManager.java | 9 +- 25 files changed, 822 insertions(+), 548 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 44c7d7c34a5..43fed6baa1c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1469,6 +1469,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3110. Fixed TestRPC failure. (vinodkv) + MAPREDUCE-3078. Ensure MapReduce AM reports progress correctly for + displaying on the RM Web-UI. (vinodkv via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml index 66ac197d842..0f12598fc17 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml @@ -55,6 +55,12 @@ hadoop-yarn-server-resourcemanager test + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test-jar + test + org.apache.hadoop hadoop-mapreduce-client-shuffle diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 6bd1c47133e..8b7d578fc9b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -549,9 +549,9 @@ public class MRAppMaster extends CompositeService { // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); - /** create a job event for job intialization */ + // create a job event for job intialization JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); - /** send init to the job (this does NOT trigger job execution) */ + // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index a3f067d14c2..c26bc24695c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -92,6 +92,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -584,25 +585,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, public JobReport getReport() { readLock.lock(); try { - JobReport report = recordFactory.newRecordInstance(JobReport.class); - report.setJobId(jobId); - report.setJobState(getState()); - - // TODO - Fix to correctly setup report and to check state - if (report.getJobState() == JobState.NEW) { - return report; - } - - report.setStartTime(startTime); - report.setFinishTime(finishTime); - report.setSetupProgress(setupProgress); - report.setCleanupProgress(cleanupProgress); - report.setMapProgress(computeProgress(mapTasks)); - report.setReduceProgress(computeProgress(reduceTasks)); - report.setJobName(jobName); - report.setUser(username); + JobState state = getState(); - return report; + if (getState() == JobState.NEW) { + return MRBuilderUtils.newJobReport(jobId, jobName, username, state, + startTime, finishTime, setupProgress, 0.0f, + 0.0f, cleanupProgress); + } + + return MRBuilderUtils.newJobReport(jobId, jobName, username, state, + startTime, finishTime, setupProgress, computeProgress(mapTasks), + computeProgress(reduceTasks), cleanupProgress); } finally { readLock.unlock(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index 18a0f2d5a6a..0261e18b56f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.local; +import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -30,15 +31,19 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; /** @@ -65,6 +70,20 @@ public class LocalContainerAllocator extends RMCommunicator this.appID = context.getApplicationID(); } + @Override + protected synchronized void heartbeat() throws Exception { + AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( + this.applicationAttemptId, this.lastResponseID, super + .getApplicationProgress(), new ArrayList(), + new ArrayList()); + AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); + AMResponse response = allocateResponse.getAMResponse(); + if (response.getReboot()) { + // TODO + LOG.info("Event from RM: shutting down Application Master"); + } + } + @Override public void handle(ContainerAllocatorEvent event) { if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index db4a60b1dcc..15a7e3f6a5a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import java.io.IOException; import java.security.PrivilegedAction; -import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +28,7 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -42,17 +42,12 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -64,7 +59,7 @@ import org.apache.hadoop.yarn.service.AbstractService; /** * Registers/unregisters to RM and sends heartbeats to RM. */ -public class RMCommunicator extends AbstractService { +public abstract class RMCommunicator extends AbstractService { private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); private int rmPollInterval;//millis protected ApplicationId applicationId; @@ -74,7 +69,7 @@ public class RMCommunicator extends AbstractService { protected EventHandler eventHandler; protected AMRMProtocol scheduler; private final ClientService clientService; - private int lastResponseID; + protected int lastResponseID; private Resource minContainerCapability; private Resource maxContainerCapability; @@ -121,6 +116,34 @@ public class RMCommunicator extends AbstractService { return job; } + /** + * Get the appProgress. Can be used only after this component is started. + * @return the appProgress. + */ + protected float getApplicationProgress() { + // For now just a single job. In future when we have a DAG, we need an + // aggregate progress. + JobReport report = this.job.getReport(); + float setupWeight = 0.05f; + float cleanupWeight = 0.05f; + float mapWeight = 0.0f; + float reduceWeight = 0.0f; + int numMaps = this.job.getTotalMaps(); + int numReduces = this.job.getTotalReduces(); + if (numMaps == 0 && numReduces == 0) { + } else if (numMaps == 0) { + reduceWeight = 0.9f; + } else if (numReduces == 0) { + mapWeight = 0.9f; + } else { + mapWeight = reduceWeight = 0.45f; + } + return (report.getSetupProgress() * setupWeight + + report.getCleanupProgress() * cleanupWeight + + report.getMapProgress() * mapWeight + report.getReduceProgress() + * reduceWeight); + } + protected void register() { //Register String host = @@ -262,18 +285,5 @@ public class RMCommunicator extends AbstractService { }); } - protected synchronized void heartbeat() throws Exception { - AllocateRequest allocateRequest = - recordFactory.newRecordInstance(AllocateRequest.class); - allocateRequest.setApplicationAttemptId(applicationAttemptId); - allocateRequest.setResponseId(lastResponseID); - allocateRequest.addAllAsks(new ArrayList()); - allocateRequest.addAllReleases(new ArrayList()); - AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); - AMResponse response = allocateResponse.getAMResponse(); - if (response.getReboot()) { - LOG.info("Event from RM: shutting down Application Master"); - } - } - + protected abstract void heartbeat() throws Exception; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index b9f0c6ee45e..cda2ed678af 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.BuilderUtils; /** * Keeps the data structures to send container requests to RM. @@ -107,15 +108,11 @@ public abstract class RMContainerRequestor extends RMCommunicator { LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode); } - protected abstract void heartbeat() throws Exception; - protected AMResponse makeRemoteRequest() throws YarnRemoteException { - AllocateRequest allocateRequest = recordFactory - .newRecordInstance(AllocateRequest.class); - allocateRequest.setApplicationAttemptId(applicationAttemptId); - allocateRequest.setResponseId(lastResponseID); - allocateRequest.addAllAsks(new ArrayList(ask)); - allocateRequest.addAllReleases(new ArrayList(release)); + AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( + applicationAttemptId, lastResponseID, super.getApplicationProgress(), + new ArrayList(ask), new ArrayList( + release)); AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); AMResponse response = allocateResponse.getAMResponse(); lastResponseID = response.getResponseId(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index cbf3ab0a658..a1eb928919e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -18,12 +18,15 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import junit.framework.Assert; @@ -32,475 +35,651 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationMaster; -import org.apache.hadoop.yarn.api.records.ApplicationStatus; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; -import org.junit.BeforeClass; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.After; import org.junit.Test; public class TestRMContainerAllocator { -// private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class); -// private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); -// -// @BeforeClass -// public static void preTests() { -// DefaultMetricsSystem.shutdown(); -// } -// -// @Test -// public void testSimple() throws Exception { -// FifoScheduler scheduler = createScheduler(); -// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( -// scheduler, new Configuration()); -// -// //add resources to scheduler -// RMNode nodeManager1 = addNode(scheduler, "h1", 10240); -// RMNode nodeManager2 = addNode(scheduler, "h2", 10240); -// RMNode nodeManager3 = addNode(scheduler, "h3", 10240); -// -// //create the container request -// ContainerRequestEvent event1 = -// createReq(1, 1024, new String[]{"h1"}); -// allocator.sendRequest(event1); -// -// //send 1 more request with different resource req -// ContainerRequestEvent event2 = createReq(2, 1024, new String[]{"h2"}); -// allocator.sendRequest(event2); -// -// //this tells the scheduler about the requests -// //as nodes are not added, no allocations -// List assigned = allocator.schedule(); -// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); -// -// //send another request with different resource and priority -// ContainerRequestEvent event3 = createReq(3, 1024, new String[]{"h3"}); -// allocator.sendRequest(event3); -// -// //this tells the scheduler about the requests -// //as nodes are not added, no allocations -// assigned = allocator.schedule(); -// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); -// -// //update resources in scheduler -// scheduler.nodeUpdate(nodeManager1); // Node heartbeat -// scheduler.nodeUpdate(nodeManager2); // Node heartbeat -// scheduler.nodeUpdate(nodeManager3); // Node heartbeat -// -// -// assigned = allocator.schedule(); -// checkAssignments( -// new ContainerRequestEvent[]{event1, event2, event3}, assigned, false); -// } -// -// //TODO: Currently Scheduler seems to have bug where it does not work -// //for Application asking for containers with different capabilities. -// //@Test -// public void testResource() throws Exception { -// FifoScheduler scheduler = createScheduler(); -// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( -// scheduler, new Configuration()); -// -// //add resources to scheduler -// RMNode nodeManager1 = addNode(scheduler, "h1", 10240); -// RMNode nodeManager2 = addNode(scheduler, "h2", 10240); -// RMNode nodeManager3 = addNode(scheduler, "h3", 10240); -// -// //create the container request -// ContainerRequestEvent event1 = -// createReq(1, 1024, new String[]{"h1"}); -// allocator.sendRequest(event1); -// -// //send 1 more request with different resource req -// ContainerRequestEvent event2 = createReq(2, 2048, new String[]{"h2"}); -// allocator.sendRequest(event2); -// -// //this tells the scheduler about the requests -// //as nodes are not added, no allocations -// List assigned = allocator.schedule(); -// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); -// -// //update resources in scheduler -// scheduler.nodeUpdate(nodeManager1); // Node heartbeat -// scheduler.nodeUpdate(nodeManager2); // Node heartbeat -// scheduler.nodeUpdate(nodeManager3); // Node heartbeat -// -// assigned = allocator.schedule(); -// checkAssignments( -// new ContainerRequestEvent[]{event1, event2}, assigned, false); -// } -// -// @Test -// public void testMapReduceScheduling() throws Exception { -// FifoScheduler scheduler = createScheduler(); -// Configuration conf = new Configuration(); -// LocalRMContainerAllocator allocator = new LocalRMContainerAllocator( -// scheduler, conf); -// -// //add resources to scheduler -// RMNode nodeManager1 = addNode(scheduler, "h1", 1024); -// RMNode nodeManager2 = addNode(scheduler, "h2", 10240); -// RMNode nodeManager3 = addNode(scheduler, "h3", 10240); -// -// //create the container request -// //send MAP request -// ContainerRequestEvent event1 = -// createReq(1, 2048, new String[]{"h1", "h2"}, true, false); -// allocator.sendRequest(event1); -// -// //send REDUCE request -// ContainerRequestEvent event2 = createReq(2, 3000, new String[]{"h1"}, false, true); -// allocator.sendRequest(event2); -// -// //send MAP request -// ContainerRequestEvent event3 = createReq(3, 2048, new String[]{"h3"}, false, false); -// allocator.sendRequest(event3); -// -// //this tells the scheduler about the requests -// //as nodes are not added, no allocations -// List assigned = allocator.schedule(); -// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); -// -// //update resources in scheduler -// scheduler.nodeUpdate(nodeManager1); // Node heartbeat -// scheduler.nodeUpdate(nodeManager2); // Node heartbeat -// scheduler.nodeUpdate(nodeManager3); // Node heartbeat -// -// assigned = allocator.schedule(); -// checkAssignments( -// new ContainerRequestEvent[]{event1, event3}, assigned, false); -// -// //validate that no container is assigned to h1 as it doesn't have 2048 -// for (TaskAttemptContainerAssignedEvent assig : assigned) { -// Assert.assertFalse("Assigned count not correct", -// "h1".equals(assig.getContainer().getNodeId().getHost())); -// } -// } -// -// -// -// private RMNode addNode(FifoScheduler scheduler, -// String nodeName, int memory) { -// NodeId nodeId = recordFactory.newRecordInstance(NodeId.class); -// nodeId.setHost(nodeName); -// nodeId.setPort(1234); -// Resource resource = recordFactory.newRecordInstance(Resource.class); -// resource.setMemory(memory); -// RMNode nodeManager = new RMNodeImpl(nodeId, null, nodeName, 0, 0, -// ResourceTrackerService.resolve(nodeName), resource); -// scheduler.addNode(nodeManager); // Node registration -// return nodeManager; -// } -// -// private FifoScheduler createScheduler() throws YarnRemoteException { -// FifoScheduler fsc = new FifoScheduler() { -// //override this to copy the objects -// //otherwise FifoScheduler updates the numContainers in same objects as kept by -// //RMContainerAllocator -// -// @Override -// public synchronized void allocate(ApplicationAttemptId applicationId, -// List ask) { -// List askCopy = new ArrayList(); -// for (ResourceRequest req : ask) { -// ResourceRequest reqCopy = recordFactory.newRecordInstance(ResourceRequest.class); -// reqCopy.setPriority(req.getPriority()); -// reqCopy.setHostName(req.getHostName()); -// reqCopy.setCapability(req.getCapability()); -// reqCopy.setNumContainers(req.getNumContainers()); -// askCopy.add(reqCopy); -// } -// super.allocate(applicationId, askCopy); -// } -// }; -// try { -// fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null); -// fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class), -// recordFactory.newRecordInstance(ApplicationMaster.class), -// "test", null, null, StoreFactory.createVoidAppStore()); -// } catch(IOException ie) { -// LOG.info("add application failed with ", ie); -// assert(false); -// } -// return fsc; -// } -// -// private ContainerRequestEvent createReq( -// int attemptid, int memory, String[] hosts) { -// return createReq(attemptid, memory, hosts, false, false); -// } -// -// private ContainerRequestEvent createReq( -// int attemptid, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { -// ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); -// appId.setClusterTimestamp(0); -// appId.setId(0); -// JobId jobId = recordFactory.newRecordInstance(JobId.class); -// jobId.setAppId(appId); -// jobId.setId(0); -// TaskId taskId = recordFactory.newRecordInstance(TaskId.class); -// taskId.setId(0); -// taskId.setJobId(jobId); -// if (reduce) { -// taskId.setTaskType(TaskType.REDUCE); -// } else { -// taskId.setTaskType(TaskType.MAP); -// } -// TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class); -// attemptId.setId(attemptid); -// attemptId.setTaskId(taskId); -// Resource containerNeed = recordFactory.newRecordInstance(Resource.class); -// containerNeed.setMemory(memory); -// if (earlierFailedAttempt) { -// return ContainerRequestEvent. -// createContainerRequestEventForFailedContainer(attemptId, containerNeed); -// } -// return new ContainerRequestEvent(attemptId, -// containerNeed, -// hosts, new String[] {NetworkTopology.DEFAULT_RACK}); -// } -// -// private void checkAssignments(ContainerRequestEvent[] requests, -// List assignments, -// boolean checkHostMatch) { -// Assert.assertNotNull("Container not assigned", assignments); -// Assert.assertEquals("Assigned count not correct", -// requests.length, assignments.size()); -// -// //check for uniqueness of containerIDs -// Set containerIds = new HashSet(); -// for (TaskAttemptContainerAssignedEvent assigned : assignments) { -// containerIds.add(assigned.getContainer().getId()); -// } -// Assert.assertEquals("Assigned containers must be different", -// assignments.size(), containerIds.size()); -// -// //check for all assignment -// for (ContainerRequestEvent req : requests) { -// TaskAttemptContainerAssignedEvent assigned = null; -// for (TaskAttemptContainerAssignedEvent ass : assignments) { -// if (ass.getTaskAttemptID().equals(req.getAttemptID())){ -// assigned = ass; -// break; -// } -// } -// checkAssignment(req, assigned, checkHostMatch); -// } -// } -// -// private void checkAssignment(ContainerRequestEvent request, -// TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) { -// Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(), -// assigned); -// Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(), -// assigned.getTaskAttemptID()); -// if (checkHostMatch) { -// Assert.assertTrue("Not assigned to requested host", Arrays.asList( -// request.getHosts()).contains( -// assigned.getContainer().getNodeId().toString())); -// } -// -// } -// -// //Mock RMContainerAllocator -// //Instead of talking to remote Scheduler,uses the local Scheduler -// public static class LocalRMContainerAllocator extends RMContainerAllocator { -// private static final List events = -// new ArrayList(); -// -// public static class AMRMProtocolImpl implements AMRMProtocol { -// -// private ResourceScheduler resourceScheduler; -// -// public AMRMProtocolImpl(ResourceScheduler resourceScheduler) { -// this.resourceScheduler = resourceScheduler; -// } -// -// @Override -// public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException { -// RegisterApplicationMasterResponse response = recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class); -// return response; -// } -// -// public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException { -// List ask = request.getAskList(); -// List release = request.getReleaseList(); -// try { -// AMResponse response = recordFactory.newRecordInstance(AMResponse.class); -// Allocation allocation = resourceScheduler.allocate(request.getApplicationAttemptId(), ask); -// response.addAllNewContainers(allocation.getContainers()); -// response.setAvailableResources(allocation.getResourceLimit()); -// AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); -// allocateResponse.setAMResponse(response); -// return allocateResponse; -// } catch(IOException ie) { -// throw RPCUtil.getRemoteException(ie); -// } -// } -// -// @Override -// public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException { -// FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class); -// return response; -// } -// -// } -// -// private ResourceScheduler scheduler; -// LocalRMContainerAllocator(ResourceScheduler scheduler, Configuration conf) { -// super(null, new TestContext(events)); -// this.scheduler = scheduler; -// super.init(conf); -// super.start(); -// } -// -// protected AMRMProtocol createSchedulerProxy() { -// return new AMRMProtocolImpl(scheduler); -// } -// -// @Override -// protected void register() {} -// @Override -// protected void unregister() {} -// -// @Override -// protected Resource getMinContainerCapability() { -// Resource res = recordFactory.newRecordInstance(Resource.class); -// res.setMemory(1024); -// return res; -// } -// -// @Override -// protected Resource getMaxContainerCapability() { -// Resource res = recordFactory.newRecordInstance(Resource.class); -// res.setMemory(10240); -// return res; -// } -// -// public void sendRequest(ContainerRequestEvent req) { -// sendRequests(Arrays.asList(new ContainerRequestEvent[]{req})); -// } -// -// public void sendRequests(List reqs) { -// for (ContainerRequestEvent req : reqs) { -// handle(req); -// } -// } -// -// //API to be used by tests -// public List schedule() { -// //run the scheduler -// try { -// heartbeat(); -// } catch (Exception e) { -// LOG.error("error in heartbeat ", e); -// throw new YarnException(e); -// } -// -// List result = new ArrayList(events); -// events.clear(); -// return result; -// } -// -// protected void startAllocatorThread() { -// //override to NOT start thread -// } -// -// static class TestContext implements AppContext { -// private List events; -// TestContext(List events) { -// this.events = events; -// } -// @Override -// public Map getAllJobs() { -// return null; -// } -// @Override -// public ApplicationAttemptId getApplicationAttemptId() { -// return recordFactory.newRecordInstance(ApplicationAttemptId.class); -// } -// @Override -// public ApplicationId getApplicationID() { -// return recordFactory.newRecordInstance(ApplicationId.class); -// } -// @Override -// public EventHandler getEventHandler() { -// return new EventHandler() { -// @Override -// public void handle(Event event) { -// events.add((TaskAttemptContainerAssignedEvent) event); -// } -// }; -// } -// @Override -// public Job getJob(JobId jobID) { -// return null; -// } -// -// @Override -// public String getUser() { -// return null; -// } -// -// @Override -// public Clock getClock() { -// return null; -// } -// -// @Override -// public String getApplicationName() { -// return null; -// } -// -// @Override -// public long getStartTime() { -// return 0; -// } -// } -// } -// -// public static void main(String[] args) throws Exception { -// TestRMContainerAllocator t = new TestRMContainerAllocator(); -// t.testSimple(); -// //t.testResource(); -// t.testMapReduceScheduling(); -// } + + static final Log LOG = LogFactory + .getLog(TestRMContainerAllocator.class); + static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @After + public void tearDown() { + DefaultMetricsSystem.shutdown(); + } + + @Test + public void testSimple() throws Exception { + + LOG.info("Running testSimple"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, + 0, 0, 0, 0, 0, 0)); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); + MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); + MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); + dispatcher.await(); + + // create the container request + ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event1); + + // send 1 more request with different resource req + ContainerRequestEvent event2 = createReq(jobId, 2, 1024, + new String[] { "h2" }); + allocator.sendRequest(event2); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // send another request with different resource and priority + ContainerRequestEvent event3 = createReq(jobId, 3, 1024, + new String[] { "h3" }); + allocator.sendRequest(event3); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + nodeManager2.nodeHeartbeat(true); // Node heartbeat + nodeManager3.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, + assigned, false); + } + + @Test + public void testResource() throws Exception { + + LOG.info("Running testResource"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, + 0, 0, 0, 0, 0, 0)); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); + MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); + MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); + dispatcher.await(); + + // create the container request + ContainerRequestEvent event1 = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event1); + + // send 1 more request with different resource req + ContainerRequestEvent event2 = createReq(jobId, 2, 2048, + new String[] { "h2" }); + allocator.sendRequest(event2); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + nodeManager2.nodeHeartbeat(true); // Node heartbeat + nodeManager3.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + checkAssignments(new ContainerRequestEvent[] { event1, event2 }, + assigned, false); + } + + @Test + public void testMapReduceScheduling() throws Exception { + + LOG.info("Running testMapReduceScheduling"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, + 0, 0, 0, 0, 0, 0)); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); + MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); + MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); + dispatcher.await(); + + // create the container request + // send MAP request + ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { + "h1", "h2" }, true, false); + allocator.sendRequest(event1); + + // send REDUCE request + ContainerRequestEvent event2 = createReq(jobId, 2, 3000, + new String[] { "h1" }, false, true); + allocator.sendRequest(event2); + + // send MAP request + ContainerRequestEvent event3 = createReq(jobId, 3, 2048, + new String[] { "h3" }, false, false); + allocator.sendRequest(event3); + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // update resources in scheduler + nodeManager1.nodeHeartbeat(true); // Node heartbeat + nodeManager2.nodeHeartbeat(true); // Node heartbeat + nodeManager3.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + assigned = allocator.schedule(); + dispatcher.await(); + checkAssignments(new ContainerRequestEvent[] { event1, event3 }, + assigned, false); + + // validate that no container is assigned to h1 as it doesn't have 2048 + for (TaskAttemptContainerAssignedEvent assig : assigned) { + Assert.assertFalse("Assigned count not correct", "h1".equals(assig + .getContainer().getNodeId().getHost())); + } + } + + private static class MyResourceManager extends MockRM { + + public MyResourceManager(Configuration conf) { + super(conf); + } + + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + + @Override + protected EventHandler createSchedulerEventDispatcher() { + // Dispatch inline for test sanity + return new EventHandler() { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + @Override + protected ResourceScheduler createScheduler() { + return new MyFifoScheduler(getRMContext()); + } + } + + private static class FakeJob extends JobImpl { + + public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf, + int numMaps, int numReduces) { + super(appAttemptID, conf, null, null, null, null, null, null, null, + null); + this.jobId = MRBuilderUtils + .newJobId(appAttemptID.getApplicationId(), 0); + this.numMaps = numMaps; + this.numReduces = numReduces; + } + + private float setupProgress; + private float mapProgress; + private float reduceProgress; + private float cleanupProgress; + private final int numMaps; + private final int numReduces; + private JobId jobId; + + void setProgress(float setupProgress, float mapProgress, + float reduceProgress, float cleanupProgress) { + this.setupProgress = setupProgress; + this.mapProgress = mapProgress; + this.reduceProgress = reduceProgress; + this.cleanupProgress = cleanupProgress; + } + + @Override + public int getTotalMaps() { return this.numMaps; } + @Override + public int getTotalReduces() { return this.numReduces;} + + @Override + public JobReport getReport() { + return MRBuilderUtils.newJobReport(this.jobId, "job", "user", + JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress, + this.reduceProgress, this.cleanupProgress); + } + } + + @Test + public void testReportedAppProgress() throws Exception { + + LOG.info("Running testReportedAppProgress"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + FakeJob job = new FakeJob(appAttemptId, conf, 2, 2); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, job); + + allocator.schedule(); // Send heartbeat + dispatcher.await(); + Assert.assertEquals(0.0, app.getProgress(), 0.0); + + job.setProgress(100, 10, 0, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(9.5f, app.getProgress(), 0.0); + + job.setProgress(100, 80, 0, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(41.0f, app.getProgress(), 0.0); + + job.setProgress(100, 100, 20, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(59.0f, app.getProgress(), 0.0); + + job.setProgress(100, 100, 100, 100); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(100.0f, app.getProgress(), 0.0); + } + + @Test + public void testReportedAppProgressWithOnlyMaps() throws Exception { + + LOG.info("Running testReportedAppProgressWithOnlyMaps"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + FakeJob job = new FakeJob(appAttemptId, conf, 2, 0); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, job); + + allocator.schedule(); // Send heartbeat + dispatcher.await(); + Assert.assertEquals(0.0, app.getProgress(), 0.0); + + job.setProgress(100, 10, 0, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(14f, app.getProgress(), 0.0); + + job.setProgress(100, 60, 0, 0); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(59.0f, app.getProgress(), 0.0); + + job.setProgress(100, 100, 0, 100); + allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(100.0f, app.getProgress(), 0.0); + } + + private static class MyFifoScheduler extends FifoScheduler { + + public MyFifoScheduler(RMContext rmContext) { + super(); + try { + reinitialize(new Configuration(), new ContainerTokenSecretManager(), + rmContext); + } catch (IOException ie) { + LOG.info("add application failed with ", ie); + assert (false); + } + } + + // override this to copy the objects otherwise FifoScheduler updates the + // numContainers in same objects as kept by RMContainerAllocator + @Override + public synchronized Allocation allocate( + ApplicationAttemptId applicationAttemptId, List ask, + List release) { + List askCopy = new ArrayList(); + for (ResourceRequest req : ask) { + ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req + .getPriority(), req.getHostName(), req.getCapability(), req + .getNumContainers()); + askCopy.add(reqCopy); + } + return super.allocate(applicationAttemptId, askCopy, release); + } + } + + private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, + int memory, String[] hosts) { + return createReq(jobId, taskAttemptId, memory, hosts, false, false); + } + + private ContainerRequestEvent + createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts, + boolean earlierFailedAttempt, boolean reduce) { + TaskId taskId; + if (reduce) { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); + } else { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + } + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, + taskAttemptId); + Resource containerNeed = BuilderUtils.newResource(memory); + if (earlierFailedAttempt) { + return ContainerRequestEvent + .createContainerRequestEventForFailedContainer(attemptId, + containerNeed); + } + return new ContainerRequestEvent(attemptId, containerNeed, hosts, + new String[] { NetworkTopology.DEFAULT_RACK }); + } + + private void checkAssignments(ContainerRequestEvent[] requests, + List assignments, + boolean checkHostMatch) { + Assert.assertNotNull("Container not assigned", assignments); + Assert.assertEquals("Assigned count not correct", requests.length, + assignments.size()); + + // check for uniqueness of containerIDs + Set containerIds = new HashSet(); + for (TaskAttemptContainerAssignedEvent assigned : assignments) { + containerIds.add(assigned.getContainer().getId()); + } + Assert.assertEquals("Assigned containers must be different", assignments + .size(), containerIds.size()); + + // check for all assignment + for (ContainerRequestEvent req : requests) { + TaskAttemptContainerAssignedEvent assigned = null; + for (TaskAttemptContainerAssignedEvent ass : assignments) { + if (ass.getTaskAttemptID().equals(req.getAttemptID())) { + assigned = ass; + break; + } + } + checkAssignment(req, assigned, checkHostMatch); + } + } + + private void checkAssignment(ContainerRequestEvent request, + TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) { + Assert.assertNotNull("Nothing assigned to attempt " + + request.getAttemptID(), assigned); + Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(), + assigned.getTaskAttemptID()); + if (checkHostMatch) { + Assert.assertTrue("Not assigned to requested host", Arrays.asList( + request.getHosts()).contains( + assigned.getContainer().getNodeId().toString())); + } + } + + // Mock RMContainerAllocator + // Instead of talking to remote Scheduler,uses the local Scheduler + private static class MyContainerAllocator extends RMContainerAllocator { + static final List events + = new ArrayList(); + + private MyResourceManager rm; + + @SuppressWarnings("rawtypes") + private static AppContext createAppContext( + ApplicationAttemptId appAttemptId, Job job) { + AppContext context = mock(AppContext.class); + ApplicationId appId = appAttemptId.getApplicationId(); + when(context.getApplicationID()).thenReturn(appId); + when(context.getApplicationAttemptId()).thenReturn(appAttemptId); + when(context.getJob(isA(JobId.class))).thenReturn(job); + when(context.getEventHandler()).thenReturn(new EventHandler() { + @Override + public void handle(Event event) { + // Only capture interesting events. + if (event instanceof TaskAttemptContainerAssignedEvent) { + events.add((TaskAttemptContainerAssignedEvent) event); + } + } + }); + return context; + } + + private static ClientService createMockClientService() { + ClientService service = mock(ClientService.class); + when(service.getBindAddress()).thenReturn( + NetUtils.createSocketAddr("localhost:4567")); + when(service.getHttpPort()).thenReturn(890); + return service; + } + + MyContainerAllocator(MyResourceManager rm, Configuration conf, + ApplicationAttemptId appAttemptId, Job job) { + super(createMockClientService(), createAppContext(appAttemptId, job)); + this.rm = rm; + super.init(conf); + super.start(); + } + + @Override + protected AMRMProtocol createSchedulerProxy() { + return this.rm.getApplicationMasterService(); + } + + @Override + protected void register() { + super.register(); + } + + @Override + protected void unregister() { + } + + @Override + protected Resource getMinContainerCapability() { + return BuilderUtils.newResource(1024); + } + + @Override + protected Resource getMaxContainerCapability() { + return BuilderUtils.newResource(10240); + } + + public void sendRequest(ContainerRequestEvent req) { + sendRequests(Arrays.asList(new ContainerRequestEvent[] { req })); + } + + public void sendRequests(List reqs) { + for (ContainerRequestEvent req : reqs) { + super.handle(req); + } + } + + // API to be used by tests + public List schedule() { + // run the scheduler + try { + super.heartbeat(); + } catch (Exception e) { + LOG.error("error in heartbeat ", e); + throw new YarnException(e); + } + + List result + = new ArrayList(events); + events.clear(); + return result; + } + + protected void startAllocatorThread() { + // override to NOT start thread + } + } + + public static void main(String[] args) throws Exception { + TestRMContainerAllocator t = new TestRMContainerAllocator(); + t.testSimple(); + t.testResource(); + t.testMapReduceScheduling(); + t.testReportedAppProgress(); + t.testReportedAppProgressWithOnlyMaps(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java index c429ca55b51..d710a6f7b88 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java @@ -19,27 +19,25 @@ package org.apache.hadoop.mapreduce.v2.util; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.Records; public class MRBuilderUtils { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - public static JobId newJobId(ApplicationId appId, int id) { - JobId jobId = recordFactory.newRecordInstance(JobId.class); + JobId jobId = Records.newRecord(JobId.class); jobId.setAppId(appId); jobId.setId(id); return jobId; } public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) { - TaskId taskId = recordFactory.newRecordInstance(TaskId.class); + TaskId taskId = Records.newRecord(TaskId.class); taskId.setJobId(jobId); taskId.setId(id); taskId.setTaskType(taskType); @@ -48,9 +46,27 @@ public class MRBuilderUtils { public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) { TaskAttemptId taskAttemptId = - recordFactory.newRecordInstance(TaskAttemptId.class); + Records.newRecord(TaskAttemptId.class); taskAttemptId.setTaskId(taskId); taskAttemptId.setId(attemptId); return taskAttemptId; } + + public static JobReport newJobReport(JobId jobId, String jobName, + String userName, JobState state, long startTime, long finishTime, + float setupProgress, float mapProgress, float reduceProgress, + float cleanupProgress) { + JobReport report = Records.newRecord(JobReport.class); + report.setJobId(jobId); + report.setJobName(jobName); + report.setUser(userName); + report.setJobState(state); + report.setStartTime(startTime); + report.setFinishTime(finishTime); + report.setSetupProgress(setupProgress); + report.setCleanupProgress(cleanupProgress); + report.setMapProgress(mapProgress); + report.setReduceProgress(reduceProgress); + return report; + } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml index 4b605cb2ae5..ef388fcd86a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml @@ -64,6 +64,12 @@ hadoop-yarn-server-resourcemanager test + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test-jar + test + org.apache.hadoop hadoop-yarn-server-common diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml index ab1ffcca988..2a5cef3cbc9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml @@ -88,6 +88,12 @@ hadoop-yarn-server-resourcemanager ${yarn.version} + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + ${yarn.version} + test-jar + org.apache.hadoop hadoop-mapreduce-client-core diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 9df37ee03a2..7ec367292e1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.util; import java.net.URI; import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -184,6 +186,13 @@ public class BuilderUtils { return id; } + public static NodeId newNodeId(String host, int port) { + NodeId nodeId = recordFactory.newRecordInstance(NodeId.class); + nodeId.setHost(host); + nodeId.setPort(port); + return nodeId; + } + public static Container newContainer(RecordFactory recordFactory, ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority) { @@ -266,5 +275,18 @@ public class BuilderUtils { url.setFile(file); return url; } - + + public static AllocateRequest newAllocateRequest( + ApplicationAttemptId applicationAttemptId, int responseID, + float appProgress, List resourceAsk, + List containersToBeReleased) { + AllocateRequest allocateRequest = recordFactory + .newRecordInstance(AllocateRequest.class); + allocateRequest.setApplicationAttemptId(applicationAttemptId); + allocateRequest.setResponseId(responseID); + allocateRequest.setProgress(appProgress); + allocateRequest.addAllAsks(resourceAsk); + allocateRequest.addAllReleases(containersToBeReleased); + return allocateRequest; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index e676485e922..d94f5973144 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -37,6 +37,20 @@ + + + + maven-jar-plugin + + + + test-jar + + test-compile + + + + maven-antrun-plugin diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index d0cd0a7ff86..3f175a34a0a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -250,13 +251,10 @@ public class RMAppManager implements EventHandler { if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { - LOG.info("Application with id " + applicationId + - " is already present! Cannot add a duplicate!"); - // don't send event through dispatcher as it will be handled by app - // already present with this id. - application.handle(new RMAppRejectedEvent(applicationId, - "Application with this id is already present! " + - "Cannot add a duplicate!")); + String message = "Application with id " + applicationId + + " is already present! Cannot add a duplicate!"; + LOG.info(message); + throw RPCUtil.getRemoteException(message); } else { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.START)); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d1515e4fb5b..8a56d504d69 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -98,7 +98,7 @@ public class ResourceManager extends CompositeService implements Recoverable { private ContainerAllocationExpirer containerAllocationExpirer; protected NMLivelinessMonitor nmLivelinessMonitor; protected NodesListManager nodesListManager; - private SchedulerEventDispatcher schedulerDispatcher; + private EventHandler schedulerDispatcher; protected RMAppManager rmAppManager; private WebApp webApp; @@ -119,7 +119,7 @@ public class ResourceManager extends CompositeService implements Recoverable { @Override public synchronized void init(Configuration conf) { - this.rmDispatcher = new AsyncDispatcher(); + this.rmDispatcher = createDispatcher(); addIfService(this.rmDispatcher); this.containerAllocationExpirer = new ContainerAllocationExpirer( @@ -138,8 +138,8 @@ public class ResourceManager extends CompositeService implements Recoverable { this.conf = new YarnConfiguration(conf); // Initialize the scheduler this.scheduler = createScheduler(); - this.schedulerDispatcher = new SchedulerEventDispatcher(this.scheduler); - addService(this.schedulerDispatcher); + this.schedulerDispatcher = createSchedulerEventDispatcher(); + addIfService(this.schedulerDispatcher); this.rmDispatcher.register(SchedulerEventType.class, this.schedulerDispatcher); @@ -195,6 +195,14 @@ public class ResourceManager extends CompositeService implements Recoverable { super.init(conf); } + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler); + } + + protected Dispatcher createDispatcher() { + return new AsyncDispatcher(); + } + protected void addIfService(Object object) { if (object instanceof Service) { addService((Service) object); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 484a7a38ba4..6e63e2248d6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; * look at {@link RMAppImpl} for its implementation. This interface * exposes methods to access various updates in application status/report. */ -public interface RMApp extends EventHandler{ +public interface RMApp extends EventHandler { /** * The application id for this {@link RMApp}. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 3164602f59f..aeb3d2af045 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; * {@link YarnConfiguration#RM_AM_MAX_RETRIES}. For specific * implementation take a look at {@link RMAppAttemptImpl}. */ -public interface RMAppAttempt extends EventHandler{ +public interface RMAppAttempt extends EventHandler { /** * Get the application attempt id for this {@link RMAppAttempt}. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 7f7f050bc4f..7f8ff82d6a1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -685,6 +685,8 @@ public class RMAppAttemptImpl implements RMAppAttempt { public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + appAttempt.progress = 1.0f; + // Tell the app and the scheduler super.transition(appAttempt, event); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java index b4037aaeaf7..10913e09999 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java @@ -207,13 +207,18 @@ public class SchedulerApp { .getDispatcher().getEventHandler(), this.rmContext .getContainerAllocationExpirer()); + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); + // Update consumption and track allocations - + appSchedulingInfo.allocate(type, node, priority, request, container); + Resources.addTo(currentConsumption, container.getResource()); + // Inform the container rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.START)); - Resources.addTo(currentConsumption, container.getResource()); if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationAttemptId=" + container.getId().getApplicationAttemptId() @@ -223,12 +228,6 @@ public class SchedulerApp { RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, "SchedulerApp", getApplicationId(), container.getId()); - - // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); - liveContainers.put(container.getId(), rmContainer); - - appSchedulingInfo.allocate(type, node, priority, request, container); return rmContainer; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java index 9f3bc1cce7a..ff51d62d910 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java @@ -19,10 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; import java.util.List; -import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index dfa4965d5d9..7a90c5b6fac 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -291,7 +291,7 @@ public class FifoScheduler implements ResourceScheduler { @SuppressWarnings("unchecked") private synchronized void addApplication(ApplicationAttemptId appAttemptId, - String queueName, String user) { + String user) { // TODO: Fix store SchedulerApp schedulerApp = new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, @@ -628,7 +628,7 @@ public class FifoScheduler implements ResourceScheduler { { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent - .getQueue(), appAddedEvent.getUser()); + .getUser()); } break; case APP_REMOVED: diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 9a9ae2f51cd..727cd1a2323 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; public class MockAM { @@ -128,7 +129,7 @@ public class MockAM { req.setHostName(resource); req.setNumContainers(containers); Priority pri = Records.newRecord(Priority.class); - pri.setPriority(1); + pri.setPriority(priority); req.setPriority(pri); Resource capability = Records.newRecord(Resource.class); capability.setMemory(memory); @@ -139,11 +140,8 @@ public class MockAM { public AMResponse allocate( List resourceRequest, List releases) throws Exception { - AllocateRequest req = Records.newRecord(AllocateRequest.class); - req.setResponseId(++responseId); - req.setApplicationAttemptId(attemptId); - req.addAllAsks(resourceRequest); - req.addAllReleases(releases); + AllocateRequest req = BuilderUtils.newAllocateRequest(attemptId, + ++responseId, 0F, resourceRequest, releases); AllocateResponse resp = amRMProtocol.allocate(req); return resp.getAMResponse(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java index 61d678ea01c..3bc55473423 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -77,13 +78,14 @@ public class TestAMRMRPCResponseId { am.registerAppAttempt(); - AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class); - allocateRequest.setApplicationAttemptId(attempt.getAppAttemptId()); + AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(attempt + .getAppAttemptId(), 0, 0F, null, null); AMResponse response = amService.allocate(allocateRequest).getAMResponse(); Assert.assertEquals(1, response.getResponseId()); Assert.assertFalse(response.getReboot()); - allocateRequest.setResponseId(response.getResponseId()); + allocateRequest = BuilderUtils.newAllocateRequest(attempt + .getAppAttemptId(), response.getResponseId(), 0F, null, null); response = amService.allocate(allocateRequest).getAMResponse(); Assert.assertEquals(2, response.getResponseId()); @@ -91,8 +93,9 @@ public class TestAMRMRPCResponseId { response = amService.allocate(allocateRequest).getAMResponse(); Assert.assertEquals(2, response.getResponseId()); - /** try sending old **/ - allocateRequest.setResponseId(0); + /** try sending old request again **/ + allocateRequest = BuilderUtils.newAllocateRequest(attempt + .getAppAttemptId(), 0, 0F, null, null); response = amService.allocate(allocateRequest).getAMResponse(); Assert.assertTrue(response.getReboot()); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 4fb6486c2c7..03229c34b48 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -162,6 +162,7 @@ public class MockRMApp implements RMApp { this.diagnostics = new StringBuilder(diag); } + @Override public void handle(RMAppEvent event) { } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java index 989f3483d91..1b681628c98 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java @@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.BeforeClass; import org.junit.AfterClass; @@ -240,12 +241,8 @@ public class TestContainerTokenSecretManager { ask.add(rr); ArrayList release = new ArrayList(); - AllocateRequest allocateRequest = - recordFactory.newRecordInstance(AllocateRequest.class); - allocateRequest.setApplicationAttemptId(appAttempt.getAppAttemptId()); - allocateRequest.setResponseId(0); - allocateRequest.addAllAsks(ask); - allocateRequest.addAllReleases(release); + AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( + appAttempt.getAppAttemptId(), 0, 0F, ask, release); List allocatedContainers = scheduler.allocate(allocateRequest) .getAMResponse().getAllocatedContainers();