MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the YARN resource model (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562229 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-28 21:05:03 +00:00
parent 55b56f3bcf
commit be75f21b6f
5 changed files with 64 additions and 46 deletions

View File

@ -58,6 +58,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5725. Make explicit that TestNetworkedJob relies on the Capacity MAPREDUCE-5725. Make explicit that TestNetworkedJob relies on the Capacity
Scheduler (Sandy Ryza) Scheduler (Sandy Ryza)
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
YARN resource model (Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

View File

@ -1262,34 +1262,40 @@ private void computeRackAndLocality() {
} }
} }
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) { private static void updateMillisCounters(JobCounterUpdateEvent jce,
TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
int slotMemoryReq = long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
int mbRequired =
taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
int vcoresRequired = taskAttempt.getCpuRequired(taskAttempt.conf, taskType);
int minSlotMemSize = taskAttempt.conf.getInt( int minSlotMemSize = taskAttempt.conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int simSlotsRequired = int simSlotsRequired =
minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) mbRequired
/ minSlotMemSize); / minSlotMemSize);
long slotMillisIncrement = if (taskType == TaskType.MAP) {
simSlotsRequired jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, simSlotsRequired * duration);
* (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime()); jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbRequired);
return slotMillisIncrement; jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS, duration * vcoresRequired);
jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration);
} else {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, simSlotsRequired * duration);
jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES, duration * mbRequired);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES, duration * vcoresRequired);
jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration);
}
} }
private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded( private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
TaskAttemptImpl taskAttempt) { TaskAttemptImpl taskAttempt) {
long slotMillis = computeSlotMillis(taskAttempt);
TaskId taskId = taskAttempt.attemptId.getTaskId(); TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId()); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate( updateMillisCounters(jce, taskAttempt);
taskId.getTaskType() == TaskType.MAP ?
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
slotMillis);
return jce; return jce;
} }
@ -1298,20 +1304,13 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
long slotMillisIncrement = computeSlotMillis(taskAttempt);
if (taskType == TaskType.MAP) { if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1); jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
if(!taskAlreadyCompleted) {
// dont double count the elapsed time
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
}
} else { } else {
jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1); jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
if(!taskAlreadyCompleted) { }
// dont double count the elapsed time if (!taskAlreadyCompleted) {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); updateMillisCounters(jce, taskAttempt);
}
} }
return jce; return jce;
} }
@ -1321,20 +1320,13 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
long slotMillisIncrement = computeSlotMillis(taskAttempt);
if (taskType == TaskType.MAP) { if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1); jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
if(!taskAlreadyCompleted) {
// dont double count the elapsed time
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
}
} else { } else {
jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1); jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
if(!taskAlreadyCompleted) { }
// dont double count the elapsed time if (!taskAlreadyCompleted) {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); updateMillisCounters(jce, taskAttempt);
}
} }
return jce; return jce;
} }

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -182,13 +183,13 @@ public void testHostResolveAttempt() throws Exception {
} }
@Test @Test
public void testSlotMillisCounterUpdate() throws Exception { public void testMillisCountersUpdate() throws Exception {
verifySlotMillis(2048, 2048, 1024); verifyMillisCounters(2048, 2048, 1024);
verifySlotMillis(2048, 1024, 1024); verifyMillisCounters(2048, 1024, 1024);
verifySlotMillis(10240, 1024, 2048); verifyMillisCounters(10240, 1024, 2048);
} }
public void verifySlotMillis(int mapMemMb, int reduceMemMb, public void verifyMillisCounters(int mapMemMb, int reduceMemMb,
int minContainerSize) throws Exception { int minContainerSize) throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = new SystemClock();
ControlledClock clock = new ControlledClock(actualClock); ControlledClock clock = new ControlledClock(actualClock);
@ -232,13 +233,23 @@ public void verifySlotMillis(int mapMemMb, int reduceMemMb,
Assert.assertEquals(mta.getLaunchTime(), 10); Assert.assertEquals(mta.getLaunchTime(), 10);
Assert.assertEquals(rta.getFinishTime(), 11); Assert.assertEquals(rta.getFinishTime(), 11);
Assert.assertEquals(rta.getLaunchTime(), 10); Assert.assertEquals(rta.getLaunchTime(), 10);
Counters counters = job.getAllCounters();
Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize), Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS) counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue());
.getValue()); Assert.assertEquals((int) Math.ceil((float) reduceMemMb / minContainerSize),
Assert.assertEquals( counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue());
(int) Math.ceil((float) reduceMemMb / minContainerSize), job Assert.assertEquals(1,
.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES) counters.findCounter(JobCounter.MILLIS_MAPS).getValue());
.getValue()); Assert.assertEquals(1,
counters.findCounter(JobCounter.MILLIS_REDUCES).getValue());
Assert.assertEquals(mapMemMb,
counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue());
Assert.assertEquals(reduceMemMb,
counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue());
Assert.assertEquals(1,
counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue());
Assert.assertEquals(1,
counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue());
} }
private TaskAttemptImpl createMapTaskAttemptImplForTest( private TaskAttemptImpl createMapTaskAttemptImplForTest(

View File

@ -45,5 +45,11 @@ public enum JobCounter {
TOTAL_LAUNCHED_UBERTASKS, TOTAL_LAUNCHED_UBERTASKS,
NUM_UBER_SUBMAPS, NUM_UBER_SUBMAPS,
NUM_UBER_SUBREDUCES, NUM_UBER_SUBREDUCES,
NUM_FAILED_UBERTASKS NUM_FAILED_UBERTASKS,
MILLIS_MAPS,
MILLIS_REDUCES,
VCORES_MILLIS_MAPS,
VCORES_MILLIS_REDUCES,
MB_MILLIS_MAPS,
MB_MILLIS_REDUCES
} }

View File

@ -25,5 +25,11 @@ DATA_LOCAL_MAPS.name= Data-local map tasks
RACK_LOCAL_MAPS.name= Rack-local map tasks RACK_LOCAL_MAPS.name= Rack-local map tasks
SLOTS_MILLIS_MAPS.name= Total time spent by all maps in occupied slots (ms) SLOTS_MILLIS_MAPS.name= Total time spent by all maps in occupied slots (ms)
SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms) SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms)
MILLIS_MAPS.name= Total time spent by all map tasks (ms)
MILLIS_REDUCES.name= Total time spent by all reduce tasks (ms)
MB_MILLIS_MAPS.name= Total megabyte-seconds taken by all map tasks
MB_MILLIS_REDUCES.name= Total megabyte-seconds taken by all reduce tasks
VCORES_MILLIS_MAPS.name= Total vcore-seconds taken by all map tasks
VCORES_MILLIS_REDUCES.name= Total vcore-seconds taken by all reduce tasks
FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms) FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms)
FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms) FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms)