MAPREDUCE-6566. Add retry support to mapreduce CLI tool. Contributed by Varun Vasudev

(cherry picked from commit fc470840a0)
This commit is contained in:
Xuan 2015-12-07 14:15:14 -08:00
parent c993327483
commit 3b10ff6a6f
3 changed files with 65 additions and 10 deletions

View File

@ -380,6 +380,7 @@ Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
@ -455,6 +456,8 @@ Release 2.7.2 - UNRELEASED
MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when
strategy is dynamic (Kuhu Shukla via kihwal)
MAPREDUCE-6566. Add retry support to mapreduce CLI tool. (Varun Vasudev via xgong)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -26,6 +26,7 @@
import java.util.List;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -43,6 +44,7 @@
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskReport;
@ -268,7 +270,7 @@ public int run(String[] argv) throws Exception {
System.out.println("Created job " + job.getJobID());
exitCode = 0;
} else if (getStatus) {
Job job = cluster.getJob(JobID.forName(jobid));
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
@ -283,7 +285,7 @@ public int run(String[] argv) throws Exception {
exitCode = 0;
}
} else if (getCounter) {
Job job = cluster.getJob(JobID.forName(jobid));
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
@ -299,7 +301,7 @@ public int run(String[] argv) throws Exception {
}
}
} else if (killJob) {
Job job = cluster.getJob(JobID.forName(jobid));
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
@ -323,7 +325,7 @@ public int run(String[] argv) throws Exception {
}
}
} else if (setJobPriority) {
Job job = cluster.getJob(JobID.forName(jobid));
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
@ -339,7 +341,7 @@ public int run(String[] argv) throws Exception {
viewHistory(historyFile, viewAllHistory);
exitCode = 0;
} else if (listEvents) {
listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
exitCode = 0;
} else if (listJobs) {
listJobs(cluster);
@ -354,11 +356,11 @@ public int run(String[] argv) throws Exception {
listBlacklistedTrackers(cluster);
exitCode = 0;
} else if (displayTasks) {
displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
displayTasks(getJob(JobID.forName(jobid)), taskType, taskState);
exitCode = 0;
} else if(killTask) {
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
Job job = cluster.getJob(taskID.getJobID());
Job job = getJob(taskID.getJobID());
if (job == null) {
System.out.println("Could not find job " + jobid);
} else if (job.killTask(taskID, false)) {
@ -370,7 +372,7 @@ public int run(String[] argv) throws Exception {
}
} else if(failTask) {
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
Job job = cluster.getJob(taskID.getJobID());
Job job = getJob(taskID.getJobID());
if (job == null) {
System.out.println("Could not find job " + jobid);
} else if(job.killTask(taskID, true)) {
@ -532,6 +534,29 @@ protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
}
@VisibleForTesting
Job getJob(JobID jobid) throws IOException, InterruptedException {
int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
long retryInterval = getConf()
.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
Job job = cluster.getJob(jobid);
for (int i = 0; i < maxRetry; ++i) {
if (job != null) {
return job;
}
LOG.info("Could not obtain job info after " + String.valueOf(i + 1)
+ " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000)
+ " seconds and retrying.");
Thread.sleep(retryInterval);
job = cluster.getJob(jobid);
}
return job;
}
/**
* Dump a list of currently running jobs

View File

@ -20,14 +20,19 @@
import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -44,7 +49,7 @@ public void testListAttemptIdsWithValidInput() throws Exception {
JobID jobId = JobID.forName(jobIdStr);
Cluster mockCluster = mock(Cluster.class);
Job job = mock(Job.class);
CLI cli = spy(new CLI());
CLI cli = spy(new CLI(new Configuration()));
doReturn(mockCluster).when(cli).createCluster();
when(job.getTaskReports(TaskType.MAP)).thenReturn(
@ -112,7 +117,7 @@ private TaskReport[] getTaskReports(JobID jobId, TaskType type) {
@Test
public void testJobKIll() throws Exception {
Cluster mockCluster = mock(Cluster.class);
CLI cli = spy(new CLI());
CLI cli = spy(new CLI(new Configuration()));
doReturn(mockCluster).when(cli).createCluster();
String jobId1 = "job_1234654654_001";
String jobId2 = "job_1234654654_002";
@ -149,4 +154,26 @@ private Job mockJob(Cluster mockCluster, String jobId, State jobState)
when(mockJob.getStatus()).thenReturn(status);
return mockJob;
}
@Test
public void testGetJob() throws Exception {
Configuration conf = new Configuration();
long sleepTime = 100;
conf.setLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, sleepTime);
Cluster mockCluster = mock(Cluster.class);
JobID jobId1 = JobID.forName("job_1234654654_001");
when(mockCluster.getJob(jobId1)).thenReturn(null);
for (int i = 0; i < 2; ++i) {
conf.setInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, i);
CLI cli = spy(new CLI(conf));
cli.cluster = mockCluster;
doReturn(mockCluster).when(cli).createCluster();
long start = Time.monotonicNow();
cli.getJob(jobId1);
long end = Time.monotonicNow();
Assert.assertTrue(end - start > (i * sleepTime));
Assert.assertTrue(end - start < ((i + 1) * sleepTime));
}
}
}