From 5443f258fbfcf0c9958948e0d2e87fc625b4afd3 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Tue, 7 Feb 2012 23:07:56 +0000 Subject: [PATCH] merge from trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1241693 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/job/impl/JobImpl.java | 74 +++++++++++++++---- .../hadoop/mapreduce/v2/app/TestMRApp.java | 41 ++++++++++ .../v2/app/job/impl/TestJobImpl.java | 67 ++++++++--------- 4 files changed, 133 insertions(+), 52 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 37017ce6e24..18323ee2c2a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -711,6 +711,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3826. Fixed a bug in RM web-ui which broke sorting. (Jonathan Eagles via acmurthy) + MAPREDUCE-3823. Ensure counters are calculated only once after a job + finishes. (Vinod Kumar Vavilapalli via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index c93696d4e05..e647dc31c97 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -106,7 +107,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; /** Implementation of Job interface. Maintains the state machines of Job. * The read and write calls use ReadWriteLock for concurrency. */ -@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" }) +@SuppressWarnings({ "rawtypes", "unchecked" }) public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, EventHandler { @@ -153,6 +154,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private boolean lazyTasksCopyNeeded = false; volatile Map tasks = new LinkedHashMap(); private Counters jobCounters = new Counters(); + private Object fullCountersLock = new Object(); + private Counters fullCounters = null; + private Counters finalMapCounters = null; + private Counters finalReduceCounters = null; // FIXME: // // Can then replace task-level uber counters (MR-2424) with job-level ones @@ -473,11 +478,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, @Override public Counters getAllCounters() { - Counters counters = new Counters(); + readLock.lock(); + try { + JobState state = getState(); + if (state == JobState.ERROR || state == JobState.FAILED + || state == JobState.KILLED || state == JobState.SUCCEEDED) { + this.mayBeConstructFinalFullCounters(); + return fullCounters; + } + + Counters counters = new Counters(); counters.incrAllCounters(jobCounters); return incrTaskCounters(counters, tasks.values()); + } finally { readLock.unlock(); } @@ -525,17 +540,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, try { JobState state = getState(); + // jobFile can be null if the job is not yet inited. + String jobFile = + remoteJobConfFile == null ? "" : remoteJobConfFile.toString(); + if (getState() == JobState.NEW) { return MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, - cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); + cleanupProgress, jobFile, amInfos, isUber); } computeProgress(); return MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, this.mapProgress, this.reduceProgress, - cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); + cleanupProgress, jobFile, amInfos, isUber); } finally { readLock.unlock(); } @@ -1143,26 +1162,49 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // not be generated for KilledJobs, etc. private static JobFinishedEvent createJobFinishedEvent(JobImpl job) { - Counters mapCounters = new Counters(); - Counters reduceCounters = new Counters(); - for (Task t : job.tasks.values()) { - Counters counters = t.getCounters(); - switch (t.getType()) { - case MAP: mapCounters.incrAllCounters(counters); break; - case REDUCE: reduceCounters.incrAllCounters(counters); break; - } - } + job.mayBeConstructFinalFullCounters(); JobFinishedEvent jfe = new JobFinishedEvent( job.oldJobId, job.finishTime, job.succeededMapTaskCount, job.succeededReduceTaskCount, job.failedMapTaskCount, job.failedReduceTaskCount, - mapCounters, - reduceCounters, - job.getAllCounters()); + job.finalMapCounters, + job.finalReduceCounters, + job.fullCounters); return jfe; } + private void mayBeConstructFinalFullCounters() { + // Calculating full-counters. This should happen only once for the job. + synchronized (this.fullCountersLock) { + if (this.fullCounters != null) { + // Already constructed. Just return. + return; + } + this.constructFinalFullcounters(); + } + } + + @Private + public void constructFinalFullcounters() { + this.fullCounters = new Counters(); + this.finalMapCounters = new Counters(); + this.finalReduceCounters = new Counters(); + this.fullCounters.incrAllCounters(jobCounters); + for (Task t : this.tasks.values()) { + Counters counters = t.getCounters(); + switch (t.getType()) { + case MAP: + this.finalMapCounters.incrAllCounters(counters); + break; + case REDUCE: + this.finalReduceCounters.incrAllCounters(counters); + break; + } + this.fullCounters.incrAllCounters(counters); + } + } + // Task-start has been moved out of InitTransition, so this arc simply // hardcodes 0 for both map and reduce finished tasks. private static class KillNewJobTransition diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index 1e9631696ae..9d1b8cc30bc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -18,6 +18,10 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; + import java.util.Iterator; import junit.framework.Assert; @@ -35,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.junit.Test; /** @@ -175,6 +180,41 @@ public class TestMRApp { app.waitForState(job, JobState.ERROR); } + private final class MRAppWithSpiedJob extends MRApp { + private JobImpl spiedJob; + + private MRAppWithSpiedJob(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + } + + @Override + protected Job createJob(Configuration conf) { + spiedJob = spy((JobImpl) super.createJob(conf)); + ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob); + return spiedJob; + } + + JobImpl getSpiedJob() { + return this.spiedJob; + } + } + + @Test + public void testCountersOnJobFinish() throws Exception { + MRAppWithSpiedJob app = + new MRAppWithSpiedJob(1, 1, true, this.getClass().getName(), true); + JobImpl job = (JobImpl)app.submit(new Configuration()); + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + System.out.println(job.getAllCounters()); + // Just call getCounters + job.getAllCounters(); + job.getAllCounters(); + // Should be called only once + verify(job, times(1)).constructFinalFullcounters(); + } + @Test public void checkJobStateTypeConversion() { //verify that all states can be converted without @@ -200,5 +240,6 @@ public class TestMRApp { t.testCommitPending(); t.testCompletedMapsForReduceSlowstart(); t.testJobError(); + t.testCountersOnJobFinish(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index acec4cb6c3d..2461760dd3d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -18,48 +18,40 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; -import java.io.IOException; -import java.util.Map; -import java.util.HashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.JobACL; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; -import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; -import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition; -import org.apache.hadoop.mapreduce.v2.app.job.Job; -import org.apache.hadoop.mapreduce.v2.app.job.Task; -import org.apache.hadoop.mapreduce.v2.api.records.JobState; -import org.apache.hadoop.mapreduce.v2.api.records.TaskId; -import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; -import org.apache.hadoop.mapreduce.v2.app.MRApp; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.event.EventHandler; -import org.junit.Test; -import org.junit.Assert; - +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.any; -import org.mockito.ArgumentMatcher; -import org.mockito.Mockito; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.Assert; +import org.junit.Test; /** * Tests various functions of the JobImpl class */ +@SuppressWarnings({"unchecked", "rawtypes"}) public class TestJobImpl { @Test @@ -106,7 +98,9 @@ public class TestJobImpl { "for successful job", JobImpl.checkJobCompleteSuccess(mockJob)); Assert.assertEquals("checkJobCompleteSuccess returns incorrect state", - JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED); + JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED); + + } @Test @@ -139,6 +133,7 @@ public class TestJobImpl { t.testJobNoTasksTransition(); t.testCheckJobCompleteSuccess(); t.testCheckJobCompleteSuccessFailed(); + t.testCheckAccess(); } @Test