From f24452d14e9ba48cdb82e5e6e5c10ce5b1407308 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 12 May 2015 12:11:42 -0700 Subject: [PATCH] MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so that they don't fail on history-server backed by DFSes with not so strong guarantees. Contributed by Craig Welch. --- hadoop-mapreduce-project/CHANGES.txt | 5 ++ .../org/apache/hadoop/mapred/JobClient.java | 51 +++++++++++---- .../apache/hadoop/mapreduce/MRJobConfig.java | 15 +++++ .../src/main/resources/mapred-default.xml | 17 +++++ .../hadoop/mapred/JobClientUnitTest.java | 65 +++++++++++++++++++ 5 files changed, 142 insertions(+), 11 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 15cdf90a324..fc983761567 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -328,6 +328,7 @@ Release 2.8.0 - UNRELEASED OPTIMIZATIONS BUG FIXES + MAPREDUCE-6314. TestPipeApplication fails on trunk. (Varun Vasudev via harsh) @@ -450,6 +451,10 @@ Release 2.7.1 - UNRELEASED MAPREDUCE-6259. IllegalArgumentException due to missing job submit time (zhihai xu via jlowe) + MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so + that they don't fail on history-server backed by DFSes with not so strong + guarantees. (Craig Welch via vinodkv) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index e91fbfed37c..cf123c7b4ee 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskType; @@ -154,6 +155,10 @@ public class JobClient extends CLI { public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; + private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES; + private long retryInterval = + MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL; + static{ ConfigUtil.loadResources(); } @@ -469,6 +474,14 @@ public class JobClient extends CLI { setConf(conf); cluster = new Cluster(conf); clientUgi = UserGroupInformation.getCurrentUser(); + + maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, + MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES); + + retryInterval = + conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, + MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL); + } /** @@ -581,16 +594,8 @@ public class JobClient extends CLI { } }); } - /** - * Get an {@link RunningJob} object to track an ongoing job. Returns - * null if the id does not correspond to any known job. - * - * @param jobid the jobid of the job. - * @return the {@link RunningJob} handle to track the job, null if the - * jobid doesn't correspond to any known job. - * @throws IOException - */ - public RunningJob getJob(final JobID jobid) throws IOException { + + protected RunningJob getJobInner(final JobID jobid) throws IOException { try { Job job = getJobUsingCluster(jobid); @@ -607,7 +612,31 @@ public class JobClient extends CLI { return null; } - /**@deprecated Applications should rather use {@link #getJob(JobID)}. + /** + * Get an {@link RunningJob} object to track an ongoing job. Returns + * null if the id does not correspond to any known job. + * + * @param jobid the jobid of the job. + * @return the {@link RunningJob} handle to track the job, null if the + * jobid doesn't correspond to any known job. + * @throws IOException + */ + public RunningJob getJob(final JobID jobid) throws IOException { + for (int i = 0;i <= maxRetry;i++) { + if (i > 0) { + try { + Thread.sleep(retryInterval); + } catch (Exception e) { } + } + RunningJob job = getJobInner(jobid); + if (job != null) { + return job; + } + } + return null; + } + + /**@deprecated Applications should rather use {@link #getJob(JobID)}. */ @Deprecated public RunningJob getJob(String jobid) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 49345cd50f5..fb4064c2270 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -469,6 +469,21 @@ public interface MRJobConfig { MR_PREFIX + "client.max-retries"; public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3; + /** + * How many times to retry jobclient calls (via getjob) + */ + public static final String MR_CLIENT_JOB_MAX_RETRIES = + MR_PREFIX + "client.job.max-retries"; + public static final int DEFAULT_MR_CLIENT_JOB_MAX_RETRIES = 0; + + /** + * How long to wait between jobclient retries on failure + */ + public static final String MR_CLIENT_JOB_RETRY_INTERVAL = + MR_PREFIX + "client.job.retry-interval"; + public static final long DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL = + 2000; + /** The staging directory for map reduce.*/ public static final String MR_AM_STAGING_DIR = MR_AM_PREFIX+"staging-dir"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index a5e76b34080..5daf66df078 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1390,6 +1390,23 @@ + + yarn.app.mapreduce.client.job.max-retries + 0 + The number of retries the client will make for getJob and + dependent calls. The default is 0 as this is generally only needed for + non-HDFS DFS where additional, high level retries are required to avoid + spurious failures during the getJob call. 30 is a good value for + WASB + + + + yarn.app.mapreduce.client.job.retry-interval + 2000 + The delay between getJob retries in ms for retries configured + with yarn.app.mapreduce.client.job.max-retries. + + CLASSPATH for MR applications. A comma-separated list of CLASSPATH entries. If mapreduce.application.framework is set then this diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java index 8dfac895e0b..84b76bfbcf6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobClientUnitTest.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapred; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; @@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskType; import org.junit.Assert; @@ -52,6 +54,42 @@ public class JobClientUnitTest { void setCluster(Cluster cluster) { this.cluster = cluster; } + + } + + public class TestJobClientGetJob extends TestJobClient { + + int lastGetJobRetriesCounter = 0; + int getJobRetriesCounter = 0; + int getJobRetries = 0; + RunningJob runningJob; + + TestJobClientGetJob(JobConf jobConf) throws IOException { + super(jobConf); + } + + public int getLastGetJobRetriesCounter() { + return lastGetJobRetriesCounter; + } + + public void setGetJobRetries(int getJobRetries) { + this.getJobRetries = getJobRetries; + } + + public void setRunningJob(RunningJob runningJob) { + this.runningJob = runningJob; + } + + protected RunningJob getJobInner(final JobID jobid) throws IOException { + if (getJobRetriesCounter >= getJobRetries) { + lastGetJobRetriesCounter = getJobRetriesCounter; + getJobRetriesCounter = 0; + return runningJob; + } + getJobRetriesCounter++; + return null; + } + } @Test @@ -124,6 +162,7 @@ public class JobClientUnitTest { JobStatus mockJobStatus = mock(JobStatus.class); when(mockJobStatus.getJobID()).thenReturn(jobID); + when(mockJobStatus.getJobName()).thenReturn(jobID.toString()); when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING); when(mockJobStatus.getStartTime()).thenReturn(startTime); when(mockJobStatus.getUsername()).thenReturn("mockuser"); @@ -181,4 +220,30 @@ public class JobClientUnitTest { assertNull(client.getJob(id)); } + @Test + public void testGetJobRetry() throws Exception { + + //To prevent the test from running for a very long time, lower the retry + JobConf conf = new JobConf(); + conf.set(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, "3"); + + TestJobClientGetJob client = new TestJobClientGetJob(conf); + JobID id = new JobID("ajob",1); + RunningJob rj = mock(RunningJob.class); + client.setRunningJob(rj); + + //no retry + assertNotNull(client.getJob(id)); + assertEquals(client.getLastGetJobRetriesCounter(), 0); + + //3 retry + client.setGetJobRetries(3); + assertNotNull(client.getJob(id)); + assertEquals(client.getLastGetJobRetriesCounter(), 3); + + //beyond MAPREDUCE_JOBCLIENT_GETJOB_MAX_RETRY_KEY, will get null + client.setGetJobRetries(5); + assertNull(client.getJob(id)); + } + }