diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 8fff7de5a19..5f0a622ec44 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1406,29 +1406,36 @@ public abstract class TaskAttemptImpl implements private static void updateMillisCounters(JobCounterUpdateEvent jce, TaskAttemptImpl taskAttempt) { - TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); + // if container/resource if not allocated, do not update + if (null == taskAttempt.container || + null == taskAttempt.container.getResource()) { + return; + } long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime()); - int mbRequired = - taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); - int vcoresRequired = taskAttempt.getCpuRequired(taskAttempt.conf, taskType); - + Resource allocatedResource = taskAttempt.container.getResource(); + int mbAllocated = allocatedResource.getMemory(); + int vcoresAllocated = allocatedResource.getVirtualCores(); int minSlotMemSize = taskAttempt.conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - - int simSlotsRequired = - minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) mbRequired - / minSlotMemSize); + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + int simSlotsAllocated = minSlotMemSize == 0 ? 0 : + (int) Math.ceil((float) mbAllocated / minSlotMemSize); + TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); if (taskType == TaskType.MAP) { - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, simSlotsRequired * duration); - jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbRequired); - jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS, duration * vcoresRequired); + jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, + simSlotsAllocated * duration); + jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbAllocated); + jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS, + duration * vcoresAllocated); jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration); } else { - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, simSlotsRequired * duration); - jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES, duration * mbRequired); - jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES, duration * vcoresRequired); + jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, + simSlotsAllocated * duration); + jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES, + duration * mbAllocated); + jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES, + duration * vcoresAllocated); jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index fd0e8e2b9b6..b43a7b43f00 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -111,6 +111,10 @@ import org.junit.Assert; public class MRApp extends MRAppMaster { private static final Log LOG = LogFactory.getLog(MRApp.class); + /** + * The available resource of each container allocated. + */ + private Resource resource; int maps; int reduces; @@ -250,6 +254,7 @@ public class MRApp extends MRAppMaster { // the job can reaches the final state when MRAppMaster shuts down. this.successfullyUnregistered.set(unregistered); this.assignedQueue = assignedQueue; + this.resource = Resource.newInstance(1234, 2); } @Override @@ -589,7 +594,6 @@ public class MRApp extends MRAppMaster { ContainerId.newContainerId(getContext().getApplicationAttemptId(), containerCount++); NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT); - Resource resource = Resource.newInstance(1234, 2); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(cId, nodeId.toString(), "user", resource, System.currentTimeMillis() + 10000, 42, 42, @@ -712,6 +716,10 @@ public class MRApp extends MRAppMaster { } } + public void setAllocatedContainerResource(Resource resource) { + this.resource = resource; + } + class TestJob extends JobImpl { //override the init transition private final TestInitTransition initTransition = new TestInitTransition( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 504a5f7c71d..0f112381810 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -1744,19 +1744,24 @@ public class TestRecovery { expectedJobHistoryEvents.remove(0); } else if (current instanceof JobCounterUpdateEvent) { JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current; - - LOG.info("JobCounterUpdateEvent " - + jcue.getCounterUpdates().get(0).getCounterKey() - + " " + jcue.getCounterUpdates().get(0).getIncrementValue()); - if (jcue.getCounterUpdates().get(0).getCounterKey() == - JobCounter.NUM_FAILED_MAPS) { - totalFailedMaps += jcue.getCounterUpdates().get(0) - .getIncrementValue(); - } else if (jcue.getCounterUpdates().get(0).getCounterKey() == - JobCounter.TOTAL_LAUNCHED_MAPS) { - totalLaunchedMaps += jcue.getCounterUpdates().get(0) - .getIncrementValue(); + boolean containsUpdates = jcue.getCounterUpdates().size() > 0; + // there is no updates in a JobCounterUpdateEvent emitted on + // TaskAttempt recovery. Check that first. + if(containsUpdates) { + LOG.info("JobCounterUpdateEvent " + + jcue.getCounterUpdates().get(0).getCounterKey() + + " " + jcue.getCounterUpdates().get(0).getIncrementValue()); + if (jcue.getCounterUpdates().get(0).getCounterKey() == + JobCounter.NUM_FAILED_MAPS) { + totalFailedMaps += jcue.getCounterUpdates().get(0) + .getIncrementValue(); + } else if (jcue.getCounterUpdates().get(0).getCounterKey() == + JobCounter.TOTAL_LAUNCHED_MAPS) { + totalLaunchedMaps += jcue.getCounterUpdates().get(0) + .getIncrementValue(); + } } + } else if (current instanceof JobTaskEvent) { JobTaskEvent jte = (JobTaskEvent) current; assertEquals(jte.getState(), finalState); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 5a660617661..509f6af6129 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -33,6 +33,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -41,9 +47,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; @@ -82,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; @@ -250,22 +259,21 @@ public class TestTaskAttempt{ @Test public void testMillisCountersUpdate() throws Exception { - verifyMillisCounters(2048, 2048, 1024); - verifyMillisCounters(2048, 1024, 1024); - verifyMillisCounters(10240, 1024, 2048); + verifyMillisCounters(Resource.newInstance(1024, 1), 512); + verifyMillisCounters(Resource.newInstance(2048, 4), 1024); + verifyMillisCounters(Resource.newInstance(10240, 8), 2048); } - public void verifyMillisCounters(int mapMemMb, int reduceMemMb, + public void verifyMillisCounters(Resource containerResource, int minContainerSize) throws Exception { Clock actualClock = SystemClock.getInstance(); ControlledClock clock = new ControlledClock(actualClock); clock.setTime(10); MRApp app = new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock); + app.setAllocatedContainerResource(containerResource); Configuration conf = new Configuration(); - conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb); - conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, minContainerSize); app.setClusterInfo(new ClusterInfo(Resource.newInstance(10240, 1))); @@ -300,21 +308,24 @@ public class TestTaskAttempt{ Assert.assertEquals(rta.getFinishTime(), 11); Assert.assertEquals(rta.getLaunchTime(), 10); Counters counters = job.getAllCounters(); - Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize), + + int memoryMb = containerResource.getMemory(); + int vcores = containerResource.getVirtualCores(); + Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize), counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue()); - Assert.assertEquals((int) Math.ceil((float) reduceMemMb / minContainerSize), + Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize), counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue()); Assert.assertEquals(1, counters.findCounter(JobCounter.MILLIS_MAPS).getValue()); Assert.assertEquals(1, counters.findCounter(JobCounter.MILLIS_REDUCES).getValue()); - Assert.assertEquals(mapMemMb, + Assert.assertEquals(memoryMb, counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue()); - Assert.assertEquals(reduceMemMb, + Assert.assertEquals(memoryMb, counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue()); - Assert.assertEquals(1, + Assert.assertEquals(vcores, counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue()); - Assert.assertEquals(1, + Assert.assertEquals(vcores, counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue()); }