diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 731bc90d411..3bd3d53911f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -572,6 +572,9 @@ Release 2.7.0 - UNRELEASED HADOOP-9087. Queue size metric for metric sinks isn't actually maintained (Akira AJISAKA via jlowe) + HADOOP-11604. Prevent ConcurrentModificationException while closing domain + sockets during shutdown of DomainSocketWatcher thread. (cnauroth) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java index 0172f6bfae7..8c617dc8c82 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java @@ -246,6 +246,13 @@ public final class DomainSocketWatcher implements Closeable { this.interruptCheckPeriodMs = interruptCheckPeriodMs; notificationSockets = DomainSocket.socketpair(); watcherThread.setDaemon(true); + watcherThread.setUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread thread, Throwable t) { + LOG.error(thread + " terminating on unexpected exception", t); + } + }); watcherThread.start(); } @@ -372,7 +379,17 @@ public final class DomainSocketWatcher implements Closeable { } } - private void sendCallback(String caller, TreeMap entries, + /** + * Send callback and return whether or not the domain socket was closed as a + * result of processing. + * + * @param caller reason for call + * @param entries mapping of file descriptor to entry + * @param fdSet set of file descriptors + * @param fd file descriptor + * @return true if the domain socket was closed as a result of processing + */ + private boolean sendCallback(String caller, TreeMap entries, FdSet fdSet, int fd) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd); @@ -401,13 +418,30 @@ public final class DomainSocketWatcher implements Closeable { "still in the poll(2) loop."); } IOUtils.cleanup(LOG, sock); - entries.remove(fd); fdSet.remove(fd); + return true; } else { if (LOG.isTraceEnabled()) { LOG.trace(this + ": " + caller + ": sendCallback not " + "closing fd " + fd); } + return false; + } + } + + /** + * Send callback, and if the domain socket was closed as a result of + * processing, then also remove the entry for the file descriptor. + * + * @param caller reason for call + * @param entries mapping of file descriptor to entry + * @param fdSet set of file descriptors + * @param fd file descriptor + */ + private void sendCallbackAndRemove(String caller, + TreeMap entries, FdSet fdSet, int fd) { + if (sendCallback(caller, entries, fdSet, fd)) { + entries.remove(fd); } } @@ -427,7 +461,8 @@ public final class DomainSocketWatcher implements Closeable { lock.lock(); try { for (int fd : fdSet.getAndClearReadableFds()) { - sendCallback("getAndClearReadableFds", entries, fdSet, fd); + sendCallbackAndRemove("getAndClearReadableFds", entries, fdSet, + fd); } if (!(toAdd.isEmpty() && toRemove.isEmpty())) { // Handle pending additions (before pending removes). @@ -448,7 +483,7 @@ public final class DomainSocketWatcher implements Closeable { while (true) { Map.Entry entry = toRemove.firstEntry(); if (entry == null) break; - sendCallback("handlePendingRemovals", + sendCallbackAndRemove("handlePendingRemovals", entries, fdSet, entry.getValue().fd); } processedCond.signalAll(); @@ -482,6 +517,8 @@ public final class DomainSocketWatcher implements Closeable { try { kick(); // allow the handler for notificationSockets[0] to read a byte for (Entry entry : entries.values()) { + // We do not remove from entries as we iterate, because that can + // cause a ConcurrentModificationException. sendCallback("close", entries, fdSet, entry.getDomainSocket().fd); } entries.clear(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java index 7c5b42de46a..e85e4141354 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.net.unix; +import static org.junit.Assert.assertFalse; + import java.util.ArrayList; import java.util.Random; import java.util.concurrent.CountDownLatch; @@ -25,6 +27,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.junit.After; import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -34,17 +37,28 @@ import com.google.common.util.concurrent.Uninterruptibles; public class TestDomainSocketWatcher { static final Log LOG = LogFactory.getLog(TestDomainSocketWatcher.class); + private Throwable trappedException = null; + @Before public void before() { Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null); } + @After + public void after() { + if (trappedException != null) { + throw new IllegalStateException( + "DomainSocketWatcher thread terminated with unexpected exception.", + trappedException); + } + } + /** * Test that we can create a DomainSocketWatcher and then shut it down. */ @Test(timeout=60000) public void testCreateShutdown() throws Exception { - DomainSocketWatcher watcher = new DomainSocketWatcher(10000000); + DomainSocketWatcher watcher = newDomainSocketWatcher(10000000); watcher.close(); } @@ -53,7 +67,7 @@ public class TestDomainSocketWatcher { */ @Test(timeout=180000) public void testDeliverNotifications() throws Exception { - DomainSocketWatcher watcher = new DomainSocketWatcher(10000000); + DomainSocketWatcher watcher = newDomainSocketWatcher(10000000); DomainSocket pair[] = DomainSocket.socketpair(); final CountDownLatch latch = new CountDownLatch(1); watcher.add(pair[1], new DomainSocketWatcher.Handler() { @@ -73,17 +87,35 @@ public class TestDomainSocketWatcher { */ @Test(timeout=60000) public void testInterruption() throws Exception { - final DomainSocketWatcher watcher = new DomainSocketWatcher(10); + final DomainSocketWatcher watcher = newDomainSocketWatcher(10); watcher.watcherThread.interrupt(); Uninterruptibles.joinUninterruptibly(watcher.watcherThread); watcher.close(); } + + /** + * Test that domain sockets are closed when the watcher is closed. + */ + @Test(timeout=300000) + public void testCloseSocketOnWatcherClose() throws Exception { + final DomainSocketWatcher watcher = newDomainSocketWatcher(10000000); + DomainSocket pair[] = DomainSocket.socketpair(); + watcher.add(pair[1], new DomainSocketWatcher.Handler() { + @Override + public boolean handle(DomainSocket sock) { + return true; + } + }); + watcher.close(); + Uninterruptibles.joinUninterruptibly(watcher.watcherThread); + assertFalse(pair[1].isOpen()); + } @Test(timeout=300000) public void testStress() throws Exception { final int SOCKET_NUM = 250; final ReentrantLock lock = new ReentrantLock(); - final DomainSocketWatcher watcher = new DomainSocketWatcher(10000000); + final DomainSocketWatcher watcher = newDomainSocketWatcher(10000000); final ArrayList pairs = new ArrayList(); final AtomicInteger handled = new AtomicInteger(0); @@ -148,4 +180,29 @@ public class TestDomainSocketWatcher { Uninterruptibles.joinUninterruptibly(removerThread); watcher.close(); } + + /** + * Creates a new DomainSocketWatcher and tracks its thread for termination due + * to an unexpected exception. At the end of each test, if there was an + * unexpected exception, then that exception is thrown to force a failure of + * the test. + * + * @param interruptCheckPeriodMs interrupt check period passed to + * DomainSocketWatcher + * @return new DomainSocketWatcher + * @throws Exception if there is any failure + */ + private DomainSocketWatcher newDomainSocketWatcher(int interruptCheckPeriodMs) + throws Exception { + DomainSocketWatcher watcher = new DomainSocketWatcher( + interruptCheckPeriodMs); + watcher.watcherThread.setUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread thread, Throwable t) { + trappedException = t; + } + }); + return watcher; + } }