Merge pull request #1694 from metamx/namespaceExtractionCacheManagerTestImprovements

Better timing and locking in NamespaceExtractionCacheManagerExecutorsTest
This commit is contained in:
Fangjin Yang 2015-09-04 15:06:22 -07:00
commit 07266d682a
2 changed files with 286 additions and 236 deletions

View File

@ -148,9 +148,9 @@ public abstract class NamespaceExtractionCacheManager
);
}
protected void waitForServiceToEnd(long time, TimeUnit unit) throws InterruptedException
protected boolean waitForServiceToEnd(long time, TimeUnit unit) throws InterruptedException
{
listeningScheduledExecutorService.awaitTermination(time, unit);
return listeningScheduledExecutorService.awaitTermination(time, unit);
}

View File

@ -23,15 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.concurrent.Execs;
import io.druid.data.SearchableVersionedDataFinder;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.extraction.namespace.ExtractionNamespace;
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
import io.druid.query.extraction.namespace.URIExtractionNamespace;
@ -39,13 +38,13 @@ import io.druid.query.extraction.namespace.URIExtractionNamespaceTest;
import io.druid.segment.loading.LocalFileTimestampVersionFinder;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.server.namespace.URIExtractionNamespaceFunctionFactory;
import org.apache.commons.io.FileUtils;
import org.joda.time.Period;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileOutputStream;
@ -58,13 +57,14 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -72,73 +72,125 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class NamespaceExtractionCacheManagerExecutorsTest
{
private static final Logger log = new Logger(NamespaceExtractionCacheManagerExecutorsTest.class);
private static Path tmpDir;
private static final String KEY = "foo";
private static final String VALUE = "bar";
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private Lifecycle lifecycle;
private NamespaceExtractionCacheManager manager;
private File tmpFile;
private URIExtractionNamespaceFunctionFactory factory;
private final ConcurrentHashMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<String, Function<String, String>>();
private final ConcurrentMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Object> cacheUpdateAlerts = new ConcurrentHashMap<>();
@BeforeClass
public static void setUpStatic() throws IOException
{
tmpDir = Files.createTempDirectory("TestNamespaceExtractionCacheManagerExecutors");
}
@AfterClass
public static void tearDownStatic() throws IOException
{
FileUtils.deleteDirectory(tmpDir.toFile());
}
private final AtomicLong numRuns = new AtomicLong(0L);
@Before
public void setUp() throws IOException
{
final Path tmpDir = temporaryFolder.newFolder().toPath();
lifecycle = new Lifecycle();
manager = new OnHeapNamespaceExtractionCacheManager(
lifecycle, fnCache, new NoopServiceEmitter(),
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
URIExtractionNamespace.class,
new URIExtractionNamespaceFunctionFactory(
ImmutableMap.<String, SearchableVersionedDataFinder>of("file", new LocalFileTimestampVersionFinder())
)
)
);
tmpFile = Files.createTempFile(tmpDir, "druidTestURIExtractionNS", ".dat").toFile();
tmpFile.deleteOnExit();
final ObjectMapper mapper = new DefaultObjectMapper();
try (OutputStream ostream = new FileOutputStream(tmpFile)) {
try (OutputStreamWriter out = new OutputStreamWriter(ostream)) {
out.write(mapper.writeValueAsString(ImmutableMap.<String, String>of("foo", "bar")));
}
}
factory = new URIExtractionNamespaceFunctionFactory(
final URIExtractionNamespaceFunctionFactory factory = new URIExtractionNamespaceFunctionFactory(
ImmutableMap.<String, SearchableVersionedDataFinder>of("file", new LocalFileTimestampVersionFinder())
)
{
@Override
public Callable<String> getCachePopulator(
final URIExtractionNamespace extractionNamespace,
final String lastVersion,
final Map<String, String> cache
)
{
final Callable<String> superCallable = super.getCachePopulator(extractionNamespace, lastVersion, cache);
return new Callable<String>()
{
@Override
public String call() throws Exception
{
superCallable.call();
return String.format("%d", System.currentTimeMillis());
// Don't actually read off disk because TravisCI doesn't like that
cache.put(KEY, VALUE);
Thread.sleep(2);// To make absolutely sure there is a unique currentTimeMillis
return Long.toString(System.currentTimeMillis());
}
};
}
};
manager = new OnHeapNamespaceExtractionCacheManager(
lifecycle, fnCache, new NoopServiceEmitter(),
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
URIExtractionNamespace.class,
factory
)
)
{
@Override
protected <T extends ExtractionNamespace> Runnable getPostRunnable(
final T namespace,
final ExtractionNamespaceFunctionFactory<T> factory,
final String cacheId
)
{
final Runnable runnable = super.getPostRunnable(namespace, factory, cacheId);
cacheUpdateAlerts.putIfAbsent(namespace.getNamespace(), new Object());
final Object cacheUpdateAlerter = cacheUpdateAlerts.get(namespace.getNamespace());
return new Runnable()
{
@Override
public void run()
{
synchronized (cacheUpdateAlerter) {
try {
runnable.run();
numRuns.incrementAndGet();
}
finally {
cacheUpdateAlerter.notifyAll();
}
}
}
};
}
};
tmpFile = Files.createTempFile(tmpDir, "druidTestURIExtractionNS", ".dat").toFile();
try (OutputStream ostream = new FileOutputStream(tmpFile)) {
try (OutputStreamWriter out = new OutputStreamWriter(ostream)) {
// Since Travis sucks with disk related stuff, we override the disk reading part above.
// This is safe and should shake out any problem areas that accidentally read the file.
out.write("SHOULDN'T TRY TO PARSE");
out.flush();
}
}
}
@After
public void tearDown()
{
lifecycle.stop();
}
@Test(expected = IAE.class)
public void testDoubleSubmission()
{
URIExtractionNamespace namespace = new URIExtractionNamespace(
"ns",
tmpFile.toURI(),
new URIExtractionNamespace.ObjectMapperFlatDataParser(
URIExtractionNamespaceTest.registerTypes(new ObjectMapper())
),
new Period(0),
null
);
final ListenableFuture<?> future = manager.schedule(namespace);
Assert.assertFalse(future.isDone());
Assert.assertFalse(future.isCancelled());
try {
manager.schedule(namespace).cancel(true);
}
finally {
future.cancel(true);
}
}
@Test(timeout = 50_000)
@Test(timeout = 60_000)
public void testSimpleSubmission() throws ExecutionException, InterruptedException
{
URIExtractionNamespace namespace = new URIExtractionNamespace(
@ -150,23 +202,16 @@ public class NamespaceExtractionCacheManagerExecutorsTest
new Period(0),
null
);
try {
NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace));
}
finally {
lifecycle.stop();
}
NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace));
}
@Test(timeout = 50_000)
@Test(timeout = 60_000)
public void testRepeatSubmission() throws ExecutionException, InterruptedException
{
final int repeatCount = 5;
final long delay = 5;
final AtomicLong ranCount = new AtomicLong(0l);
final long totalRunCount;
final long start;
final CountDownLatch latch = new CountDownLatch(repeatCount);
try {
final URIExtractionNamespace namespace = new URIExtractionNamespace(
"ns",
@ -177,27 +222,28 @@ public class NamespaceExtractionCacheManagerExecutorsTest
new Period(delay),
null
);
cacheUpdateAlerts.putIfAbsent(namespace.getNamespace(), new Object());
start = System.currentTimeMillis();
final String cacheId = UUID.randomUUID().toString();
ListenableFuture<?> future = manager.schedule(
namespace, factory, new Runnable()
{
@Override
public void run()
{
try {
manager.getPostRunnable(namespace, factory, cacheId).run();
ranCount.incrementAndGet();
}
finally {
latch.countDown();
}
}
},
cacheId
);
latch.await();
ListenableFuture<?> future = manager.schedule(namespace);
Assert.assertFalse(future.isDone());
Assert.assertFalse(future.isCancelled());
final long preRunCount;
final Object cacheUpdateAlerter = cacheUpdateAlerts.get(namespace.getNamespace());
synchronized (cacheUpdateAlerter) {
preRunCount = numRuns.get();
}
for (; ; ) {
synchronized (cacheUpdateAlerter) {
if (numRuns.get() - preRunCount >= repeatCount) {
break;
} else {
cacheUpdateAlerter.wait();
}
}
}
long minEnd = start + ((repeatCount - 1) * delay);
long end = System.currentTimeMillis();
Assert.assertTrue(
@ -205,148 +251,175 @@ public class NamespaceExtractionCacheManagerExecutorsTest
"Didn't wait long enough between runs. Expected more than %d was %d",
minEnd - start,
end - start
), minEnd < end
), minEnd <= end
);
}
finally {
lifecycle.stop();
}
totalRunCount = ranCount.get();
Thread.sleep(50);
Assert.assertEquals(totalRunCount, ranCount.get(), 1);
totalRunCount = numRuns.get();
Thread.sleep(delay * 10);
Assert.assertEquals(totalRunCount, numRuns.get(), 1);
}
@Test(timeout = 50_000)
public void testConcurrentDelete() throws ExecutionException, InterruptedException
@Test(timeout = 600_000) // This is very fast when run locally. Speed on Travis completely depends on noisy neighbors.
public void testConcurrentAddDelete() throws ExecutionException, InterruptedException, TimeoutException
{
final int threads = 5;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threads));
final int threads = 10;
final int deletesPerThread = 5;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(
threads,
"concurrentTestingPool-%s"
)
);
final CountDownLatch latch = new CountDownLatch(threads);
Collection<ListenableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < threads; ++i) {
final int loopNum = i;
ListenableFuture<?> future = executorService.submit(
new Runnable()
{
@Override
public void run()
{
try {
latch.countDown();
latch.await();
for (int j = 0; j < 10; ++j) {
testDelete(String.format("ns-%d", loopNum));
final int ii = i;
futures.add(
executorService.submit(
new Runnable()
{
@Override
public void run()
{
try {
latch.countDown();
if (!latch.await(5, TimeUnit.SECONDS)) {
throw new RuntimeException(new TimeoutException("Took too long to wait for more tasks"));
}
for (int j = 0; j < deletesPerThread; ++j) {
testDelete(String.format("ns-%d-%d", ii, j));
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
)
);
}
Futures.allAsList(futures).get();
executorService.shutdown();
}
@Test(timeout = 50_000)
public void testDelete()
throws NoSuchFieldException, IllegalAccessException, InterruptedException, ExecutionException
{
// Create an all-encompassing exception if any of them failed
final Collection<Exception> exceptions = new ArrayList<>();
try {
testDelete("ns");
for (ListenableFuture<?> future : futures) {
try {
future.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}
catch (Exception e) {
exceptions.add(e);
}
}
if (!exceptions.isEmpty()) {
final RuntimeException e = new RuntimeException("Futures failed");
for (Exception ex : exceptions) {
e.addSuppressed(ex);
}
}
}
finally {
lifecycle.stop();
executorService.shutdownNow();
}
checkNoMoreRunning();
}
@Test(timeout = 60_000L)
public void testSimpleDelete() throws InterruptedException
{
testDelete("someNamespace");
}
public void testDelete(final String ns)
throws InterruptedException
{
final CountDownLatch latch = new CountDownLatch(5);
final CountDownLatch latchMore = new CountDownLatch(10);
cacheUpdateAlerts.putIfAbsent(ns, new Object());
final Object cacheUpdateAlerter = cacheUpdateAlerts.get(ns);
final AtomicLong runs = new AtomicLong(0);
long prior = 0;
final long period = 1_000L;// Give it some time between attempts to update
final URIExtractionNamespace namespace = new URIExtractionNamespace(
ns,
tmpFile.toURI(),
new URIExtractionNamespace.ObjectMapperFlatDataParser(
URIExtractionNamespaceTest.registerTypes(new ObjectMapper())
),
new Period(1l),
new Period(period),
null
);
final String cacheId = UUID.randomUUID().toString();
final CountDownLatch latchBeforeMore = new CountDownLatch(1);
ListenableFuture<?> future =
manager.schedule(
namespace, factory, new Runnable()
{
@Override
public void run()
{
try {
if (!Thread.interrupted()) {
manager.getPostRunnable(namespace, factory, cacheId).run();
} else {
Thread.currentThread().interrupt();
}
if (!Thread.interrupted()) {
runs.incrementAndGet();
} else {
Thread.currentThread().interrupt();
}
}
finally {
latch.countDown();
try {
if (latch.getCount() == 0) {
latchBeforeMore.await();
}
}
catch (InterruptedException e) {
log.debug("Interrupted");
Thread.currentThread().interrupt();
}
finally {
latchMore.countDown();
}
}
}
},
cacheId
);
latch.await();
prior = runs.get();
latchBeforeMore.countDown();
final ListenableFuture<?> future = manager.schedule(namespace);
Assert.assertFalse(future.isCancelled());
Assert.assertFalse(future.isDone());
Assert.assertTrue(fnCache.containsKey(ns));
latchMore.await();
Assert.assertTrue(runs.get() > prior);
long start = 0L;
final long timeout = 45_000L;
do {
synchronized (cacheUpdateAlerter) {
if (!fnCache.containsKey(ns)) {
cacheUpdateAlerter.wait(10_000);
}
}
if (future.isDone()) {
try {
// Bubble up the exception
Assert.assertNull(future.get());
Assert.fail("Task finished");
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
}
if (!fnCache.containsKey(ns) && System.currentTimeMillis() - start > timeout) {
throw new RuntimeException(
new TimeoutException(
String.format(
"Namespace took too long to appear in cache for %s",
namespace
)
)
);
}
} while (!fnCache.containsKey(ns));
Assert.assertEquals(VALUE, manager.getCacheMap(ns).get(KEY));
Assert.assertTrue(fnCache.containsKey(ns));
Assert.assertTrue(manager.implData.containsKey(ns));
manager.delete("ns");
Assert.assertTrue(manager.delete(ns));
try {
Assert.assertNull(future.get());
}
catch (CancellationException e) {
// Ignore
}
catch (ExecutionException e) {
if (!future.isCancelled()) {
throw Throwables.propagate(e);
}
}
Assert.assertFalse(manager.implData.containsKey(ns));
Assert.assertFalse(fnCache.containsKey(ns));
Assert.assertTrue(future.isCancelled());
Assert.assertTrue(future.isDone());
prior = runs.get();
Thread.sleep(20);
Assert.assertEquals(prior, runs.get());
}
@Test(timeout = 50_000)
@Test(timeout = 60_000)
public void testShutdown()
throws NoSuchFieldException, IllegalAccessException, InterruptedException, ExecutionException
{
final CountDownLatch latch = new CountDownLatch(1);
final long period = 5L;
final ListenableFuture future;
final AtomicLong runs = new AtomicLong(0);
long prior = 0;
try {
@ -356,43 +429,34 @@ public class NamespaceExtractionCacheManagerExecutorsTest
new URIExtractionNamespace.ObjectMapperFlatDataParser(
URIExtractionNamespaceTest.registerTypes(new ObjectMapper())
),
new Period(1l),
new Period(period),
null
);
final String cacheId = UUID.randomUUID().toString();
final Runnable runnable = manager.getPostRunnable(namespace, factory, cacheId);
future =
manager.schedule(
namespace, factory, new Runnable()
{
@Override
public void run()
{
runnable.run();
latch.countDown();
runs.incrementAndGet();
}
},
cacheId
);
cacheUpdateAlerts.putIfAbsent(namespace.getNamespace(), new Object());
future = manager.schedule(namespace);
final Object cacheUpdateAlerter = cacheUpdateAlerts.get(namespace.getNamespace());
synchronized (cacheUpdateAlerter) {
cacheUpdateAlerter.wait();
}
latch.await();
Assert.assertFalse(future.isCancelled());
Assert.assertFalse(future.isDone());
prior = runs.get();
while (runs.get() <= prior) {
Thread.sleep(50);
synchronized (cacheUpdateAlerter) {
prior = numRuns.get();
cacheUpdateAlerter.wait();
}
Assert.assertTrue(runs.get() > prior);
Assert.assertTrue(numRuns.get() > prior);
}
finally {
lifecycle.stop();
}
manager.waitForServiceToEnd(1_000, TimeUnit.MILLISECONDS);
while (!manager.waitForServiceToEnd(1_000, TimeUnit.MILLISECONDS)) {
}
prior = runs.get();
Thread.sleep(50);
Assert.assertEquals(prior, runs.get());
checkNoMoreRunning();
Field execField = NamespaceExtractionCacheManager.class.getDeclaredField("listeningScheduledExecutorService");
execField.setAccessible(true);
@ -400,62 +464,48 @@ public class NamespaceExtractionCacheManagerExecutorsTest
Assert.assertTrue(((ListeningScheduledExecutorService) execField.get(manager)).isTerminated());
}
@Test(timeout = 50_000)
@Test(timeout = 60_000)
public void testRunCount()
throws InterruptedException, ExecutionException
{
final Lifecycle lifecycle = new Lifecycle();
final NamespaceExtractionCacheManager onHeap;
final AtomicLong runCount = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(1);
final long numWaits = 5;
final ListenableFuture<?> future;
try {
onHeap = new OnHeapNamespaceExtractionCacheManager(
lifecycle,
new ConcurrentHashMap<String, Function<String, String>>(),
new NoopServiceEmitter(),
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
URIExtractionNamespace.class,
new URIExtractionNamespaceFunctionFactory(
ImmutableMap.<String, SearchableVersionedDataFinder>of(
"file",
new LocalFileTimestampVersionFinder()
)
)
)
);
final URIExtractionNamespace namespace = new URIExtractionNamespace(
"ns",
tmpFile.toURI(),
new URIExtractionNamespace.ObjectMapperFlatDataParser(
URIExtractionNamespaceTest.registerTypes(new ObjectMapper())
),
new Period(1l),
new Period(5l),
null
);
final String cacheId = UUID.randomUUID().toString();
ListenableFuture<?> future =
onHeap.schedule(
namespace, factory, new Runnable()
{
@Override
public void run()
{
manager.getPostRunnable(namespace, factory, cacheId).run();
latch.countDown();
runCount.incrementAndGet();
}
},
cacheId
);
latch.await();
Thread.sleep(20);
cacheUpdateAlerts.putIfAbsent(namespace.getNamespace(), new Object());
future = manager.schedule(namespace);
Assert.assertFalse(future.isDone());
final Object cacheUpdateAlerter = cacheUpdateAlerts.get(namespace.getNamespace());
for (int i = 0; i < numWaits; ++i) {
synchronized (cacheUpdateAlerter) {
cacheUpdateAlerter.wait();
}
}
Assert.assertFalse(future.isDone());
}
finally {
lifecycle.stop();
}
onHeap.waitForServiceToEnd(1_000, TimeUnit.MILLISECONDS);
Assert.assertTrue(runCount.get() > 5);
while (!manager.waitForServiceToEnd(1_000, TimeUnit.MILLISECONDS)) {
}
Assert.assertTrue(numRuns.get() >= numWaits);
checkNoMoreRunning();
}
private void checkNoMoreRunning() throws InterruptedException
{
final long pre = numRuns.get();
Thread.sleep(100L);
Assert.assertEquals(pre, numRuns.get(), 1); // since we don't synchronize here we might have an extra increment
}
}