MAPREDUCE-3823. Ensure counters are calculated only once after a job finishes. (Contributed by Vinod Kumar Vavilapalli)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241692 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-02-07 23:07:03 +00:00
parent d8e9f84fb1
commit ccc2807d86
4 changed files with 133 additions and 52 deletions

View File

@ -761,6 +761,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3826. Fixed a bug in RM web-ui which broke sorting. (Jonathan
Eagles via acmurthy)
MAPREDUCE-3823. Ensure counters are calculated only once after a job
finishes. (Vinod Kumar Vavilapalli via sseth)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -35,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -106,7 +107,7 @@
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked" })
public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
EventHandler<JobEvent> {
@ -153,6 +154,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private boolean lazyTasksCopyNeeded = false;
volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = new Counters();
private Object fullCountersLock = new Object();
private Counters fullCounters = null;
private Counters finalMapCounters = null;
private Counters finalReduceCounters = null;
// FIXME:
//
// Can then replace task-level uber counters (MR-2424) with job-level ones
@ -473,11 +478,21 @@ public boolean isUber() {
@Override
public Counters getAllCounters() {
Counters counters = new Counters();
readLock.lock();
try {
JobState state = getState();
if (state == JobState.ERROR || state == JobState.FAILED
|| state == JobState.KILLED || state == JobState.SUCCEEDED) {
this.mayBeConstructFinalFullCounters();
return fullCounters;
}
Counters counters = new Counters();
counters.incrAllCounters(jobCounters);
return incrTaskCounters(counters, tasks.values());
} finally {
readLock.unlock();
}
@ -525,17 +540,21 @@ public JobReport getReport() {
try {
JobState state = getState();
// jobFile can be null if the job is not yet inited.
String jobFile =
remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
cleanupProgress, jobFile, amInfos, isUber);
}
computeProgress();
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress,
this.mapProgress, this.reduceProgress,
cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
cleanupProgress, jobFile, amInfos, isUber);
} finally {
readLock.unlock();
}
@ -1143,26 +1162,49 @@ private void abortJob(
// not be generated for KilledJobs, etc.
private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
Counters mapCounters = new Counters();
Counters reduceCounters = new Counters();
for (Task t : job.tasks.values()) {
Counters counters = t.getCounters();
switch (t.getType()) {
case MAP: mapCounters.incrAllCounters(counters); break;
case REDUCE: reduceCounters.incrAllCounters(counters); break;
}
}
job.mayBeConstructFinalFullCounters();
JobFinishedEvent jfe = new JobFinishedEvent(
job.oldJobId, job.finishTime,
job.succeededMapTaskCount, job.succeededReduceTaskCount,
job.failedMapTaskCount, job.failedReduceTaskCount,
mapCounters,
reduceCounters,
job.getAllCounters());
job.finalMapCounters,
job.finalReduceCounters,
job.fullCounters);
return jfe;
}
private void mayBeConstructFinalFullCounters() {
// Calculating full-counters. This should happen only once for the job.
synchronized (this.fullCountersLock) {
if (this.fullCounters != null) {
// Already constructed. Just return.
return;
}
this.constructFinalFullcounters();
}
}
@Private
public void constructFinalFullcounters() {
this.fullCounters = new Counters();
this.finalMapCounters = new Counters();
this.finalReduceCounters = new Counters();
this.fullCounters.incrAllCounters(jobCounters);
for (Task t : this.tasks.values()) {
Counters counters = t.getCounters();
switch (t.getType()) {
case MAP:
this.finalMapCounters.incrAllCounters(counters);
break;
case REDUCE:
this.finalReduceCounters.incrAllCounters(counters);
break;
}
this.fullCounters.incrAllCounters(counters);
}
}
// Task-start has been moved out of InitTransition, so this arc simply
// hardcodes 0 for both map and reduce finished tasks.
private static class KillNewJobTransition

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;
import java.util.Iterator;
import junit.framework.Assert;
@ -35,6 +39,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.junit.Test;
/**
@ -175,6 +180,41 @@ public void testJobError() throws Exception {
app.waitForState(job, JobState.ERROR);
}
private final class MRAppWithSpiedJob extends MRApp {
private JobImpl spiedJob;
private MRAppWithSpiedJob(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@Override
protected Job createJob(Configuration conf) {
spiedJob = spy((JobImpl) super.createJob(conf));
((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
return spiedJob;
}
JobImpl getSpiedJob() {
return this.spiedJob;
}
}
@Test
public void testCountersOnJobFinish() throws Exception {
MRAppWithSpiedJob app =
new MRAppWithSpiedJob(1, 1, true, this.getClass().getName(), true);
JobImpl job = (JobImpl)app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
System.out.println(job.getAllCounters());
// Just call getCounters
job.getAllCounters();
job.getAllCounters();
// Should be called only once
verify(job, times(1)).constructFinalFullcounters();
}
@Test
public void checkJobStateTypeConversion() {
//verify that all states can be converted without
@ -200,5 +240,6 @@ public static void main(String[] args) throws Exception {
t.testCommitPending();
t.testCompletedMapsForReduceSlowstart();
t.testJobError();
t.testCountersOnJobFinish();
}
}

View File

@ -18,48 +18,40 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
import org.junit.Assert;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.any;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests various functions of the JobImpl class
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestJobImpl {
@Test
@ -106,7 +98,9 @@ public void testCheckJobCompleteSuccess() {
"for successful job",
JobImpl.checkJobCompleteSuccess(mockJob));
Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);
JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);
}
@Test
@ -139,6 +133,7 @@ public static void main(String[] args) throws Exception {
t.testJobNoTasksTransition();
t.testCheckJobCompleteSuccess();
t.testCheckJobCompleteSuccessFailed();
t.testCheckAccess();
}
@Test