From 95e08b38ea6e77e2d6895678ea53cbdf8f5aaae0 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 16 Sep 2016 11:54:23 -0700 Subject: [PATCH] [QTL] Reduced Locking Lookups (#3071) * Lockless lookups * Fix compile problem * Make stack trace throw instead * Remove non-germane change * * Add better naming to cache keys. Makes logging nicer * Fix #3459 * Move start/stop lock to non-interruptable for readability purposes --- .../lookup/KafkaLookupExtractorFactory.java | 3 +- .../NamespaceLookupExtractorFactory.java | 29 +- .../NamespaceExtractionCacheManager.java | 2 +- ...ffHeapNamespaceExtractionCacheManager.java | 28 +- ...apNamespaceExtractionCacheManagerTest.java | 80 ++++ .../query/lookup/LookupReferencesManager.java | 140 ++++++- .../io/druid/query/lookup/LookupModule.java | 23 +- .../lookup/LookupReferencesManagerTest.java | 388 +++++++++++++++++- 8 files changed, 627 insertions(+), 66 deletions(-) diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java index 2649794f3fc..b97a275cb71 100644 --- a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java +++ b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java @@ -78,7 +78,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory private final ListeningExecutorService executorService; private final AtomicLong doubleEventCount = new AtomicLong(0L); private final NamespaceExtractionCacheManager cacheManager; - private final String factoryId = UUID.randomUUID().toString(); + private final String factoryId; private final AtomicReference> mapRef = new AtomicReference<>(null); private final AtomicBoolean started = new AtomicBoolean(false); @@ -114,6 +114,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory this.cacheManager = cacheManager; this.connectTimeout = connectTimeout; this.injective = injective; + this.factoryId = "kafka-factory-" + kafkaTopic + UUID.randomUUID().toString(); } public KafkaLookupExtractorFactory( diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/NamespaceLookupExtractorFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/NamespaceLookupExtractorFactory.java index fa94dab5e97..233728e0c20 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/NamespaceLookupExtractorFactory.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/NamespaceLookupExtractorFactory.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.metamx.common.ISE; import com.metamx.common.StringUtils; import com.metamx.common.logger.Logger; @@ -78,7 +79,7 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory Preconditions.checkArgument(this.firstCacheTimeout >= 0); this.injective = injective; this.manager = manager; - this.extractorID = buildID(); + this.extractorID = String.format("namespace-factory-%s-%s", extractionNamespace, UUID.randomUUID().toString()); this.lookupIntrospectHandler = new NamespaceLookupIntrospectHandler(this, manager, extractorID); } @@ -95,7 +96,12 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory public boolean start() { final Lock writeLock = startStopSync.writeLock(); - writeLock.lock(); + try { + writeLock.lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } try { if (started) { LOG.warn("Already started! [%s]", extractorID); @@ -125,7 +131,12 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory public boolean close() { final Lock writeLock = startStopSync.writeLock(); - writeLock.lock(); + try { + writeLock.lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } try { if (!started) { LOG.warn("Not started! [%s]", extractorID); @@ -179,17 +190,17 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory return injective; } - private String buildID() - { - return UUID.randomUUID().toString(); - } - // Grab the latest snapshot from the cache manager @Override public LookupExtractor get() { final Lock readLock = startStopSync.readLock(); - readLock.lock(); + try { + readLock.lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } try { if (!started) { throw new ISE("Factory [%s] not started", extractorID); diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java index d9badf8a1c7..41f2ed536d9 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java @@ -319,7 +319,7 @@ public abstract class NamespaceExtractionCacheManager if (factory == null) { throw new ISE("Cannot find factory for namespace [%s]", namespace); } - final String cacheId = UUID.randomUUID().toString(); + final String cacheId = String.format("namespace-cache-%s-%s", id, UUID.randomUUID().toString()); return schedule(id, namespace, factory, getPostRunnable(id, namespace, factory, cacheId), cacheId); } diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java index f33f25949f9..672702e3576 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java @@ -48,7 +48,7 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC private static final Logger log = new Logger(OffHeapNamespaceExtractionCacheManager.class); private final DB mmapDB; private ConcurrentMap currentNamespaceCache = new ConcurrentHashMap<>(); - private Striped nsLocks = Striped.lock(32); // Needed to make sure delete() doesn't do weird things + private Striped nsLocks = Striped.lazyWeakLock(1024); // Needed to make sure delete() doesn't do weird things private final File tmpFile; @Inject @@ -137,7 +137,7 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC lock.lock(); try { if (super.delete(namespaceKey)) { - final String mmapDBkey = currentNamespaceCache.get(namespaceKey); + final String mmapDBkey = currentNamespaceCache.remove(namespaceKey); if (mmapDBkey != null) { final long pre = tmpFile.length(); mmapDB.delete(mmapDBkey); @@ -156,27 +156,17 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC } @Override - public ConcurrentMap getCacheMap(String namespaceOrCacheKey) + public ConcurrentMap getCacheMap(String namespaceKey) { - final Lock lock = nsLocks.get(namespaceOrCacheKey); + final Lock lock = nsLocks.get(namespaceKey); lock.lock(); try { - String realKey = currentNamespaceCache.get(namespaceOrCacheKey); - if (realKey == null) { - realKey = namespaceOrCacheKey; - } - final Lock nsLock = nsLocks.get(realKey); - if (lock != nsLock) { - nsLock.lock(); - } - try { - return mmapDB.createHashMap(realKey).makeOrGet(); - } - finally { - if (lock != nsLock) { - nsLock.unlock(); - } + String mapDBKey = currentNamespaceCache.get(namespaceKey); + if (mapDBKey == null) { + // Not something created by swapAndClearCache + mapDBKey = namespaceKey; } + return mmapDB.createHashMap(mapDBKey).makeOrGet(); } finally { lock.unlock(); diff --git a/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManagerTest.java b/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManagerTest.java index 0af60778d9f..98793b4b2f1 100644 --- a/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManagerTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManagerTest.java @@ -19,21 +19,38 @@ package io.druid.server.lookup.namespace.cache; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.concurrent.Execs; import io.druid.guice.GuiceInjectors; import io.druid.guice.JsonConfigProvider; import io.druid.guice.annotations.Self; import io.druid.initialization.Initialization; +import io.druid.query.lookup.namespace.ExtractionNamespace; +import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory; import io.druid.server.DruidNode; import io.druid.server.lookup.namespace.NamespaceExtractionModule; +import io.druid.server.metrics.NoopServiceEmitter; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; public class OffHeapNamespaceExtractionCacheManagerTest { @@ -61,4 +78,67 @@ public class OffHeapNamespaceExtractionCacheManagerTest final NamespaceExtractionCacheManager manager = injector.getInstance(NamespaceExtractionCacheManager.class); Assert.assertEquals(OffHeapNamespaceExtractionCacheManager.class, manager.getClass()); } + + @Test(timeout = 30000L) + public void testRacyCreation() throws Exception + { + final int concurrentThreads = 100; + final Lifecycle lifecycle = new Lifecycle(); + final ServiceEmitter emitter = new NoopServiceEmitter(); + final OffHeapNamespaceExtractionCacheManager manager = new OffHeapNamespaceExtractionCacheManager( + lifecycle, + emitter, + ImmutableMap., ExtractionNamespaceCacheFactory>of() + ); + final ListeningExecutorService service = MoreExecutors.listeningDecorator(Execs.multiThreaded( + concurrentThreads, + "offheaptest-%s" + )); + final List> futures = new ArrayList<>(); + final CountDownLatch thunder = new CountDownLatch(1); + final List namespaceIds = new ArrayList<>(); + for (int i = 0; i < 5; ++i) { + final String namespace = "namespace-" + UUID.randomUUID().toString(); + final String cacheKey = "initial-cache-" + namespace; + namespaceIds.add(namespace); + manager.getCacheMap(cacheKey).put("foo", "bar"); + Assert.assertFalse(manager.swapAndClearCache(namespace, cacheKey)); + } + final Random random = new Random(3748218904L); + try { + for (int i = 0; i < concurrentThreads; ++i) { + final int j = i; + final String namespace = namespaceIds.get(random.nextInt(namespaceIds.size())); + futures.add(service.submit( + new Runnable() + { + @Override + public void run() + { + try { + thunder.await(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + for (int i = 0; i < 1000; ++i) { + final String cacheKey = String.format("%s-%d-key-%d", namespace, j, i); + manager.getCacheMap(cacheKey).put("foo", "bar" + Integer.toString(i)); + Assert.assertTrue(manager.swapAndClearCache(namespace, cacheKey)); + } + } + } + )); + } + thunder.countDown(); + Futures.allAsList(futures).get(); + } + finally { + service.shutdownNow(); + } + + for (final String namespace : namespaceIds) { + Assert.assertEquals(ImmutableMap.of("foo", "bar999"), manager.getCacheMap(namespace)); + } + } } diff --git a/processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java b/processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java index 66c21e5f1fb..5699e1817e8 100644 --- a/processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java +++ b/processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java @@ -23,6 +23,7 @@ package io.druid.query.lookup; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,6 +42,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class provide a basic {@link LookupExtractorFactory} references manager. @@ -54,8 +57,9 @@ import java.util.concurrent.atomic.AtomicBoolean; public class LookupReferencesManager { private static final Logger LOGGER = new Logger(LookupReferencesManager.class); - private final ConcurrentMap lookupMap = new ConcurrentHashMap(); - private final Object lock = new Object(); + private final ConcurrentMap lookupMap = new ConcurrentHashMap<>(); + // This is a lock against the state of the REFERENCE MANAGER (aka start/stop state), NOT of the lookup itself. + private final ReadWriteLock startStopLock = new ReentrantReadWriteLock(true); private final AtomicBoolean started = new AtomicBoolean(false); private final LookupSnapshotTaker lookupSnapshotTaker; @@ -73,7 +77,8 @@ public class LookupReferencesManager @LifecycleStart public void start() { - synchronized (lock) { + startStopLock.writeLock().lock(); + try { if (!started.getAndSet(true)) { if (lookupSnapshotTaker != null) { final List lookupBeanList = lookupSnapshotTaker.pullExistingSnapshot(); @@ -84,12 +89,16 @@ public class LookupReferencesManager LOGGER.info("Started lookup factory references manager"); } } + finally { + startStopLock.writeLock().unlock(); + } } @LifecycleStop public void stop() { - synchronized (lock) { + startStopLock.writeLock().lock(); + try { if (started.getAndSet(false)) { if (lookupSnapshotTaker != null) { lookupSnapshotTaker.takeSnapshot(getAllAsList()); @@ -100,6 +109,9 @@ public class LookupReferencesManager } } } + finally { + startStopLock.writeLock().unlock(); + } } /** @@ -112,7 +124,13 @@ public class LookupReferencesManager */ public boolean put(String lookupName, final LookupExtractorFactory lookupExtractorFactory) { - synchronized (lock) { + try { + startStopLock.readLock().lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + try { assertStarted(); if (lookupMap.containsKey(lookupName)) { LOGGER.warn("lookup [%s] is not add, another lookup with the same name already exist", lookupName); @@ -123,7 +141,7 @@ public class LookupReferencesManager } final boolean noPrior = null == lookupMap.putIfAbsent(lookupName, lookupExtractorFactory); if (noPrior) { - if(lookupSnapshotTaker != null) { + if (lookupSnapshotTaker != null) { lookupSnapshotTaker.takeSnapshot(getAllAsList()); } } else { @@ -133,6 +151,9 @@ public class LookupReferencesManager } return noPrior; } + finally { + startStopLock.readLock().unlock(); + } } /** @@ -143,12 +164,19 @@ public class LookupReferencesManager public void put(Map lookups) { Map failedExtractorFactoryMap = new HashMap<>(); - synchronized (lock) { + try { + startStopLock.readLock().lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + try { assertStarted(); for (Map.Entry entry : lookups.entrySet()) { final String lookupName = entry.getKey(); final LookupExtractorFactory lookupExtractorFactory = entry.getValue(); if (lookupMap.containsKey(lookupName)) { + // Fail early without bothering to start LOGGER.warn("lookup [%s] is not add, another lookup with the same name already exist", lookupName); continue; } @@ -156,7 +184,14 @@ public class LookupReferencesManager failedExtractorFactoryMap.put(lookupName, lookupExtractorFactory); continue; } - lookupMap.put(lookupName, lookupExtractorFactory); + if (null != lookupMap.putIfAbsent(lookupName, lookupExtractorFactory)) { + // handle race + LOGGER.warn("lookup [%s] is not add, another lookup with the same name already exist", lookupName); + if (!lookupExtractorFactory.close()) { + LOGGER.error("Failed to properly close stale lookup [%s]", lookupExtractorFactory); + } + continue; + } if (lookupSnapshotTaker != null) { lookupSnapshotTaker.takeSnapshot(getAllAsList()); } @@ -168,6 +203,9 @@ public class LookupReferencesManager ); } } + finally { + startStopLock.readLock().unlock(); + } } /** @@ -182,23 +220,52 @@ public class LookupReferencesManager */ public boolean updateIfNew(String lookupName, final LookupExtractorFactory lookupExtractorFactory) { - final boolean update; - synchronized (lock) { + boolean update = false; + try { + startStopLock.readLock().lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + try { assertStarted(); - final LookupExtractorFactory prior = lookupMap.get(lookupName); + LookupExtractorFactory prior = lookupMap.get(lookupName); update = lookupExtractorFactory.replaces(prior); if (update) { if (!lookupExtractorFactory.start()) { throw new ISE("Could not start [%s]", lookupName); } - lookupMap.put(lookupName, lookupExtractorFactory); - if (prior != null) { + boolean racy; + do { + if (prior == null) { + racy = null != lookupMap.putIfAbsent(lookupName, lookupExtractorFactory); + } else { + racy = !lookupMap.replace(lookupName, prior, lookupExtractorFactory); + } + + if (racy) { + prior = lookupMap.get(lookupName); + update = lookupExtractorFactory.replaces(prior); + } + } while (racy && update); + + if (prior != null && update) { if (!prior.close()) { - LOGGER.error("Error closing [%s]:[%s]", lookupName, lookupExtractorFactory); + LOGGER.error("Error closing [%s]:[%s]", lookupName, prior); + } + } + + if (!update) { + // We started the lookup, failed a race, and now need to cleanup + if (!lookupExtractorFactory.close()) { + LOGGER.error("Error closing [%s]:[%s]", lookupExtractorFactory); } } } } + finally { + startStopLock.readLock().unlock(); + } return update; } @@ -210,7 +277,13 @@ public class LookupReferencesManager */ public boolean remove(String lookupName) { - synchronized (lock) { + try { + startStopLock.readLock().lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + try { final LookupExtractorFactory lookupExtractorFactory = lookupMap.remove(lookupName); if (lookupExtractorFactory != null) { LOGGER.debug("Removed lookup [%s]", lookupName); @@ -220,6 +293,9 @@ public class LookupReferencesManager return lookupExtractorFactory.close(); } } + finally { + startStopLock.readLock().unlock(); + } return false; } @@ -233,9 +309,20 @@ public class LookupReferencesManager @Nullable public LookupExtractorFactory get(String lookupName) { - final LookupExtractorFactory lookupExtractorFactory = lookupMap.get(lookupName); - assertStarted(); - return lookupExtractorFactory; + try { + startStopLock.readLock().lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + try { + final LookupExtractorFactory lookupExtractorFactory = lookupMap.get(lookupName); + assertStarted(); + return lookupExtractorFactory; + } + finally { + startStopLock.readLock().unlock(); + } } /** @@ -245,8 +332,19 @@ public class LookupReferencesManager */ public Map getAll() { - assertStarted(); - return Maps.newHashMap(lookupMap); + try { + startStopLock.readLock().lockInterruptibly(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + try { + assertStarted(); + return Maps.newHashMap(lookupMap); + } + finally { + startStopLock.readLock().unlock(); + } } private void assertStarted() throws ISE @@ -272,7 +370,7 @@ public class LookupReferencesManager @Override public LookupBean apply( @Nullable - Map.Entry input + Map.Entry input ) { final LookupBean lookupBean = new LookupBean(); diff --git a/server/src/main/java/io/druid/query/lookup/LookupModule.java b/server/src/main/java/io/druid/query/lookup/LookupModule.java index cfd98a40923..016dce6d353 100644 --- a/server/src/main/java/io/druid/query/lookup/LookupModule.java +++ b/server/src/main/java/io/druid/query/lookup/LookupModule.java @@ -54,11 +54,12 @@ import io.druid.server.listener.resource.AbstractListenerHandler; import io.druid.server.listener.resource.ListenerResource; import io.druid.server.lookup.cache.LookupCoordinatorManager; import io.druid.server.metrics.DataSourceTaskIdHolder; +import org.apache.curator.utils.ZKPaths; + +import javax.ws.rs.Path; import java.util.HashMap; import java.util.List; import java.util.Map; -import javax.ws.rs.Path; -import org.apache.curator.utils.ZKPaths; public class LookupModule implements DruidModule { @@ -116,10 +117,8 @@ class LookupListeningResource extends ListenerResource { }) { - private final Object deleteLock = new Object(); - @Override - public synchronized Object post(final Map lookups) + public Object post(final Map lookups) throws Exception { final Map failedUpdates = new HashMap<>(); @@ -154,17 +153,17 @@ class LookupListeningResource extends ListenerResource @Override public Object delete(String id) { - // Prevent races to 404 vs 500 between concurrent delete requests - synchronized (deleteLock) { + if (manager.get(id) == null) { + return null; + } + if (!manager.remove(id)) { if (manager.get(id) == null) { return null; } - if (!manager.remove(id)) { - // We don't have more information at this point. - throw new RE("Could not remove lookup [%s]", id); - } - return id; + // We don't have more information at this point. + throw new RE("Could not remove lookup [%s]", id); } + return id; } } ); diff --git a/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java b/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java index 9ee103c1ee5..187592e0d16 100644 --- a/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java +++ b/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java @@ -20,9 +20,16 @@ package io.druid.query.lookup; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; +import com.metamx.common.StringUtils; +import io.druid.concurrent.Execs; import io.druid.jackson.DefaultObjectMapper; import org.easymock.EasyMock; import org.junit.After; @@ -32,20 +39,40 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; public class LookupReferencesManagerTest { + private static final int CONCURRENT_THREADS = 16; LookupReferencesManager lookupReferencesManager; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); ObjectMapper mapper = new DefaultObjectMapper(); + private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Execs.multiThreaded( + CONCURRENT_THREADS, + "hammer-time-%s" + )); @Before public void setUp() throws IOException { mapper.registerSubtypes(MapLookupExtractorFactory.class); - lookupReferencesManager = new LookupReferencesManager(new LookupConfig(Files.createTempDir().getAbsolutePath()), mapper); + lookupReferencesManager = new LookupReferencesManager( + new LookupConfig(Files.createTempDir().getAbsolutePath()), + mapper + ); Assert.assertTrue("must be closed before start call", lookupReferencesManager.isClosed()); lookupReferencesManager.start(); Assert.assertFalse("must start after start call", lookupReferencesManager.isClosed()); @@ -56,6 +83,7 @@ public class LookupReferencesManagerTest { lookupReferencesManager.stop(); Assert.assertTrue("stop call should close it", lookupReferencesManager.isClosed()); + executorService.shutdownNow(); } @Test(expected = ISE.class) @@ -253,11 +281,365 @@ public class LookupReferencesManagerTest @Test public void testBootstrapFromFile() throws IOException { - LookupExtractorFactory lookupExtractorFactory = new MapLookupExtractorFactory(ImmutableMap.of("key", "value"), true); - lookupReferencesManager.put("testMockForBootstrap",lookupExtractorFactory); + LookupExtractorFactory lookupExtractorFactory = new MapLookupExtractorFactory(ImmutableMap.of( + "key", + "value" + ), true); + lookupReferencesManager.put("testMockForBootstrap", lookupExtractorFactory); lookupReferencesManager.stop(); lookupReferencesManager.start(); Assert.assertEquals(lookupExtractorFactory, lookupReferencesManager.get("testMockForBootstrap")); } + + @Test + public void testConcurrencyStaaaaaaaaaaartStop() throws Exception + { + lookupReferencesManager.stop(); + final CyclicBarrier cyclicBarrier = new CyclicBarrier(CONCURRENT_THREADS); + final Runnable start = new Runnable() + { + @Override + public void run() + { + try { + cyclicBarrier.await(); + } + catch (InterruptedException | BrokenBarrierException e) { + throw Throwables.propagate(e); + } + lookupReferencesManager.start(); + } + }; + final Collection> futures = new ArrayList<>(CONCURRENT_THREADS); + for (int i = 0; i < CONCURRENT_THREADS; ++i) { + futures.add(executorService.submit(start)); + } + lookupReferencesManager.stop(); + Futures.allAsList(futures).get(100, TimeUnit.MILLISECONDS); + for (ListenableFuture future : futures) { + Assert.assertNull(future.get()); + } + } + + @Test + public void testConcurrencyStartStoooooooooop() throws Exception + { + lookupReferencesManager.stop(); + lookupReferencesManager.start(); + final CyclicBarrier cyclicBarrier = new CyclicBarrier(CONCURRENT_THREADS); + final Runnable start = new Runnable() + { + @Override + public void run() + { + try { + cyclicBarrier.await(); + } + catch (InterruptedException | BrokenBarrierException e) { + throw Throwables.propagate(e); + } + lookupReferencesManager.stop(); + } + }; + final Collection> futures = new ArrayList<>(CONCURRENT_THREADS); + for (int i = 0; i < CONCURRENT_THREADS; ++i) { + futures.add(executorService.submit(start)); + } + Futures.allAsList(futures).get(100, TimeUnit.MILLISECONDS); + for (ListenableFuture future : futures) { + Assert.assertNull(future.get()); + } + } + + @Test(timeout = 10000L) + public void testConcurrencySequentialChaos() throws Exception + { + final CountDownLatch runnableStartBarrier = new CountDownLatch(1); + final Random random = new Random(478137498L); + final int numUpdates = 100000; + final int numNamespaces = 100; + final CountDownLatch runnablesFinishedBarrier = new CountDownLatch(numUpdates); + final List runnables = new ArrayList<>(numUpdates); + final Map maxNumber = new HashMap<>(); + for (int i = 1; i <= numUpdates; ++i) { + final boolean shouldStart = random.nextInt(10) == 1; + final boolean shouldClose = random.nextInt(10) == 1; + final String name = Integer.toString(random.nextInt(numNamespaces)); + final int position = i; + + final LookupExtractorFactory lookupExtractorFactory = new LookupExtractorFactory() + { + @Override + public boolean start() + { + return shouldStart; + } + + @Override + public boolean close() + { + return shouldClose; + } + + @Override + public boolean replaces(@Nullable LookupExtractorFactory other) + { + if (other == null) { + return true; + } + final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) other.getIntrospectHandler(); + return position > introspectionHandler.position; + } + + @Nullable + @Override + public LookupIntrospectHandler getIntrospectHandler() + { + return new NamedIntrospectionHandler(position); + } + + @Override + public String toString() + { + return String.format("TestFactroy position %d", position); + } + + @Override + public LookupExtractor get() + { + return null; + } + }; + + if (shouldStart && (!maxNumber.containsKey(name) || maxNumber.get(name) < position)) { + maxNumber.put(name, position); + } + runnables.add(new LookupUpdatingRunnable( + name, + lookupExtractorFactory, + runnableStartBarrier, + lookupReferencesManager + )); + } + ////// Add some CHAOS! + Collections.shuffle(runnables, random); + final Runnable decrementFinished = new Runnable() + { + @Override + public void run() + { + runnablesFinishedBarrier.countDown(); + } + }; + for (Runnable runnable : runnables) { + executorService.submit(runnable).addListener(decrementFinished, MoreExecutors.sameThreadExecutor()); + } + + runnableStartBarrier.countDown(); + do { + for (String name : maxNumber.keySet()) { + final LookupExtractorFactory factory; + try { + factory = lookupReferencesManager.get(name); + } + catch (ISE e) { + continue; + } + if (null == factory) { + continue; + } + final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) factory.getIntrospectHandler(); + Assert.assertTrue(introspectionHandler.position >= 0); + } + } while (runnablesFinishedBarrier.getCount() > 0); + + lookupReferencesManager.start(); + + for (String name : maxNumber.keySet()) { + final LookupExtractorFactory factory = lookupReferencesManager.get(name); + if (null == factory) { + continue; + } + final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) factory.getIntrospectHandler(); + Assert.assertNotNull(introspectionHandler); + Assert.assertEquals( + StringUtils.safeFormat("Named position %s failed", name), + maxNumber.get(name), + Integer.valueOf(introspectionHandler.position) + ); + } + Assert.assertEquals(maxNumber.size(), lookupReferencesManager.getAll().size()); + } + + @Test(timeout = 10000L) + public void testConcurrencyStartStopChaos() throws Exception + { + // Don't want to exercise snapshot here + final LookupReferencesManager manager = new LookupReferencesManager(new LookupConfig(null), mapper); + final Runnable chaosStart = new Runnable() + { + @Override + public void run() + { + manager.start(); + } + }; + final Runnable chaosStop = new Runnable() + { + @Override + public void run() + { + manager.stop(); + } + }; + final CountDownLatch runnableStartBarrier = new CountDownLatch(1); + final Random random = new Random(478137498L); + final int numUpdates = 100000; + final int numNamespaces = 100; + final CountDownLatch runnablesFinishedBarrier = new CountDownLatch(numUpdates); + final List runnables = new ArrayList<>(numUpdates); + final Map maxNumber = new HashMap<>(); + for (int i = 1; i <= numUpdates; ++i) { + final boolean shouldStart = random.nextInt(10) == 1; + final boolean shouldClose = random.nextInt(10) == 1; + final String name = Integer.toString(random.nextInt(numNamespaces)); + final int position = i; + + final LookupExtractorFactory lookupExtractorFactory = new LookupExtractorFactory() + { + @Override + public boolean start() + { + return shouldStart; + } + + @Override + public boolean close() + { + return shouldClose; + } + + @Override + public boolean replaces(@Nullable LookupExtractorFactory other) + { + if (other == null) { + return true; + } + final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) other.getIntrospectHandler(); + return position > introspectionHandler.position; + } + + @Nullable + @Override + public LookupIntrospectHandler getIntrospectHandler() + { + return new NamedIntrospectionHandler(position); + } + + @Override + public String toString() + { + return String.format("TestFactroy position %d", position); + } + + @Override + public LookupExtractor get() + { + return null; + } + }; + if (random.nextFloat() < 0.001) { + if (random.nextBoolean()) { + runnables.add(chaosStart); + } else { + runnables.add(chaosStop); + } + } else { + if (shouldStart && (!maxNumber.containsKey(name) || maxNumber.get(name) < position)) { + maxNumber.put(name, position); + } + runnables.add(new LookupUpdatingRunnable( + name, + lookupExtractorFactory, + runnableStartBarrier, + manager + )); + } + } + ////// Add some CHAOS! + Collections.shuffle(runnables, random); + final Runnable decrementFinished = new Runnable() + { + @Override + public void run() + { + runnablesFinishedBarrier.countDown(); + } + }; + for (Runnable runnable : runnables) { + executorService.submit(runnable).addListener(decrementFinished, MoreExecutors.sameThreadExecutor()); + } + + runnableStartBarrier.countDown(); + do { + for (String name : maxNumber.keySet()) { + final LookupExtractorFactory factory; + try { + factory = manager.get(name); + } + catch (ISE e) { + continue; + } + if (null == factory) { + continue; + } + final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) factory.getIntrospectHandler(); + Assert.assertTrue(introspectionHandler.position >= 0); + } + } while (runnablesFinishedBarrier.getCount() > 0); + } +} + +class LookupUpdatingRunnable implements Runnable +{ + final String name; + final LookupExtractorFactory factory; + final CountDownLatch startLatch; + final LookupReferencesManager lookupReferencesManager; + + LookupUpdatingRunnable( + String name, + LookupExtractorFactory factory, + CountDownLatch startLatch, + LookupReferencesManager lookupReferencesManager + ) + { + this.name = name; + this.factory = factory; + this.startLatch = startLatch; + this.lookupReferencesManager = lookupReferencesManager; + } + + @Override + public void run() + { + try { + startLatch.await(); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + lookupReferencesManager.updateIfNew(name, factory); + } +} + +class NamedIntrospectionHandler implements LookupIntrospectHandler +{ + final int position; + + NamedIntrospectionHandler(final int position) + { + this.position = position; + } }