HADOOP-13375. o.a.h.security.TestGroupsCaching.testBackgroundRefreshCounters seems flaky. (Contributed by Weiwei Yang)

(cherry picked from commit dcd21d083a)
This commit is contained in:
Mingliang Liu 2016-09-01 11:03:06 -07:00
parent 34f9330651
commit 4d709be8c9
2 changed files with 95 additions and 29 deletions

View File

@ -43,6 +43,8 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -355,21 +357,22 @@ public class Groups {
executorService.submit(new Callable<List<String>>() { executorService.submit(new Callable<List<String>>() {
@Override @Override
public List<String> call() throws Exception { public List<String> call() throws Exception {
boolean success = false;
try {
backgroundRefreshQueued.decrementAndGet(); backgroundRefreshQueued.decrementAndGet();
backgroundRefreshRunning.incrementAndGet(); backgroundRefreshRunning.incrementAndGet();
List<String> results = load(key); List<String> results = load(key);
success = true;
return results; return results;
} finally { }
backgroundRefreshRunning.decrementAndGet(); });
if (success) { Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> result) {
backgroundRefreshSuccess.incrementAndGet(); backgroundRefreshSuccess.incrementAndGet();
} else { backgroundRefreshRunning.decrementAndGet();
}
@Override
public void onFailure(Throwable t) {
backgroundRefreshException.incrementAndGet(); backgroundRefreshException.incrementAndGet();
} backgroundRefreshRunning.decrementAndGet();
}
} }
}); });
return listenableFuture; return listenableFuture;

View File

@ -25,11 +25,16 @@ import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.FakeTimer;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -67,6 +72,7 @@ public class TestGroupsCaching {
private static int requestCount = 0; private static int requestCount = 0;
private static long getGroupsDelayMs = 0; private static long getGroupsDelayMs = 0;
private static boolean throwException; private static boolean throwException;
private static volatile CountDownLatch latch = null;
@Override @Override
public List<String> getGroups(String user) throws IOException { public List<String> getGroups(String user) throws IOException {
@ -85,7 +91,19 @@ public class TestGroupsCaching {
return new LinkedList<String>(allGroups); return new LinkedList<String>(allGroups);
} }
/**
* Delay returning on a latch or a specific amount of time.
*/
private void delayIfNecessary() { private void delayIfNecessary() {
// cause current method to pause
// resume until get notified
if (latch != null) {
try {
latch.await();
return;
} catch (InterruptedException e) {}
}
if (getGroupsDelayMs > 0) { if (getGroupsDelayMs > 0) {
try { try {
Thread.sleep(getGroupsDelayMs); Thread.sleep(getGroupsDelayMs);
@ -114,6 +132,7 @@ public class TestGroupsCaching {
requestCount = 0; requestCount = 0;
getGroupsDelayMs = 0; getGroupsDelayMs = 0;
throwException = false; throwException = false;
latch = null;
} }
@Override @Override
@ -142,6 +161,31 @@ public class TestGroupsCaching {
public static void setThrowException(boolean throwIfTrue) { public static void setThrowException(boolean throwIfTrue) {
throwException = throwIfTrue; throwException = throwIfTrue;
} }
/**
* Hold on returning the group names unless being notified,
* ensure this method is called before {@link #getGroups(String)}.
* Call {@link #resume()} will resume the process.
*/
public static void pause() {
// Set a static latch, multiple background refresh threads
// share this instance. So when await is called, all the
// threads will pause until the it decreases the count of
// the latch.
latch = new CountDownLatch(1);
}
/**
* Resume the background refresh thread and return the value
* of group names.
*/
public static void resume() {
// if latch is null, it means pause was not called and it is
// safe to ignore.
if (latch != null) {
latch.countDown();
}
}
} }
public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping { public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping {
@ -610,22 +654,18 @@ public class TestGroupsCaching {
// expire the cache // expire the cache
timer.advance(2*1000); timer.advance(2*1000);
FakeGroupMapping.setGetGroupsDelayMs(40); FakeGroupMapping.pause();
// Request all groups again, as there are 2 threads to process them // Request all groups again, as there are 2 threads to process them
// 3 should get queued and 2 should be running // 3 should get queued and 2 should be running
for (String g: grps) { for (String g: grps) {
groups.getGroups(g); groups.getGroups(g);
} }
Thread.sleep(20); waitForGroupCounters(groups, 3, 2, 0, 0);
assertEquals(groups.getBackgroundRefreshQueued(), 3); FakeGroupMapping.resume();
assertEquals(groups.getBackgroundRefreshRunning(), 2);
// After 120ms all should have completed running // Once resumed, all results should be returned immediately
Thread.sleep(120); waitForGroupCounters(groups, 0, 0, 5, 0);
assertEquals(groups.getBackgroundRefreshQueued(), 0);
assertEquals(groups.getBackgroundRefreshRunning(), 0);
assertEquals(groups.getBackgroundRefreshSuccess(), 5);
// Now run again, this time throwing exceptions but no delay // Now run again, this time throwing exceptions but no delay
timer.advance(2*1000); timer.advance(2*1000);
@ -634,11 +674,34 @@ public class TestGroupsCaching {
for (String g: grps) { for (String g: grps) {
groups.getGroups(g); groups.getGroups(g);
} }
Thread.sleep(20); waitForGroupCounters(groups, 0, 0, 5, 5);
assertEquals(groups.getBackgroundRefreshQueued(), 0); }
assertEquals(groups.getBackgroundRefreshRunning(), 0);
assertEquals(groups.getBackgroundRefreshSuccess(), 5); private void waitForGroupCounters(final Groups groups, long expectedQueued,
assertEquals(groups.getBackgroundRefreshException(), 5); long expectedRunning, long expectedSuccess, long expectedExpection)
throws InterruptedException {
final long[] expected = {expectedQueued, expectedRunning,
expectedSuccess, expectedExpection};
final long[] actual = new long[expected.length];
// wait for a certain time until the counters reach
// to expected values. Check values in 20 ms interval.
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
actual[0] = groups.getBackgroundRefreshQueued();
actual[1] = groups.getBackgroundRefreshRunning();
actual[2] = groups.getBackgroundRefreshSuccess();
actual[3] = groups.getBackgroundRefreshException();
return Arrays.equals(actual, expected);
}
}, 20, 1000);
} catch (TimeoutException e) {
fail("Excepted group counter values are not reached in given time,"
+ " expecting (Queued, Running, Success, Exception) : "
+ Arrays.toString(expected) + " but actual : "
+ Arrays.toString(actual));
}
} }
@Test @Test