svn merge -c 1390466 from trunk for HDFS-3373. Change DFSClient input stream socket cache to global static and add
a thread to cleanup expired cache entries. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1390470 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6667d885bb
commit
ff5f50532a
|
@ -20,6 +20,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
HDFS-3939. NN RPC address cleanup. (eli)
|
||||
|
||||
HDFS-3373. Change DFSClient input stream socket cache to global static and add
|
||||
a thread to cleanup expired cache entries. (John George via szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
|
||||
|
@ -210,6 +212,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
final int writePacketSize;
|
||||
final int socketTimeout;
|
||||
final int socketCacheCapacity;
|
||||
final long socketCacheExpiry;
|
||||
/** Wait time window (in msec) if BlockMissingException is caught */
|
||||
final int timeWindow;
|
||||
final int nCachedConnRetry;
|
||||
|
@ -258,6 +261,8 @@ public class DFSClient implements java.io.Closeable {
|
|||
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
|
||||
socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
|
||||
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
|
||||
socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
|
||||
DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
|
||||
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
|
||||
10 * defaultBlockSize);
|
||||
timeWindow = conf
|
||||
|
@ -428,7 +433,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
Joiner.on(',').join(localInterfaceAddrs) + "]");
|
||||
}
|
||||
|
||||
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
|
||||
this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -642,7 +647,6 @@ public class DFSClient implements java.io.Closeable {
|
|||
void abort() {
|
||||
clientRunning = false;
|
||||
closeAllFilesBeingWritten(true);
|
||||
socketCache.clear();
|
||||
|
||||
try {
|
||||
// remove reference to this client and stop the renewer,
|
||||
|
@ -689,7 +693,6 @@ public class DFSClient implements java.io.Closeable {
|
|||
public synchronized void close() throws IOException {
|
||||
if(clientRunning) {
|
||||
closeAllFilesBeingWritten(false);
|
||||
socketCache.clear();
|
||||
clientRunning = false;
|
||||
getLeaseRenewer().closeClient(this);
|
||||
// close connections to the namenode
|
||||
|
|
|
@ -74,6 +74,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
|
||||
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
|
||||
|
||||
public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
|
||||
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
|
||||
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
|
||||
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
|
||||
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
|
||||
|
|
|
@ -26,33 +26,112 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import java.io.IOException;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.LinkedListMultimap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* A cache of sockets.
|
||||
* A cache of input stream sockets to Data Node.
|
||||
*/
|
||||
class SocketCache {
|
||||
static final Log LOG = LogFactory.getLog(SocketCache.class);
|
||||
private static final Log LOG = LogFactory.getLog(SocketCache.class);
|
||||
|
||||
private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
|
||||
private final int capacity;
|
||||
@InterfaceAudience.Private
|
||||
static class SocketAndStreams implements Closeable {
|
||||
public final Socket sock;
|
||||
public final IOStreamPair ioStreams;
|
||||
long createTime;
|
||||
|
||||
/**
|
||||
* Create a SocketCache with the given capacity.
|
||||
* @param capacity Max cache size.
|
||||
*/
|
||||
public SocketCache(int capacity) {
|
||||
multimap = LinkedListMultimap.create();
|
||||
this.capacity = capacity;
|
||||
if (capacity <= 0) {
|
||||
LOG.debug("SocketCache disabled in configuration.");
|
||||
public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
|
||||
this.sock = s;
|
||||
this.ioStreams = ioStreams;
|
||||
this.createTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (ioStreams != null) {
|
||||
IOUtils.closeStream(ioStreams.in);
|
||||
IOUtils.closeStream(ioStreams.out);
|
||||
}
|
||||
IOUtils.closeSocket(sock);
|
||||
}
|
||||
|
||||
public long getCreateTime() {
|
||||
return this.createTime;
|
||||
}
|
||||
}
|
||||
|
||||
private Daemon daemon;
|
||||
/** A map for per user per datanode. */
|
||||
private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
|
||||
LinkedListMultimap.create();
|
||||
private static int capacity;
|
||||
private static long expiryPeriod;
|
||||
private static SocketCache scInstance = new SocketCache();
|
||||
private static boolean isInitedOnce = false;
|
||||
|
||||
public static synchronized SocketCache getInstance(int c, long e) {
|
||||
// capacity is only initialized once
|
||||
if (isInitedOnce == false) {
|
||||
capacity = c;
|
||||
expiryPeriod = e;
|
||||
|
||||
if (capacity == 0 ) {
|
||||
LOG.info("SocketCache disabled.");
|
||||
}
|
||||
else if (expiryPeriod == 0) {
|
||||
throw new IllegalStateException("Cannot initialize expiryPeriod to " +
|
||||
expiryPeriod + "when cache is enabled.");
|
||||
}
|
||||
isInitedOnce = true;
|
||||
} else { //already initialized once
|
||||
if (capacity != c || expiryPeriod != e) {
|
||||
LOG.info("capacity and expiry periods already set to " + capacity +
|
||||
" and " + expiryPeriod + " respectively. Cannot set it to " + c +
|
||||
" and " + e);
|
||||
}
|
||||
}
|
||||
|
||||
return scInstance;
|
||||
}
|
||||
|
||||
private boolean isDaemonStarted() {
|
||||
return (daemon == null)? false: true;
|
||||
}
|
||||
|
||||
private synchronized void startExpiryDaemon() {
|
||||
// start daemon only if not already started
|
||||
if (isDaemonStarted() == true) {
|
||||
return;
|
||||
}
|
||||
|
||||
daemon = new Daemon(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SocketCache.this.run();
|
||||
} catch(InterruptedException e) {
|
||||
//noop
|
||||
} finally {
|
||||
SocketCache.this.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.valueOf(SocketCache.this);
|
||||
}
|
||||
});
|
||||
daemon.start();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -61,16 +140,17 @@ class SocketCache {
|
|||
* @return A socket with unknown state, possibly closed underneath. Or null.
|
||||
*/
|
||||
public synchronized SocketAndStreams get(SocketAddress remote) {
|
||||
|
||||
if (capacity <= 0) { // disabled
|
||||
return null;
|
||||
}
|
||||
|
||||
List<SocketAndStreams> socklist = multimap.get(remote);
|
||||
if (socklist == null) {
|
||||
List<SocketAndStreams> sockStreamList = multimap.get(remote);
|
||||
if (sockStreamList == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Iterator<SocketAndStreams> iter = socklist.iterator();
|
||||
Iterator<SocketAndStreams> iter = sockStreamList.iterator();
|
||||
while (iter.hasNext()) {
|
||||
SocketAndStreams candidate = iter.next();
|
||||
iter.remove();
|
||||
|
@ -86,6 +166,8 @@ class SocketCache {
|
|||
* @param sock socket not used by anyone.
|
||||
*/
|
||||
public synchronized void put(Socket sock, IOStreamPair ioStreams) {
|
||||
|
||||
Preconditions.checkNotNull(sock);
|
||||
SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
|
||||
if (capacity <= 0) {
|
||||
// Cache disabled.
|
||||
|
@ -93,7 +175,7 @@ class SocketCache {
|
|||
return;
|
||||
}
|
||||
|
||||
Preconditions.checkNotNull(sock);
|
||||
startExpiryDaemon();
|
||||
|
||||
SocketAddress remoteAddr = sock.getRemoteSocketAddress();
|
||||
if (remoteAddr == null) {
|
||||
|
@ -106,13 +188,33 @@ class SocketCache {
|
|||
if (capacity == multimap.size()) {
|
||||
evictOldest();
|
||||
}
|
||||
multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
|
||||
multimap.put(remoteAddr, s);
|
||||
}
|
||||
|
||||
public synchronized int size() {
|
||||
return multimap.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Evict and close sockets older than expiry period from the cache.
|
||||
*/
|
||||
private synchronized void evictExpired(long expiryPeriod) {
|
||||
while (multimap.size() != 0) {
|
||||
Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
|
||||
multimap.entries().iterator();
|
||||
Entry<SocketAddress, SocketAndStreams> entry = iter.next();
|
||||
// if oldest socket expired, remove it
|
||||
if (entry == null ||
|
||||
System.currentTimeMillis() - entry.getValue().getCreateTime() <
|
||||
expiryPeriod) {
|
||||
break;
|
||||
}
|
||||
iter.remove();
|
||||
SocketAndStreams s = entry.getValue();
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Evict the oldest entry in the cache.
|
||||
*/
|
||||
|
@ -120,7 +222,8 @@ class SocketCache {
|
|||
Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
|
||||
multimap.entries().iterator();
|
||||
if (!iter.hasNext()) {
|
||||
throw new IllegalStateException("Cannot evict from empty cache!");
|
||||
throw new IllegalStateException("Cannot evict from empty cache! " +
|
||||
"capacity: " + capacity);
|
||||
}
|
||||
Entry<SocketAddress, SocketAndStreams> entry = iter.next();
|
||||
iter.remove();
|
||||
|
@ -128,39 +231,32 @@ class SocketCache {
|
|||
s.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodically check in the cache and expire the entries
|
||||
* older than expiryPeriod minutes
|
||||
*/
|
||||
private void run() throws InterruptedException {
|
||||
for(long lastExpiryTime = System.currentTimeMillis();
|
||||
!Thread.interrupted();
|
||||
Thread.sleep(expiryPeriod)) {
|
||||
final long elapsed = System.currentTimeMillis() - lastExpiryTime;
|
||||
if (elapsed >= expiryPeriod) {
|
||||
evictExpired(expiryPeriod);
|
||||
lastExpiryTime = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
clear();
|
||||
throw new InterruptedException("Daemon Interrupted");
|
||||
}
|
||||
|
||||
/**
|
||||
* Empty the cache, and close all sockets.
|
||||
*/
|
||||
public synchronized void clear() {
|
||||
for (SocketAndStreams s : multimap.values()) {
|
||||
s.close();
|
||||
private synchronized void clear() {
|
||||
for (SocketAndStreams sockAndStream : multimap.values()) {
|
||||
sockAndStream.close();
|
||||
}
|
||||
multimap.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() {
|
||||
clear();
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
static class SocketAndStreams implements Closeable {
|
||||
public final Socket sock;
|
||||
public final IOStreamPair ioStreams;
|
||||
|
||||
public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
|
||||
this.sock = s;
|
||||
this.ioStreams = ioStreams;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (ioStreams != null) {
|
||||
IOUtils.closeStream(ioStreams.in);
|
||||
IOUtils.closeStream(ioStreams.out);
|
||||
}
|
||||
IOUtils.closeSocket(sock);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -54,10 +55,12 @@ public class TestConnCache {
|
|||
|
||||
static final int BLOCK_SIZE = 4096;
|
||||
static final int FILE_SIZE = 3 * BLOCK_SIZE;
|
||||
|
||||
final static int CACHE_SIZE = 4;
|
||||
final static long CACHE_EXPIRY_MS = 200;
|
||||
static Configuration conf = null;
|
||||
static MiniDFSCluster cluster = null;
|
||||
static FileSystem fs = null;
|
||||
static SocketCache cache;
|
||||
|
||||
static final Path testFile = new Path("/testConnCache.dat");
|
||||
static byte authenticData[] = null;
|
||||
|
@ -93,6 +96,9 @@ public class TestConnCache {
|
|||
public static void setupCluster() throws Exception {
|
||||
final int REPLICATION_FACTOR = 1;
|
||||
|
||||
/* create a socket cache. There is only one socket cache per jvm */
|
||||
cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
|
||||
|
||||
util = new BlockReaderTestUtil(REPLICATION_FACTOR);
|
||||
cluster = util.getCluster();
|
||||
conf = util.getConf();
|
||||
|
@ -142,10 +148,7 @@ public class TestConnCache {
|
|||
* Test the SocketCache itself.
|
||||
*/
|
||||
@Test
|
||||
public void testSocketCache() throws IOException {
|
||||
final int CACHE_SIZE = 4;
|
||||
SocketCache cache = new SocketCache(CACHE_SIZE);
|
||||
|
||||
public void testSocketCache() throws Exception {
|
||||
// Make a client
|
||||
InetSocketAddress nnAddr =
|
||||
new InetSocketAddress("localhost", cluster.getNameNodePort());
|
||||
|
@ -159,6 +162,7 @@ public class TestConnCache {
|
|||
DataNode dn = util.getDataNode(block);
|
||||
InetSocketAddress dnAddr = dn.getXferAddress();
|
||||
|
||||
|
||||
// Make some sockets to the DN
|
||||
Socket[] dnSockets = new Socket[CACHE_SIZE];
|
||||
for (int i = 0; i < dnSockets.length; ++i) {
|
||||
|
@ -166,6 +170,7 @@ public class TestConnCache {
|
|||
dnAddr.getAddress(), dnAddr.getPort());
|
||||
}
|
||||
|
||||
|
||||
// Insert a socket to the NN
|
||||
Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
|
||||
cache.put(nnSock, null);
|
||||
|
@ -189,6 +194,51 @@ public class TestConnCache {
|
|||
assertEquals("Cache is empty", 0, cache.size());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test the SocketCache expiry.
|
||||
* Verify that socket cache entries expire after the set
|
||||
* expiry time.
|
||||
*/
|
||||
@Test
|
||||
public void testSocketCacheExpiry() throws Exception {
|
||||
// Make a client
|
||||
InetSocketAddress nnAddr =
|
||||
new InetSocketAddress("localhost", cluster.getNameNodePort());
|
||||
DFSClient client = new DFSClient(nnAddr, conf);
|
||||
|
||||
// Find out the DN addr
|
||||
LocatedBlock block =
|
||||
client.getNamenode().getBlockLocations(
|
||||
testFile.toString(), 0, FILE_SIZE)
|
||||
.getLocatedBlocks().get(0);
|
||||
DataNode dn = util.getDataNode(block);
|
||||
InetSocketAddress dnAddr = dn.getXferAddress();
|
||||
|
||||
|
||||
// Make some sockets to the DN and put in cache
|
||||
Socket[] dnSockets = new Socket[CACHE_SIZE];
|
||||
for (int i = 0; i < dnSockets.length; ++i) {
|
||||
dnSockets[i] = client.socketFactory.createSocket(
|
||||
dnAddr.getAddress(), dnAddr.getPort());
|
||||
cache.put(dnSockets[i], null);
|
||||
}
|
||||
|
||||
// Client side still has the sockets cached
|
||||
assertEquals(CACHE_SIZE, client.socketCache.size());
|
||||
|
||||
//sleep for a second and see if it expired
|
||||
Thread.sleep(CACHE_EXPIRY_MS + 1000);
|
||||
|
||||
// Client side has no sockets cached
|
||||
assertEquals(0, client.socketCache.size());
|
||||
|
||||
//sleep for another second and see if
|
||||
//the daemon thread runs fine on empty cache
|
||||
Thread.sleep(CACHE_EXPIRY_MS + 1000);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Read a file served entirely from one DN. Seek around and read from
|
||||
* different offsets. And verify that they all use the same socket.
|
||||
|
@ -230,33 +280,6 @@ public class TestConnCache {
|
|||
in.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the socket cache can be disabled by setting the capacity to
|
||||
* 0. Regression test for HDFS-3365.
|
||||
*/
|
||||
@Test
|
||||
public void testDisableCache() throws IOException {
|
||||
LOG.info("Starting testDisableCache()");
|
||||
|
||||
// Reading with the normally configured filesystem should
|
||||
// cache a socket.
|
||||
DFSTestUtil.readFile(fs, testFile);
|
||||
assertEquals(1, ((DistributedFileSystem)fs).dfs.socketCache.size());
|
||||
|
||||
// Configure a new instance with no caching, ensure that it doesn't
|
||||
// cache anything
|
||||
Configuration confWithoutCache = new Configuration(fs.getConf());
|
||||
confWithoutCache.setInt(
|
||||
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
|
||||
FileSystem fsWithoutCache = FileSystem.newInstance(confWithoutCache);
|
||||
try {
|
||||
DFSTestUtil.readFile(fsWithoutCache, testFile);
|
||||
assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
|
||||
} finally {
|
||||
fsWithoutCache.close();
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws Exception {
|
||||
util.shutdown();
|
||||
|
|
|
@ -117,12 +117,9 @@ public class TestDistributedFileSystem {
|
|||
DFSTestUtil.readFile(fileSys, p);
|
||||
|
||||
DFSClient client = ((DistributedFileSystem)fileSys).dfs;
|
||||
SocketCache cache = client.socketCache;
|
||||
assertEquals(1, cache.size());
|
||||
|
||||
fileSys.close();
|
||||
|
||||
assertEquals(0, cache.size());
|
||||
} finally {
|
||||
if (cluster != null) {cluster.shutdown();}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/**
|
||||
* This class tests the client connection caching in a single node
|
||||
* mini-cluster.
|
||||
*/
|
||||
public class TestSocketCache {
|
||||
static final Log LOG = LogFactory.getLog(TestSocketCache.class);
|
||||
|
||||
static final int BLOCK_SIZE = 4096;
|
||||
static final int FILE_SIZE = 3 * BLOCK_SIZE;
|
||||
final static int CACHE_SIZE = 4;
|
||||
final static long CACHE_EXPIRY_MS = 200;
|
||||
static Configuration conf = null;
|
||||
static MiniDFSCluster cluster = null;
|
||||
static FileSystem fs = null;
|
||||
static SocketCache cache;
|
||||
|
||||
static final Path testFile = new Path("/testConnCache.dat");
|
||||
static byte authenticData[] = null;
|
||||
|
||||
static BlockReaderTestUtil util = null;
|
||||
|
||||
|
||||
/**
|
||||
* A mock Answer to remember the BlockReader used.
|
||||
*
|
||||
* It verifies that all invocation to DFSInputStream.getBlockReader()
|
||||
* use the same socket.
|
||||
*/
|
||||
private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
|
||||
public RemoteBlockReader2 reader = null;
|
||||
private Socket sock = null;
|
||||
|
||||
@Override
|
||||
public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
|
||||
RemoteBlockReader2 prevReader = reader;
|
||||
reader = (RemoteBlockReader2) invocation.callRealMethod();
|
||||
if (sock == null) {
|
||||
sock = reader.dnSock;
|
||||
} else if (prevReader != null) {
|
||||
assertSame("DFSInputStream should use the same socket",
|
||||
sock, reader.dnSock);
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
final int REPLICATION_FACTOR = 1;
|
||||
|
||||
HdfsConfiguration confWithoutCache = new HdfsConfiguration();
|
||||
confWithoutCache.setInt(
|
||||
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
|
||||
util = new BlockReaderTestUtil(REPLICATION_FACTOR, confWithoutCache);
|
||||
cluster = util.getCluster();
|
||||
conf = util.getConf();
|
||||
|
||||
authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* (Optionally) seek to position, read and verify data.
|
||||
*
|
||||
* Seek to specified position if pos is non-negative.
|
||||
*/
|
||||
private void pread(DFSInputStream in,
|
||||
long pos,
|
||||
byte[] buffer,
|
||||
int offset,
|
||||
int length)
|
||||
throws IOException {
|
||||
assertTrue("Test buffer too small", buffer.length >= offset + length);
|
||||
|
||||
if (pos >= 0)
|
||||
in.seek(pos);
|
||||
|
||||
LOG.info("Reading from file of size " + in.getFileLength() +
|
||||
" at offset " + in.getPos());
|
||||
|
||||
while (length > 0) {
|
||||
int cnt = in.read(buffer, offset, length);
|
||||
assertTrue("Error in read", cnt > 0);
|
||||
offset += cnt;
|
||||
length -= cnt;
|
||||
}
|
||||
|
||||
// Verify
|
||||
for (int i = 0; i < length; ++i) {
|
||||
byte actual = buffer[i];
|
||||
byte expect = authenticData[(int)pos + i];
|
||||
assertEquals("Read data mismatch at file offset " + (pos + i) +
|
||||
". Expects " + expect + "; got " + actual,
|
||||
actual, expect);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that the socket cache can be disabled by setting the capacity to
|
||||
* 0. Regression test for HDFS-3365.
|
||||
*/
|
||||
@Test
|
||||
public void testDisableCache() throws IOException {
|
||||
LOG.info("Starting testDisableCache()");
|
||||
|
||||
// Configure a new instance with no caching, ensure that it doesn't
|
||||
// cache anything
|
||||
|
||||
FileSystem fsWithoutCache = FileSystem.newInstance(conf);
|
||||
try {
|
||||
DFSTestUtil.readFile(fsWithoutCache, testFile);
|
||||
assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
|
||||
} finally {
|
||||
fsWithoutCache.close();
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws Exception {
|
||||
util.shutdown();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue