HDFS-4911. Reduce PeerCache timeout to be commensurate with dfs.datanode.socket.reuse.keepalive (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1565438 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
89fbc0ce18
commit
a0d87e3f0c
|
@ -22,6 +22,9 @@ Release 2.4.0 - UNRELEASED
|
|||
|
||||
HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe)
|
||||
|
||||
HDFS-4911. Reduce PeerCache timeout to be commensurate with
|
||||
dfs.datanode.socket.reuse.keepalive (cmccabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||
|
|
|
@ -86,7 +86,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final int DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
|
||||
|
||||
public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
|
||||
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
|
||||
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000;
|
||||
public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis";
|
||||
public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms
|
||||
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
|
||||
|
@ -215,7 +215,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
|
||||
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
|
||||
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
|
||||
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
|
||||
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 4000;
|
||||
|
||||
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
|
||||
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
@ -25,6 +26,7 @@ import java.util.Map.Entry;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.LinkedListMultimap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
|
@ -118,6 +120,11 @@ class PeerCache {
|
|||
return instance;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static synchronized void setInstance(int c, long e) {
|
||||
instance = new PeerCache(c, e);
|
||||
}
|
||||
|
||||
private boolean isDaemonStarted() {
|
||||
return (daemon == null)? false: true;
|
||||
}
|
||||
|
@ -171,8 +178,17 @@ class PeerCache {
|
|||
while (iter.hasNext()) {
|
||||
Value candidate = iter.next();
|
||||
iter.remove();
|
||||
if (!candidate.getPeer().isClosed()) {
|
||||
return candidate.getPeer();
|
||||
long ageMs = Time.monotonicNow() - candidate.getTime();
|
||||
Peer peer = candidate.getPeer();
|
||||
if (ageMs >= expiryPeriod) {
|
||||
try {
|
||||
peer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("got IOException closing stale peer " + peer +
|
||||
", which is " + ageMs + " ms old");
|
||||
}
|
||||
} else if (!peer.isClosed()) {
|
||||
return peer;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -19,16 +19,19 @@ package org.apache.hadoop.hdfs;
|
|||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
@ -37,10 +40,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -51,10 +52,7 @@ import com.google.common.io.NullOutputStream;
|
|||
public class TestDataTransferKeepalive {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
private MiniDFSCluster cluster;
|
||||
private FileSystem fs;
|
||||
private InetSocketAddress dnAddr;
|
||||
private DataNode dn;
|
||||
private DFSClient dfsClient;
|
||||
private static Path TEST_FILE = new Path("/test");
|
||||
|
||||
private static final int KEEPALIVE_TIMEOUT = 1000;
|
||||
|
@ -69,15 +67,7 @@ public class TestDataTransferKeepalive {
|
|||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build();
|
||||
fs = cluster.getFileSystem();
|
||||
dfsClient = ((DistributedFileSystem)fs).dfs;
|
||||
dfsClient.peerCache.clear();
|
||||
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
dn = cluster.getDataNodes().get(0);
|
||||
DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
|
||||
dn, poolId);
|
||||
dnAddr = NetUtils.createSocketAddr(dnReg.getXferAddr());
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -90,34 +80,86 @@ public class TestDataTransferKeepalive {
|
|||
* its configured keepalive timeout.
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testKeepaliveTimeouts() throws Exception {
|
||||
public void testDatanodeRespectsKeepAliveTimeout() throws Exception {
|
||||
Configuration clientConf = new Configuration(conf);
|
||||
// Set a client socket cache expiry time much longer than
|
||||
// the datanode-side expiration time.
|
||||
final long CLIENT_EXPIRY_MS = 60000L;
|
||||
clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
|
||||
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
|
||||
DistributedFileSystem fs =
|
||||
(DistributedFileSystem)FileSystem.get(cluster.getURI(),
|
||||
clientConf);
|
||||
|
||||
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
|
||||
|
||||
// Clients that write aren't currently re-used.
|
||||
assertEquals(0, dfsClient.peerCache.size());
|
||||
assertEquals(0, fs.dfs.peerCache.size());
|
||||
assertXceiverCount(0);
|
||||
|
||||
// Reads the file, so we should get a
|
||||
// cached socket, and should have an xceiver on the other side.
|
||||
DFSTestUtil.readFile(fs, TEST_FILE);
|
||||
assertEquals(1, dfsClient.peerCache.size());
|
||||
assertEquals(1, fs.dfs.peerCache.size());
|
||||
assertXceiverCount(1);
|
||||
|
||||
// Sleep for a bit longer than the keepalive timeout
|
||||
// and make sure the xceiver died.
|
||||
Thread.sleep(KEEPALIVE_TIMEOUT * 2);
|
||||
Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 1);
|
||||
assertXceiverCount(0);
|
||||
|
||||
// The socket is still in the cache, because we don't
|
||||
// notice that it's closed until we try to read
|
||||
// from it again.
|
||||
assertEquals(1, dfsClient.peerCache.size());
|
||||
assertEquals(1, fs.dfs.peerCache.size());
|
||||
|
||||
// Take it out of the cache - reading should
|
||||
// give an EOF.
|
||||
Peer peer = dfsClient.peerCache.get(dn.getDatanodeId(), false);
|
||||
Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false);
|
||||
assertNotNull(peer);
|
||||
assertEquals(-1, peer.getInputStream().read());
|
||||
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT,
|
||||
DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the client respects its keepalive timeout.
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testClientResponsesKeepAliveTimeout() throws Exception {
|
||||
Configuration clientConf = new Configuration(conf);
|
||||
// Set a client socket cache expiry time much shorter than
|
||||
// the datanode-side expiration time.
|
||||
final long CLIENT_EXPIRY_MS = 10L;
|
||||
clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
|
||||
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
|
||||
DistributedFileSystem fs =
|
||||
(DistributedFileSystem)FileSystem.get(cluster.getURI(),
|
||||
clientConf);
|
||||
|
||||
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
|
||||
|
||||
// Clients that write aren't currently re-used.
|
||||
assertEquals(0, fs.dfs.peerCache.size());
|
||||
assertXceiverCount(0);
|
||||
|
||||
// Reads the file, so we should get a
|
||||
// cached socket, and should have an xceiver on the other side.
|
||||
DFSTestUtil.readFile(fs, TEST_FILE);
|
||||
assertEquals(1, fs.dfs.peerCache.size());
|
||||
assertXceiverCount(1);
|
||||
|
||||
// Sleep for a bit longer than the client keepalive timeout.
|
||||
Thread.sleep(CLIENT_EXPIRY_MS + 1);
|
||||
|
||||
// Taking out a peer which is expired should give a null.
|
||||
Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false);
|
||||
assertTrue(peer == null);
|
||||
|
||||
// The socket cache is now empty.
|
||||
assertEquals(0, fs.dfs.peerCache.size());
|
||||
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT,
|
||||
DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,8 +167,17 @@ public class TestDataTransferKeepalive {
|
|||
* read bytes off the stream quickly. The datanode should time out sending the
|
||||
* chunks and the transceiver should die, even if it has a long keepalive.
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
@Test(timeout=300000)
|
||||
public void testSlowReader() throws Exception {
|
||||
// Set a client socket cache expiry time much longer than
|
||||
// the datanode-side expiration time.
|
||||
final long CLIENT_EXPIRY_MS = 600000L;
|
||||
Configuration clientConf = new Configuration(conf);
|
||||
clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
|
||||
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
|
||||
DistributedFileSystem fs =
|
||||
(DistributedFileSystem)FileSystem.get(cluster.getURI(),
|
||||
clientConf);
|
||||
// Restart the DN with a shorter write timeout.
|
||||
DataNodeProperties props = cluster.stopDataNode(0);
|
||||
props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
||||
|
@ -134,38 +185,31 @@ public class TestDataTransferKeepalive {
|
|||
props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
|
||||
120000);
|
||||
assertTrue(cluster.restartDataNode(props, true));
|
||||
dn = cluster.getDataNodes().get(0);
|
||||
// Wait for heartbeats to avoid a startup race where we
|
||||
// try to write the block while the DN is still starting.
|
||||
cluster.triggerHeartbeats();
|
||||
|
||||
dn = cluster.getDataNodes().get(0);
|
||||
|
||||
DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L);
|
||||
FSDataInputStream stm = fs.open(TEST_FILE);
|
||||
try {
|
||||
stm.read();
|
||||
assertXceiverCount(1);
|
||||
|
||||
// Poll for 0 running xceivers. Allow up to 5 seconds for some slack.
|
||||
long totalSleepTime = 0;
|
||||
long sleepTime = WRITE_TIMEOUT + 100;
|
||||
while (getXceiverCountWithoutServer() > 0 && totalSleepTime < 5000) {
|
||||
Thread.sleep(sleepTime);
|
||||
totalSleepTime += sleepTime;
|
||||
sleepTime = 100;
|
||||
}
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
// DN should time out in sendChunks, and this should force
|
||||
// the xceiver to exit.
|
||||
assertXceiverCount(0);
|
||||
} finally {
|
||||
IOUtils.closeStream(stm);
|
||||
return getXceiverCountWithoutServer() == 0;
|
||||
}
|
||||
}, 500, 50000);
|
||||
|
||||
IOUtils.closeStream(stm);
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testManyClosedSocketsInCache() throws Exception {
|
||||
// Make a small file
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
|
||||
|
||||
// Insert a bunch of dead sockets in the cache, by opening
|
||||
|
|
Loading…
Reference in New Issue