MAPREDUCE-6566. Add retry support to mapreduce CLI tool. Contributed by Varun Vasudev
This commit is contained in:
parent
91828fef6b
commit
d4e766de93
|
@ -451,6 +451,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
MAPREDUCE-5870. Support for passing Job priority through Application
|
MAPREDUCE-5870. Support for passing Job priority through Application
|
||||||
Submission Context in Mapreduce Side (Sunil G via jlowe)
|
Submission Context in Mapreduce Side (Sunil G via jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-6566. Add retry support to mapreduce CLI tool.
|
||||||
|
(Varun Vasudev via xgong)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via
|
MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Set;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.JobPriority;
|
import org.apache.hadoop.mapreduce.JobPriority;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.TaskReport;
|
import org.apache.hadoop.mapreduce.TaskReport;
|
||||||
|
@ -268,7 +270,7 @@ public class CLI extends Configured implements Tool {
|
||||||
System.out.println("Created job " + job.getJobID());
|
System.out.println("Created job " + job.getJobID());
|
||||||
exitCode = 0;
|
exitCode = 0;
|
||||||
} else if (getStatus) {
|
} else if (getStatus) {
|
||||||
Job job = cluster.getJob(JobID.forName(jobid));
|
Job job = getJob(JobID.forName(jobid));
|
||||||
if (job == null) {
|
if (job == null) {
|
||||||
System.out.println("Could not find job " + jobid);
|
System.out.println("Could not find job " + jobid);
|
||||||
} else {
|
} else {
|
||||||
|
@ -283,7 +285,7 @@ public class CLI extends Configured implements Tool {
|
||||||
exitCode = 0;
|
exitCode = 0;
|
||||||
}
|
}
|
||||||
} else if (getCounter) {
|
} else if (getCounter) {
|
||||||
Job job = cluster.getJob(JobID.forName(jobid));
|
Job job = getJob(JobID.forName(jobid));
|
||||||
if (job == null) {
|
if (job == null) {
|
||||||
System.out.println("Could not find job " + jobid);
|
System.out.println("Could not find job " + jobid);
|
||||||
} else {
|
} else {
|
||||||
|
@ -299,7 +301,7 @@ public class CLI extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (killJob) {
|
} else if (killJob) {
|
||||||
Job job = cluster.getJob(JobID.forName(jobid));
|
Job job = getJob(JobID.forName(jobid));
|
||||||
if (job == null) {
|
if (job == null) {
|
||||||
System.out.println("Could not find job " + jobid);
|
System.out.println("Could not find job " + jobid);
|
||||||
} else {
|
} else {
|
||||||
|
@ -323,7 +325,7 @@ public class CLI extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (setJobPriority) {
|
} else if (setJobPriority) {
|
||||||
Job job = cluster.getJob(JobID.forName(jobid));
|
Job job = getJob(JobID.forName(jobid));
|
||||||
if (job == null) {
|
if (job == null) {
|
||||||
System.out.println("Could not find job " + jobid);
|
System.out.println("Could not find job " + jobid);
|
||||||
} else {
|
} else {
|
||||||
|
@ -339,7 +341,7 @@ public class CLI extends Configured implements Tool {
|
||||||
viewHistory(historyFile, viewAllHistory);
|
viewHistory(historyFile, viewAllHistory);
|
||||||
exitCode = 0;
|
exitCode = 0;
|
||||||
} else if (listEvents) {
|
} else if (listEvents) {
|
||||||
listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
|
listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
|
||||||
exitCode = 0;
|
exitCode = 0;
|
||||||
} else if (listJobs) {
|
} else if (listJobs) {
|
||||||
listJobs(cluster);
|
listJobs(cluster);
|
||||||
|
@ -354,11 +356,11 @@ public class CLI extends Configured implements Tool {
|
||||||
listBlacklistedTrackers(cluster);
|
listBlacklistedTrackers(cluster);
|
||||||
exitCode = 0;
|
exitCode = 0;
|
||||||
} else if (displayTasks) {
|
} else if (displayTasks) {
|
||||||
displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
|
displayTasks(getJob(JobID.forName(jobid)), taskType, taskState);
|
||||||
exitCode = 0;
|
exitCode = 0;
|
||||||
} else if(killTask) {
|
} else if(killTask) {
|
||||||
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
||||||
Job job = cluster.getJob(taskID.getJobID());
|
Job job = getJob(taskID.getJobID());
|
||||||
if (job == null) {
|
if (job == null) {
|
||||||
System.out.println("Could not find job " + jobid);
|
System.out.println("Could not find job " + jobid);
|
||||||
} else if (job.killTask(taskID, false)) {
|
} else if (job.killTask(taskID, false)) {
|
||||||
|
@ -370,7 +372,7 @@ public class CLI extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
} else if(failTask) {
|
} else if(failTask) {
|
||||||
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
||||||
Job job = cluster.getJob(taskID.getJobID());
|
Job job = getJob(taskID.getJobID());
|
||||||
if (job == null) {
|
if (job == null) {
|
||||||
System.out.println("Could not find job " + jobid);
|
System.out.println("Could not find job " + jobid);
|
||||||
} else if(job.killTask(taskID, true)) {
|
} else if(job.killTask(taskID, true)) {
|
||||||
|
@ -531,6 +533,29 @@ public class CLI extends Configured implements Tool {
|
||||||
protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
|
protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
|
||||||
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
|
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Job getJob(JobID jobid) throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
|
||||||
|
MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
|
||||||
|
long retryInterval = getConf()
|
||||||
|
.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
|
||||||
|
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
|
||||||
|
Job job = cluster.getJob(jobid);
|
||||||
|
|
||||||
|
for (int i = 0; i < maxRetry; ++i) {
|
||||||
|
if (job != null) {
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
LOG.info("Could not obtain job info after " + String.valueOf(i + 1)
|
||||||
|
+ " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000)
|
||||||
|
+ " seconds and retrying.");
|
||||||
|
Thread.sleep(retryInterval);
|
||||||
|
job = cluster.getJob(jobid);
|
||||||
|
}
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -20,14 +20,19 @@ package org.apache.hadoop.mapreduce.tools;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.Cluster;
|
import org.apache.hadoop.mapreduce.Cluster;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskReport;
|
import org.apache.hadoop.mapreduce.TaskReport;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.JobPriority;
|
import org.apache.hadoop.mapreduce.JobPriority;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus.State;
|
import org.apache.hadoop.mapreduce.JobStatus.State;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -44,7 +49,7 @@ public class TestCLI {
|
||||||
JobID jobId = JobID.forName(jobIdStr);
|
JobID jobId = JobID.forName(jobIdStr);
|
||||||
Cluster mockCluster = mock(Cluster.class);
|
Cluster mockCluster = mock(Cluster.class);
|
||||||
Job job = mock(Job.class);
|
Job job = mock(Job.class);
|
||||||
CLI cli = spy(new CLI());
|
CLI cli = spy(new CLI(new Configuration()));
|
||||||
|
|
||||||
doReturn(mockCluster).when(cli).createCluster();
|
doReturn(mockCluster).when(cli).createCluster();
|
||||||
when(job.getTaskReports(TaskType.MAP)).thenReturn(
|
when(job.getTaskReports(TaskType.MAP)).thenReturn(
|
||||||
|
@ -112,7 +117,7 @@ public class TestCLI {
|
||||||
@Test
|
@Test
|
||||||
public void testJobKIll() throws Exception {
|
public void testJobKIll() throws Exception {
|
||||||
Cluster mockCluster = mock(Cluster.class);
|
Cluster mockCluster = mock(Cluster.class);
|
||||||
CLI cli = spy(new CLI());
|
CLI cli = spy(new CLI(new Configuration()));
|
||||||
doReturn(mockCluster).when(cli).createCluster();
|
doReturn(mockCluster).when(cli).createCluster();
|
||||||
String jobId1 = "job_1234654654_001";
|
String jobId1 = "job_1234654654_001";
|
||||||
String jobId2 = "job_1234654654_002";
|
String jobId2 = "job_1234654654_002";
|
||||||
|
@ -149,4 +154,26 @@ public class TestCLI {
|
||||||
when(mockJob.getStatus()).thenReturn(status);
|
when(mockJob.getStatus()).thenReturn(status);
|
||||||
return mockJob;
|
return mockJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetJob() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
long sleepTime = 100;
|
||||||
|
conf.setLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, sleepTime);
|
||||||
|
Cluster mockCluster = mock(Cluster.class);
|
||||||
|
JobID jobId1 = JobID.forName("job_1234654654_001");
|
||||||
|
when(mockCluster.getJob(jobId1)).thenReturn(null);
|
||||||
|
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
conf.setInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, i);
|
||||||
|
CLI cli = spy(new CLI(conf));
|
||||||
|
cli.cluster = mockCluster;
|
||||||
|
doReturn(mockCluster).when(cli).createCluster();
|
||||||
|
long start = Time.monotonicNow();
|
||||||
|
cli.getJob(jobId1);
|
||||||
|
long end = Time.monotonicNow();
|
||||||
|
Assert.assertTrue(end - start > (i * sleepTime));
|
||||||
|
Assert.assertTrue(end - start < ((i + 1) * sleepTime));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue