From a701c792f880c43ba807f00a92a99dadf89eab0c Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 4 May 2012 18:50:54 +0000 Subject: [PATCH] HDFS-3357. DataXceiver reads from client socket with incorrect/no timeout. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1334116 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/DataXceiver.java | 24 ++- .../server/datanode/DataXceiverServer.java | 4 +- .../hdfs/TestDataTransferKeepalive.java | 159 ++++++++++++++++++ 4 files changed, 183 insertions(+), 7 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7d53324acc8..fd629ae49fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -597,6 +597,9 @@ Release 2.0.0 - UNRELEASED HDFS-3350. In INode, add final to compareTo(..), equals(..) and hashCode(), and remove synchronized from updatePermissionStatus(..). (szetszwo) + HDFS-3357. DataXceiver reads from client socket with incorrect/no timeout + (todd) + BREAKDOWN OF HDFS-1623 SUBTASKS HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 995840066db..4114d7f9cc7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.SocketInputWrapper; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -83,13 +84,24 @@ class DataXceiver extends Receiver implements Runnable { private final DataXceiverServer dataXceiverServer; private long opStartTime; //the start time of receiving an Op + private final SocketInputWrapper socketInputWrapper; - public DataXceiver(Socket s, DataNode datanode, + public static DataXceiver create(Socket s, DataNode dn, + DataXceiverServer dataXceiverServer) throws IOException { + + SocketInputWrapper iw = NetUtils.getInputStream(s); + return new DataXceiver(s, iw, dn, dataXceiverServer); + } + + private DataXceiver(Socket s, + SocketInputWrapper socketInput, + DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException { super(new DataInputStream(new BufferedInputStream( - NetUtils.getInputStream(s), HdfsConstants.SMALL_BUFFER_SIZE))); + socketInput, HdfsConstants.SMALL_BUFFER_SIZE))); this.s = s; + this.socketInputWrapper = socketInput; this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.datanode = datanode; this.dnConf = datanode.getDnConf(); @@ -128,8 +140,6 @@ class DataXceiver extends Receiver implements Runnable { Op op = null; dataXceiverServer.childSockets.add(s); try { - int stdTimeout = s.getSoTimeout(); - // We process requests in a loop, and stay around for a short timeout. // This optimistic behaviour allows the other end to reuse connections. // Setting keepalive timeout to 0 disable this behavior. @@ -139,7 +149,9 @@ class DataXceiver extends Receiver implements Runnable { try { if (opsProcessed != 0) { assert dnConf.socketKeepaliveTimeout > 0; - s.setSoTimeout(dnConf.socketKeepaliveTimeout); + socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout); + } else { + socketInputWrapper.setTimeout(dnConf.socketTimeout); } op = readOp(); } catch (InterruptedIOException ignored) { @@ -160,7 +172,7 @@ class DataXceiver extends Receiver implements Runnable { // restore normal timeout if (opsProcessed != 0) { - s.setSoTimeout(stdTimeout); + s.setSoTimeout(dnConf.socketTimeout); } opStartTime = now(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index f32b2968f52..bb0f7fd81b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -135,6 +135,7 @@ class DataXceiverServer implements Runnable { try { s = ss.accept(); s.setTcpNoDelay(true); + // Timeouts are set within DataXceiver.run() // Make sure the xceiver count is not exceeded int curXceiverCount = datanode.getXceiverCount(); @@ -144,7 +145,8 @@ class DataXceiverServer implements Runnable { + maxXceiverCount); } - new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)) + new Daemon(datanode.threadGroup, + DataXceiver.create(s, datanode, this)) .start(); } catch (SocketTimeoutException ignored) { // wake up to see if should continue to run diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java new file mode 100644 index 00000000000..852f3c6801a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.junit.Assert.*; + +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.Socket; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestDataTransferKeepalive { + Configuration conf = new HdfsConfiguration(); + private MiniDFSCluster cluster; + private FileSystem fs; + private InetSocketAddress dnAddr; + private DataNode dn; + private DFSClient dfsClient; + private static Path TEST_FILE = new Path("/test"); + + private static final int KEEPALIVE_TIMEOUT = 1000; + private static final int WRITE_TIMEOUT = 3000; + + @Before + public void setup() throws Exception { + conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, + KEEPALIVE_TIMEOUT); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + fs = cluster.getFileSystem(); + dfsClient = ((DistributedFileSystem)fs).dfs; + + String poolId = cluster.getNamesystem().getBlockPoolId(); + dn = cluster.getDataNodes().get(0); + DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP( + dn, poolId); + dnAddr = NetUtils.createSocketAddr(dnReg.getXferAddr()); + } + + @After + public void teardown() { + cluster.shutdown(); + } + + /** + * Regression test for HDFS-3357. Check that the datanode is respecting + * its configured keepalive timeout. + */ + @Test(timeout=30000) + public void testKeepaliveTimeouts() throws Exception { + DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); + + // Clients that write aren't currently re-used. + assertEquals(0, dfsClient.socketCache.size()); + assertXceiverCount(0); + + // Reads the file, so we should get a + // cached socket, and should have an xceiver on the other side. + DFSTestUtil.readFile(fs, TEST_FILE); + assertEquals(1, dfsClient.socketCache.size()); + assertXceiverCount(1); + + // Sleep for a bit longer than the keepalive timeout + // and make sure the xceiver died. + Thread.sleep(KEEPALIVE_TIMEOUT * 2); + assertXceiverCount(0); + + // The socket is still in the cache, because we don't + // notice that it's closed until we try to read + // from it again. + assertEquals(1, dfsClient.socketCache.size()); + + // Take it out of the cache - reading should + // give an EOF. + Socket s = dfsClient.socketCache.get(dnAddr); + assertNotNull(s); + assertEquals(-1, NetUtils.getInputStream(s).read()); + } + + /** + * Test for the case where the client beings to read a long block, but doesn't + * read bytes off the stream quickly. The datanode should time out sending the + * chunks and the transceiver should die, even if it has a long keepalive. + */ + @Test(timeout=30000) + public void testSlowReader() throws Exception { + // Restart the DN with a shorter write timeout. + DataNodeProperties props = cluster.stopDataNode(0); + props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + WRITE_TIMEOUT); + props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, + 120000); + assertTrue(cluster.restartDataNode(props, true)); + // Wait for heartbeats to avoid a startup race where we + // try to write the block while the DN is still starting. + cluster.triggerHeartbeats(); + + dn = cluster.getDataNodes().get(0); + + DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L); + FSDataInputStream stm = fs.open(TEST_FILE); + try { + stm.read(); + assertXceiverCount(1); + + Thread.sleep(WRITE_TIMEOUT + 1000); + // DN should time out in sendChunks, and this should force + // the xceiver to exit. + assertXceiverCount(0); + } finally { + IOUtils.closeStream(stm); + } + } + + private void assertXceiverCount(int expected) { + // Subtract 1, since the DataXceiverServer + // counts as one + int count = dn.getXceiverCount() - 1; + if (count != expected) { + ReflectionUtils.printThreadInfo( + new PrintWriter(System.err), + "Thread dumps"); + fail("Expected " + expected + " xceivers, found " + + count); + } + } +}