HDFS-7608: hdfs dfsclient newConnectedPeer has no write timeout (Xiaoyu Yao via Colin P. McCabe)
(cherry picked from commit 1d74ccececaefffaa90c0c18b40a3645dbc819d9)
This commit is contained in:
parent
64059d8884
commit
471037883f
@ -702,6 +702,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-2956. calling fetchdt without a --renewer argument throws NPE
|
HDFS-2956. calling fetchdt without a --renewer argument throws NPE
|
||||||
(vinayakumarb)
|
(vinayakumarb)
|
||||||
|
|
||||||
|
HDFS-7608. hdfs dfsclient newConnectedPeer has no write timeout (Xiaoyu Yao
|
||||||
|
via Colin P. McCabe)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -3101,6 +3101,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
|
|||||||
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
|
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
|
||||||
blockToken, datanodeId);
|
blockToken, datanodeId);
|
||||||
peer.setReadTimeout(socketTimeout);
|
peer.setReadTimeout(socketTimeout);
|
||||||
|
peer.setWriteTimeout(socketTimeout);
|
||||||
success = true;
|
success = true;
|
||||||
return peer;
|
return peer;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1009,10 +1009,9 @@ public void testListFiles() throws IOException {
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testDFSClientPeerTimeout() throws IOException {
|
public void testDFSClientPeerReadTimeout() throws IOException {
|
||||||
final int timeout = 1000;
|
final int timeout = 1000;
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
|
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
|
||||||
@ -1029,11 +1028,11 @@ public void testDFSClientPeerTimeout() throws IOException {
|
|||||||
long start = Time.now();
|
long start = Time.now();
|
||||||
try {
|
try {
|
||||||
peer.getInputStream().read();
|
peer.getInputStream().read();
|
||||||
Assert.fail("should timeout");
|
Assert.fail("read should timeout");
|
||||||
} catch (SocketTimeoutException ste) {
|
} catch (SocketTimeoutException ste) {
|
||||||
long delta = Time.now() - start;
|
long delta = Time.now() - start;
|
||||||
Assert.assertTrue("timedout too soon", delta >= timeout*0.9);
|
Assert.assertTrue("read timedout too soon", delta >= timeout*0.9);
|
||||||
Assert.assertTrue("timedout too late", delta <= timeout*1.1);
|
Assert.assertTrue("read timedout too late", delta <= timeout*1.1);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
Assert.fail("wrong exception:"+t);
|
Assert.fail("wrong exception:"+t);
|
||||||
}
|
}
|
||||||
@ -1055,4 +1054,36 @@ public void testGetServerDefaults() throws IOException {
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testDFSClientPeerWriteTimeout() throws IOException {
|
||||||
|
final int timeout = 1000;
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
|
||||||
|
|
||||||
|
// only need cluster to create a dfs client to get a peer
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
// Write 1 MB to a dummy socket to ensure the write times out
|
||||||
|
ServerSocket socket = new ServerSocket(0);
|
||||||
|
Peer peer = dfs.getClient().newConnectedPeer(
|
||||||
|
(InetSocketAddress) socket.getLocalSocketAddress(), null, null);
|
||||||
|
long start = Time.now();
|
||||||
|
try {
|
||||||
|
byte[] buf = new byte[1024 * 1024];
|
||||||
|
peer.getOutputStream().write(buf);
|
||||||
|
Assert.fail("write should timeout");
|
||||||
|
} catch (SocketTimeoutException ste) {
|
||||||
|
long delta = Time.now() - start;
|
||||||
|
Assert.assertTrue("write timedout too soon", delta >= timeout * 0.9);
|
||||||
|
Assert.assertTrue("write timedout too late", delta <= timeout * 1.1);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
Assert.fail("wrong exception:" + t);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user