HADOOP-11238. Update the NameNode's Group Cache in the background when possible (Chris Li via Colin P. McCabe)

(cherry picked from commit e5a6925199)

(cherry picked from commit b521d91c0f)
This commit is contained in:
Colin Patrick Mccabe 2014-12-12 16:30:52 -08:00 committed by Vinod Kumar Vavilapalli
parent 3d5627b563
commit 1698110acc
3 changed files with 345 additions and 93 deletions

View File

@ -10,6 +10,9 @@ Release 2.6.1 - UNRELEASED
OPTIMIZATIONS
HADOOP-11238. Update the NameNode's Group Cache in the background when
possible (Chris Li via Colin P. McCabe)
BUG FIXES
HADOOP-11466: FastByteComparisons: do not use UNSAFE_COMPARER on the SPARC

View File

@ -24,7 +24,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -52,10 +58,11 @@ public class Groups {
private static final Log LOG = LogFactory.getLog(Groups.class);
private final GroupMappingServiceProvider impl;
private final Map<String, CachedGroups> userToGroupsMap =
new ConcurrentHashMap<String, CachedGroups>();
private final Map<String, List<String>> staticUserToGroupsMap =
private final LoadingCache<String, List<String>> cache;
private final ConcurrentHashMap<String, Long> negativeCacheMask =
new ConcurrentHashMap<String, Long>();
private final Map<String, List<String>> staticUserToGroupsMap =
new HashMap<String, List<String>>();
private final long cacheTimeout;
private final long negativeCacheTimeout;
@ -66,7 +73,7 @@ public class Groups {
this(conf, new Timer());
}
public Groups(Configuration conf, Timer timer) {
public Groups(Configuration conf, final Timer timer) {
impl =
ReflectionUtils.newInstance(
conf.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
@ -86,6 +93,11 @@ public class Groups {
parseStaticMapping(conf);
this.timer = timer;
this.cache = CacheBuilder.newBuilder()
.refreshAfterWrite(cacheTimeout, TimeUnit.MILLISECONDS)
.ticker(new TimerToTickerAdapter(timer))
.expireAfterWrite(10 * cacheTimeout, TimeUnit.MILLISECONDS)
.build(new GroupCacheLoader());
if(LOG.isDebugEnabled())
LOG.debug("Group mapping impl=" + impl.getClass().getName() +
@ -123,78 +135,112 @@ public class Groups {
}
}
/**
* Determine whether the CachedGroups is expired.
* @param groups cached groups for one user.
* @return true if groups is expired from useToGroupsMap.
*/
private boolean hasExpired(CachedGroups groups, long startMs) {
if (groups == null) {
return true;
}
long timeout = cacheTimeout;
if (isNegativeCacheEnabled() && groups.getGroups().isEmpty()) {
// This CachedGroups is in the negative cache, thus it should expire
// sooner.
timeout = negativeCacheTimeout;
}
return groups.getTimestamp() + timeout <= startMs;
}
private boolean isNegativeCacheEnabled() {
return negativeCacheTimeout > 0;
}
private IOException noGroupsForUser(String user) {
return new IOException("No groups found for user " + user);
}
/**
* Get the group memberships of a given user.
* If the user's group is not cached, this method may block.
* @param user User's name
* @return the group memberships of the user
* @throws IOException
* @throws IOException if user does not exist
*/
public List<String> getGroups(String user) throws IOException {
public List<String> getGroups(final String user) throws IOException {
// No need to lookup for groups of static users
List<String> staticMapping = staticUserToGroupsMap.get(user);
if (staticMapping != null) {
return staticMapping;
}
// Return cached value if available
CachedGroups groups = userToGroupsMap.get(user);
long startMs = timer.monotonicNow();
if (!hasExpired(groups, startMs)) {
if(LOG.isDebugEnabled()) {
LOG.debug("Returning cached groups for '" + user + "'");
// Check the negative cache first
if (isNegativeCacheEnabled()) {
Long expirationTime = negativeCacheMask.get(user);
if (expirationTime != null) {
if (timer.monotonicNow() < expirationTime) {
throw noGroupsForUser(user);
} else {
negativeCacheMask.remove(user, expirationTime);
}
}
if (groups.getGroups().isEmpty()) {
// Even with enabling negative cache, getGroups() has the same behavior
// that throws IOException if the groups for the user is empty.
throw new IOException("No groups found for user " + user);
}
return groups.getGroups();
}
// Create and cache user's groups
List<String> groupList = impl.getGroups(user);
long endMs = timer.monotonicNow();
long deltaMs = endMs - startMs ;
UserGroupInformation.metrics.addGetGroups(deltaMs);
if (deltaMs > warningDeltaMs) {
LOG.warn("Potential performance problem: getGroups(user=" + user +") " +
"took " + deltaMs + " milliseconds.");
try {
return cache.get(user);
} catch (ExecutionException e) {
throw (IOException)e.getCause();
}
groups = new CachedGroups(groupList, endMs);
if (groups.getGroups().isEmpty()) {
if (isNegativeCacheEnabled()) {
userToGroupsMap.put(user, groups);
}
throw new IOException("No groups found for user " + user);
}
userToGroupsMap.put(user, groups);
if(LOG.isDebugEnabled()) {
LOG.debug("Returning fetched groups for '" + user + "'");
}
return groups.getGroups();
}
/**
* Convert millisecond times from hadoop's timer to guava's nanosecond ticker.
*/
private static class TimerToTickerAdapter extends Ticker {
private Timer timer;
public TimerToTickerAdapter(Timer timer) {
this.timer = timer;
}
@Override
public long read() {
final long NANOSECONDS_PER_MS = 1000000;
return timer.monotonicNow() * NANOSECONDS_PER_MS;
}
}
/**
* Deals with loading data into the cache.
*/
private class GroupCacheLoader extends CacheLoader<String, List<String>> {
/**
* This method will block if a cache entry doesn't exist, and
* any subsequent requests for the same user will wait on this
* request to return. If a user already exists in the cache,
* this will be run in the background.
* @param user key of cache
* @return List of groups belonging to user
* @throws IOException to prevent caching negative entries
*/
@Override
public List<String> load(String user) throws Exception {
List<String> groups = fetchGroupList(user);
if (groups.isEmpty()) {
if (isNegativeCacheEnabled()) {
long expirationTime = timer.monotonicNow() + negativeCacheTimeout;
negativeCacheMask.put(user, expirationTime);
}
// We throw here to prevent Cache from retaining an empty group
throw noGroupsForUser(user);
}
return groups;
}
/**
* Queries impl for groups belonging to the user. This could involve I/O and take awhile.
*/
private List<String> fetchGroupList(String user) throws IOException {
long startMs = timer.monotonicNow();
List<String> groupList = impl.getGroups(user);
long endMs = timer.monotonicNow();
long deltaMs = endMs - startMs ;
UserGroupInformation.metrics.addGetGroups(deltaMs);
if (deltaMs > warningDeltaMs) {
LOG.warn("Potential performance problem: getGroups(user=" + user +") " +
"took " + deltaMs + " milliseconds.");
}
return groupList;
}
}
/**
* Refresh all user-to-groups mappings.
*/
@ -205,7 +251,8 @@ public class Groups {
} catch (IOException e) {
LOG.warn("Error refreshing groups cache", e);
}
userToGroupsMap.clear();
cache.invalidateAll();
negativeCacheMask.clear();
}
/**
@ -221,40 +268,6 @@ public class Groups {
}
}
/**
* Class to hold the cached groups
*/
private static class CachedGroups {
final long timestamp;
final List<String> groups;
/**
* Create and initialize group cache
*/
CachedGroups(List<String> groups, long timestamp) {
this.groups = groups;
this.timestamp = timestamp;
}
/**
* Returns time of last cache update
*
* @return time of last cache update
*/
public long getTimestamp() {
return timestamp;
}
/**
* Get list of cached groups
*
* @return cached groups
*/
public List<String> getGroups() {
return groups;
}
}
private static Groups GROUPS = null;
/**

View File

@ -51,6 +51,9 @@ public class TestGroupsCaching {
@Before
public void setup() {
FakeGroupMapping.resetRequestCount();
ExceptionalGroupMapping.resetRequestCount();
conf = new Configuration();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
FakeGroupMapping.class,
@ -61,16 +64,32 @@ public class TestGroupsCaching {
// any to n mapping
private static Set<String> allGroups = new HashSet<String>();
private static Set<String> blackList = new HashSet<String>();
private static int requestCount = 0;
private static long getGroupsDelayMs = 0;
@Override
public List<String> getGroups(String user) throws IOException {
LOG.info("Getting groups for " + user);
requestCount++;
delayIfNecessary();
if (blackList.contains(user)) {
return new LinkedList<String>();
}
return new LinkedList<String>(allGroups);
}
private void delayIfNecessary() {
if (getGroupsDelayMs > 0) {
try {
Thread.sleep(getGroupsDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void cacheGroupsRefresh() throws IOException {
LOG.info("Cache is being refreshed.");
@ -93,6 +112,36 @@ public class TestGroupsCaching {
LOG.info("Adding " + user + " to the blacklist");
blackList.add(user);
}
public static int getRequestCount() {
return requestCount;
}
public static void resetRequestCount() {
requestCount = 0;
}
public static void setGetGroupsDelayMs(long delayMs) {
getGroupsDelayMs = delayMs;
}
}
public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping {
private static int requestCount = 0;
@Override
public List<String> getGroups(String user) throws IOException {
requestCount++;
throw new IOException("For test");
}
public static int getRequestCount() {
return requestCount;
}
public static void resetRequestCount() {
requestCount = 0;
}
}
@Test
@ -219,4 +268,191 @@ public class TestGroupsCaching {
// groups for the user is fetched.
assertEquals(Arrays.asList(myGroups), groups.getGroups(user));
}
@Test
public void testCachePreventsImplRequest() throws Exception {
// Disable negative cache.
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0);
Groups groups = new Groups(conf);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
assertEquals(0, FakeGroupMapping.getRequestCount());
// First call hits the wire
assertTrue(groups.getGroups("me").size() == 2);
assertEquals(1, FakeGroupMapping.getRequestCount());
// Second count hits cache
assertTrue(groups.getGroups("me").size() == 2);
assertEquals(1, FakeGroupMapping.getRequestCount());
}
@Test
public void testExceptionsFromImplNotCachedInNegativeCache() {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
ExceptionalGroupMapping.class,
ShellBasedUnixGroupsMapping.class);
conf.setLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 10000);
Groups groups = new Groups(conf);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
assertEquals(0, ExceptionalGroupMapping.getRequestCount());
// First call should hit the wire
try {
groups.getGroups("anything");
fail("Should have thrown");
} catch (IOException e) {
// okay
}
assertEquals(1, ExceptionalGroupMapping.getRequestCount());
// Second call should hit the wire (no negative caching)
try {
groups.getGroups("anything");
fail("Should have thrown");
} catch (IOException e) {
// okay
}
assertEquals(2, ExceptionalGroupMapping.getRequestCount());
}
@Test
public void testOnlyOneRequestWhenNoEntryIsCached() throws Exception {
// Disable negative cache.
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0);
final Groups groups = new Groups(conf);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
FakeGroupMapping.setGetGroupsDelayMs(100);
ArrayList<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread() {
public void run() {
try {
assertEquals(2, groups.getGroups("me").size());
} catch (IOException e) {
fail("Should not happen");
}
}
});
}
// We start a bunch of threads who all see no cached value
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
// But only one thread should have made the request
assertEquals(1, FakeGroupMapping.getRequestCount());
}
@Test
public void testOnlyOneRequestWhenExpiredEntryExists() throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
FakeGroupMapping.setGetGroupsDelayMs(100);
// We make an initial request to populate the cache
groups.getGroups("me");
int startingRequestCount = FakeGroupMapping.getRequestCount();
// Then expire that entry
timer.advance(400 * 1000);
Thread.sleep(100);
ArrayList<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread() {
public void run() {
try {
assertEquals(2, groups.getGroups("me").size());
} catch (IOException e) {
fail("Should not happen");
}
}
});
}
// We start a bunch of threads who all see the cached value
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
// Only one extra request is made
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
}
@Test
public void testCacheEntriesExpire() throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
FakeTimer timer = new FakeTimer();
final Groups groups = new Groups(conf, timer);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
// We make an entry
groups.getGroups("me");
int startingRequestCount = FakeGroupMapping.getRequestCount();
timer.advance(20 * 1000);
// Cache entry has expired so it results in a new fetch
groups.getGroups("me");
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
}
@Test
public void testNegativeCacheClearedOnRefresh() throws Exception {
conf.setLong(
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 100);
final Groups groups = new Groups(conf);
groups.cacheGroupsAdd(Arrays.asList(myGroups));
groups.refresh();
FakeGroupMapping.clearBlackList();
FakeGroupMapping.addToBlackList("dne");
try {
groups.getGroups("dne");
fail("Should have failed to find this group");
} catch (IOException e) {
// pass
}
int startingRequestCount = FakeGroupMapping.getRequestCount();
groups.refresh();
FakeGroupMapping.addToBlackList("dne");
try {
List<String> g = groups.getGroups("dne");
fail("Should have failed to find this group");
} catch (IOException e) {
// pass
}
assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
}
}