diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f24c114e2e8..154740055ed 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -201,6 +201,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-6123. TestCombineFileInputFormat incorrectly starts 2 MiniDFSCluster instances. (cnauroth) + MAPREDUCE-5875. Make Counter limits consistent across JobClient, + MRAppMaster, and YarnChild. (Gera Shegalov via kasha) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 55eb88b4103..7c882ef0068 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.EventReader; import org.apache.hadoop.mapreduce.jobhistory.EventType; @@ -1088,6 +1089,8 @@ public class MRAppMaster extends CompositeService { // finally set the job classloader MRApps.setClassLoader(jobClassLoader, getConfig()); + // set job classloader if configured + Limits.init(getConfig()); if (initFailed) { JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java index 2fcc0461c39..60ff715cb83 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java @@ -182,15 +182,15 @@ public class Cluster { public Job getJob(JobID jobId) throws IOException, InterruptedException { JobStatus status = client.getJobStatus(jobId); if (status != null) { - JobConf conf; + final JobConf conf = new JobConf(); + final Path jobPath = new Path(client.getFilesystemName(), + status.getJobFile()); + final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf()); try { - conf = new JobConf(status.getJobFile()); - } catch (RuntimeException ex) { - // If job file doesn't exist it means we can't find the job - if (ex.getCause() instanceof FileNotFoundException) { - return null; - } else { - throw ex; + conf.addResource(fs.open(jobPath), jobPath.toString()); + } catch (FileNotFoundException fnf) { + if (LOG.isWarnEnabled()) { + LOG.warn("Job conf missing on cluster", fnf); } } return Job.getInstance(this, status, conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index d80521c5944..59202ca1092 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -50,6 +50,7 @@ import org.apache.hadoop.mapred.QueueACL; import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName; +import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; @@ -437,6 +438,7 @@ class JobSubmitter { // Write job file to submit dir writeConf(conf, submitJobFile); + Limits.reset(conf); // // Now, actually submit the job (using the submit name) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java index 34b0fae6e82..3821694b2fb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java @@ -123,4 +123,9 @@ public class Limits { public synchronized LimitExceededException violation() { return firstViolation; } + + public static synchronized void reset(Configuration conf) { + isInited = false; + init(conf); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java index eaeadea6ff4..43b2df290d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapreduce.jobhistory; +import java.io.FileNotFoundException; import java.io.IOException; import java.text.DecimalFormat; import java.text.Format; @@ -29,6 +30,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -41,6 +44,7 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.util.HostUtil; @@ -54,7 +58,8 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @InterfaceAudience.Private @InterfaceStability.Unstable public class HistoryViewer { - private static SimpleDateFormat dateFormat = + private static final Log LOG = LogFactory.getLog(HistoryViewer.class); + private static final SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss"); private FileSystem fs; private JobInfo job; @@ -83,6 +88,17 @@ public class HistoryViewer { System.err.println("Ignore unrecognized file: " + jobFile.getName()); throw new IOException(errorMsg); } + final Path jobConfPath = new Path(jobFile.getParent(), jobDetails[0] + + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml"); + final Configuration jobConf = new Configuration(conf); + try { + jobConf.addResource(fs.open(jobConfPath), jobConfPath.toString()); + Limits.reset(conf); + } catch (FileNotFoundException fnf) { + if (LOG.isWarnEnabled()) { + LOG.warn("Missing job conf in history", fnf); + } + } JobHistoryParser parser = new JobHistoryParser(fs, jobFile); job = parser.parse(); jobId = job.getJobId().toString(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index 79b9275467f..9e38316e068 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.hs; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; @@ -34,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.TaskCompletionEvent; @@ -41,6 +43,7 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; @@ -331,9 +334,21 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job if (historyFileAbsolute != null) { JobHistoryParser parser = null; try { + final FileSystem fs = historyFileAbsolute.getFileSystem(conf); parser = new JobHistoryParser(historyFileAbsolute.getFileSystem(conf), historyFileAbsolute); + final Path jobConfPath = new Path(historyFileAbsolute.getParent(), + JobHistoryUtils.getIntermediateConfFileName(jobId)); + final Configuration conf = new Configuration(); + try { + conf.addResource(fs.open(jobConfPath), jobConfPath.toString()); + Limits.reset(conf); + } catch (FileNotFoundException fnf) { + if (LOG.isWarnEnabled()) { + LOG.warn("Missing job conf in history", fnf); + } + } this.jobInfo = parser.parse(); } catch (IOException e) { throw new YarnRuntimeException("Could not load history file " diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index e27168d24f2..f32944d94ee 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -53,10 +53,14 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; @@ -105,6 +109,7 @@ public class TestMRJobs { EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED); private static final int NUM_NODE_MGRS = 3; private static final String TEST_IO_SORT_MB = "11"; + private static final String TEST_GROUP_MAX = "200"; protected static MiniMRYarnCluster mrCluster; protected static MiniDFSCluster dfsCluster; @@ -213,31 +218,58 @@ public class TestMRJobs { } @Test(timeout = 300000) - public void testJobClassloader() throws IOException, InterruptedException, - ClassNotFoundException { - testJobClassloader(false); + public void testConfVerificationWithClassloader() throws Exception { + testConfVerification(true, false, false, false); } @Test(timeout = 300000) - public void testJobClassloaderWithCustomClasses() throws IOException, - InterruptedException, ClassNotFoundException { - testJobClassloader(true); + public void testConfVerificationWithClassloaderCustomClasses() + throws Exception { + testConfVerification(true, true, false, false); } - private void testJobClassloader(boolean useCustomClasses) throws IOException, - InterruptedException, ClassNotFoundException { - LOG.info("\n\n\nStarting testJobClassloader()" - + " useCustomClasses=" + useCustomClasses); + @Test(timeout = 300000) + public void testConfVerificationWithOutClassloader() throws Exception { + testConfVerification(false, false, false, false); + } + + @Test(timeout = 300000) + public void testConfVerificationWithJobClient() throws Exception { + testConfVerification(false, false, true, false); + } + + @Test(timeout = 300000) + public void testConfVerificationWithJobClientLocal() throws Exception { + testConfVerification(false, false, true, true); + } + + private void testConfVerification(boolean useJobClassLoader, + boolean useCustomClasses, boolean useJobClientForMonitring, + boolean useLocal) throws Exception { + LOG.info("\n\n\nStarting testConfVerification()" + + " jobClassloader=" + useJobClassLoader + + " customClasses=" + useCustomClasses + + " jobClient=" + useJobClientForMonitring + + " localMode=" + useLocal); if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); return; } - final Configuration sleepConf = new Configuration(mrCluster.getConfig()); + final Configuration clusterConfig; + if (useLocal) { + clusterConfig = new Configuration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); + } else { + clusterConfig = mrCluster.getConfig(); + } + final JobClient jc = new JobClient(clusterConfig); + final Configuration sleepConf = new Configuration(clusterConfig); // set master address to local to test that local mode applied iff framework == local sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); - sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true); + sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, + useJobClassLoader); if (useCustomClasses) { // to test AM loading user classes such as output format class, we want // to blacklist them from the system classes (they need to be prepended @@ -255,6 +287,7 @@ public class TestMRJobs { sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString()); sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class"); + sleepConf.set(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TEST_GROUP_MAX); final SleepJob sleepJob = new SleepJob(); sleepJob.setConf(sleepConf); final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1); @@ -272,7 +305,26 @@ public class TestMRJobs { jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true); } job.submit(); - boolean succeeded = job.waitForCompletion(true); + final boolean succeeded; + if (useJobClientForMonitring && !useLocal) { + // We can't use getJobID in useLocal case because JobClient and Job + // point to different instances of LocalJobRunner + // + final JobID mapredJobID = JobID.downgrade(job.getJobID()); + RunningJob runningJob = null; + do { + Thread.sleep(10); + runningJob = jc.getJob(mapredJobID); + } while (runningJob == null); + Assert.assertEquals("Unexpected RunningJob's " + + MRJobConfig.COUNTER_GROUPS_MAX_KEY, + TEST_GROUP_MAX, runningJob.getConfiguration() + .get(MRJobConfig.COUNTER_GROUPS_MAX_KEY)); + runningJob.waitForCompletion(); + succeeded = runningJob.isSuccessful(); + } else { + succeeded = job.waitForCompletion(true); + } Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(), succeeded); } @@ -925,5 +977,14 @@ public class TestMRJobs { + ", actual: " + ioSortMb); } } + + @Override + public void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException { + super.map(key, value, context); + for (int i = 0; i < 100; i++) { + context.getCounter("testCounterGroup-" + i, + "testCounter").increment(1); + } + } } }