HDFS-3373. Change DFSClient input stream socket cache to global static and add a thread to cleanup expired cache entries. Contributed by John George

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1390466 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-09-26 13:23:21 +00:00
parent 462ba6a3de
commit a7bcdcc051
7 changed files with 384 additions and 89 deletions

View File

@ -235,6 +235,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-3939. NN RPC address cleanup. (eli)
HDFS-3373. Change DFSClient input stream socket cache to global static and add
a thread to cleanup expired cache entries. (John George via szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@ -209,6 +211,7 @@ public class DFSClient implements java.io.Closeable {
final int writePacketSize;
final int socketTimeout;
final int socketCacheCapacity;
final long socketCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */
final int timeWindow;
final int nCachedConnRetry;
@ -257,6 +260,8 @@ public class DFSClient implements java.io.Closeable {
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
timeWindow = conf
@ -427,7 +432,7 @@ public class DFSClient implements java.io.Closeable {
Joiner.on(',').join(localInterfaceAddrs) + "]");
}
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
}
/**
@ -641,7 +646,6 @@ public class DFSClient implements java.io.Closeable {
void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
socketCache.clear();
try {
// remove reference to this client and stop the renewer,
@ -688,7 +692,6 @@ public class DFSClient implements java.io.Closeable {
public synchronized void close() throws IOException {
if(clientRunning) {
closeAllFilesBeingWritten(false);
socketCache.clear();
clientRunning = false;
getLeaseRenewer().closeClient(this);
// close connections to the namenode

View File

@ -74,6 +74,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";

View File

@ -26,33 +26,112 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.io.IOException;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
/**
* A cache of sockets.
* A cache of input stream sockets to Data Node.
*/
class SocketCache {
static final Log LOG = LogFactory.getLog(SocketCache.class);
private static final Log LOG = LogFactory.getLog(SocketCache.class);
private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
private final int capacity;
/**
* Create a SocketCache with the given capacity.
* @param capacity Max cache size.
*/
public SocketCache(int capacity) {
multimap = LinkedListMultimap.create();
this.capacity = capacity;
if (capacity <= 0) {
LOG.debug("SocketCache disabled in configuration.");
@InterfaceAudience.Private
static class SocketAndStreams implements Closeable {
public final Socket sock;
public final IOStreamPair ioStreams;
long createTime;
public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
this.sock = s;
this.ioStreams = ioStreams;
this.createTime = System.currentTimeMillis();
}
@Override
public void close() {
if (ioStreams != null) {
IOUtils.closeStream(ioStreams.in);
IOUtils.closeStream(ioStreams.out);
}
IOUtils.closeSocket(sock);
}
public long getCreateTime() {
return this.createTime;
}
}
private Daemon daemon;
/** A map for per user per datanode. */
private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
LinkedListMultimap.create();
private static int capacity;
private static long expiryPeriod;
private static SocketCache scInstance = new SocketCache();
private static boolean isInitedOnce = false;
public static synchronized SocketCache getInstance(int c, long e) {
// capacity is only initialized once
if (isInitedOnce == false) {
capacity = c;
expiryPeriod = e;
if (capacity == 0 ) {
LOG.info("SocketCache disabled.");
}
else if (expiryPeriod == 0) {
throw new IllegalStateException("Cannot initialize expiryPeriod to " +
expiryPeriod + "when cache is enabled.");
}
isInitedOnce = true;
} else { //already initialized once
if (capacity != c || expiryPeriod != e) {
LOG.info("capacity and expiry periods already set to " + capacity +
" and " + expiryPeriod + " respectively. Cannot set it to " + c +
" and " + e);
}
}
return scInstance;
}
private boolean isDaemonStarted() {
return (daemon == null)? false: true;
}
private synchronized void startExpiryDaemon() {
// start daemon only if not already started
if (isDaemonStarted() == true) {
return;
}
daemon = new Daemon(new Runnable() {
@Override
public void run() {
try {
SocketCache.this.run();
} catch(InterruptedException e) {
//noop
} finally {
SocketCache.this.clear();
}
}
@Override
public String toString() {
return String.valueOf(SocketCache.this);
}
});
daemon.start();
}
/**
@ -61,16 +140,17 @@ class SocketCache {
* @return A socket with unknown state, possibly closed underneath. Or null.
*/
public synchronized SocketAndStreams get(SocketAddress remote) {
if (capacity <= 0) { // disabled
return null;
}
List<SocketAndStreams> socklist = multimap.get(remote);
if (socklist == null) {
List<SocketAndStreams> sockStreamList = multimap.get(remote);
if (sockStreamList == null) {
return null;
}
Iterator<SocketAndStreams> iter = socklist.iterator();
Iterator<SocketAndStreams> iter = sockStreamList.iterator();
while (iter.hasNext()) {
SocketAndStreams candidate = iter.next();
iter.remove();
@ -86,14 +166,16 @@ class SocketCache {
* @param sock socket not used by anyone.
*/
public synchronized void put(Socket sock, IOStreamPair ioStreams) {
Preconditions.checkNotNull(sock);
SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
if (capacity <= 0) {
// Cache disabled.
s.close();
return;
}
Preconditions.checkNotNull(sock);
startExpiryDaemon();
SocketAddress remoteAddr = sock.getRemoteSocketAddress();
if (remoteAddr == null) {
@ -106,13 +188,33 @@ class SocketCache {
if (capacity == multimap.size()) {
evictOldest();
}
multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
multimap.put(remoteAddr, s);
}
public synchronized int size() {
return multimap.size();
}
/**
* Evict and close sockets older than expiry period from the cache.
*/
private synchronized void evictExpired(long expiryPeriod) {
while (multimap.size() != 0) {
Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
multimap.entries().iterator();
Entry<SocketAddress, SocketAndStreams> entry = iter.next();
// if oldest socket expired, remove it
if (entry == null ||
System.currentTimeMillis() - entry.getValue().getCreateTime() <
expiryPeriod) {
break;
}
iter.remove();
SocketAndStreams s = entry.getValue();
s.close();
}
}
/**
* Evict the oldest entry in the cache.
*/
@ -120,7 +222,8 @@ class SocketCache {
Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
throw new IllegalStateException("Cannot evict from empty cache!");
throw new IllegalStateException("Cannot evict from empty cache! " +
"capacity: " + capacity);
}
Entry<SocketAddress, SocketAndStreams> entry = iter.next();
iter.remove();
@ -128,39 +231,32 @@ class SocketCache {
s.close();
}
/**
* Periodically check in the cache and expire the entries
* older than expiryPeriod minutes
*/
private void run() throws InterruptedException {
for(long lastExpiryTime = System.currentTimeMillis();
!Thread.interrupted();
Thread.sleep(expiryPeriod)) {
final long elapsed = System.currentTimeMillis() - lastExpiryTime;
if (elapsed >= expiryPeriod) {
evictExpired(expiryPeriod);
lastExpiryTime = System.currentTimeMillis();
}
}
clear();
throw new InterruptedException("Daemon Interrupted");
}
/**
* Empty the cache, and close all sockets.
*/
public synchronized void clear() {
for (SocketAndStreams s : multimap.values()) {
s.close();
private synchronized void clear() {
for (SocketAndStreams sockAndStream : multimap.values()) {
sockAndStream.close();
}
multimap.clear();
}
@Override
protected void finalize() {
clear();
}
@InterfaceAudience.Private
static class SocketAndStreams implements Closeable {
public final Socket sock;
public final IOStreamPair ioStreams;
public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
this.sock = s;
this.ioStreams = ioStreams;
}
@Override
public void close() {
if (ioStreams != null) {
IOUtils.closeStream(ioStreams.in);
IOUtils.closeStream(ioStreams.out);
}
IOUtils.closeSocket(sock);
}
}
}

View File

@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -54,10 +55,12 @@ public class TestConnCache {
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
final static int CACHE_SIZE = 4;
final static long CACHE_EXPIRY_MS = 200;
static Configuration conf = null;
static MiniDFSCluster cluster = null;
static FileSystem fs = null;
static SocketCache cache;
static final Path testFile = new Path("/testConnCache.dat");
static byte authenticData[] = null;
@ -93,6 +96,9 @@ public class TestConnCache {
public static void setupCluster() throws Exception {
final int REPLICATION_FACTOR = 1;
/* create a socket cache. There is only one socket cache per jvm */
cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
util = new BlockReaderTestUtil(REPLICATION_FACTOR);
cluster = util.getCluster();
conf = util.getConf();
@ -142,10 +148,7 @@ public class TestConnCache {
* Test the SocketCache itself.
*/
@Test
public void testSocketCache() throws IOException {
final int CACHE_SIZE = 4;
SocketCache cache = new SocketCache(CACHE_SIZE);
public void testSocketCache() throws Exception {
// Make a client
InetSocketAddress nnAddr =
new InetSocketAddress("localhost", cluster.getNameNodePort());
@ -159,6 +162,7 @@ public class TestConnCache {
DataNode dn = util.getDataNode(block);
InetSocketAddress dnAddr = dn.getXferAddress();
// Make some sockets to the DN
Socket[] dnSockets = new Socket[CACHE_SIZE];
for (int i = 0; i < dnSockets.length; ++i) {
@ -166,6 +170,7 @@ public class TestConnCache {
dnAddr.getAddress(), dnAddr.getPort());
}
// Insert a socket to the NN
Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
cache.put(nnSock, null);
@ -179,7 +184,7 @@ public class TestConnCache {
assertEquals("NN socket evicted", null, cache.get(nnAddr));
assertTrue("Evicted socket closed", nnSock.isClosed());
// Lookup the DN socks
for (Socket dnSock : dnSockets) {
assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
@ -189,6 +194,51 @@ public class TestConnCache {
assertEquals("Cache is empty", 0, cache.size());
}
/**
* Test the SocketCache expiry.
* Verify that socket cache entries expire after the set
* expiry time.
*/
@Test
public void testSocketCacheExpiry() throws Exception {
// Make a client
InetSocketAddress nnAddr =
new InetSocketAddress("localhost", cluster.getNameNodePort());
DFSClient client = new DFSClient(nnAddr, conf);
// Find out the DN addr
LocatedBlock block =
client.getNamenode().getBlockLocations(
testFile.toString(), 0, FILE_SIZE)
.getLocatedBlocks().get(0);
DataNode dn = util.getDataNode(block);
InetSocketAddress dnAddr = dn.getXferAddress();
// Make some sockets to the DN and put in cache
Socket[] dnSockets = new Socket[CACHE_SIZE];
for (int i = 0; i < dnSockets.length; ++i) {
dnSockets[i] = client.socketFactory.createSocket(
dnAddr.getAddress(), dnAddr.getPort());
cache.put(dnSockets[i], null);
}
// Client side still has the sockets cached
assertEquals(CACHE_SIZE, client.socketCache.size());
//sleep for a second and see if it expired
Thread.sleep(CACHE_EXPIRY_MS + 1000);
// Client side has no sockets cached
assertEquals(0, client.socketCache.size());
//sleep for another second and see if
//the daemon thread runs fine on empty cache
Thread.sleep(CACHE_EXPIRY_MS + 1000);
}
/**
* Read a file served entirely from one DN. Seek around and read from
* different offsets. And verify that they all use the same socket.
@ -229,33 +279,6 @@ public class TestConnCache {
in.close();
}
/**
* Test that the socket cache can be disabled by setting the capacity to
* 0. Regression test for HDFS-3365.
*/
@Test
public void testDisableCache() throws IOException {
LOG.info("Starting testDisableCache()");
// Reading with the normally configured filesystem should
// cache a socket.
DFSTestUtil.readFile(fs, testFile);
assertEquals(1, ((DistributedFileSystem)fs).dfs.socketCache.size());
// Configure a new instance with no caching, ensure that it doesn't
// cache anything
Configuration confWithoutCache = new Configuration(fs.getConf());
confWithoutCache.setInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
FileSystem fsWithoutCache = FileSystem.newInstance(confWithoutCache);
try {
DFSTestUtil.readFile(fsWithoutCache, testFile);
assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
} finally {
fsWithoutCache.close();
}
}
@AfterClass
public static void teardownCluster() throws Exception {

View File

@ -120,12 +120,9 @@ public class TestDistributedFileSystem {
DFSTestUtil.readFile(fileSys, p);
DFSClient client = ((DistributedFileSystem)fileSys).dfs;
SocketCache cache = client.socketCache;
assertEquals(1, cache.size());
fileSys.close();
assertEquals(0, cache.size());
} finally {
if (cluster != null) {cluster.shutdown();}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.security.token.Token;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* This class tests the client connection caching in a single node
* mini-cluster.
*/
public class TestSocketCache {
static final Log LOG = LogFactory.getLog(TestSocketCache.class);
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
final static int CACHE_SIZE = 4;
final static long CACHE_EXPIRY_MS = 200;
static Configuration conf = null;
static MiniDFSCluster cluster = null;
static FileSystem fs = null;
static SocketCache cache;
static final Path testFile = new Path("/testConnCache.dat");
static byte authenticData[] = null;
static BlockReaderTestUtil util = null;
/**
* A mock Answer to remember the BlockReader used.
*
* It verifies that all invocation to DFSInputStream.getBlockReader()
* use the same socket.
*/
private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
public RemoteBlockReader2 reader = null;
private Socket sock = null;
@Override
public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
RemoteBlockReader2 prevReader = reader;
reader = (RemoteBlockReader2) invocation.callRealMethod();
if (sock == null) {
sock = reader.dnSock;
} else if (prevReader != null) {
assertSame("DFSInputStream should use the same socket",
sock, reader.dnSock);
}
return reader;
}
}
@BeforeClass
public static void setupCluster() throws Exception {
final int REPLICATION_FACTOR = 1;
HdfsConfiguration confWithoutCache = new HdfsConfiguration();
confWithoutCache.setInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
util = new BlockReaderTestUtil(REPLICATION_FACTOR, confWithoutCache);
cluster = util.getCluster();
conf = util.getConf();
authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
}
/**
* (Optionally) seek to position, read and verify data.
*
* Seek to specified position if pos is non-negative.
*/
private void pread(DFSInputStream in,
long pos,
byte[] buffer,
int offset,
int length)
throws IOException {
assertTrue("Test buffer too small", buffer.length >= offset + length);
if (pos >= 0)
in.seek(pos);
LOG.info("Reading from file of size " + in.getFileLength() +
" at offset " + in.getPos());
while (length > 0) {
int cnt = in.read(buffer, offset, length);
assertTrue("Error in read", cnt > 0);
offset += cnt;
length -= cnt;
}
// Verify
for (int i = 0; i < length; ++i) {
byte actual = buffer[i];
byte expect = authenticData[(int)pos + i];
assertEquals("Read data mismatch at file offset " + (pos + i) +
". Expects " + expect + "; got " + actual,
actual, expect);
}
}
/**
* Test that the socket cache can be disabled by setting the capacity to
* 0. Regression test for HDFS-3365.
*/
@Test
public void testDisableCache() throws IOException {
LOG.info("Starting testDisableCache()");
// Configure a new instance with no caching, ensure that it doesn't
// cache anything
FileSystem fsWithoutCache = FileSystem.newInstance(conf);
try {
DFSTestUtil.readFile(fsWithoutCache, testFile);
assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
} finally {
fsWithoutCache.close();
}
}
@AfterClass
public static void teardownCluster() throws Exception {
util.shutdown();
}
}