Merge -c 1232314 from trunk to branch-0.23 to fix MAPREDUCE-2450. MAPREDUCE-2450. Fixed a corner case with interrupted communication threads leading to a long timeout in Task.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1232315 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9722f68619
commit
d36e002fea
|
@ -449,6 +449,9 @@ Release 0.23.1 - Unreleased
|
|||
MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe
|
||||
via mahadev)
|
||||
|
||||
MAPREDUCE-2450. Fixed a corner case with interrupted communication threads
|
||||
leading to a long timeout in Task. (Rajesh Balamohan via acmurthy)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -552,6 +552,8 @@ abstract public class Task implements Writable, Configurable {
|
|||
private InputSplit split = null;
|
||||
private Progress taskProgress;
|
||||
private Thread pingThread = null;
|
||||
private boolean done = true;
|
||||
private Object lock = new Object();
|
||||
|
||||
/**
|
||||
* flag that indicates whether progress update needs to be sent to parent.
|
||||
|
@ -648,6 +650,9 @@ abstract public class Task implements Writable, Configurable {
|
|||
// get current flag value and reset it as well
|
||||
boolean sendProgress = resetProgressFlag();
|
||||
while (!taskDone.get()) {
|
||||
synchronized (lock) {
|
||||
done = false;
|
||||
}
|
||||
try {
|
||||
boolean taskFound = true; // whether TT knows about this task
|
||||
// sleep for a bit
|
||||
|
@ -680,6 +685,7 @@ abstract public class Task implements Writable, Configurable {
|
|||
// came back up), kill ourselves
|
||||
if (!taskFound) {
|
||||
LOG.warn("Parent died. Exiting "+taskId);
|
||||
resetDoneFlag();
|
||||
System.exit(66);
|
||||
}
|
||||
|
||||
|
@ -692,10 +698,19 @@ abstract public class Task implements Writable, Configurable {
|
|||
if (remainingRetries == 0) {
|
||||
ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
|
||||
LOG.warn("Last retry, killing "+taskId);
|
||||
resetDoneFlag();
|
||||
System.exit(65);
|
||||
}
|
||||
}
|
||||
}
|
||||
//Notify that we are done with the work
|
||||
resetDoneFlag();
|
||||
}
|
||||
void resetDoneFlag() {
|
||||
synchronized (lock) {
|
||||
done = true;
|
||||
lock.notify();
|
||||
}
|
||||
}
|
||||
public void startCommunicationThread() {
|
||||
if (pingThread == null) {
|
||||
|
@ -706,6 +721,11 @@ abstract public class Task implements Writable, Configurable {
|
|||
}
|
||||
public void stopCommunicationThread() throws InterruptedException {
|
||||
if (pingThread != null) {
|
||||
synchronized (lock) {
|
||||
while (!done) {
|
||||
lock.wait();
|
||||
}
|
||||
}
|
||||
pingThread.interrupt();
|
||||
pingThread.join();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue