MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so that they don't fail on history-server backed by DFSes with not so strong guarantees. Contributed by Craig Welch.

(cherry picked from commit f24452d14e)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-05-12 12:11:42 -07:00
parent 9d6c63f8be
commit d5755ba241
5 changed files with 142 additions and 11 deletions

View File

@ -73,6 +73,7 @@ Release 2.8.0 - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
MAPREDUCE-6314. TestPipeApplication fails on trunk. MAPREDUCE-6314. TestPipeApplication fails on trunk.
(Varun Vasudev via harsh) (Varun Vasudev via harsh)
@ -195,6 +196,10 @@ Release 2.7.1 - UNRELEASED
MAPREDUCE-6259. IllegalArgumentException due to missing job submit time MAPREDUCE-6259. IllegalArgumentException due to missing job submit time
(zhihai xu via jlowe) (zhihai xu via jlowe)
MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so
that they don't fail on history-server backed by DFSes with not so strong
guarantees. (Craig Welch via vinodkv)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
@ -154,6 +155,10 @@ public class JobClient extends CLI {
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES;
private long retryInterval =
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL;
static{ static{
ConfigUtil.loadResources(); ConfigUtil.loadResources();
} }
@ -469,6 +474,14 @@ public class JobClient extends CLI {
setConf(conf); setConf(conf);
cluster = new Cluster(conf); cluster = new Cluster(conf);
clientUgi = UserGroupInformation.getCurrentUser(); clientUgi = UserGroupInformation.getCurrentUser();
maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
retryInterval =
conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
} }
/** /**
@ -581,16 +594,8 @@ public class JobClient extends CLI {
} }
}); });
} }
/**
* Get an {@link RunningJob} object to track an ongoing job. Returns protected RunningJob getJobInner(final JobID jobid) throws IOException {
* null if the id does not correspond to any known job.
*
* @param jobid the jobid of the job.
* @return the {@link RunningJob} handle to track the job, null if the
* <code>jobid</code> doesn't correspond to any known job.
* @throws IOException
*/
public RunningJob getJob(final JobID jobid) throws IOException {
try { try {
Job job = getJobUsingCluster(jobid); Job job = getJobUsingCluster(jobid);
@ -607,6 +612,30 @@ public class JobClient extends CLI {
return null; return null;
} }
/**
* Get an {@link RunningJob} object to track an ongoing job. Returns
* null if the id does not correspond to any known job.
*
* @param jobid the jobid of the job.
* @return the {@link RunningJob} handle to track the job, null if the
* <code>jobid</code> doesn't correspond to any known job.
* @throws IOException
*/
public RunningJob getJob(final JobID jobid) throws IOException {
for (int i = 0;i <= maxRetry;i++) {
if (i > 0) {
try {
Thread.sleep(retryInterval);
} catch (Exception e) { }
}
RunningJob job = getJobInner(jobid);
if (job != null) {
return job;
}
}
return null;
}
/**@deprecated Applications should rather use {@link #getJob(JobID)}. /**@deprecated Applications should rather use {@link #getJob(JobID)}.
*/ */
@Deprecated @Deprecated

View File

@ -461,6 +461,21 @@ public interface MRJobConfig {
MR_PREFIX + "client.max-retries"; MR_PREFIX + "client.max-retries";
public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3; public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
/**
* How many times to retry jobclient calls (via getjob)
*/
public static final String MR_CLIENT_JOB_MAX_RETRIES =
MR_PREFIX + "client.job.max-retries";
public static final int DEFAULT_MR_CLIENT_JOB_MAX_RETRIES = 0;
/**
* How long to wait between jobclient retries on failure
*/
public static final String MR_CLIENT_JOB_RETRY_INTERVAL =
MR_PREFIX + "client.job.retry-interval";
public static final long DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL =
2000;
/** The staging directory for map reduce.*/ /** The staging directory for map reduce.*/
public static final String MR_AM_STAGING_DIR = public static final String MR_AM_STAGING_DIR =
MR_AM_PREFIX+"staging-dir"; MR_AM_PREFIX+"staging-dir";

View File

@ -1400,6 +1400,23 @@
</description> </description>
</property> </property>
<property>
<name>yarn.app.mapreduce.client.job.max-retries</name>
<value>0</value>
<description>The number of retries the client will make for getJob and
dependent calls. The default is 0 as this is generally only needed for
non-HDFS DFS where additional, high level retries are required to avoid
spurious failures during the getJob call. 30 is a good value for
WASB</description>
</property>
<property>
<name>yarn.app.mapreduce.client.job.retry-interval</name>
<value>2000</value>
<description>The delay between getJob retries in ms for retries configured
with yarn.app.mapreduce.client.job.max-retries.</description>
</property>
<property> <property>
<description>CLASSPATH for MR applications. A comma-separated list <description>CLASSPATH for MR applications. A comma-separated list
of CLASSPATH entries. If mapreduce.application.framework is set then this of CLASSPATH entries. If mapreduce.application.framework is set then this

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Assert; import org.junit.Assert;
@ -52,6 +54,42 @@ public class JobClientUnitTest {
void setCluster(Cluster cluster) { void setCluster(Cluster cluster) {
this.cluster = cluster; this.cluster = cluster;
} }
}
public class TestJobClientGetJob extends TestJobClient {
int lastGetJobRetriesCounter = 0;
int getJobRetriesCounter = 0;
int getJobRetries = 0;
RunningJob runningJob;
TestJobClientGetJob(JobConf jobConf) throws IOException {
super(jobConf);
}
public int getLastGetJobRetriesCounter() {
return lastGetJobRetriesCounter;
}
public void setGetJobRetries(int getJobRetries) {
this.getJobRetries = getJobRetries;
}
public void setRunningJob(RunningJob runningJob) {
this.runningJob = runningJob;
}
protected RunningJob getJobInner(final JobID jobid) throws IOException {
if (getJobRetriesCounter >= getJobRetries) {
lastGetJobRetriesCounter = getJobRetriesCounter;
getJobRetriesCounter = 0;
return runningJob;
}
getJobRetriesCounter++;
return null;
}
} }
@Test @Test
@ -124,6 +162,7 @@ public class JobClientUnitTest {
JobStatus mockJobStatus = mock(JobStatus.class); JobStatus mockJobStatus = mock(JobStatus.class);
when(mockJobStatus.getJobID()).thenReturn(jobID); when(mockJobStatus.getJobID()).thenReturn(jobID);
when(mockJobStatus.getJobName()).thenReturn(jobID.toString());
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING); when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
when(mockJobStatus.getStartTime()).thenReturn(startTime); when(mockJobStatus.getStartTime()).thenReturn(startTime);
when(mockJobStatus.getUsername()).thenReturn("mockuser"); when(mockJobStatus.getUsername()).thenReturn("mockuser");
@ -181,4 +220,30 @@ public class JobClientUnitTest {
assertNull(client.getJob(id)); assertNull(client.getJob(id));
} }
@Test
public void testGetJobRetry() throws Exception {
//To prevent the test from running for a very long time, lower the retry
JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, "3");
TestJobClientGetJob client = new TestJobClientGetJob(conf);
JobID id = new JobID("ajob",1);
RunningJob rj = mock(RunningJob.class);
client.setRunningJob(rj);
//no retry
assertNotNull(client.getJob(id));
assertEquals(client.getLastGetJobRetriesCounter(), 0);
//3 retry
client.setGetJobRetries(3);
assertNotNull(client.getJob(id));
assertEquals(client.getLastGetJobRetriesCounter(), 3);
//beyond MAPREDUCE_JOBCLIENT_GETJOB_MAX_RETRY_KEY, will get null
client.setGetJobRetries(5);
assertNull(client.getJob(id));
}
} }