diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e29cb0f6caa..b1441774411 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -21,6 +21,9 @@ Release 2.7.1 - UNRELEASED HADOOP-11730. Regression: s3n read failure recovery broken. (Takenori Sato via stevel) + HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there + is an I/O error during requestShortCircuitShm (cmccabe) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java index 03b52e0f8c4..5648ae12f00 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java @@ -512,8 +512,8 @@ public final class DomainSocketWatcher implements Closeable { } } catch (InterruptedException e) { LOG.info(toString() + " terminating on InterruptedException"); - } catch (IOException e) { - LOG.error(toString() + " terminating on IOException", e); + } catch (Throwable e) { + LOG.error(toString() + " terminating on exception", e); } finally { lock.lock(); try { diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c index dbaa4fefed6..596601b88c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c @@ -111,7 +111,7 @@ JNIEnv *env, jobject obj, jint fd) pollfd = &sd->pollfd[sd->used_size]; sd->used_size++; pollfd->fd = fd; - pollfd->events = POLLIN; + pollfd->events = POLLIN | POLLHUP; pollfd->revents = 0; } @@ -162,7 +162,10 @@ JNIEnv *env, jobject obj) GetLongField(env, obj, fd_set_data_fid); used_size = sd->used_size; for (i = 0; i < used_size; i++) { - if (sd->pollfd[i].revents & POLLIN) { + // We check for both POLLIN and POLLHUP, because on some OSes, when a socket + // is shutdown(), it sends POLLHUP rather than POLLIN. + if ((sd->pollfd[i].revents & POLLIN) || + (sd->pollfd[i].revents & POLLHUP)) { num_readable++; } else { sd->pollfd[i].revents = 0; @@ -177,7 +180,8 @@ JNIEnv *env, jobject obj) } j = 0; for (i = 0; ((i < used_size) && (j < num_readable)); i++) { - if (sd->pollfd[i].revents & POLLIN) { + if ((sd->pollfd[i].revents & POLLIN) || + (sd->pollfd[i].revents & POLLHUP)) { carr[j] = sd->pollfd[i].fd; j++; sd->pollfd[i].revents = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 478099deee9..65f0506a38c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -39,4 +39,6 @@ public class DataNodeFaultInjector { public void getHdfsBlocksMetadata() {} public void writeBlockAfterFlush() throws IOException {} + + public void sendShortCircuitShmResponse() throws IOException {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index cf1b6bebeab..01ff32daffa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -421,6 +421,7 @@ class DataXceiver extends Receiver implements Runnable { private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo) throws IOException { + DataNodeFaultInjector.get().sendShortCircuitShmResponse(); ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS). setId(PBHelper.convert(shmInfo.shmId)).build(). writeDelimitedTo(socketOut); @@ -479,10 +480,19 @@ class DataXceiver extends Receiver implements Runnable { } } if ((!success) && (peer == null)) { - // If we failed to pass the shared memory segment to the client, - // close the UNIX domain socket now. This will trigger the - // DomainSocketWatcher callback, cleaning up the segment. - IOUtils.cleanup(null, sock); + // The socket is now managed by the DomainSocketWatcher. However, + // we failed to pass it to the client. We call shutdown() on the + // UNIX domain socket now. This will trigger the DomainSocketWatcher + // callback. The callback will close the domain socket. + // We don't want to close the socket here, since that might lead to + // bad behavior inside the poll() call. See HADOOP-11802 for details. + try { + LOG.warn("Failed to send success response back to the client. " + + "Shutting down socket for " + shmInfo.shmId + "."); + sock.shutdown(); + } catch (IOException e) { + LOG.warn("Failed to shut down socket in error handler", e); + } } IOUtils.cleanup(null, shmInfo); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java index 9092bc531f1..062539a0721 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java @@ -216,10 +216,11 @@ public class DfsClientShmManager implements Closeable { * Must be called with the EndpointShmManager lock held. * * @param peer The peer to use to talk to the DataNode. - * @param clientName The client name. * @param usedPeer (out param) Will be set to true if we used the peer. * When a peer is used * + * @param clientName The client name. + * @param blockId The block ID to use. * @return null if the DataNode does not support shared memory * segments, or experienced an error creating the * shm. The shared memory segment itself on success. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java index 5fd31a920cc..60adb021592 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -185,4 +186,9 @@ public class DomainSocketFactory { public void disableDomainSocketPath(String path) { pathMap.put(path, PathState.UNUSABLE); } + + @VisibleForTesting + public void clearPathMap() { + pathMap.invalidateAll(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 7daabd0c4c8..efd975415cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; @@ -73,6 +74,9 @@ import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -622,6 +626,18 @@ public class TestShortCircuitCache { sockDir.close(); } + static private void checkNumberOfSegmentsAndSlots(final int expectedSegments, + final int expectedSlots, ShortCircuitRegistry registry) { + registry.visit(new ShortCircuitRegistry.Visitor() { + @Override + public void accept(HashMap segments, + HashMultimap slots) { + Assert.assertEquals(expectedSegments, segments.size()); + Assert.assertEquals(expectedSlots, slots.size()); + } + }); + } + public static class TestCleanupFailureInjector extends BlockReaderFactory.FailureInjector { @Override @@ -665,16 +681,67 @@ public class TestShortCircuitCache { GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + "testing, but we failed to do a non-TCP read.", t); } - ShortCircuitRegistry registry = - cluster.getDataNodes().get(0).getShortCircuitRegistry(); - registry.visit(new ShortCircuitRegistry.Visitor() { + checkNumberOfSegmentsAndSlots(1, 1, + cluster.getDataNodes().get(0).getShortCircuitRegistry()); + cluster.shutdown(); + sockDir.close(); + } + + // Regression test for HADOOP-11802 + @Test(timeout=60000) + public void testDataXceiverHandlesRequestShortCircuitShmFailure() + throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir); + conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, + 1000000000L); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final Path TEST_PATH1 = new Path("/test_file1"); + DFSTestUtil.createFile(fs, TEST_PATH1, 4096, + (short)1, 0xFADE1); + LOG.info("Setting failure injector and performing a read which " + + "should fail..."); + DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class); + Mockito.doAnswer(new Answer() { @Override - public void accept(HashMap segments, - HashMultimap slots) { - Assert.assertEquals(1, segments.size()); - Assert.assertEquals(1, slots.size()); + public Void answer(InvocationOnMock invocation) throws Throwable { + throw new IOException("injected error into sendShmResponse"); } - }); + }).when(failureInjector).sendShortCircuitShmResponse(); + DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance; + DataNodeFaultInjector.instance = failureInjector; + + try { + // The first read will try to allocate a shared memory segment and slot. + // The shared memory segment allocation will fail because of the failure + // injector. + DFSTestUtil.readFileBuffer(fs, TEST_PATH1); + Assert.fail("expected readFileBuffer to fail, but it succeeded."); + } catch (Throwable t) { + GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + + "testing, but we failed to do a non-TCP read.", t); + } + + checkNumberOfSegmentsAndSlots(0, 0, + cluster.getDataNodes().get(0).getShortCircuitRegistry()); + + LOG.info("Clearing failure injector and performing another read..."); + DataNodeFaultInjector.instance = prevInjector; + + fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap(); + + // The second read should succeed. + DFSTestUtil.readFileBuffer(fs, TEST_PATH1); + + // We should have added a new short-circuit shared memory segment and slot. + checkNumberOfSegmentsAndSlots(1, 1, + cluster.getDataNodes().get(0).getShortCircuitRegistry()); + cluster.shutdown(); sockDir.close(); }