MAPREDUCE-6647. MR usage counters use the resources requested instead of the resources allocated (haibochen via rkanter)

(cherry picked from commit 3be1ab485f)
This commit is contained in:
Robert Kanter 2016-04-06 17:15:43 -07:00
parent 42bc565630
commit 3584cbc355
4 changed files with 74 additions and 43 deletions

View File

@ -1422,29 +1422,36 @@ public abstract class TaskAttemptImpl implements
private static void updateMillisCounters(JobCounterUpdateEvent jce, private static void updateMillisCounters(JobCounterUpdateEvent jce,
TaskAttemptImpl taskAttempt) { TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); // if container/resource if not allocated, do not update
if (null == taskAttempt.container ||
null == taskAttempt.container.getResource()) {
return;
}
long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime()); long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
int mbRequired = Resource allocatedResource = taskAttempt.container.getResource();
taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); int mbAllocated = allocatedResource.getMemory();
int vcoresRequired = taskAttempt.getCpuRequired(taskAttempt.conf, taskType); int vcoresAllocated = allocatedResource.getVirtualCores();
int minSlotMemSize = taskAttempt.conf.getInt( int minSlotMemSize = taskAttempt.conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int simSlotsAllocated = minSlotMemSize == 0 ? 0 :
int simSlotsRequired = (int) Math.ceil((float) mbAllocated / minSlotMemSize);
minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) mbRequired
/ minSlotMemSize);
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
if (taskType == TaskType.MAP) { if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, simSlotsRequired * duration); jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS,
jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbRequired); simSlotsAllocated * duration);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS, duration * vcoresRequired); jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbAllocated);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS,
duration * vcoresAllocated);
jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration); jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration);
} else { } else {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, simSlotsRequired * duration); jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES,
jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES, duration * mbRequired); simSlotsAllocated * duration);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES, duration * vcoresRequired); jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES,
duration * mbAllocated);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES,
duration * vcoresAllocated);
jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration); jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration);
} }
} }

View File

@ -110,6 +110,10 @@ import org.junit.Assert;
public class MRApp extends MRAppMaster { public class MRApp extends MRAppMaster {
private static final Log LOG = LogFactory.getLog(MRApp.class); private static final Log LOG = LogFactory.getLog(MRApp.class);
/**
* The available resource of each container allocated.
*/
private Resource resource;
int maps; int maps;
int reduces; int reduces;
@ -249,6 +253,7 @@ public class MRApp extends MRAppMaster {
// the job can reaches the final state when MRAppMaster shuts down. // the job can reaches the final state when MRAppMaster shuts down.
this.successfullyUnregistered.set(unregistered); this.successfullyUnregistered.set(unregistered);
this.assignedQueue = assignedQueue; this.assignedQueue = assignedQueue;
this.resource = Resource.newInstance(1234, 2);
} }
@Override @Override
@ -587,7 +592,6 @@ public class MRApp extends MRAppMaster {
ContainerId.newContainerId(getContext().getApplicationAttemptId(), ContainerId.newContainerId(getContext().getApplicationAttemptId(),
containerCount++); containerCount++);
NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT); NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
Resource resource = Resource.newInstance(1234, 2);
ContainerTokenIdentifier containerTokenIdentifier = ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), "user", new ContainerTokenIdentifier(cId, nodeId.toString(), "user",
resource, System.currentTimeMillis() + 10000, 42, 42, resource, System.currentTimeMillis() + 10000, 42, 42,
@ -710,6 +714,10 @@ public class MRApp extends MRAppMaster {
} }
} }
public void setAllocatedContainerResource(Resource resource) {
this.resource = resource;
}
class TestJob extends JobImpl { class TestJob extends JobImpl {
//override the init transition //override the init transition
private final TestInitTransition initTransition = new TestInitTransition( private final TestInitTransition initTransition = new TestInitTransition(

View File

@ -1744,19 +1744,24 @@ public class TestRecovery {
expectedJobHistoryEvents.remove(0); expectedJobHistoryEvents.remove(0);
} else if (current instanceof JobCounterUpdateEvent) { } else if (current instanceof JobCounterUpdateEvent) {
JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current; JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
boolean containsUpdates = jcue.getCounterUpdates().size() > 0;
LOG.info("JobCounterUpdateEvent " // there is no updates in a JobCounterUpdateEvent emitted on
+ jcue.getCounterUpdates().get(0).getCounterKey() // TaskAttempt recovery. Check that first.
+ " " + jcue.getCounterUpdates().get(0).getIncrementValue()); if(containsUpdates) {
if (jcue.getCounterUpdates().get(0).getCounterKey() == LOG.info("JobCounterUpdateEvent "
JobCounter.NUM_FAILED_MAPS) { + jcue.getCounterUpdates().get(0).getCounterKey()
totalFailedMaps += jcue.getCounterUpdates().get(0) + " " + jcue.getCounterUpdates().get(0).getIncrementValue());
.getIncrementValue(); if (jcue.getCounterUpdates().get(0).getCounterKey() ==
} else if (jcue.getCounterUpdates().get(0).getCounterKey() == JobCounter.NUM_FAILED_MAPS) {
JobCounter.TOTAL_LAUNCHED_MAPS) { totalFailedMaps += jcue.getCounterUpdates().get(0)
totalLaunchedMaps += jcue.getCounterUpdates().get(0) .getIncrementValue();
.getIncrementValue(); } else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
JobCounter.TOTAL_LAUNCHED_MAPS) {
totalLaunchedMaps += jcue.getCounterUpdates().get(0)
.getIncrementValue();
}
} }
} else if (current instanceof JobTaskEvent) { } else if (current instanceof JobTaskEvent) {
JobTaskEvent jte = (JobTaskEvent) current; JobTaskEvent jte = (JobTaskEvent) current;
assertEquals(jte.getState(), finalState); assertEquals(jte.getState(), finalState);

View File

@ -33,6 +33,12 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -41,9 +47,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -82,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
@ -250,22 +259,21 @@ public class TestTaskAttempt{
@Test @Test
public void testMillisCountersUpdate() throws Exception { public void testMillisCountersUpdate() throws Exception {
verifyMillisCounters(2048, 2048, 1024); verifyMillisCounters(Resource.newInstance(1024, 1), 512);
verifyMillisCounters(2048, 1024, 1024); verifyMillisCounters(Resource.newInstance(2048, 4), 1024);
verifyMillisCounters(10240, 1024, 2048); verifyMillisCounters(Resource.newInstance(10240, 8), 2048);
} }
public void verifyMillisCounters(int mapMemMb, int reduceMemMb, public void verifyMillisCounters(Resource containerResource,
int minContainerSize) throws Exception { int minContainerSize) throws Exception {
Clock actualClock = SystemClock.getInstance(); Clock actualClock = SystemClock.getInstance();
ControlledClock clock = new ControlledClock(actualClock); ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(10); clock.setTime(10);
MRApp app = MRApp app =
new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock); new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
app.setAllocatedContainerResource(containerResource);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
minContainerSize); minContainerSize);
app.setClusterInfo(new ClusterInfo(Resource.newInstance(10240, 1))); app.setClusterInfo(new ClusterInfo(Resource.newInstance(10240, 1)));
@ -300,21 +308,24 @@ public class TestTaskAttempt{
Assert.assertEquals(rta.getFinishTime(), 11); Assert.assertEquals(rta.getFinishTime(), 11);
Assert.assertEquals(rta.getLaunchTime(), 10); Assert.assertEquals(rta.getLaunchTime(), 10);
Counters counters = job.getAllCounters(); Counters counters = job.getAllCounters();
Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
int memoryMb = containerResource.getMemory();
int vcores = containerResource.getVirtualCores();
Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue()); counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue());
Assert.assertEquals((int) Math.ceil((float) reduceMemMb / minContainerSize), Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue()); counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue());
Assert.assertEquals(1, Assert.assertEquals(1,
counters.findCounter(JobCounter.MILLIS_MAPS).getValue()); counters.findCounter(JobCounter.MILLIS_MAPS).getValue());
Assert.assertEquals(1, Assert.assertEquals(1,
counters.findCounter(JobCounter.MILLIS_REDUCES).getValue()); counters.findCounter(JobCounter.MILLIS_REDUCES).getValue());
Assert.assertEquals(mapMemMb, Assert.assertEquals(memoryMb,
counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue()); counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue());
Assert.assertEquals(reduceMemMb, Assert.assertEquals(memoryMb,
counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue()); counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue());
Assert.assertEquals(1, Assert.assertEquals(vcores,
counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue()); counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue());
Assert.assertEquals(1, Assert.assertEquals(vcores,
counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue()); counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue());
} }