From a7bcdcc0518595b7d94383606ab8e9aa711292b0 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 26 Sep 2012 13:23:21 +0000 Subject: [PATCH] HDFS-3373. Change DFSClient input stream socket cache to global static and add a thread to cleanup expired cache entries. Contributed by John George git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1390466 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSClient.java | 9 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../org/apache/hadoop/hdfs/SocketCache.java | 196 +++++++++++++----- .../org/apache/hadoop/hdfs/TestConnCache.java | 89 +++++--- .../hdfs/TestDistributedFileSystem.java | 3 - .../apache/hadoop/hdfs/TestSocketCache.java | 171 +++++++++++++++ 7 files changed, 384 insertions(+), 89 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e7a4ab80868..c1918a3565d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -235,6 +235,9 @@ Release 2.0.3-alpha - Unreleased HDFS-3939. NN RPC address cleanup. (eli) + HDFS-3373. Change DFSClient input stream socket cache to global static and add + a thread to cleanup expired cache entries. (John George via szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 92179fe99dd..d29ab1d0a89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; @@ -209,6 +211,7 @@ public class DFSClient implements java.io.Closeable { final int writePacketSize; final int socketTimeout; final int socketCacheCapacity; + final long socketCacheExpiry; /** Wait time window (in msec) if BlockMissingException is caught */ final int timeWindow; final int nCachedConnRetry; @@ -257,6 +260,8 @@ public class DFSClient implements java.io.Closeable { taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); + socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, + DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 10 * defaultBlockSize); timeWindow = conf @@ -427,7 +432,7 @@ public class DFSClient implements java.io.Closeable { Joiner.on(',').join(localInterfaceAddrs) + "]"); } - this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); + this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry); } /** @@ -641,7 +646,6 @@ public class DFSClient implements java.io.Closeable { void abort() { clientRunning = false; closeAllFilesBeingWritten(true); - socketCache.clear(); try { // remove reference to this client and stop the renewer, @@ -688,7 +692,6 @@ public class DFSClient implements java.io.Closeable { public synchronized void close() throws IOException { if(clientRunning) { closeAllFilesBeingWritten(false); - socketCache.clear(); clientRunning = false; getLeaseRenewer().closeClient(this); // close connections to the namenode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 509fc80e850..441a9056a47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -74,6 +74,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts"; public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0; + public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec"; + public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000; public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100"; public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java index 2fa7b55d440..06d2a2baeb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java @@ -26,33 +26,112 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.io.IOException; import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; /** - * A cache of sockets. + * A cache of input stream sockets to Data Node. */ class SocketCache { - static final Log LOG = LogFactory.getLog(SocketCache.class); + private static final Log LOG = LogFactory.getLog(SocketCache.class); - private final LinkedListMultimap multimap; - private final int capacity; - - /** - * Create a SocketCache with the given capacity. - * @param capacity Max cache size. - */ - public SocketCache(int capacity) { - multimap = LinkedListMultimap.create(); - this.capacity = capacity; - if (capacity <= 0) { - LOG.debug("SocketCache disabled in configuration."); + @InterfaceAudience.Private + static class SocketAndStreams implements Closeable { + public final Socket sock; + public final IOStreamPair ioStreams; + long createTime; + + public SocketAndStreams(Socket s, IOStreamPair ioStreams) { + this.sock = s; + this.ioStreams = ioStreams; + this.createTime = System.currentTimeMillis(); } + + @Override + public void close() { + if (ioStreams != null) { + IOUtils.closeStream(ioStreams.in); + IOUtils.closeStream(ioStreams.out); + } + IOUtils.closeSocket(sock); + } + + public long getCreateTime() { + return this.createTime; + } + } + + private Daemon daemon; + /** A map for per user per datanode. */ + private static LinkedListMultimap multimap = + LinkedListMultimap.create(); + private static int capacity; + private static long expiryPeriod; + private static SocketCache scInstance = new SocketCache(); + private static boolean isInitedOnce = false; + + public static synchronized SocketCache getInstance(int c, long e) { + // capacity is only initialized once + if (isInitedOnce == false) { + capacity = c; + expiryPeriod = e; + + if (capacity == 0 ) { + LOG.info("SocketCache disabled."); + } + else if (expiryPeriod == 0) { + throw new IllegalStateException("Cannot initialize expiryPeriod to " + + expiryPeriod + "when cache is enabled."); + } + isInitedOnce = true; + } else { //already initialized once + if (capacity != c || expiryPeriod != e) { + LOG.info("capacity and expiry periods already set to " + capacity + + " and " + expiryPeriod + " respectively. Cannot set it to " + c + + " and " + e); + } + } + + return scInstance; + } + + private boolean isDaemonStarted() { + return (daemon == null)? false: true; + } + + private synchronized void startExpiryDaemon() { + // start daemon only if not already started + if (isDaemonStarted() == true) { + return; + } + + daemon = new Daemon(new Runnable() { + @Override + public void run() { + try { + SocketCache.this.run(); + } catch(InterruptedException e) { + //noop + } finally { + SocketCache.this.clear(); + } + } + + @Override + public String toString() { + return String.valueOf(SocketCache.this); + } + }); + daemon.start(); } /** @@ -61,16 +140,17 @@ class SocketCache { * @return A socket with unknown state, possibly closed underneath. Or null. */ public synchronized SocketAndStreams get(SocketAddress remote) { + if (capacity <= 0) { // disabled return null; } - - List socklist = multimap.get(remote); - if (socklist == null) { + + List sockStreamList = multimap.get(remote); + if (sockStreamList == null) { return null; } - Iterator iter = socklist.iterator(); + Iterator iter = sockStreamList.iterator(); while (iter.hasNext()) { SocketAndStreams candidate = iter.next(); iter.remove(); @@ -86,14 +166,16 @@ class SocketCache { * @param sock socket not used by anyone. */ public synchronized void put(Socket sock, IOStreamPair ioStreams) { + + Preconditions.checkNotNull(sock); SocketAndStreams s = new SocketAndStreams(sock, ioStreams); if (capacity <= 0) { // Cache disabled. s.close(); return; } - - Preconditions.checkNotNull(sock); + + startExpiryDaemon(); SocketAddress remoteAddr = sock.getRemoteSocketAddress(); if (remoteAddr == null) { @@ -106,13 +188,33 @@ class SocketCache { if (capacity == multimap.size()) { evictOldest(); } - multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams)); + multimap.put(remoteAddr, s); } public synchronized int size() { return multimap.size(); } + /** + * Evict and close sockets older than expiry period from the cache. + */ + private synchronized void evictExpired(long expiryPeriod) { + while (multimap.size() != 0) { + Iterator> iter = + multimap.entries().iterator(); + Entry entry = iter.next(); + // if oldest socket expired, remove it + if (entry == null || + System.currentTimeMillis() - entry.getValue().getCreateTime() < + expiryPeriod) { + break; + } + iter.remove(); + SocketAndStreams s = entry.getValue(); + s.close(); + } + } + /** * Evict the oldest entry in the cache. */ @@ -120,7 +222,8 @@ class SocketCache { Iterator> iter = multimap.entries().iterator(); if (!iter.hasNext()) { - throw new IllegalStateException("Cannot evict from empty cache!"); + throw new IllegalStateException("Cannot evict from empty cache! " + + "capacity: " + capacity); } Entry entry = iter.next(); iter.remove(); @@ -128,39 +231,32 @@ class SocketCache { s.close(); } + /** + * Periodically check in the cache and expire the entries + * older than expiryPeriod minutes + */ + private void run() throws InterruptedException { + for(long lastExpiryTime = System.currentTimeMillis(); + !Thread.interrupted(); + Thread.sleep(expiryPeriod)) { + final long elapsed = System.currentTimeMillis() - lastExpiryTime; + if (elapsed >= expiryPeriod) { + evictExpired(expiryPeriod); + lastExpiryTime = System.currentTimeMillis(); + } + } + clear(); + throw new InterruptedException("Daemon Interrupted"); + } + /** * Empty the cache, and close all sockets. */ - public synchronized void clear() { - for (SocketAndStreams s : multimap.values()) { - s.close(); + private synchronized void clear() { + for (SocketAndStreams sockAndStream : multimap.values()) { + sockAndStream.close(); } multimap.clear(); } - @Override - protected void finalize() { - clear(); - } - - @InterfaceAudience.Private - static class SocketAndStreams implements Closeable { - public final Socket sock; - public final IOStreamPair ioStreams; - - public SocketAndStreams(Socket s, IOStreamPair ioStreams) { - this.sock = s; - this.ioStreams = ioStreams; - } - - @Override - public void close() { - if (ioStreams != null) { - IOUtils.closeStream(ioStreams.in); - IOUtils.closeStream(ioStreams.out); - } - IOUtils.closeSocket(sock); - } - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index 327aa8fa22d..d9020e0bd0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.security.PrivilegedExceptionAction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,10 +55,12 @@ public class TestConnCache { static final int BLOCK_SIZE = 4096; static final int FILE_SIZE = 3 * BLOCK_SIZE; - + final static int CACHE_SIZE = 4; + final static long CACHE_EXPIRY_MS = 200; static Configuration conf = null; static MiniDFSCluster cluster = null; static FileSystem fs = null; + static SocketCache cache; static final Path testFile = new Path("/testConnCache.dat"); static byte authenticData[] = null; @@ -93,6 +96,9 @@ public class TestConnCache { public static void setupCluster() throws Exception { final int REPLICATION_FACTOR = 1; + /* create a socket cache. There is only one socket cache per jvm */ + cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS); + util = new BlockReaderTestUtil(REPLICATION_FACTOR); cluster = util.getCluster(); conf = util.getConf(); @@ -142,10 +148,7 @@ public class TestConnCache { * Test the SocketCache itself. */ @Test - public void testSocketCache() throws IOException { - final int CACHE_SIZE = 4; - SocketCache cache = new SocketCache(CACHE_SIZE); - + public void testSocketCache() throws Exception { // Make a client InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort()); @@ -159,6 +162,7 @@ public class TestConnCache { DataNode dn = util.getDataNode(block); InetSocketAddress dnAddr = dn.getXferAddress(); + // Make some sockets to the DN Socket[] dnSockets = new Socket[CACHE_SIZE]; for (int i = 0; i < dnSockets.length; ++i) { @@ -166,6 +170,7 @@ public class TestConnCache { dnAddr.getAddress(), dnAddr.getPort()); } + // Insert a socket to the NN Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort()); cache.put(nnSock, null); @@ -179,7 +184,7 @@ public class TestConnCache { assertEquals("NN socket evicted", null, cache.get(nnAddr)); assertTrue("Evicted socket closed", nnSock.isClosed()); - + // Lookup the DN socks for (Socket dnSock : dnSockets) { assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock); @@ -189,6 +194,51 @@ public class TestConnCache { assertEquals("Cache is empty", 0, cache.size()); } + + /** + * Test the SocketCache expiry. + * Verify that socket cache entries expire after the set + * expiry time. + */ + @Test + public void testSocketCacheExpiry() throws Exception { + // Make a client + InetSocketAddress nnAddr = + new InetSocketAddress("localhost", cluster.getNameNodePort()); + DFSClient client = new DFSClient(nnAddr, conf); + + // Find out the DN addr + LocatedBlock block = + client.getNamenode().getBlockLocations( + testFile.toString(), 0, FILE_SIZE) + .getLocatedBlocks().get(0); + DataNode dn = util.getDataNode(block); + InetSocketAddress dnAddr = dn.getXferAddress(); + + + // Make some sockets to the DN and put in cache + Socket[] dnSockets = new Socket[CACHE_SIZE]; + for (int i = 0; i < dnSockets.length; ++i) { + dnSockets[i] = client.socketFactory.createSocket( + dnAddr.getAddress(), dnAddr.getPort()); + cache.put(dnSockets[i], null); + } + + // Client side still has the sockets cached + assertEquals(CACHE_SIZE, client.socketCache.size()); + + //sleep for a second and see if it expired + Thread.sleep(CACHE_EXPIRY_MS + 1000); + + // Client side has no sockets cached + assertEquals(0, client.socketCache.size()); + + //sleep for another second and see if + //the daemon thread runs fine on empty cache + Thread.sleep(CACHE_EXPIRY_MS + 1000); + } + + /** * Read a file served entirely from one DN. Seek around and read from * different offsets. And verify that they all use the same socket. @@ -229,33 +279,6 @@ public class TestConnCache { in.close(); } - - /** - * Test that the socket cache can be disabled by setting the capacity to - * 0. Regression test for HDFS-3365. - */ - @Test - public void testDisableCache() throws IOException { - LOG.info("Starting testDisableCache()"); - - // Reading with the normally configured filesystem should - // cache a socket. - DFSTestUtil.readFile(fs, testFile); - assertEquals(1, ((DistributedFileSystem)fs).dfs.socketCache.size()); - - // Configure a new instance with no caching, ensure that it doesn't - // cache anything - Configuration confWithoutCache = new Configuration(fs.getConf()); - confWithoutCache.setInt( - DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0); - FileSystem fsWithoutCache = FileSystem.newInstance(confWithoutCache); - try { - DFSTestUtil.readFile(fsWithoutCache, testFile); - assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size()); - } finally { - fsWithoutCache.close(); - } - } @AfterClass public static void teardownCluster() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index b6687792343..9d2edc65832 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -120,12 +120,9 @@ public class TestDistributedFileSystem { DFSTestUtil.readFile(fileSys, p); DFSClient client = ((DistributedFileSystem)fileSys).dfs; - SocketCache cache = client.socketCache; - assertEquals(1, cache.size()); fileSys.close(); - assertEquals(0, cache.size()); } finally { if (cluster != null) {cluster.shutdown();} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java new file mode 100644 index 00000000000..255d408f824 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.security.token.Token; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * This class tests the client connection caching in a single node + * mini-cluster. + */ +public class TestSocketCache { + static final Log LOG = LogFactory.getLog(TestSocketCache.class); + + static final int BLOCK_SIZE = 4096; + static final int FILE_SIZE = 3 * BLOCK_SIZE; + final static int CACHE_SIZE = 4; + final static long CACHE_EXPIRY_MS = 200; + static Configuration conf = null; + static MiniDFSCluster cluster = null; + static FileSystem fs = null; + static SocketCache cache; + + static final Path testFile = new Path("/testConnCache.dat"); + static byte authenticData[] = null; + + static BlockReaderTestUtil util = null; + + + /** + * A mock Answer to remember the BlockReader used. + * + * It verifies that all invocation to DFSInputStream.getBlockReader() + * use the same socket. + */ + private class MockGetBlockReader implements Answer { + public RemoteBlockReader2 reader = null; + private Socket sock = null; + + @Override + public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable { + RemoteBlockReader2 prevReader = reader; + reader = (RemoteBlockReader2) invocation.callRealMethod(); + if (sock == null) { + sock = reader.dnSock; + } else if (prevReader != null) { + assertSame("DFSInputStream should use the same socket", + sock, reader.dnSock); + } + return reader; + } + } + + @BeforeClass + public static void setupCluster() throws Exception { + final int REPLICATION_FACTOR = 1; + + HdfsConfiguration confWithoutCache = new HdfsConfiguration(); + confWithoutCache.setInt( + DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0); + util = new BlockReaderTestUtil(REPLICATION_FACTOR, confWithoutCache); + cluster = util.getCluster(); + conf = util.getConf(); + + authenticData = util.writeFile(testFile, FILE_SIZE / 1024); + } + + + /** + * (Optionally) seek to position, read and verify data. + * + * Seek to specified position if pos is non-negative. + */ + private void pread(DFSInputStream in, + long pos, + byte[] buffer, + int offset, + int length) + throws IOException { + assertTrue("Test buffer too small", buffer.length >= offset + length); + + if (pos >= 0) + in.seek(pos); + + LOG.info("Reading from file of size " + in.getFileLength() + + " at offset " + in.getPos()); + + while (length > 0) { + int cnt = in.read(buffer, offset, length); + assertTrue("Error in read", cnt > 0); + offset += cnt; + length -= cnt; + } + + // Verify + for (int i = 0; i < length; ++i) { + byte actual = buffer[i]; + byte expect = authenticData[(int)pos + i]; + assertEquals("Read data mismatch at file offset " + (pos + i) + + ". Expects " + expect + "; got " + actual, + actual, expect); + } + } + + + /** + * Test that the socket cache can be disabled by setting the capacity to + * 0. Regression test for HDFS-3365. + */ + @Test + public void testDisableCache() throws IOException { + LOG.info("Starting testDisableCache()"); + + // Configure a new instance with no caching, ensure that it doesn't + // cache anything + + FileSystem fsWithoutCache = FileSystem.newInstance(conf); + try { + DFSTestUtil.readFile(fsWithoutCache, testFile); + assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size()); + } finally { + fsWithoutCache.close(); + } + } + + @AfterClass + public static void teardownCluster() throws Exception { + util.shutdown(); + } +}