svn merge -c 1245781 from trunk to branch 0.23 FIXES MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering DeletionService threads (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1245794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-02-17 23:01:05 +00:00
parent 62f1fec1a3
commit 0286c5fe05
3 changed files with 56 additions and 9 deletions

View File

@ -16,6 +16,8 @@ Release 0.23.2 - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering
DeletionService threads (Jason Lowe via bobby)
MAPREDUCE-3680. FifoScheduler web service rest API can print out invalid MAPREDUCE-3680. FifoScheduler web service rest API can print out invalid
JSON. (B Anil Kumar via tgraves) JSON. (B Anil Kumar via tgraves)

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*; import static java.util.concurrent.TimeUnit.*;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
@ -85,6 +86,7 @@ public class DeletionService extends AbstractService {
sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
tf); tf);
} }
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
sched.setKeepAliveTime(60L, SECONDS); sched.setKeepAliveTime(60L, SECONDS);
super.init(conf); super.init(conf);
} }
@ -92,14 +94,27 @@ public class DeletionService extends AbstractService {
@Override @Override
public void stop() { public void stop() {
sched.shutdown(); sched.shutdown();
boolean terminated = false;
try { try {
sched.awaitTermination(10, SECONDS); terminated = sched.awaitTermination(10, SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
}
if (terminated != true) {
sched.shutdownNow(); sched.shutdownNow();
} }
super.stop(); super.stop();
} }
/**
* Determine if the service has completely stopped.
* Used only by unit tests
* @return true if service has completely stopped
*/
@Private
public boolean isTerminated() {
return getServiceState() == STATE.STOPPED && sched.isTerminated();
}
private class FileDeletion implements Runnable { private class FileDeletion implements Runnable {
final String user; final String user;
final Path subDir; final Path subDir;

View File

@ -27,12 +27,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class TestDeletionService { public class TestDeletionService {
@ -107,12 +110,18 @@ public class TestDeletionService {
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
p, null); p, null);
} }
int msecToWait = 20 * 1000;
for (Path p : dirs) {
while (msecToWait > 0 && lfs.util().exists(p)) {
Thread.sleep(100);
msecToWait -= 100;
}
assertFalse(lfs.util().exists(p));
}
} finally { } finally {
del.stop(); del.stop();
} }
for (Path p : dirs) {
assertFalse(lfs.util().exists(p));
}
} }
@Test @Test
@ -137,14 +146,35 @@ public class TestDeletionService {
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
p, baseDirs.toArray(new Path[4])); p, baseDirs.toArray(new Path[4]));
} }
int msecToWait = 20 * 1000;
for (Path p : baseDirs) {
for (Path q : content) {
Path fp = new Path(p, q);
while (msecToWait > 0 && lfs.util().exists(fp)) {
Thread.sleep(100);
msecToWait -= 100;
}
assertFalse(lfs.util().exists(fp));
}
}
} finally { } finally {
del.stop(); del.stop();
} }
for (Path p : baseDirs) {
for (Path q : content) {
assertFalse(lfs.util().exists(new Path(p, q)));
}
}
} }
@Test
public void testStopWithDelayedTasks() throws Exception {
DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class));
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 60);
del.init(conf);
del.start();
try {
del.delete("dingo", new Path("/does/not/exist"));
} finally {
del.stop();
}
assertTrue(del.isTerminated());
}
} }