diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 925a85e1068..434f4872908 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -702,6 +702,9 @@ Release 2.8.0 - UNRELEASED HDFS-2956. calling fetchdt without a --renewer argument throws NPE (vinayakumarb) + HDFS-7608. hdfs dfsclient newConnectedPeer has no write timeout (Xiaoyu Yao + via Colin P. McCabe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 922c0658423..0ebe4884c45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -3101,6 +3101,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, blockToken, datanodeId); peer.setReadTimeout(socketTimeout); + peer.setWriteTimeout(socketTimeout); success = true; return peer; } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 5c4fd8c9cff..ce519388ce5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -1009,10 +1009,9 @@ public class TestDistributedFileSystem { cluster.shutdown(); } } - - + @Test(timeout=10000) - public void testDFSClientPeerTimeout() throws IOException { + public void testDFSClientPeerReadTimeout() throws IOException { final int timeout = 1000; final Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout); @@ -1029,11 +1028,11 @@ public class TestDistributedFileSystem { long start = Time.now(); try { peer.getInputStream().read(); - Assert.fail("should timeout"); + Assert.fail("read should timeout"); } catch (SocketTimeoutException ste) { long delta = Time.now() - start; - Assert.assertTrue("timedout too soon", delta >= timeout*0.9); - Assert.assertTrue("timedout too late", delta <= timeout*1.1); + Assert.assertTrue("read timedout too soon", delta >= timeout*0.9); + Assert.assertTrue("read timedout too late", delta <= timeout*1.1); } catch (Throwable t) { Assert.fail("wrong exception:"+t); } @@ -1055,4 +1054,36 @@ public class TestDistributedFileSystem { cluster.shutdown(); } } + + @Test(timeout=10000) + public void testDFSClientPeerWriteTimeout() throws IOException { + final int timeout = 1000; + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout); + + // only need cluster to create a dfs client to get a peer + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + // Write 1 MB to a dummy socket to ensure the write times out + ServerSocket socket = new ServerSocket(0); + Peer peer = dfs.getClient().newConnectedPeer( + (InetSocketAddress) socket.getLocalSocketAddress(), null, null); + long start = Time.now(); + try { + byte[] buf = new byte[1024 * 1024]; + peer.getOutputStream().write(buf); + Assert.fail("write should timeout"); + } catch (SocketTimeoutException ste) { + long delta = Time.now() - start; + Assert.assertTrue("write timedout too soon", delta >= timeout * 0.9); + Assert.assertTrue("write timedout too late", delta <= timeout * 1.1); + } catch (Throwable t) { + Assert.fail("wrong exception:" + t); + } + } finally { + cluster.shutdown(); + } + } }