MAPREDUCE-3862 Nodemanager can appear to hang on shutdown due to lingering DeletionService threads (Jason Lowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1245781 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dd732d5a24
commit
ff33b38734
|
@ -105,6 +105,8 @@ Release 0.23.2 - UNRELEASED
|
|||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering
|
||||
DeletionService threads (Jason Lowe via bobby)
|
||||
|
||||
MAPREDUCE-3680. FifoScheduler web service rest API can print out invalid
|
||||
JSON. (B Anil Kumar via tgraves)
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import static java.util.concurrent.TimeUnit.*;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
|
@ -85,6 +86,7 @@ public class DeletionService extends AbstractService {
|
|||
sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
|
||||
tf);
|
||||
}
|
||||
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||
sched.setKeepAliveTime(60L, SECONDS);
|
||||
super.init(conf);
|
||||
}
|
||||
|
@ -92,14 +94,27 @@ public class DeletionService extends AbstractService {
|
|||
@Override
|
||||
public void stop() {
|
||||
sched.shutdown();
|
||||
boolean terminated = false;
|
||||
try {
|
||||
sched.awaitTermination(10, SECONDS);
|
||||
terminated = sched.awaitTermination(10, SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
if (terminated != true) {
|
||||
sched.shutdownNow();
|
||||
}
|
||||
super.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the service has completely stopped.
|
||||
* Used only by unit tests
|
||||
* @return true if service has completely stopped
|
||||
*/
|
||||
@Private
|
||||
public boolean isTerminated() {
|
||||
return getServiceState() == STATE.STOPPED && sched.isTerminated();
|
||||
}
|
||||
|
||||
private class FileDeletion implements Runnable {
|
||||
final String user;
|
||||
final Path subDir;
|
||||
|
|
|
@ -27,12 +27,15 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestDeletionService {
|
||||
|
@ -107,12 +110,18 @@ public class TestDeletionService {
|
|||
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
|
||||
p, null);
|
||||
}
|
||||
|
||||
int msecToWait = 20 * 1000;
|
||||
for (Path p : dirs) {
|
||||
while (msecToWait > 0 && lfs.util().exists(p)) {
|
||||
Thread.sleep(100);
|
||||
msecToWait -= 100;
|
||||
}
|
||||
assertFalse(lfs.util().exists(p));
|
||||
}
|
||||
} finally {
|
||||
del.stop();
|
||||
}
|
||||
for (Path p : dirs) {
|
||||
assertFalse(lfs.util().exists(p));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -137,14 +146,35 @@ public class TestDeletionService {
|
|||
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
|
||||
p, baseDirs.toArray(new Path[4]));
|
||||
}
|
||||
|
||||
int msecToWait = 20 * 1000;
|
||||
for (Path p : baseDirs) {
|
||||
for (Path q : content) {
|
||||
Path fp = new Path(p, q);
|
||||
while (msecToWait > 0 && lfs.util().exists(fp)) {
|
||||
Thread.sleep(100);
|
||||
msecToWait -= 100;
|
||||
}
|
||||
assertFalse(lfs.util().exists(fp));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
del.stop();
|
||||
}
|
||||
for (Path p : baseDirs) {
|
||||
for (Path q : content) {
|
||||
assertFalse(lfs.util().exists(new Path(p, q)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopWithDelayedTasks() throws Exception {
|
||||
DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class));
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 60);
|
||||
del.init(conf);
|
||||
del.start();
|
||||
try {
|
||||
del.delete("dingo", new Path("/does/not/exist"));
|
||||
} finally {
|
||||
del.stop();
|
||||
}
|
||||
assertTrue(del.isTerminated());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue