Merge -r 1164254:1164255 from trunk to branch-0.23 to fix MAPREDUCE-2756.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1164258 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-01 20:26:12 +00:00
parent 8019d90630
commit e5e45c4f1b
4 changed files with 137 additions and 111 deletions

View File

@ -1182,6 +1182,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2917. Fixed corner case in container reservation which led to MAPREDUCE-2917. Fixed corner case in container reservation which led to
starvation and hung jobs. (acmurthy) starvation and hung jobs. (acmurthy)
MAPREDUCE-2756. Better error handling in JobControl for failed jobs.
(Robert Evans via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -47,6 +49,7 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ControlledJob { public class ControlledJob {
private static final Log LOG = LogFactory.getLog(ControlledJob.class);
// A job will be in one of the following states // A job will be in one of the following states
public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED, public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
@ -235,6 +238,17 @@ public class ControlledJob {
job.killJob(); job.killJob();
} }
public synchronized void failJob(String message) throws IOException, InterruptedException {
try {
if(job != null && this.state == State.RUNNING) {
job.killJob();
}
} finally {
this.state = State.FAILED;
this.message = message;
}
}
/** /**
* Check the state of this running job. The state may * Check the state of this running job. The state may
* remain the same, become SUCCESS or FAILED. * remain the same, become SUCCESS or FAILED.
@ -322,6 +336,7 @@ public class ControlledJob {
job.submit(); job.submit();
this.state = State.RUNNING; this.state = State.RUNNING;
} catch (Exception ioe) { } catch (Exception ioe) {
LOG.info(getJobName()+" got an error while submitting ",ioe);
this.state = State.FAILED; this.state = State.FAILED;
this.message = StringUtils.stringifyException(ioe); this.message = StringUtils.stringifyException(ioe);
} }

View File

@ -21,13 +21,16 @@ package org.apache.hadoop.mapreduce.lib.jobcontrol;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Hashtable; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
import org.apache.hadoop.util.StringUtils;
/** /**
* This class encapsulates a set of MapReduce jobs and its dependency. * This class encapsulates a set of MapReduce jobs and its dependency.
@ -49,17 +52,16 @@ import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class JobControl implements Runnable { public class JobControl implements Runnable {
private static final Log LOG = LogFactory.getLog(JobControl.class);
// The thread can be in one of the following state // The thread can be in one of the following state
public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
private ThreadState runnerState; // the thread state private ThreadState runnerState; // the thread state
private Map<String, ControlledJob> waitingJobs; private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
private Map<String, ControlledJob> readyJobs; private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
private Map<String, ControlledJob> runningJobs; private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
private Map<String, ControlledJob> successfulJobs;
private Map<String, ControlledJob> failedJobs;
private long nextJobID; private long nextJobID;
private String groupName; private String groupName;
@ -69,46 +71,51 @@ public class JobControl implements Runnable {
* @param groupName a name identifying this group * @param groupName a name identifying this group
*/ */
public JobControl(String groupName) { public JobControl(String groupName) {
this.waitingJobs = new Hashtable<String, ControlledJob>();
this.readyJobs = new Hashtable<String, ControlledJob>();
this.runningJobs = new Hashtable<String, ControlledJob>();
this.successfulJobs = new Hashtable<String, ControlledJob>();
this.failedJobs = new Hashtable<String, ControlledJob>();
this.nextJobID = -1; this.nextJobID = -1;
this.groupName = groupName; this.groupName = groupName;
this.runnerState = ThreadState.READY; this.runnerState = ThreadState.READY;
} }
private static List<ControlledJob> toList( private static List<ControlledJob> toList(
Map<String, ControlledJob> jobs) { LinkedList<ControlledJob> jobs) {
ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>(); ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
synchronized (jobs) { synchronized (jobs) {
for (ControlledJob job : jobs.values()) { for (ControlledJob job : jobs) {
retv.add(job); retv.add(job);
} }
} }
return retv; return retv;
} }
synchronized private List<ControlledJob> getJobsIn(State state) {
LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
for(ControlledJob j: jobsInProgress) {
if(j.getJobState() == state) {
l.add(j);
}
}
return l;
}
/** /**
* @return the jobs in the waiting state * @return the jobs in the waiting state
*/ */
public List<ControlledJob> getWaitingJobList() { public List<ControlledJob> getWaitingJobList() {
return toList(this.waitingJobs); return getJobsIn(State.WAITING);
} }
/** /**
* @return the jobs in the running state * @return the jobs in the running state
*/ */
public List<ControlledJob> getRunningJobList() { public List<ControlledJob> getRunningJobList() {
return toList(this.runningJobs); return getJobsIn(State.RUNNING);
} }
/** /**
* @return the jobs in the ready state * @return the jobs in the ready state
*/ */
public List<ControlledJob> getReadyJobsList() { public List<ControlledJob> getReadyJobsList() {
return toList(this.readyJobs); return getJobsIn(State.READY);
} }
/** /**
@ -127,34 +134,6 @@ public class JobControl implements Runnable {
return this.groupName + this.nextJobID; return this.groupName + this.nextJobID;
} }
private static void addToQueue(ControlledJob aJob,
Map<String, ControlledJob> queue) {
synchronized(queue) {
queue.put(aJob.getJobID(), aJob);
}
}
private void addToQueue(ControlledJob aJob) {
Map<String, ControlledJob> queue = getQueue(aJob.getJobState());
addToQueue(aJob, queue);
}
private Map<String, ControlledJob> getQueue(State state) {
Map<String, ControlledJob> retv = null;
if (state == State.WAITING) {
retv = this.waitingJobs;
} else if (state == State.READY) {
retv = this.readyJobs;
} else if (state == State.RUNNING) {
retv = this.runningJobs;
} else if (state == State.SUCCESS) {
retv = this.successfulJobs;
} else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
retv = this.failedJobs;
}
return retv;
}
/** /**
* Add a new job. * Add a new job.
* @param aJob the new job * @param aJob the new job
@ -163,7 +142,7 @@ public class JobControl implements Runnable {
String id = this.getNextJobID(); String id = this.getNextJobID();
aJob.setJobID(id); aJob.setJobID(id);
aJob.setJobState(State.WAITING); aJob.setJobState(State.WAITING);
this.addToQueue(aJob); jobsInProgress.add(aJob);
return id; return id;
} }
@ -211,47 +190,8 @@ public class JobControl implements Runnable {
} }
} }
synchronized private void checkRunningJobs()
throws IOException, InterruptedException {
Map<String, ControlledJob> oldJobs = null;
oldJobs = this.runningJobs;
this.runningJobs = new Hashtable<String, ControlledJob>();
for (ControlledJob nextJob : oldJobs.values()) {
nextJob.checkState();
this.addToQueue(nextJob);
}
}
synchronized private void checkWaitingJobs()
throws IOException, InterruptedException {
Map<String, ControlledJob> oldJobs = null;
oldJobs = this.waitingJobs;
this.waitingJobs = new Hashtable<String, ControlledJob>();
for (ControlledJob nextJob : oldJobs.values()) {
nextJob.checkState();
this.addToQueue(nextJob);
}
}
synchronized private void startReadyJobs() {
Map<String, ControlledJob> oldJobs = null;
oldJobs = this.readyJobs;
this.readyJobs = new Hashtable<String, ControlledJob>();
for (ControlledJob nextJob : oldJobs.values()) {
//Submitting Job to Hadoop
nextJob.submit();
this.addToQueue(nextJob);
}
}
synchronized public boolean allFinished() { synchronized public boolean allFinished() {
return this.waitingJobs.size() == 0 && return jobsInProgress.isEmpty();
this.readyJobs.size() == 0 &&
this.runningJobs.size() == 0;
} }
/** /**
@ -262,6 +202,7 @@ public class JobControl implements Runnable {
* Submit the jobs in ready state * Submit the jobs in ready state
*/ */
public void run() { public void run() {
try {
this.runnerState = ThreadState.RUNNING; this.runnerState = ThreadState.RUNNING;
while (true) { while (true) {
while (this.runnerState == ThreadState.SUSPENDED) { while (this.runnerState == ThreadState.SUSPENDED) {
@ -269,16 +210,36 @@ public class JobControl implements Runnable {
Thread.sleep(5000); Thread.sleep(5000);
} }
catch (Exception e) { catch (Exception e) {
//TODO the thread was interrupted, do something!!!
}
}
synchronized(this) {
Iterator<ControlledJob> it = jobsInProgress.iterator();
while(it.hasNext()) {
ControlledJob j = it.next();
LOG.debug("Checking state of job "+j);
switch(j.checkState()) {
case SUCCESS:
successfulJobs.add(j);
it.remove();
break;
case FAILED:
case DEPENDENT_FAILED:
failedJobs.add(j);
it.remove();
break;
case READY:
j.submit();
break;
case RUNNING:
case WAITING:
//Do Nothing
break;
} }
} }
try {
checkRunningJobs();
checkWaitingJobs();
startReadyJobs();
} catch (Exception e) {
this.runnerState = ThreadState.STOPPED;
} }
if (this.runnerState != ThreadState.RUNNING && if (this.runnerState != ThreadState.RUNNING &&
this.runnerState != ThreadState.SUSPENDED) { this.runnerState != ThreadState.SUSPENDED) {
break; break;
@ -287,14 +248,37 @@ public class JobControl implements Runnable {
Thread.sleep(5000); Thread.sleep(5000);
} }
catch (Exception e) { catch (Exception e) {
//TODO the thread was interrupted, do something!!!
} }
if (this.runnerState != ThreadState.RUNNING && if (this.runnerState != ThreadState.RUNNING &&
this.runnerState != ThreadState.SUSPENDED) { this.runnerState != ThreadState.SUSPENDED) {
break; break;
} }
} }
}catch(Throwable t) {
LOG.error("Error while trying to run jobs.",t);
//Mark all jobs as failed because we got something bad.
failAllJobs(t);
}
this.runnerState = ThreadState.STOPPED; this.runnerState = ThreadState.STOPPED;
} }
synchronized private void failAllJobs(Throwable t) {
String message = "Unexpected System Error Occured: "+
StringUtils.stringifyException(t);
Iterator<ControlledJob> it = jobsInProgress.iterator();
while(it.hasNext()) {
ControlledJob j = it.next();
try {
j.failJob(message);
} catch (IOException e) {
LOG.error("Error while tyring to clean up "+j.getJobName(), e);
} catch (InterruptedException e) {
LOG.error("Error while tyring to clean up "+j.getJobName(), e);
} finally {
failedJobs.add(j);
it.remove();
}
}
}
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -81,6 +82,29 @@ public class TestMapReduceJobControlWithMocks {
jobControl.stop(); jobControl.stop();
} }
@Test
public void testErrorWhileSubmitting() throws Exception {
JobControl jobControl = new JobControl("Test");
Job mockJob = mock(Job.class);
ControlledJob job1 = new ControlledJob(mockJob, null);
when(mockJob.getConfiguration()).thenReturn(new Configuration());
doThrow(new IncompatibleClassChangeError("This is a test")).when(mockJob).submit();
jobControl.addJob(job1);
runJobControl(jobControl);
try {
assertEquals("Success list", 0, jobControl.getSuccessfulJobList().size());
assertEquals("Failed list", 1, jobControl.getFailedJobList().size());
assertTrue(job1.getJobState() == ControlledJob.State.FAILED);
} finally {
jobControl.stop();
}
}
@Test @Test
public void testKillJob() throws Exception { public void testKillJob() throws Exception {
JobControl jobControl = new JobControl("Test"); JobControl jobControl = new JobControl("Test");