diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java index df32bb3ebf6..a8fa7241b77 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java @@ -43,6 +43,8 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.Cache; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -355,23 +357,24 @@ public class Groups { executorService.submit(new Callable>() { @Override public List call() throws Exception { - boolean success = false; - try { - backgroundRefreshQueued.decrementAndGet(); - backgroundRefreshRunning.incrementAndGet(); - List results = load(key); - success = true; - return results; - } finally { - backgroundRefreshRunning.decrementAndGet(); - if (success) { - backgroundRefreshSuccess.incrementAndGet(); - } else { - backgroundRefreshException.incrementAndGet(); - } - } + backgroundRefreshQueued.decrementAndGet(); + backgroundRefreshRunning.incrementAndGet(); + List results = load(key); + return results; } }); + Futures.addCallback(listenableFuture, new FutureCallback>() { + @Override + public void onSuccess(List result) { + backgroundRefreshSuccess.incrementAndGet(); + backgroundRefreshRunning.decrementAndGet(); + } + @Override + public void onFailure(Throwable t) { + backgroundRefreshException.incrementAndGet(); + backgroundRefreshRunning.decrementAndGet(); + } + }); return listenableFuture; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java index 5a5596eaeb7..2b47d41a625 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java @@ -25,11 +25,16 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.FakeTimer; import org.junit.Before; import org.junit.Test; + +import com.google.common.base.Supplier; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; @@ -67,6 +72,7 @@ public class TestGroupsCaching { private static int requestCount = 0; private static long getGroupsDelayMs = 0; private static boolean throwException; + private static volatile CountDownLatch latch = null; @Override public List getGroups(String user) throws IOException { @@ -85,7 +91,19 @@ public class TestGroupsCaching { return new LinkedList(allGroups); } + /** + * Delay returning on a latch or a specific amount of time. + */ private void delayIfNecessary() { + // cause current method to pause + // resume until get notified + if (latch != null) { + try { + latch.await(); + return; + } catch (InterruptedException e) {} + } + if (getGroupsDelayMs > 0) { try { Thread.sleep(getGroupsDelayMs); @@ -114,6 +132,7 @@ public class TestGroupsCaching { requestCount = 0; getGroupsDelayMs = 0; throwException = false; + latch = null; } @Override @@ -142,6 +161,31 @@ public class TestGroupsCaching { public static void setThrowException(boolean throwIfTrue) { throwException = throwIfTrue; } + + /** + * Hold on returning the group names unless being notified, + * ensure this method is called before {@link #getGroups(String)}. + * Call {@link #resume()} will resume the process. + */ + public static void pause() { + // Set a static latch, multiple background refresh threads + // share this instance. So when await is called, all the + // threads will pause until the it decreases the count of + // the latch. + latch = new CountDownLatch(1); + } + + /** + * Resume the background refresh thread and return the value + * of group names. + */ + public static void resume() { + // if latch is null, it means pause was not called and it is + // safe to ignore. + if (latch != null) { + latch.countDown(); + } + } } public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping { @@ -610,22 +654,18 @@ public class TestGroupsCaching { // expire the cache timer.advance(2*1000); - FakeGroupMapping.setGetGroupsDelayMs(40); + FakeGroupMapping.pause(); // Request all groups again, as there are 2 threads to process them // 3 should get queued and 2 should be running for (String g: grps) { groups.getGroups(g); } - Thread.sleep(20); - assertEquals(groups.getBackgroundRefreshQueued(), 3); - assertEquals(groups.getBackgroundRefreshRunning(), 2); + waitForGroupCounters(groups, 3, 2, 0, 0); + FakeGroupMapping.resume(); - // After 120ms all should have completed running - Thread.sleep(120); - assertEquals(groups.getBackgroundRefreshQueued(), 0); - assertEquals(groups.getBackgroundRefreshRunning(), 0); - assertEquals(groups.getBackgroundRefreshSuccess(), 5); + // Once resumed, all results should be returned immediately + waitForGroupCounters(groups, 0, 0, 5, 0); // Now run again, this time throwing exceptions but no delay timer.advance(2*1000); @@ -634,11 +674,34 @@ public class TestGroupsCaching { for (String g: grps) { groups.getGroups(g); } - Thread.sleep(20); - assertEquals(groups.getBackgroundRefreshQueued(), 0); - assertEquals(groups.getBackgroundRefreshRunning(), 0); - assertEquals(groups.getBackgroundRefreshSuccess(), 5); - assertEquals(groups.getBackgroundRefreshException(), 5); + waitForGroupCounters(groups, 0, 0, 5, 5); + } + + private void waitForGroupCounters(final Groups groups, long expectedQueued, + long expectedRunning, long expectedSuccess, long expectedExpection) + throws InterruptedException { + final long[] expected = {expectedQueued, expectedRunning, + expectedSuccess, expectedExpection}; + final long[] actual = new long[expected.length]; + // wait for a certain time until the counters reach + // to expected values. Check values in 20 ms interval. + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + actual[0] = groups.getBackgroundRefreshQueued(); + actual[1] = groups.getBackgroundRefreshRunning(); + actual[2] = groups.getBackgroundRefreshSuccess(); + actual[3] = groups.getBackgroundRefreshException(); + return Arrays.equals(actual, expected); + } + }, 20, 1000); + } catch (TimeoutException e) { + fail("Excepted group counter values are not reached in given time," + + " expecting (Queued, Running, Success, Exception) : " + + Arrays.toString(expected) + " but actual : " + + Arrays.toString(actual)); + } } @Test