MAPREDUCE-2756. Better error handling in JobControl for failed jobs. Contributed by Robert Evans.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1164255 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c5ef34a1e7
commit
a9bf33445e
|
@ -1184,6 +1184,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-2917. Fixed corner case in container reservation which led to
|
||||
starvation and hung jobs. (acmurthy)
|
||||
|
||||
MAPREDUCE-2756. Better error handling in JobControl for failed jobs.
|
||||
(Robert Evans via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -47,6 +49,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class ControlledJob {
|
||||
private static final Log LOG = LogFactory.getLog(ControlledJob.class);
|
||||
|
||||
// A job will be in one of the following states
|
||||
public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
|
||||
|
@ -235,6 +238,17 @@ public class ControlledJob {
|
|||
job.killJob();
|
||||
}
|
||||
|
||||
public synchronized void failJob(String message) throws IOException, InterruptedException {
|
||||
try {
|
||||
if(job != null && this.state == State.RUNNING) {
|
||||
job.killJob();
|
||||
}
|
||||
} finally {
|
||||
this.state = State.FAILED;
|
||||
this.message = message;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the state of this running job. The state may
|
||||
* remain the same, become SUCCESS or FAILED.
|
||||
|
@ -322,6 +336,7 @@ public class ControlledJob {
|
|||
job.submit();
|
||||
this.state = State.RUNNING;
|
||||
} catch (Exception ioe) {
|
||||
LOG.info(getJobName()+" got an error while submitting ",ioe);
|
||||
this.state = State.FAILED;
|
||||
this.message = StringUtils.stringifyException(ioe);
|
||||
}
|
||||
|
|
|
@ -21,13 +21,16 @@ package org.apache.hadoop.mapreduce.lib.jobcontrol;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Hashtable;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* This class encapsulates a set of MapReduce jobs and its dependency.
|
||||
|
@ -49,17 +52,16 @@ import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class JobControl implements Runnable {
|
||||
private static final Log LOG = LogFactory.getLog(JobControl.class);
|
||||
|
||||
// The thread can be in one of the following state
|
||||
public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
|
||||
|
||||
private ThreadState runnerState; // the thread state
|
||||
|
||||
private Map<String, ControlledJob> waitingJobs;
|
||||
private Map<String, ControlledJob> readyJobs;
|
||||
private Map<String, ControlledJob> runningJobs;
|
||||
private Map<String, ControlledJob> successfulJobs;
|
||||
private Map<String, ControlledJob> failedJobs;
|
||||
private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
|
||||
private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
|
||||
private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
|
||||
|
||||
private long nextJobID;
|
||||
private String groupName;
|
||||
|
@ -69,46 +71,51 @@ public class JobControl implements Runnable {
|
|||
* @param groupName a name identifying this group
|
||||
*/
|
||||
public JobControl(String groupName) {
|
||||
this.waitingJobs = new Hashtable<String, ControlledJob>();
|
||||
this.readyJobs = new Hashtable<String, ControlledJob>();
|
||||
this.runningJobs = new Hashtable<String, ControlledJob>();
|
||||
this.successfulJobs = new Hashtable<String, ControlledJob>();
|
||||
this.failedJobs = new Hashtable<String, ControlledJob>();
|
||||
this.nextJobID = -1;
|
||||
this.groupName = groupName;
|
||||
this.runnerState = ThreadState.READY;
|
||||
}
|
||||
|
||||
private static List<ControlledJob> toList(
|
||||
Map<String, ControlledJob> jobs) {
|
||||
LinkedList<ControlledJob> jobs) {
|
||||
ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
|
||||
synchronized (jobs) {
|
||||
for (ControlledJob job : jobs.values()) {
|
||||
for (ControlledJob job : jobs) {
|
||||
retv.add(job);
|
||||
}
|
||||
}
|
||||
return retv;
|
||||
}
|
||||
|
||||
synchronized private List<ControlledJob> getJobsIn(State state) {
|
||||
LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
|
||||
for(ControlledJob j: jobsInProgress) {
|
||||
if(j.getJobState() == state) {
|
||||
l.add(j);
|
||||
}
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the jobs in the waiting state
|
||||
*/
|
||||
public List<ControlledJob> getWaitingJobList() {
|
||||
return toList(this.waitingJobs);
|
||||
return getJobsIn(State.WAITING);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the jobs in the running state
|
||||
*/
|
||||
public List<ControlledJob> getRunningJobList() {
|
||||
return toList(this.runningJobs);
|
||||
return getJobsIn(State.RUNNING);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the jobs in the ready state
|
||||
*/
|
||||
public List<ControlledJob> getReadyJobsList() {
|
||||
return toList(this.readyJobs);
|
||||
return getJobsIn(State.READY);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -127,34 +134,6 @@ public class JobControl implements Runnable {
|
|||
return this.groupName + this.nextJobID;
|
||||
}
|
||||
|
||||
private static void addToQueue(ControlledJob aJob,
|
||||
Map<String, ControlledJob> queue) {
|
||||
synchronized(queue) {
|
||||
queue.put(aJob.getJobID(), aJob);
|
||||
}
|
||||
}
|
||||
|
||||
private void addToQueue(ControlledJob aJob) {
|
||||
Map<String, ControlledJob> queue = getQueue(aJob.getJobState());
|
||||
addToQueue(aJob, queue);
|
||||
}
|
||||
|
||||
private Map<String, ControlledJob> getQueue(State state) {
|
||||
Map<String, ControlledJob> retv = null;
|
||||
if (state == State.WAITING) {
|
||||
retv = this.waitingJobs;
|
||||
} else if (state == State.READY) {
|
||||
retv = this.readyJobs;
|
||||
} else if (state == State.RUNNING) {
|
||||
retv = this.runningJobs;
|
||||
} else if (state == State.SUCCESS) {
|
||||
retv = this.successfulJobs;
|
||||
} else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
|
||||
retv = this.failedJobs;
|
||||
}
|
||||
return retv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new job.
|
||||
* @param aJob the new job
|
||||
|
@ -163,7 +142,7 @@ public class JobControl implements Runnable {
|
|||
String id = this.getNextJobID();
|
||||
aJob.setJobID(id);
|
||||
aJob.setJobState(State.WAITING);
|
||||
this.addToQueue(aJob);
|
||||
jobsInProgress.add(aJob);
|
||||
return id;
|
||||
}
|
||||
|
||||
|
@ -211,47 +190,8 @@ public class JobControl implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
synchronized private void checkRunningJobs()
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
Map<String, ControlledJob> oldJobs = null;
|
||||
oldJobs = this.runningJobs;
|
||||
this.runningJobs = new Hashtable<String, ControlledJob>();
|
||||
|
||||
for (ControlledJob nextJob : oldJobs.values()) {
|
||||
nextJob.checkState();
|
||||
this.addToQueue(nextJob);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized private void checkWaitingJobs()
|
||||
throws IOException, InterruptedException {
|
||||
Map<String, ControlledJob> oldJobs = null;
|
||||
oldJobs = this.waitingJobs;
|
||||
this.waitingJobs = new Hashtable<String, ControlledJob>();
|
||||
|
||||
for (ControlledJob nextJob : oldJobs.values()) {
|
||||
nextJob.checkState();
|
||||
this.addToQueue(nextJob);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized private void startReadyJobs() {
|
||||
Map<String, ControlledJob> oldJobs = null;
|
||||
oldJobs = this.readyJobs;
|
||||
this.readyJobs = new Hashtable<String, ControlledJob>();
|
||||
|
||||
for (ControlledJob nextJob : oldJobs.values()) {
|
||||
//Submitting Job to Hadoop
|
||||
nextJob.submit();
|
||||
this.addToQueue(nextJob);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized public boolean allFinished() {
|
||||
return this.waitingJobs.size() == 0 &&
|
||||
this.readyJobs.size() == 0 &&
|
||||
this.runningJobs.size() == 0;
|
||||
return jobsInProgress.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -262,39 +202,83 @@ public class JobControl implements Runnable {
|
|||
* Submit the jobs in ready state
|
||||
*/
|
||||
public void run() {
|
||||
this.runnerState = ThreadState.RUNNING;
|
||||
while (true) {
|
||||
while (this.runnerState == ThreadState.SUSPENDED) {
|
||||
try {
|
||||
this.runnerState = ThreadState.RUNNING;
|
||||
while (true) {
|
||||
while (this.runnerState == ThreadState.SUSPENDED) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
catch (Exception e) {
|
||||
//TODO the thread was interrupted, do something!!!
|
||||
}
|
||||
}
|
||||
|
||||
synchronized(this) {
|
||||
Iterator<ControlledJob> it = jobsInProgress.iterator();
|
||||
while(it.hasNext()) {
|
||||
ControlledJob j = it.next();
|
||||
LOG.debug("Checking state of job "+j);
|
||||
switch(j.checkState()) {
|
||||
case SUCCESS:
|
||||
successfulJobs.add(j);
|
||||
it.remove();
|
||||
break;
|
||||
case FAILED:
|
||||
case DEPENDENT_FAILED:
|
||||
failedJobs.add(j);
|
||||
it.remove();
|
||||
break;
|
||||
case READY:
|
||||
j.submit();
|
||||
break;
|
||||
case RUNNING:
|
||||
case WAITING:
|
||||
//Do Nothing
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.runnerState != ThreadState.RUNNING &&
|
||||
this.runnerState != ThreadState.SUSPENDED) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
||||
//TODO the thread was interrupted, do something!!!
|
||||
}
|
||||
if (this.runnerState != ThreadState.RUNNING &&
|
||||
this.runnerState != ThreadState.SUSPENDED) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
try {
|
||||
checkRunningJobs();
|
||||
checkWaitingJobs();
|
||||
startReadyJobs();
|
||||
} catch (Exception e) {
|
||||
this.runnerState = ThreadState.STOPPED;
|
||||
}
|
||||
if (this.runnerState != ThreadState.RUNNING &&
|
||||
this.runnerState != ThreadState.SUSPENDED) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
||||
}
|
||||
if (this.runnerState != ThreadState.RUNNING &&
|
||||
this.runnerState != ThreadState.SUSPENDED) {
|
||||
break;
|
||||
}
|
||||
}catch(Throwable t) {
|
||||
LOG.error("Error while trying to run jobs.",t);
|
||||
//Mark all jobs as failed because we got something bad.
|
||||
failAllJobs(t);
|
||||
}
|
||||
this.runnerState = ThreadState.STOPPED;
|
||||
}
|
||||
|
||||
synchronized private void failAllJobs(Throwable t) {
|
||||
String message = "Unexpected System Error Occured: "+
|
||||
StringUtils.stringifyException(t);
|
||||
Iterator<ControlledJob> it = jobsInProgress.iterator();
|
||||
while(it.hasNext()) {
|
||||
ControlledJob j = it.next();
|
||||
try {
|
||||
j.failJob(message);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while tyring to clean up "+j.getJobName(), e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Error while tyring to clean up "+j.getJobName(), e);
|
||||
} finally {
|
||||
failedJobs.add(j);
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -81,6 +82,29 @@ public class TestMapReduceJobControlWithMocks {
|
|||
jobControl.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorWhileSubmitting() throws Exception {
|
||||
JobControl jobControl = new JobControl("Test");
|
||||
|
||||
Job mockJob = mock(Job.class);
|
||||
|
||||
ControlledJob job1 = new ControlledJob(mockJob, null);
|
||||
when(mockJob.getConfiguration()).thenReturn(new Configuration());
|
||||
doThrow(new IncompatibleClassChangeError("This is a test")).when(mockJob).submit();
|
||||
|
||||
jobControl.addJob(job1);
|
||||
|
||||
runJobControl(jobControl);
|
||||
try {
|
||||
assertEquals("Success list", 0, jobControl.getSuccessfulJobList().size());
|
||||
assertEquals("Failed list", 1, jobControl.getFailedJobList().size());
|
||||
|
||||
assertTrue(job1.getJobState() == ControlledJob.State.FAILED);
|
||||
} finally {
|
||||
jobControl.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillJob() throws Exception {
|
||||
JobControl jobControl = new JobControl("Test");
|
||||
|
|
Loading…
Reference in New Issue