diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 2ef2bf07a17..a3808331b51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1281,6 +1281,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5); + // HDFS Router-based federation mount table entries + /** Maximum number of cache entries to have. */ + public static final String FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE = + DFSConfigKeys.FEDERATION_ROUTER_PREFIX + "mount-table.max-cache-size"; + /** Remove cache entries if we have more than 10k. */ + public static final int FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT = 10000; + // HDFS Router-based federation admin public static final String DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY = FEDERATION_ROUTER_PREFIX + "admin.handler.count"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java index 24082d46d7c..374e3ba85f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hdfs.server.federation.resolver; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -30,9 +33,10 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -55,6 +59,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; /** * Mount table to map between global paths and remote locations. This allows the @@ -81,8 +87,7 @@ public class MountTableResolver /** Path -> Remote HDFS location. */ private final TreeMap tree = new TreeMap<>(); /** Path -> Remote location. */ - private final ConcurrentNavigableMap locationCache = - new ConcurrentSkipListMap<>(); + private final Cache locationCache; /** Default nameservice when no mount matches the math. */ private String defaultNameService = ""; @@ -99,20 +104,30 @@ public class MountTableResolver } public MountTableResolver(Configuration conf, Router routerService) { + this(conf, routerService, null); + } + + public MountTableResolver(Configuration conf, StateStoreService store) { + this(conf, null, store); + } + + public MountTableResolver(Configuration conf, Router routerService, + StateStoreService store) { this.router = routerService; - if (this.router != null) { + if (store != null) { + this.stateStore = store; + } else if (this.router != null) { this.stateStore = this.router.getStateStore(); } else { this.stateStore = null; } - registerCacheExternal(); - initDefaultNameService(conf); - } - - public MountTableResolver(Configuration conf, StateStoreService store) { - this.router = null; - this.stateStore = store; + int maxCacheSize = conf.getInt( + FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE, + FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT); + this.locationCache = CacheBuilder.newBuilder() + .maximumSize(maxCacheSize) + .build(); registerCacheExternal(); initDefaultNameService(conf); @@ -210,16 +225,26 @@ public class MountTableResolver * @param path Source path. */ private void invalidateLocationCache(final String path) { - if (locationCache.isEmpty()) { + LOG.debug("Invalidating {} from {}", path, locationCache); + if (locationCache.size() == 0) { return; } - // Determine next lexicographic entry after source path - String nextSrc = path + Character.MAX_VALUE; - ConcurrentNavigableMap subMap = - locationCache.subMap(path, nextSrc); - for (final String key : subMap.keySet()) { - locationCache.remove(key); + + // Go through the entries and remove the ones from the path to invalidate + ConcurrentMap map = locationCache.asMap(); + Set> entries = map.entrySet(); + Iterator> it = entries.iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + PathLocation loc = entry.getValue(); + String src = loc.getSourcePath(); + if (src.startsWith(path)) { + LOG.debug("Removing {}", src); + it.remove(); + } } + + LOG.debug("Location cache after invalidation: {}", locationCache); } /** @@ -312,7 +337,7 @@ public class MountTableResolver LOG.info("Clearing all mount location caches"); writeLock.lock(); try { - this.locationCache.clear(); + this.locationCache.invalidateAll(); this.tree.clear(); } finally { writeLock.unlock(); @@ -325,8 +350,15 @@ public class MountTableResolver verifyMountTable(); readLock.lock(); try { - return this.locationCache.computeIfAbsent( - path, this::lookupLocation); + Callable meh = new Callable() { + @Override + public PathLocation call() throws Exception { + return lookupLocation(path); + } + }; + return this.locationCache.get(path, meh); + } catch (ExecutionException e) { + throw new IOException(e); } finally { readLock.unlock(); } @@ -544,4 +576,12 @@ public class MountTableResolver } return ret; } + + /** + * Get the size of the cache. + * @return Size of the cache. + */ + protected long getCacheSize() { + return this.locationCache.size(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 831cda88860..4ca7b58c396 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5117,4 +5117,13 @@ + + dfs.federation.router.mount-table.max-cache-size + 10000 + + Maximum number of mount table cache entries to have. + By default, remove cache entries if we have more than 10k. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java index 7a596c1f461..fa2f89c5d19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.federation.resolver; +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -48,6 +49,8 @@ public class TestMountTableResolver { private static final Logger LOG = LoggerFactory.getLogger(TestMountTableResolver.class); + private static final int TEST_MAX_CACHE_SIZE = 10; + private MountTableResolver mountTable; private Map getMountTableEntry( @@ -77,6 +80,8 @@ public class TestMountTableResolver { */ private void setupMountTable() throws IOException { Configuration conf = new Configuration(); + conf.setInt( + FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE, TEST_MAX_CACHE_SIZE); mountTable = new MountTableResolver(conf); // Root mount point @@ -441,4 +446,14 @@ public class TestMountTableResolver { MountTable entry2 = mountTable.getMountPoint("/testupdate"); assertNull(entry2); } + + @Test + public void testCacheCleaning() throws Exception { + for (int i = 0; i < 1000; i++) { + String filename = String.format("/user/a/file-%04d.txt", i); + mountTable.getDestinationForPath(filename); + } + long cacheSize = mountTable.getCacheSize(); + assertTrue(cacheSize <= TEST_MAX_CACHE_SIZE); + } } \ No newline at end of file