MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so that they don't fail on history-server backed by DFSes with not so strong guarantees. Contributed by Craig Welch.
This commit is contained in:
parent
fe0df59627
commit
f24452d14e
|
@ -328,6 +328,7 @@ Release 2.8.0 - UNRELEASED
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
MAPREDUCE-6314. TestPipeApplication fails on trunk.
|
MAPREDUCE-6314. TestPipeApplication fails on trunk.
|
||||||
(Varun Vasudev via harsh)
|
(Varun Vasudev via harsh)
|
||||||
|
|
||||||
|
@ -450,6 +451,10 @@ Release 2.7.1 - UNRELEASED
|
||||||
MAPREDUCE-6259. IllegalArgumentException due to missing job submit time
|
MAPREDUCE-6259. IllegalArgumentException due to missing job submit time
|
||||||
(zhihai xu via jlowe)
|
(zhihai xu via jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-6251. Added a new config for JobClient to retry JobStatus calls so
|
||||||
|
that they don't fail on history-server backed by DFSes with not so strong
|
||||||
|
guarantees. (Craig Welch via vinodkv)
|
||||||
|
|
||||||
Release 2.7.0 - 2015-04-20
|
Release 2.7.0 - 2015-04-20
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
|
||||||
import org.apache.hadoop.mapreduce.Cluster;
|
import org.apache.hadoop.mapreduce.Cluster;
|
||||||
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.QueueInfo;
|
import org.apache.hadoop.mapreduce.QueueInfo;
|
||||||
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
@ -154,6 +155,10 @@ public class JobClient extends CLI {
|
||||||
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
|
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
|
||||||
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
|
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
|
||||||
|
|
||||||
|
private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES;
|
||||||
|
private long retryInterval =
|
||||||
|
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL;
|
||||||
|
|
||||||
static{
|
static{
|
||||||
ConfigUtil.loadResources();
|
ConfigUtil.loadResources();
|
||||||
}
|
}
|
||||||
|
@ -469,6 +474,14 @@ public class JobClient extends CLI {
|
||||||
setConf(conf);
|
setConf(conf);
|
||||||
cluster = new Cluster(conf);
|
cluster = new Cluster(conf);
|
||||||
clientUgi = UserGroupInformation.getCurrentUser();
|
clientUgi = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
|
maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
|
||||||
|
MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
|
||||||
|
|
||||||
|
retryInterval =
|
||||||
|
conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
|
||||||
|
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -581,16 +594,8 @@ public class JobClient extends CLI {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* Get an {@link RunningJob} object to track an ongoing job. Returns
|
protected RunningJob getJobInner(final JobID jobid) throws IOException {
|
||||||
* null if the id does not correspond to any known job.
|
|
||||||
*
|
|
||||||
* @param jobid the jobid of the job.
|
|
||||||
* @return the {@link RunningJob} handle to track the job, null if the
|
|
||||||
* <code>jobid</code> doesn't correspond to any known job.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public RunningJob getJob(final JobID jobid) throws IOException {
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
Job job = getJobUsingCluster(jobid);
|
Job job = getJobUsingCluster(jobid);
|
||||||
|
@ -607,7 +612,31 @@ public class JobClient extends CLI {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**@deprecated Applications should rather use {@link #getJob(JobID)}.
|
/**
|
||||||
|
* Get an {@link RunningJob} object to track an ongoing job. Returns
|
||||||
|
* null if the id does not correspond to any known job.
|
||||||
|
*
|
||||||
|
* @param jobid the jobid of the job.
|
||||||
|
* @return the {@link RunningJob} handle to track the job, null if the
|
||||||
|
* <code>jobid</code> doesn't correspond to any known job.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public RunningJob getJob(final JobID jobid) throws IOException {
|
||||||
|
for (int i = 0;i <= maxRetry;i++) {
|
||||||
|
if (i > 0) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(retryInterval);
|
||||||
|
} catch (Exception e) { }
|
||||||
|
}
|
||||||
|
RunningJob job = getJobInner(jobid);
|
||||||
|
if (job != null) {
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**@deprecated Applications should rather use {@link #getJob(JobID)}.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public RunningJob getJob(String jobid) throws IOException {
|
public RunningJob getJob(String jobid) throws IOException {
|
||||||
|
|
|
@ -469,6 +469,21 @@ public interface MRJobConfig {
|
||||||
MR_PREFIX + "client.max-retries";
|
MR_PREFIX + "client.max-retries";
|
||||||
public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
|
public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many times to retry jobclient calls (via getjob)
|
||||||
|
*/
|
||||||
|
public static final String MR_CLIENT_JOB_MAX_RETRIES =
|
||||||
|
MR_PREFIX + "client.job.max-retries";
|
||||||
|
public static final int DEFAULT_MR_CLIENT_JOB_MAX_RETRIES = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How long to wait between jobclient retries on failure
|
||||||
|
*/
|
||||||
|
public static final String MR_CLIENT_JOB_RETRY_INTERVAL =
|
||||||
|
MR_PREFIX + "client.job.retry-interval";
|
||||||
|
public static final long DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL =
|
||||||
|
2000;
|
||||||
|
|
||||||
/** The staging directory for map reduce.*/
|
/** The staging directory for map reduce.*/
|
||||||
public static final String MR_AM_STAGING_DIR =
|
public static final String MR_AM_STAGING_DIR =
|
||||||
MR_AM_PREFIX+"staging-dir";
|
MR_AM_PREFIX+"staging-dir";
|
||||||
|
|
|
@ -1390,6 +1390,23 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.app.mapreduce.client.job.max-retries</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>The number of retries the client will make for getJob and
|
||||||
|
dependent calls. The default is 0 as this is generally only needed for
|
||||||
|
non-HDFS DFS where additional, high level retries are required to avoid
|
||||||
|
spurious failures during the getJob call. 30 is a good value for
|
||||||
|
WASB</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.app.mapreduce.client.job.retry-interval</name>
|
||||||
|
<value>2000</value>
|
||||||
|
<description>The delay between getJob retries in ms for retries configured
|
||||||
|
with yarn.app.mapreduce.client.job.max-retries.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>CLASSPATH for MR applications. A comma-separated list
|
<description>CLASSPATH for MR applications. A comma-separated list
|
||||||
of CLASSPATH entries. If mapreduce.application.framework is set then this
|
of CLASSPATH entries. If mapreduce.application.framework is set then this
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.atLeastOnce;
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.Cluster;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobPriority;
|
import org.apache.hadoop.mapreduce.JobPriority;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskReport;
|
import org.apache.hadoop.mapreduce.TaskReport;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -52,6 +54,42 @@ public class JobClientUnitTest {
|
||||||
void setCluster(Cluster cluster) {
|
void setCluster(Cluster cluster) {
|
||||||
this.cluster = cluster;
|
this.cluster = cluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public class TestJobClientGetJob extends TestJobClient {
|
||||||
|
|
||||||
|
int lastGetJobRetriesCounter = 0;
|
||||||
|
int getJobRetriesCounter = 0;
|
||||||
|
int getJobRetries = 0;
|
||||||
|
RunningJob runningJob;
|
||||||
|
|
||||||
|
TestJobClientGetJob(JobConf jobConf) throws IOException {
|
||||||
|
super(jobConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getLastGetJobRetriesCounter() {
|
||||||
|
return lastGetJobRetriesCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGetJobRetries(int getJobRetries) {
|
||||||
|
this.getJobRetries = getJobRetries;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRunningJob(RunningJob runningJob) {
|
||||||
|
this.runningJob = runningJob;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RunningJob getJobInner(final JobID jobid) throws IOException {
|
||||||
|
if (getJobRetriesCounter >= getJobRetries) {
|
||||||
|
lastGetJobRetriesCounter = getJobRetriesCounter;
|
||||||
|
getJobRetriesCounter = 0;
|
||||||
|
return runningJob;
|
||||||
|
}
|
||||||
|
getJobRetriesCounter++;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -124,6 +162,7 @@ public class JobClientUnitTest {
|
||||||
|
|
||||||
JobStatus mockJobStatus = mock(JobStatus.class);
|
JobStatus mockJobStatus = mock(JobStatus.class);
|
||||||
when(mockJobStatus.getJobID()).thenReturn(jobID);
|
when(mockJobStatus.getJobID()).thenReturn(jobID);
|
||||||
|
when(mockJobStatus.getJobName()).thenReturn(jobID.toString());
|
||||||
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
|
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
|
||||||
when(mockJobStatus.getStartTime()).thenReturn(startTime);
|
when(mockJobStatus.getStartTime()).thenReturn(startTime);
|
||||||
when(mockJobStatus.getUsername()).thenReturn("mockuser");
|
when(mockJobStatus.getUsername()).thenReturn("mockuser");
|
||||||
|
@ -181,4 +220,30 @@ public class JobClientUnitTest {
|
||||||
assertNull(client.getJob(id));
|
assertNull(client.getJob(id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetJobRetry() throws Exception {
|
||||||
|
|
||||||
|
//To prevent the test from running for a very long time, lower the retry
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
conf.set(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, "3");
|
||||||
|
|
||||||
|
TestJobClientGetJob client = new TestJobClientGetJob(conf);
|
||||||
|
JobID id = new JobID("ajob",1);
|
||||||
|
RunningJob rj = mock(RunningJob.class);
|
||||||
|
client.setRunningJob(rj);
|
||||||
|
|
||||||
|
//no retry
|
||||||
|
assertNotNull(client.getJob(id));
|
||||||
|
assertEquals(client.getLastGetJobRetriesCounter(), 0);
|
||||||
|
|
||||||
|
//3 retry
|
||||||
|
client.setGetJobRetries(3);
|
||||||
|
assertNotNull(client.getJob(id));
|
||||||
|
assertEquals(client.getLastGetJobRetriesCounter(), 3);
|
||||||
|
|
||||||
|
//beyond MAPREDUCE_JOBCLIENT_GETJOB_MAX_RETRY_KEY, will get null
|
||||||
|
client.setGetJobRetries(5);
|
||||||
|
assertNull(client.getJob(id));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue