HDFS-5256. Merging change r1527452 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1527453 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-09-30 06:13:27 +00:00
parent 579cf8f612
commit e26ac59ca0
4 changed files with 91 additions and 122 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -27,59 +28,81 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
/** /**
* A cache saves DFSClient objects for different users * A cache saves DFSClient objects for different users
*/ */
public class DFSClientCache { class DFSClientCache {
static final Log LOG = LogFactory.getLog(DFSClientCache.class); private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
private final LruCache<String, DFSClient> lruTable; /**
* Cache that maps User id to corresponding DFSClient.
*/
@VisibleForTesting
final LoadingCache<String, DFSClient> clientCache;
final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
private final Configuration config; private final Configuration config;
public DFSClientCache(Configuration config) { DFSClientCache(Configuration config) {
// By default, keep 256 DFSClient instance for 256 active users this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
this(config, 256);
} }
public DFSClientCache(Configuration config, int size) { DFSClientCache(Configuration config, int clientCache) {
lruTable = new LruCache<String, DFSClient>(size);
this.config = config; this.config = config;
this.clientCache = CacheBuilder.newBuilder()
.maximumSize(clientCache)
.removalListener(clientRemovealListener())
.build(clientLoader());
} }
public void put(String uname, DFSClient client) { private CacheLoader<String, DFSClient> clientLoader() {
lruTable.put(uname, client); return new CacheLoader<String, DFSClient>() {
} @Override
public DFSClient load(String userName) throws Exception {
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(userName);
synchronized public DFSClient get(String uname) { // Guava requires CacheLoader never returns null.
DFSClient client = lruTable.get(uname); return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
if (client != null) {
return client;
}
// Not in table, create one.
try {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
public DFSClient run() throws IOException { public DFSClient run() throws IOException {
return new DFSClient(NameNode.getAddress(config), config); return new DFSClient(NameNode.getAddress(config), config);
} }
}); });
} catch (IOException e) {
LOG.error("Create DFSClient failed for user:" + uname);
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} }
// Add new entry };
lruTable.put(uname, client); }
private RemovalListener<String, DFSClient> clientRemovealListener() {
return new RemovalListener<String, DFSClient>() {
@Override
public void onRemoval(RemovalNotification<String, DFSClient> notification) {
DFSClient client = notification.getValue();
try {
client.close();
} catch (IOException e) {
LOG.warn(String.format(
"IOException when closing the DFSClient(%s), cause: %s", client,
e));
}
}
};
}
DFSClient get(String userName) {
DFSClient client = null;
try {
client = clientCache.get(userName);
} catch (ExecutionException e) {
LOG.error("Failed to create DFSClient for user:" + userName + " Cause:"
+ e);
}
return client; return client;
} }
public int usedSize() {
return lruTable.usedSize();
}
public boolean containsKey(String key) {
return lruTable.containsKey(key);
}
} }

View File

@ -1,60 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* A thread-safe LRU table.
*/
public class LruCache<K, V> {
private final int maxSize;
private final LinkedHashMap<K, V> map;
private static final float hashTableLoadFactor = 0.75f;
public LruCache(int maxSize) {
this.maxSize = maxSize;
int hashTableCapacity = (int) Math.ceil(maxSize / hashTableLoadFactor) + 1;
map = new LinkedHashMap<K, V>(hashTableCapacity, hashTableLoadFactor, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > LruCache.this.maxSize;
}
};
}
// The found entry becomes the most recently used.
public synchronized V get(K key) {
return map.get(key);
}
public synchronized void put(K key, V value) {
map.put(key, value);
}
public synchronized int usedSize() {
return map.size();
}
public synchronized boolean containsKey(K key) {
return map.containsKey(key);
}
}

View File

@ -17,41 +17,44 @@
*/ */
package org.apache.hadoop.hdfs.nfs.nfs3; package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
public class TestDFSClientCache { public class TestDFSClientCache {
@Test @Test
public void testLruTable() throws IOException { public void testEviction() throws IOException {
DFSClientCache cache = new DFSClientCache(new Configuration(), 3); Configuration conf = new Configuration();
DFSClient client = Mockito.mock(DFSClient.class); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
cache.put("a", client);
assertTrue(cache.containsKey("a"));
cache.put("b", client); // Only one entry will be in the cache
cache.put("c", client); final int MAX_CACHE_SIZE = 2;
cache.put("d", client);
assertTrue(cache.usedSize() == 3);
assertFalse(cache.containsKey("a"));
// Cache should have d,c,b in LRU order DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
assertTrue(cache.containsKey("b"));
// Do a lookup to make b the most recently used
assertTrue(cache.get("b") != null);
cache.put("e", client); DFSClient c1 = cache.get("test1");
assertTrue(cache.usedSize() == 3); assertTrue(cache.get("test1").toString().contains("ugi=test1"));
// c should be replaced with e, and cache has e,b,d assertEquals(c1, cache.get("test1"));
assertFalse(cache.containsKey("c")); assertFalse(isDfsClientClose(c1));
assertTrue(cache.containsKey("e"));
assertTrue(cache.containsKey("b")); cache.get("test2");
assertTrue(cache.containsKey("d")); assertTrue(isDfsClientClose(c1));
assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
}
private static boolean isDfsClientClose(DFSClient c) {
try {
c.exists("");
} catch (IOException e) {
return e.getMessage().equals("Filesystem closed");
}
return false;
} }
} }

View File

@ -103,11 +103,14 @@ Release 2.1.2 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
OPTIMIZATIONS
HDFS-5246. Make Hadoop nfs server port and mount daemon port HDFS-5246. Make Hadoop nfs server port and mount daemon port
configurable. (Jinghui Wang via brandonli) configurable. (Jinghui Wang via brandonli)
HDFS-5256. Use guava LoadingCache to implement DFSClientCache. (Haohui Mai
via brandonli)
OPTIMIZATIONS
BUG FIXES BUG FIXES
HDFS-5139. Remove redundant -R option from setrep. HDFS-5139. Remove redundant -R option from setrep.