From c5434d6a884f74c639aab7ee9e7dbbc3aea5024a Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Mon, 7 May 2012 16:42:08 +0000 Subject: [PATCH] HDFS-3376. DFSClient fails to make connection to DN if there are many unusable cached sockets. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1335114 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../apache/hadoop/hdfs/DFSInputStream.java | 8 +++- .../hdfs/TestDataTransferKeepalive.java | 40 +++++++++++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 882f509c9b4..79af0404b34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -485,6 +485,9 @@ Release 2.0.0 - UNRELEASED HDFS-3357. DataXceiver reads from client socket with incorrect/no timeout (todd) + HDFS-3376. DFSClient fails to make connection to DN if there are many + unusable cached sockets (todd) + BREAKDOWN OF HDFS-1623 SUBTASKS HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 7f36eb63bf2..9bb32d1fbd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -794,7 +794,13 @@ public class DFSInputStream extends FSInputStream { // Allow retry since there is no way of knowing whether the cached socket // is good until we actually use it. for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) { - Socket sock = socketCache.get(dnAddr); + Socket sock = null; + // Don't use the cache on the last attempt - it's possible that there + // are arbitrarily many unusable sockets in the cache, but we don't + // want to fail the read. + if (retries < nCachedConnRetry) { + sock = socketCache.get(dnAddr); + } if (sock == null) { fromCache = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java index 852f3c6801a..1ef4eac997e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.junit.Assert.*; +import java.io.InputStream; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; @@ -40,6 +42,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.io.NullOutputStream; + public class TestDataTransferKeepalive { Configuration conf = new HdfsConfiguration(); private MiniDFSCluster cluster; @@ -56,6 +60,8 @@ public class TestDataTransferKeepalive { public void setup() throws Exception { conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, KEEPALIVE_TIMEOUT); + conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, + 0); cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); @@ -143,6 +149,40 @@ public class TestDataTransferKeepalive { IOUtils.closeStream(stm); } } + + @Test(timeout=30000) + public void testManyClosedSocketsInCache() throws Exception { + // Make a small file + DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); + + // Insert a bunch of dead sockets in the cache, by opening + // many streams concurrently, reading all of the data, + // and then closing them. + InputStream[] stms = new InputStream[5]; + try { + for (int i = 0; i < stms.length; i++) { + stms[i] = fs.open(TEST_FILE); + } + for (InputStream stm : stms) { + IOUtils.copyBytes(stm, new NullOutputStream(), 1024); + } + } finally { + IOUtils.cleanup(null, stms); + } + + DFSClient client = ((DistributedFileSystem)fs).dfs; + assertEquals(5, client.socketCache.size()); + + // Let all the xceivers timeout + Thread.sleep(1500); + assertXceiverCount(0); + + // Client side still has the sockets cached + assertEquals(5, client.socketCache.size()); + + // Reading should not throw an exception. + DFSTestUtil.readFile(fs, TEST_FILE); + } private void assertXceiverCount(int expected) { // Subtract 1, since the DataXceiverServer