MAPREDUCE-5304. mapreduce.Job killTask/failTask/getTaskCompletionEvents methods have incompatible signature changes. (kkambatl via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1492360 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-06-12 18:47:33 +00:00
parent c5821cf38b
commit c9fce677b3
4 changed files with 46 additions and 33 deletions

View File

@ -540,6 +540,9 @@ Release 2.1.0-beta - UNRELEASED
MAPREDUCE-5312. TestRMNMInfo is failing. (sandyr via tucu) MAPREDUCE-5312. TestRMNMInfo is failing. (sandyr via tucu)
MAPREDUCE-5304. mapreduce.Job killTask/failTask/getTaskCompletionEvents
methods have incompatible signature changes. (kkambatl via tucu)
Release 2.0.5-alpha - 06/06/2013 Release 2.0.5-alpha - 06/06/2013
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -62,8 +62,9 @@ public class TaskCompletionEvent
super(eventId, taskId, idWithinJob, isMap, org.apache.hadoop.mapreduce. super(eventId, taskId, idWithinJob, isMap, org.apache.hadoop.mapreduce.
TaskCompletionEvent.Status.valueOf(status.name()), taskTrackerHttp); TaskCompletionEvent.Status.valueOf(status.name()), taskTrackerHttp);
} }
static TaskCompletionEvent downgrade( @Private
public static TaskCompletionEvent downgrade(
org.apache.hadoop.mapreduce.TaskCompletionEvent event) { org.apache.hadoop.mapreduce.TaskCompletionEvent event) {
return new TaskCompletionEvent(event.getEventId(), return new TaskCompletionEvent(event.getEventId(),
TaskAttemptID.downgrade(event.getTaskAttemptId()),event.idWithinJob(), TaskAttemptID.downgrade(event.getTaskAttemptId()),event.idWithinJob(),

View File

@ -675,37 +675,57 @@ public class Job extends JobContextImpl implements JobContext {
* Get events indicating completion (success/failure) of component tasks. * Get events indicating completion (success/failure) of component tasks.
* *
* @param startFrom index to start fetching events from * @param startFrom index to start fetching events from
* @return an array of {@link TaskCompletionEvent}s * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
* @throws IOException * @throws IOException
*/ */
public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom) public org.apache.hadoop.mapred.TaskCompletionEvent[]
throws IOException { getTaskCompletionEvents(final int startFrom) throws IOException {
try { try {
return getTaskCompletionEvents(startFrom, 10); TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
for (int i = 0; i < events.length; i++) {
retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
(events[i]);
}
return retEvents;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new IOException(ie); throw new IOException(ie);
} }
} }
/**
* Kill indicated task attempt.
* @param taskId the id of the task to kill.
* @param shouldFail if <code>true</code> the task is failed and added
* to failed tasks list, otherwise it is just killed,
* w/o affecting job failure status.
*/
@Private
public boolean killTask(final TaskAttemptID taskId,
final boolean shouldFail) throws IOException {
ensureState(JobState.RUNNING);
try {
return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws IOException, InterruptedException {
return cluster.getClient().killTask(taskId, shouldFail);
}
});
}
catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/** /**
* Kill indicated task attempt. * Kill indicated task attempt.
* *
* @param taskId the id of the task to be terminated. * @param taskId the id of the task to be terminated.
* @throws IOException * @throws IOException
*/ */
public boolean killTask(final TaskAttemptID taskId) public void killTask(final TaskAttemptID taskId)
throws IOException { throws IOException {
ensureState(JobState.RUNNING); killTask(taskId, false);
try {
return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws IOException, InterruptedException {
return cluster.getClient().killTask(taskId, false);
}
});
}
catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
/** /**
@ -714,20 +734,9 @@ public class Job extends JobContextImpl implements JobContext {
* @param taskId the id of the task to be terminated. * @param taskId the id of the task to be terminated.
* @throws IOException * @throws IOException
*/ */
public boolean failTask(final TaskAttemptID taskId) public void failTask(final TaskAttemptID taskId)
throws IOException { throws IOException {
ensureState(JobState.RUNNING); killTask(taskId, true);
try {
return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException, InterruptedException {
return cluster.getClient().killTask(taskId, true);
}
});
}
catch (InterruptedException ie) {
throw new IOException(ie);
}
} }
/** /**

View File

@ -322,7 +322,7 @@ public class CLI extends Configured implements Tool {
Job job = cluster.getJob(taskID.getJobID()); Job job = cluster.getJob(taskID.getJobID());
if (job == null) { if (job == null) {
System.out.println("Could not find job " + jobid); System.out.println("Could not find job " + jobid);
} else if (job.killTask(taskID)) { } else if (job.killTask(taskID, false)) {
System.out.println("Killed task " + taskid); System.out.println("Killed task " + taskid);
exitCode = 0; exitCode = 0;
} else { } else {
@ -334,7 +334,7 @@ public class CLI extends Configured implements Tool {
Job job = cluster.getJob(taskID.getJobID()); Job job = cluster.getJob(taskID.getJobID());
if (job == null) { if (job == null) {
System.out.println("Could not find job " + jobid); System.out.println("Could not find job " + jobid);
} else if(job.failTask(taskID)) { } else if(job.killTask(taskID, true)) {
System.out.println("Killed task " + taskID + " by failing it"); System.out.println("Killed task " + taskID + " by failing it");
exitCode = 0; exitCode = 0;
} else { } else {