HDFS-7005. DFS input streams do not timeout. Contributed by Daryn Sharp.

(cherry picked from commit 6a84f88c11)
This commit is contained in:
Kihwal Lee 2014-09-08 14:44:47 -05:00
parent d510cefd14
commit d20047edda
3 changed files with 40 additions and 0 deletions

View File

@ -360,6 +360,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7025. HDFS Credential Provider related Unit Test Failure. HDFS-7025. HDFS Credential Provider related Unit Test Failure.
(Xiaoyu Yao via cnauroth) (Xiaoyu Yao via cnauroth)
HDFS-7005. DFS input streams do not timeout.
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -3015,6 +3015,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
dfsClientConf.socketTimeout); dfsClientConf.socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
blockToken, datanodeId); blockToken, datanodeId);
peer.setReadTimeout(dfsClientConf.socketTimeout);
success = true; success = true;
return peer; return peer;
} finally { } finally {

View File

@ -31,6 +31,9 @@ import static org.mockito.Mockito.mock;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
@ -60,6 +63,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.web.HftpFileSystem; import org.apache.hadoop.hdfs.web.HftpFileSystem;
@ -70,6 +74,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -991,4 +996,36 @@ public class TestDistributedFileSystem {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout=10000)
public void testDFSClientPeerTimeout() throws IOException {
final int timeout = 1000;
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
// only need cluster to create a dfs client to get a peer
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// use a dummy socket to ensure the read timesout
ServerSocket socket = new ServerSocket(0);
Peer peer = dfs.getClient().newConnectedPeer(
(InetSocketAddress) socket.getLocalSocketAddress(), null, null);
long start = Time.now();
try {
peer.getInputStream().read();
Assert.fail("should timeout");
} catch (SocketTimeoutException ste) {
long delta = Time.now() - start;
Assert.assertTrue("timedout too soon", delta >= timeout*0.9);
Assert.assertTrue("timedout too late", delta <= timeout*1.1);
} catch (Throwable t) {
Assert.fail("wrong exception:"+t);
}
} finally {
cluster.shutdown();
}
}
} }