HADOOP-11238. Update the NameNode's Group Cache in the background when possible (Chris Li via Colin P. McCabe)

(cherry picked from commit e5a6925199)
This commit is contained in:
Colin Patrick Mccabe 2014-12-12 16:30:52 -08:00
parent 614b4046ba
commit b521d91c0f
3 changed files with 345 additions and 93 deletions

View File

@ -62,6 +62,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11323. WritableComparator#compare keeps reference to byte array.
(Wilfred Spiegelenburg via wang)
HADOOP-11238. Update the NameNode's Group Cache in the background when
possible (Chris Li via Colin P. McCabe)
BUG FIXES
HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh)

View File

@ -24,7 +24,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -52,10 +58,11 @@ public class Groups {
private static final Log LOG = LogFactory.getLog(Groups.class);
private final GroupMappingServiceProvider impl;
private final Map<String, CachedGroups> userToGroupsMap =
new ConcurrentHashMap<String, CachedGroups>();
private final Map<String, List<String>> staticUserToGroupsMap =
private final LoadingCache<String, List<String>> cache;
private final ConcurrentHashMap<String, Long> negativeCacheMask =
new ConcurrentHashMap<String, Long>();
private final Map<String, List<String>> staticUserToGroupsMap =
new HashMap<String, List<String>>();
private final long cacheTimeout;
private final long negativeCacheTimeout;
@ -66,7 +73,7 @@ public Groups(Configuration conf) {
this(conf, new Timer());
}
public Groups(Configuration conf, Timer timer) {
public Groups(Configuration conf, final Timer timer) {
impl =
ReflectionUtils.newInstance(
conf.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
@ -86,6 +93,11 @@ public Groups(Configuration conf, Timer timer) {
parseStaticMapping(conf);
this.timer = timer;
this.cache = CacheBuilder.newBuilder()
.refreshAfterWrite(cacheTimeout, TimeUnit.MILLISECONDS)
.ticker(new TimerToTickerAdapter(timer))
.expireAfterWrite(10 * cacheTimeout, TimeUnit.MILLISECONDS)
.build(new GroupCacheLoader());
if(LOG.isDebugEnabled())
LOG.debug("Group mapping impl=" + impl.getClass().getName() +
@ -123,78 +135,112 @@ private void parseStaticMapping(Configuration conf) {
}
}
/**
* Determine whether the CachedGroups is expired.
* @param groups cached groups for one user.
* @return true if groups is expired from useToGroupsMap.
*/
private boolean hasExpired(CachedGroups groups, long startMs) {
if (groups == null) {
return true;
}
long timeout = cacheTimeout;
if (isNegativeCacheEnabled() && groups.getGroups().isEmpty()) {
// This CachedGroups is in the negative cache, thus it should expire
// sooner.
timeout = negativeCacheTimeout;
}
return groups.getTimestamp() + timeout <= startMs;
}
private boolean isNegativeCacheEnabled() {
return negativeCacheTimeout > 0;
}
private IOException noGroupsForUser(String user) {
return new IOException("No groups found for user " + user);
}
/**
* Get the group memberships of a given user.
* If the user's group is not cached, this method may block.
* @param user User's name
* @return the group memberships of the user
* @throws IOException
* @throws IOException if user does not exist
*/
public List<String> getGroups(String user) throws IOException {
public List<String> getGroups(final String user) throws IOException {
// No need to lookup for groups of static users
List<String> staticMapping = staticUserToGroupsMap.get(user);
if (staticMapping != null) {
return staticMapping;
}
// Return cached value if available
CachedGroups groups = userToGroupsMap.get(user);
long startMs = timer.monotonicNow();
if (!hasExpired(groups, startMs)) {
if(LOG.isDebugEnabled()) {
LOG.debug("Returning cached groups for '" + user + "'");
// Check the negative cache first
if (isNegativeCacheEnabled()) {
Long expirationTime = negativeCacheMask.get(user);
if (expirationTime != null) {
if (timer.monotonicNow() < expirationTime) {
throw noGroupsForUser(user);
} else {
negativeCacheMask.remove(user, expirationTime);
}
}
if (groups.getGroups().isEmpty()) {
// Even with enabling negative cache, getGroups() has the same behavior
// that throws IOException if the groups for the user is empty.
throw new IOException("No groups found for user " + user);
}
return groups.getGroups();
}
// Create and cache user's groups
List<String> groupList = impl.getGroups(user);
long endMs = timer.monotonicNow();
long deltaMs = endMs - startMs ;
UserGroupInformation.metrics.addGetGroups(deltaMs);
if (deltaMs > warningDeltaMs) {
LOG.warn("Potential performance problem: getGroups(user=" + user +") " +
"took " + deltaMs + " milliseconds.");
try {
return cache.get(user);
} catch (ExecutionException e) {
throw (IOException)e.getCause();
}
groups = new CachedGroups(groupList, endMs);
if (groups.getGroups().isEmpty()) {
if (isNegativeCacheEnabled()) {
userToGroupsMap.put(user, groups);
}
throw new IOException("No groups found for user " + user);
}
userToGroupsMap.put(user, groups);
if(LOG.isDebugEnabled()) {
LOG.debug("Returning fetched groups for '" + user + "'");
}
return groups.getGroups();
}
/**
* Convert millisecond times from hadoop's timer to guava's nanosecond ticker.
*/
private static class TimerToTickerAdapter extends Ticker {
private Timer timer;
public TimerToTickerAdapter(Timer timer) {
this.timer = timer;
}
@Override
public long read() {
final long NANOSECONDS_PER_MS = 1000000;
return timer.monotonicNow() * NANOSECONDS_PER_MS;
}
}
/**
* Deals with loading data into the cache.
*/
private class GroupCacheLoader extends CacheLoader<String, List<String>> {
/**
* This method will block if a cache entry doesn't exist, and
* any subsequent requests for the same user will wait on this
* request to return. If a user already exists in the cache,
* this will be run in the background.
* @param user key of cache
* @return List of groups belonging to user
* @throws IOException to prevent caching negative entries
*/
@Override
public List<String> load(String user) throws Exception {
List<String> groups = fetchGroupList(user);
if (groups.isEmpty()) {
if (isNegativeCacheEnabled()) {
long expirationTime = timer.monotonicNow() + negativeCacheTimeout;
negativeCacheMask.put(user, expirationTime);
}
// We throw here to prevent Cache from retaining an empty group
throw noGroupsForUser(user);
}
return groups;
}
/**
* Queries impl for groups belonging to the user. This could involve I/O and take awhile.
*/
private List<String> fetchGroupList(String user) throws IOException {
long startMs = timer.monotonicNow();
List<String> groupList = impl.getGroups(user);
long endMs = timer.monotonicNow();
long deltaMs = endMs - startMs ;
UserGroupInformation.metrics.addGetGroups(deltaMs);
if (deltaMs > warningDeltaMs) {
LOG.warn("Potential performance problem: getGroups(user=" + user +") " +
"took " + deltaMs + " milliseconds.");
}
return groupList;
}
}
/**
* Refresh all user-to-groups mappings.
*/
@ -205,7 +251,8 @@ public void refresh() {
} catch (IOException e) {
LOG.warn("Error refreshing groups cache", e);
}
userToGroupsMap.clear();
cache.invalidateAll();
negativeCacheMask.clear();
}
/**
@ -221,40 +268,6 @@ public void cacheGroupsAdd(List<String> groups) {
}
}
/**
* Class to hold the cached groups
*/
private static class CachedGroups {
final long timestamp;
final List<String> groups;
/**
* Create and initialize group cache
*/
CachedGroups(List<String> groups, long timestamp) {
this.groups = groups;
this.timestamp = timestamp;
}
/**
* Returns time of last cache update
*
* @return time of last cache update
*/
public long getTimestamp() {
return timestamp;
}
/**
* Get list of cached groups
*
* @return cached groups
*/
public List<String> getGroups() {
return groups;
}
}
private static Groups GROUPS = null;
/**

View File

@ -51,6 +51,9 @@ public class TestGroupsCaching {
@Before
public void setup() {
FakeGroupMapping.resetRequestCount();
ExceptionalGroupMapping.resetRequestCount();
conf = new Configuration();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
FakeGroupMapping.class,
@ -61,16 +64,32 @@ public static class FakeGroupMapping extends ShellBasedUnixGroupsMapping {
// any to n mapping
private static Set<String> allGroups = new HashSet<String>();
private static Set<String> blackList = new HashSet<String>();
private static int requestCount = 0;
private static long getGroupsDelayMs = 0;
@Override
public List<String> getGroups(String user) throws IOException {
LOG.info("Getting groups for " + user);
requestCount++;
delayIfNecessary();
if (blackList.contains(user)) {
return new LinkedList<String>();
}
return new LinkedList<String>(allGroups);
}
private void delayIfNecessary() {
if (getGroupsDelayMs > 0) {
try {
Thread.sleep(getGroupsDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void cacheGroupsRefresh() throws IOException {
LOG.info("Cache is being refreshed.");
@ -93,6 +112,36 @@ public static void addToBlackList(String user) throws IOException {
LOG.info("Adding " + user + " to the blacklist");
blackList.add(user);
}
public static int getRequestCount() {
return requestCount;
}
public static void resetRequestCount() {
requestCount = 0;
}
public static void setGetGroupsDelayMs(long delayMs) {
getGroupsDelayMs = delayMs;
}
}
public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping {
private static int requestCount = 0;
@Override
public List<String> getGroups(String user) throws IOException {
requestCount++;
throw new IOException("For test");
}
public static int getRequestCount() {
return requestCount;
}
public static void resetRequestCount() {
requestCount = 0;
}
}
@Test
@ -219,4 +268,191 @@ public void testNegativeGroupCaching() throws Exception {
// groups for the user is fetched.
assertEquals(Arrays.asList(myGroups), groups.getGroups(user));
}
@Test
public void testCachePreventsImplRequest() throws Exception {
// Disable negative cache.
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0);
Groups groups = new Groups(conf);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
assertEquals(0, FakeGroupMapping.getRequestCount());
// First call hits the wire
assertTrue(groups.getGroups("me").size() == 2);
assertEquals(1, FakeGroupMapping.getRequestCount());
// Second count hits cache
assertTrue(groups.getGroups("me").size() == 2);
assertEquals(1, FakeGroupMapping.getRequestCount());
}
@Test
public void testExceptionsFromImplNotCachedInNegativeCache() {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
ExceptionalGroupMapping.class,
ShellBasedUnixGroupsMapping.class);
conf.setLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 10000);
Groups groups = new Groups(conf);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
assertEquals(0, ExceptionalGroupMapping.getRequestCount());
// First call should hit the wire
try {
groups.getGroups("anything");
fail("Should have thrown");
} catch (IOException e) {
// okay
}
assertEquals(1, ExceptionalGroupMapping.getRequestCount());
// Second call should hit the wire (no negative caching)
try {
groups.getGroups("anything");
fail("Should have thrown");
} catch (IOException e) {
// okay
}
assertEquals(2, ExceptionalGroupMapping.getRequestCount());
}
@Test
public void testOnlyOneRequestWhenNoEntryIsCached() throws Exception {
// Disable negative cache.
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0);
final Groups groups = new Groups(conf);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
FakeGroupMapping.setGetGroupsDelayMs(100);
ArrayList<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread() {
public void run() {
try {
assertEquals(2, groups.getGroups("me").size());
} catch (IOException e) {
fail("Should not happen");
}
}
});
}
// We start a bunch of threads who all see no cached value
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
// But only one thread should have made the request
assertEquals(1, FakeGroupMapping.getRequestCount());
}
@Test
public void testOnlyOneRequestWhenExpiredEntryExists() throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
FakeGroupMapping.setGetGroupsDelayMs(100);
// We make an initial request to populate the cache
groups.getGroups("me");
int startingRequestCount = FakeGroupMapping.getRequestCount();
// Then expire that entry
timer.advance(400 * 1000);
Thread.sleep(100);
ArrayList<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread() {
public void run() {
try {
assertEquals(2, groups.getGroups("me").size());
} catch (IOException e) {
fail("Should not happen");
}
}
});
}
// We start a bunch of threads who all see the cached value
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
// Only one extra request is made
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
}
@Test
public void testCacheEntriesExpire() throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
// We make an entry
groups.getGroups("me");
int startingRequestCount = FakeGroupMapping.getRequestCount();
timer.advance(20 * 1000);
// Cache entry has expired so it results in a new fetch
groups.getGroups("me");
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
}
@Test
public void testNegativeCacheClearedOnRefresh() throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 100);
final Groups groups = new Groups(conf);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
FakeGroupMapping.addToBlackList("dne");
try {
groups.getGroups("dne");
fail("Should have failed to find this group");
} catch (IOException e) {
// pass
}
int startingRequestCount = FakeGroupMapping.getRequestCount();
groups.refresh();
FakeGroupMapping.addToBlackList("dne");
try {
List<String> g = groups.getGroups("dne");
fail("Should have failed to find this group");
} catch (IOException e) {
// pass
}
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
}
}