HDFS-5256. Use guava LoadingCache to implement DFSClientCache. Contributed by Haohui Mai
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1527452 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1c5b49eeaf
commit
e4154fc83a
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -27,59 +28,81 @@ import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import com.google.common.cache.CacheLoader;
|
||||||
|
import com.google.common.cache.LoadingCache;
|
||||||
|
import com.google.common.cache.RemovalListener;
|
||||||
|
import com.google.common.cache.RemovalNotification;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A cache saves DFSClient objects for different users
|
* A cache saves DFSClient objects for different users
|
||||||
*/
|
*/
|
||||||
public class DFSClientCache {
|
class DFSClientCache {
|
||||||
static final Log LOG = LogFactory.getLog(DFSClientCache.class);
|
private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
|
||||||
private final LruCache<String, DFSClient> lruTable;
|
/**
|
||||||
|
* Cache that maps User id to corresponding DFSClient.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
final LoadingCache<String, DFSClient> clientCache;
|
||||||
|
|
||||||
|
final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
|
||||||
|
|
||||||
private final Configuration config;
|
private final Configuration config;
|
||||||
|
|
||||||
public DFSClientCache(Configuration config) {
|
DFSClientCache(Configuration config) {
|
||||||
// By default, keep 256 DFSClient instance for 256 active users
|
this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
|
||||||
this(config, 256);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public DFSClientCache(Configuration config, int size) {
|
DFSClientCache(Configuration config, int clientCache) {
|
||||||
lruTable = new LruCache<String, DFSClient>(size);
|
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.clientCache = CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(clientCache)
|
||||||
|
.removalListener(clientRemovealListener())
|
||||||
|
.build(clientLoader());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void put(String uname, DFSClient client) {
|
private CacheLoader<String, DFSClient> clientLoader() {
|
||||||
lruTable.put(uname, client);
|
return new CacheLoader<String, DFSClient>() {
|
||||||
}
|
@Override
|
||||||
|
public DFSClient load(String userName) throws Exception {
|
||||||
|
UserGroupInformation ugi = UserGroupInformation
|
||||||
|
.createRemoteUser(userName);
|
||||||
|
|
||||||
synchronized public DFSClient get(String uname) {
|
// Guava requires CacheLoader never returns null.
|
||||||
DFSClient client = lruTable.get(uname);
|
return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
|
||||||
if (client != null) {
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not in table, create one.
|
|
||||||
try {
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
|
|
||||||
client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
|
|
||||||
public DFSClient run() throws IOException {
|
public DFSClient run() throws IOException {
|
||||||
return new DFSClient(NameNode.getAddress(config), config);
|
return new DFSClient(NameNode.getAddress(config), config);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Create DFSClient failed for user:" + uname);
|
|
||||||
e.printStackTrace();
|
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
// Add new entry
|
};
|
||||||
lruTable.put(uname, client);
|
}
|
||||||
|
|
||||||
|
private RemovalListener<String, DFSClient> clientRemovealListener() {
|
||||||
|
return new RemovalListener<String, DFSClient>() {
|
||||||
|
@Override
|
||||||
|
public void onRemoval(RemovalNotification<String, DFSClient> notification) {
|
||||||
|
DFSClient client = notification.getValue();
|
||||||
|
try {
|
||||||
|
client.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn(String.format(
|
||||||
|
"IOException when closing the DFSClient(%s), cause: %s", client,
|
||||||
|
e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
DFSClient get(String userName) {
|
||||||
|
DFSClient client = null;
|
||||||
|
try {
|
||||||
|
client = clientCache.get(userName);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
LOG.error("Failed to create DFSClient for user:" + userName + " Cause:"
|
||||||
|
+ e);
|
||||||
|
}
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int usedSize() {
|
|
||||||
return lruTable.usedSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean containsKey(String key) {
|
|
||||||
return lruTable.containsKey(key);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,60 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
|
||||||
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A thread-safe LRU table.
|
|
||||||
*/
|
|
||||||
public class LruCache<K, V> {
|
|
||||||
private final int maxSize;
|
|
||||||
private final LinkedHashMap<K, V> map;
|
|
||||||
private static final float hashTableLoadFactor = 0.75f;
|
|
||||||
|
|
||||||
public LruCache(int maxSize) {
|
|
||||||
this.maxSize = maxSize;
|
|
||||||
int hashTableCapacity = (int) Math.ceil(maxSize / hashTableLoadFactor) + 1;
|
|
||||||
map = new LinkedHashMap<K, V>(hashTableCapacity, hashTableLoadFactor, true) {
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
|
|
||||||
return size() > LruCache.this.maxSize;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// The found entry becomes the most recently used.
|
|
||||||
public synchronized V get(K key) {
|
|
||||||
return map.get(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void put(K key, V value) {
|
|
||||||
map.put(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized int usedSize() {
|
|
||||||
return map.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean containsKey(K key) {
|
|
||||||
return map.containsKey(key);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -17,41 +17,44 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
public class TestDFSClientCache {
|
public class TestDFSClientCache {
|
||||||
@Test
|
@Test
|
||||||
public void testLruTable() throws IOException {
|
public void testEviction() throws IOException {
|
||||||
DFSClientCache cache = new DFSClientCache(new Configuration(), 3);
|
Configuration conf = new Configuration();
|
||||||
DFSClient client = Mockito.mock(DFSClient.class);
|
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
|
||||||
cache.put("a", client);
|
|
||||||
assertTrue(cache.containsKey("a"));
|
|
||||||
|
|
||||||
cache.put("b", client);
|
// Only one entry will be in the cache
|
||||||
cache.put("c", client);
|
final int MAX_CACHE_SIZE = 2;
|
||||||
cache.put("d", client);
|
|
||||||
assertTrue(cache.usedSize() == 3);
|
|
||||||
assertFalse(cache.containsKey("a"));
|
|
||||||
|
|
||||||
// Cache should have d,c,b in LRU order
|
DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
|
||||||
assertTrue(cache.containsKey("b"));
|
|
||||||
// Do a lookup to make b the most recently used
|
|
||||||
assertTrue(cache.get("b") != null);
|
|
||||||
|
|
||||||
cache.put("e", client);
|
DFSClient c1 = cache.get("test1");
|
||||||
assertTrue(cache.usedSize() == 3);
|
assertTrue(cache.get("test1").toString().contains("ugi=test1"));
|
||||||
// c should be replaced with e, and cache has e,b,d
|
assertEquals(c1, cache.get("test1"));
|
||||||
assertFalse(cache.containsKey("c"));
|
assertFalse(isDfsClientClose(c1));
|
||||||
assertTrue(cache.containsKey("e"));
|
|
||||||
assertTrue(cache.containsKey("b"));
|
cache.get("test2");
|
||||||
assertTrue(cache.containsKey("d"));
|
assertTrue(isDfsClientClose(c1));
|
||||||
|
assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isDfsClientClose(DFSClient c) {
|
||||||
|
try {
|
||||||
|
c.exists("");
|
||||||
|
} catch (IOException e) {
|
||||||
|
return e.getMessage().equals("Filesystem closed");
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -347,11 +347,14 @@ Release 2.1.2 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
OPTIMIZATIONS
|
|
||||||
|
|
||||||
HDFS-5246. Make Hadoop nfs server port and mount daemon port
|
HDFS-5246. Make Hadoop nfs server port and mount daemon port
|
||||||
configurable. (Jinghui Wang via brandonli)
|
configurable. (Jinghui Wang via brandonli)
|
||||||
|
|
||||||
|
HDFS-5256. Use guava LoadingCache to implement DFSClientCache. (Haohui Mai
|
||||||
|
via brandonli)
|
||||||
|
|
||||||
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-5139. Remove redundant -R option from setrep.
|
HDFS-5139. Remove redundant -R option from setrep.
|
||||||
|
|
Loading…
Reference in New Issue