diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a33315e075d..596b51034e4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -206,6 +206,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5725. Make explicit that TestNetworkedJob relies on the Capacity Scheduler (Sandy Ryza) + MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the + YARN resource model (Sandy Ryza) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 37c5064b182..78d70241fbe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1265,57 +1265,56 @@ public abstract class TaskAttemptImpl implements } } } - - private static long computeSlotMillis(TaskAttemptImpl taskAttempt) { + + private static void updateMillisCounters(JobCounterUpdateEvent jce, + TaskAttemptImpl taskAttempt) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); - int slotMemoryReq = + long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime()); + int mbRequired = taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); + int vcoresRequired = taskAttempt.getCpuRequired(taskAttempt.conf, taskType); int minSlotMemSize = taskAttempt.conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); int simSlotsRequired = - minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq + minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) mbRequired / minSlotMemSize); - long slotMillisIncrement = - simSlotsRequired - * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime()); - return slotMillisIncrement; + if (taskType == TaskType.MAP) { + jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, simSlotsRequired * duration); + jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbRequired); + jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS, duration * vcoresRequired); + jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration); + } else { + jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, simSlotsRequired * duration); + jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES, duration * mbRequired); + jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES, duration * vcoresRequired); + jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration); + } } private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded( TaskAttemptImpl taskAttempt) { - long slotMillis = computeSlotMillis(taskAttempt); TaskId taskId = taskAttempt.attemptId.getTaskId(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId()); - jce.addCounterUpdate( - taskId.getTaskType() == TaskType.MAP ? - JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES, - slotMillis); + updateMillisCounters(jce, taskAttempt); return jce; } - + private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed( TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); - long slotMillisIncrement = computeSlotMillis(taskAttempt); - if (taskType == TaskType.MAP) { jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1); - if(!taskAlreadyCompleted) { - // dont double count the elapsed time - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); - } } else { jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1); - if(!taskAlreadyCompleted) { - // dont double count the elapsed time - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); - } + } + if (!taskAlreadyCompleted) { + updateMillisCounters(jce, taskAttempt); } return jce; } @@ -1325,20 +1324,13 @@ public abstract class TaskAttemptImpl implements TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); - long slotMillisIncrement = computeSlotMillis(taskAttempt); - if (taskType == TaskType.MAP) { jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1); - if(!taskAlreadyCompleted) { - // dont double count the elapsed time - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); - } } else { jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1); - if(!taskAlreadyCompleted) { - // dont double count the elapsed time - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); - } + } + if (!taskAlreadyCompleted) { + updateMillisCounters(jce, taskAttempt); } return jce; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 5858136d485..28babaa2d66 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -182,13 +183,13 @@ public class TestTaskAttempt{ } @Test - public void testSlotMillisCounterUpdate() throws Exception { - verifySlotMillis(2048, 2048, 1024); - verifySlotMillis(2048, 1024, 1024); - verifySlotMillis(10240, 1024, 2048); + public void testMillisCountersUpdate() throws Exception { + verifyMillisCounters(2048, 2048, 1024); + verifyMillisCounters(2048, 1024, 1024); + verifyMillisCounters(10240, 1024, 2048); } - public void verifySlotMillis(int mapMemMb, int reduceMemMb, + public void verifyMillisCounters(int mapMemMb, int reduceMemMb, int minContainerSize) throws Exception { Clock actualClock = new SystemClock(); ControlledClock clock = new ControlledClock(actualClock); @@ -232,13 +233,23 @@ public class TestTaskAttempt{ Assert.assertEquals(mta.getLaunchTime(), 10); Assert.assertEquals(rta.getFinishTime(), 11); Assert.assertEquals(rta.getLaunchTime(), 10); + Counters counters = job.getAllCounters(); Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize), - job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS) - .getValue()); - Assert.assertEquals( - (int) Math.ceil((float) reduceMemMb / minContainerSize), job - .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES) - .getValue()); + counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue()); + Assert.assertEquals((int) Math.ceil((float) reduceMemMb / minContainerSize), + counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue()); + Assert.assertEquals(1, + counters.findCounter(JobCounter.MILLIS_MAPS).getValue()); + Assert.assertEquals(1, + counters.findCounter(JobCounter.MILLIS_REDUCES).getValue()); + Assert.assertEquals(mapMemMb, + counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue()); + Assert.assertEquals(reduceMemMb, + counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue()); + Assert.assertEquals(1, + counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue()); + Assert.assertEquals(1, + counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue()); } private TaskAttemptImpl createMapTaskAttemptImplForTest( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java index f7a87d1ab88..2f09e86f2de 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java @@ -49,5 +49,11 @@ public enum JobCounter { TASKS_REQ_PREEMPT, CHECKPOINTS, CHECKPOINT_BYTES, - CHECKPOINT_TIME + CHECKPOINT_TIME, + MILLIS_MAPS, + MILLIS_REDUCES, + VCORES_MILLIS_MAPS, + VCORES_MILLIS_REDUCES, + MB_MILLIS_MAPS, + MB_MILLIS_REDUCES } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties index 42539a097b2..11547848e47 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties @@ -25,9 +25,15 @@ DATA_LOCAL_MAPS.name= Data-local map tasks RACK_LOCAL_MAPS.name= Rack-local map tasks SLOTS_MILLIS_MAPS.name= Total time spent by all maps in occupied slots (ms) SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms) +MILLIS_MAPS.name= Total time spent by all map tasks (ms) +MILLIS_REDUCES.name= Total time spent by all reduce tasks (ms) +MB_MILLIS_MAPS.name= Total megabyte-seconds taken by all map tasks +MB_MILLIS_REDUCES.name= Total megabyte-seconds taken by all reduce tasks +VCORES_MILLIS_MAPS.name= Total vcore-seconds taken by all map tasks +VCORES_MILLIS_REDUCES.name= Total vcore-seconds taken by all reduce tasks FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms) FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms) TASKS_REQ_PREEMPT.name= Tasks that have been asked to preempt CHECKPOINTS.name= Number of checkpoints reported CHECKPOINT_BYTES.name= Total amount of bytes in checkpoints -CHECKPOINT_TIME.name= Total time spent checkpointing (ms) \ No newline at end of file +CHECKPOINT_TIME.name= Total time spent checkpointing (ms)