HDFS-4911. Reduce PeerCache timeout to be commensurate with dfs.datanode.socket.reuse.keepalive (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1565435 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-02-06 21:08:09 +00:00
parent b527a975a4
commit e0cda48959
4 changed files with 108 additions and 45 deletions

View File

@ -308,6 +308,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe) HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe)
HDFS-4911. Reduce PeerCache timeout to be commensurate with
dfs.datanode.socket.reuse.keepalive (cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -86,7 +86,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT = 10; public static final int DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec"; public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000; public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000;
public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis"; public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis";
public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
@ -215,7 +215,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose"; public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false; public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive"; public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000; public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 4000;
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -25,6 +26,7 @@ import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -118,6 +120,11 @@ class PeerCache {
return instance; return instance;
} }
@VisibleForTesting
public static synchronized void setInstance(int c, long e) {
instance = new PeerCache(c, e);
}
private boolean isDaemonStarted() { private boolean isDaemonStarted() {
return (daemon == null)? false: true; return (daemon == null)? false: true;
} }
@ -171,8 +178,17 @@ class PeerCache {
while (iter.hasNext()) { while (iter.hasNext()) {
Value candidate = iter.next(); Value candidate = iter.next();
iter.remove(); iter.remove();
if (!candidate.getPeer().isClosed()) { long ageMs = Time.monotonicNow() - candidate.getTime();
return candidate.getPeer(); Peer peer = candidate.getPeer();
if (ageMs >= expiryPeriod) {
try {
peer.close();
} catch (IOException e) {
LOG.warn("got IOException closing stale peer " + peer +
", which is " + ageMs + " ms old");
}
} else if (!peer.isClosed()) {
return peer;
} }
} }
return null; return null;

View File

@ -19,16 +19,19 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import com.google.common.base.Supplier;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -37,10 +40,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -51,10 +52,7 @@ import com.google.common.io.NullOutputStream;
public class TestDataTransferKeepalive { public class TestDataTransferKeepalive {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private FileSystem fs;
private InetSocketAddress dnAddr;
private DataNode dn; private DataNode dn;
private DFSClient dfsClient;
private static Path TEST_FILE = new Path("/test"); private static Path TEST_FILE = new Path("/test");
private static final int KEEPALIVE_TIMEOUT = 1000; private static final int KEEPALIVE_TIMEOUT = 1000;
@ -69,15 +67,7 @@ public class TestDataTransferKeepalive {
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build(); .numDataNodes(1).build();
fs = cluster.getFileSystem();
dfsClient = ((DistributedFileSystem)fs).dfs;
dfsClient.peerCache.clear();
String poolId = cluster.getNamesystem().getBlockPoolId();
dn = cluster.getDataNodes().get(0); dn = cluster.getDataNodes().get(0);
DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
dn, poolId);
dnAddr = NetUtils.createSocketAddr(dnReg.getXferAddr());
} }
@After @After
@ -90,34 +80,86 @@ public class TestDataTransferKeepalive {
* its configured keepalive timeout. * its configured keepalive timeout.
*/ */
@Test(timeout=30000) @Test(timeout=30000)
public void testKeepaliveTimeouts() throws Exception { public void testDatanodeRespectsKeepAliveTimeout() throws Exception {
Configuration clientConf = new Configuration(conf);
// Set a client socket cache expiry time much longer than
// the datanode-side expiration time.
final long CLIENT_EXPIRY_MS = 60000L;
clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
DistributedFileSystem fs =
(DistributedFileSystem)FileSystem.get(cluster.getURI(),
clientConf);
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
// Clients that write aren't currently re-used. // Clients that write aren't currently re-used.
assertEquals(0, dfsClient.peerCache.size()); assertEquals(0, fs.dfs.peerCache.size());
assertXceiverCount(0); assertXceiverCount(0);
// Reads the file, so we should get a // Reads the file, so we should get a
// cached socket, and should have an xceiver on the other side. // cached socket, and should have an xceiver on the other side.
DFSTestUtil.readFile(fs, TEST_FILE); DFSTestUtil.readFile(fs, TEST_FILE);
assertEquals(1, dfsClient.peerCache.size()); assertEquals(1, fs.dfs.peerCache.size());
assertXceiverCount(1); assertXceiverCount(1);
// Sleep for a bit longer than the keepalive timeout // Sleep for a bit longer than the keepalive timeout
// and make sure the xceiver died. // and make sure the xceiver died.
Thread.sleep(KEEPALIVE_TIMEOUT * 2); Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 1);
assertXceiverCount(0); assertXceiverCount(0);
// The socket is still in the cache, because we don't // The socket is still in the cache, because we don't
// notice that it's closed until we try to read // notice that it's closed until we try to read
// from it again. // from it again.
assertEquals(1, dfsClient.peerCache.size()); assertEquals(1, fs.dfs.peerCache.size());
// Take it out of the cache - reading should // Take it out of the cache - reading should
// give an EOF. // give an EOF.
Peer peer = dfsClient.peerCache.get(dn.getDatanodeId(), false); Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false);
assertNotNull(peer); assertNotNull(peer);
assertEquals(-1, peer.getInputStream().read()); assertEquals(-1, peer.getInputStream().read());
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT,
DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
}
/**
* Test that the client respects its keepalive timeout.
*/
@Test(timeout=30000)
public void testClientResponsesKeepAliveTimeout() throws Exception {
Configuration clientConf = new Configuration(conf);
// Set a client socket cache expiry time much shorter than
// the datanode-side expiration time.
final long CLIENT_EXPIRY_MS = 10L;
clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
DistributedFileSystem fs =
(DistributedFileSystem)FileSystem.get(cluster.getURI(),
clientConf);
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
// Clients that write aren't currently re-used.
assertEquals(0, fs.dfs.peerCache.size());
assertXceiverCount(0);
// Reads the file, so we should get a
// cached socket, and should have an xceiver on the other side.
DFSTestUtil.readFile(fs, TEST_FILE);
assertEquals(1, fs.dfs.peerCache.size());
assertXceiverCount(1);
// Sleep for a bit longer than the client keepalive timeout.
Thread.sleep(CLIENT_EXPIRY_MS + 1);
// Taking out a peer which is expired should give a null.
Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false);
assertTrue(peer == null);
// The socket cache is now empty.
assertEquals(0, fs.dfs.peerCache.size());
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT,
DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
} }
/** /**
@ -125,8 +167,17 @@ public class TestDataTransferKeepalive {
* read bytes off the stream quickly. The datanode should time out sending the * read bytes off the stream quickly. The datanode should time out sending the
* chunks and the transceiver should die, even if it has a long keepalive. * chunks and the transceiver should die, even if it has a long keepalive.
*/ */
@Test(timeout=30000) @Test(timeout=300000)
public void testSlowReader() throws Exception { public void testSlowReader() throws Exception {
// Set a client socket cache expiry time much longer than
// the datanode-side expiration time.
final long CLIENT_EXPIRY_MS = 600000L;
Configuration clientConf = new Configuration(conf);
clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
DistributedFileSystem fs =
(DistributedFileSystem)FileSystem.get(cluster.getURI(),
clientConf);
// Restart the DN with a shorter write timeout. // Restart the DN with a shorter write timeout.
DataNodeProperties props = cluster.stopDataNode(0); DataNodeProperties props = cluster.stopDataNode(0);
props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
@ -134,38 +185,31 @@ public class TestDataTransferKeepalive {
props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
120000); 120000);
assertTrue(cluster.restartDataNode(props, true)); assertTrue(cluster.restartDataNode(props, true));
dn = cluster.getDataNodes().get(0);
// Wait for heartbeats to avoid a startup race where we // Wait for heartbeats to avoid a startup race where we
// try to write the block while the DN is still starting. // try to write the block while the DN is still starting.
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
dn = cluster.getDataNodes().get(0);
DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L); DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L);
FSDataInputStream stm = fs.open(TEST_FILE); FSDataInputStream stm = fs.open(TEST_FILE);
try {
stm.read(); stm.read();
assertXceiverCount(1); assertXceiverCount(1);
// Poll for 0 running xceivers. Allow up to 5 seconds for some slack. GenericTestUtils.waitFor(new Supplier<Boolean>() {
long totalSleepTime = 0; public Boolean get() {
long sleepTime = WRITE_TIMEOUT + 100;
while (getXceiverCountWithoutServer() > 0 && totalSleepTime < 5000) {
Thread.sleep(sleepTime);
totalSleepTime += sleepTime;
sleepTime = 100;
}
// DN should time out in sendChunks, and this should force // DN should time out in sendChunks, and this should force
// the xceiver to exit. // the xceiver to exit.
assertXceiverCount(0); return getXceiverCountWithoutServer() == 0;
} finally {
IOUtils.closeStream(stm);
} }
}, 500, 50000);
IOUtils.closeStream(stm);
} }
@Test(timeout=30000) @Test(timeout=30000)
public void testManyClosedSocketsInCache() throws Exception { public void testManyClosedSocketsInCache() throws Exception {
// Make a small file // Make a small file
DistributedFileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
// Insert a bunch of dead sockets in the cache, by opening // Insert a bunch of dead sockets in the cache, by opening