[QTL] Reduced Locking Lookups (#3071)

* Lockless lookups

* Fix compile problem

* Make stack trace throw instead

* Remove non-germane change

* * Add better naming to cache keys. Makes logging nicer
* Fix #3459

* Move start/stop lock to non-interruptable for readability purposes
This commit is contained in:
Charles Allen 2016-09-16 11:54:23 -07:00 committed by Gian Merlino
parent 76fcbd8fc5
commit 95e08b38ea
8 changed files with 627 additions and 66 deletions

View File

@ -78,7 +78,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
private final ListeningExecutorService executorService; private final ListeningExecutorService executorService;
private final AtomicLong doubleEventCount = new AtomicLong(0L); private final AtomicLong doubleEventCount = new AtomicLong(0L);
private final NamespaceExtractionCacheManager cacheManager; private final NamespaceExtractionCacheManager cacheManager;
private final String factoryId = UUID.randomUUID().toString(); private final String factoryId;
private final AtomicReference<Map<String, String>> mapRef = new AtomicReference<>(null); private final AtomicReference<Map<String, String>> mapRef = new AtomicReference<>(null);
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
@ -114,6 +114,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
this.cacheManager = cacheManager; this.cacheManager = cacheManager;
this.connectTimeout = connectTimeout; this.connectTimeout = connectTimeout;
this.injective = injective; this.injective = injective;
this.factoryId = "kafka-factory-" + kafkaTopic + UUID.randomUUID().toString();
} }
public KafkaLookupExtractorFactory( public KafkaLookupExtractorFactory(

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.StringUtils; import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -78,7 +79,7 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
Preconditions.checkArgument(this.firstCacheTimeout >= 0); Preconditions.checkArgument(this.firstCacheTimeout >= 0);
this.injective = injective; this.injective = injective;
this.manager = manager; this.manager = manager;
this.extractorID = buildID(); this.extractorID = String.format("namespace-factory-%s-%s", extractionNamespace, UUID.randomUUID().toString());
this.lookupIntrospectHandler = new NamespaceLookupIntrospectHandler(this, manager, extractorID); this.lookupIntrospectHandler = new NamespaceLookupIntrospectHandler(this, manager, extractorID);
} }
@ -95,7 +96,12 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
public boolean start() public boolean start()
{ {
final Lock writeLock = startStopSync.writeLock(); final Lock writeLock = startStopSync.writeLock();
writeLock.lock(); try {
writeLock.lockInterruptibly();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try { try {
if (started) { if (started) {
LOG.warn("Already started! [%s]", extractorID); LOG.warn("Already started! [%s]", extractorID);
@ -125,7 +131,12 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
public boolean close() public boolean close()
{ {
final Lock writeLock = startStopSync.writeLock(); final Lock writeLock = startStopSync.writeLock();
writeLock.lock(); try {
writeLock.lockInterruptibly();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try { try {
if (!started) { if (!started) {
LOG.warn("Not started! [%s]", extractorID); LOG.warn("Not started! [%s]", extractorID);
@ -179,17 +190,17 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
return injective; return injective;
} }
private String buildID()
{
return UUID.randomUUID().toString();
}
// Grab the latest snapshot from the cache manager // Grab the latest snapshot from the cache manager
@Override @Override
public LookupExtractor get() public LookupExtractor get()
{ {
final Lock readLock = startStopSync.readLock(); final Lock readLock = startStopSync.readLock();
readLock.lock(); try {
readLock.lockInterruptibly();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try { try {
if (!started) { if (!started) {
throw new ISE("Factory [%s] not started", extractorID); throw new ISE("Factory [%s] not started", extractorID);

View File

@ -319,7 +319,7 @@ public abstract class NamespaceExtractionCacheManager
if (factory == null) { if (factory == null) {
throw new ISE("Cannot find factory for namespace [%s]", namespace); throw new ISE("Cannot find factory for namespace [%s]", namespace);
} }
final String cacheId = UUID.randomUUID().toString(); final String cacheId = String.format("namespace-cache-%s-%s", id, UUID.randomUUID().toString());
return schedule(id, namespace, factory, getPostRunnable(id, namespace, factory, cacheId), cacheId); return schedule(id, namespace, factory, getPostRunnable(id, namespace, factory, cacheId), cacheId);
} }

View File

@ -48,7 +48,7 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC
private static final Logger log = new Logger(OffHeapNamespaceExtractionCacheManager.class); private static final Logger log = new Logger(OffHeapNamespaceExtractionCacheManager.class);
private final DB mmapDB; private final DB mmapDB;
private ConcurrentMap<String, String> currentNamespaceCache = new ConcurrentHashMap<>(); private ConcurrentMap<String, String> currentNamespaceCache = new ConcurrentHashMap<>();
private Striped<Lock> nsLocks = Striped.lock(32); // Needed to make sure delete() doesn't do weird things private Striped<Lock> nsLocks = Striped.lazyWeakLock(1024); // Needed to make sure delete() doesn't do weird things
private final File tmpFile; private final File tmpFile;
@Inject @Inject
@ -137,7 +137,7 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC
lock.lock(); lock.lock();
try { try {
if (super.delete(namespaceKey)) { if (super.delete(namespaceKey)) {
final String mmapDBkey = currentNamespaceCache.get(namespaceKey); final String mmapDBkey = currentNamespaceCache.remove(namespaceKey);
if (mmapDBkey != null) { if (mmapDBkey != null) {
final long pre = tmpFile.length(); final long pre = tmpFile.length();
mmapDB.delete(mmapDBkey); mmapDB.delete(mmapDBkey);
@ -156,27 +156,17 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC
} }
@Override @Override
public ConcurrentMap<String, String> getCacheMap(String namespaceOrCacheKey) public ConcurrentMap<String, String> getCacheMap(String namespaceKey)
{ {
final Lock lock = nsLocks.get(namespaceOrCacheKey); final Lock lock = nsLocks.get(namespaceKey);
lock.lock(); lock.lock();
try { try {
String realKey = currentNamespaceCache.get(namespaceOrCacheKey); String mapDBKey = currentNamespaceCache.get(namespaceKey);
if (realKey == null) { if (mapDBKey == null) {
realKey = namespaceOrCacheKey; // Not something created by swapAndClearCache
} mapDBKey = namespaceKey;
final Lock nsLock = nsLocks.get(realKey);
if (lock != nsLock) {
nsLock.lock();
}
try {
return mmapDB.createHashMap(realKey).makeOrGet();
}
finally {
if (lock != nsLock) {
nsLock.unlock();
}
} }
return mmapDB.createHashMap(mapDBKey).makeOrGet();
} }
finally { finally {
lock.unlock(); lock.unlock();

View File

@ -19,21 +19,38 @@
package io.druid.server.lookup.namespace.cache; package io.druid.server.lookup.namespace.cache;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.Module; import com.google.inject.Module;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.concurrent.Execs;
import io.druid.guice.GuiceInjectors; import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Self;
import io.druid.initialization.Initialization; import io.druid.initialization.Initialization;
import io.druid.query.lookup.namespace.ExtractionNamespace;
import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.lookup.namespace.NamespaceExtractionModule; import io.druid.server.lookup.namespace.NamespaceExtractionModule;
import io.druid.server.metrics.NoopServiceEmitter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
public class OffHeapNamespaceExtractionCacheManagerTest public class OffHeapNamespaceExtractionCacheManagerTest
{ {
@ -61,4 +78,67 @@ public class OffHeapNamespaceExtractionCacheManagerTest
final NamespaceExtractionCacheManager manager = injector.getInstance(NamespaceExtractionCacheManager.class); final NamespaceExtractionCacheManager manager = injector.getInstance(NamespaceExtractionCacheManager.class);
Assert.assertEquals(OffHeapNamespaceExtractionCacheManager.class, manager.getClass()); Assert.assertEquals(OffHeapNamespaceExtractionCacheManager.class, manager.getClass());
} }
@Test(timeout = 30000L)
public void testRacyCreation() throws Exception
{
final int concurrentThreads = 100;
final Lifecycle lifecycle = new Lifecycle();
final ServiceEmitter emitter = new NoopServiceEmitter();
final OffHeapNamespaceExtractionCacheManager manager = new OffHeapNamespaceExtractionCacheManager(
lifecycle,
emitter,
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceCacheFactory<?>>of()
);
final ListeningExecutorService service = MoreExecutors.listeningDecorator(Execs.multiThreaded(
concurrentThreads,
"offheaptest-%s"
));
final List<ListenableFuture<?>> futures = new ArrayList<>();
final CountDownLatch thunder = new CountDownLatch(1);
final List<String> namespaceIds = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
final String namespace = "namespace-" + UUID.randomUUID().toString();
final String cacheKey = "initial-cache-" + namespace;
namespaceIds.add(namespace);
manager.getCacheMap(cacheKey).put("foo", "bar");
Assert.assertFalse(manager.swapAndClearCache(namespace, cacheKey));
}
final Random random = new Random(3748218904L);
try {
for (int i = 0; i < concurrentThreads; ++i) {
final int j = i;
final String namespace = namespaceIds.get(random.nextInt(namespaceIds.size()));
futures.add(service.submit(
new Runnable()
{
@Override
public void run()
{
try {
thunder.await();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
for (int i = 0; i < 1000; ++i) {
final String cacheKey = String.format("%s-%d-key-%d", namespace, j, i);
manager.getCacheMap(cacheKey).put("foo", "bar" + Integer.toString(i));
Assert.assertTrue(manager.swapAndClearCache(namespace, cacheKey));
}
}
}
));
}
thunder.countDown();
Futures.allAsList(futures).get();
}
finally {
service.shutdownNow();
}
for (final String namespace : namespaceIds) {
Assert.assertEquals(ImmutableMap.of("foo", "bar999"), manager.getCacheMap(namespace));
}
}
} }

View File

@ -23,6 +23,7 @@ package io.druid.query.lookup;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2; import com.google.common.collect.Collections2;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -41,6 +42,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* This class provide a basic {@link LookupExtractorFactory} references manager. * This class provide a basic {@link LookupExtractorFactory} references manager.
@ -54,8 +57,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class LookupReferencesManager public class LookupReferencesManager
{ {
private static final Logger LOGGER = new Logger(LookupReferencesManager.class); private static final Logger LOGGER = new Logger(LookupReferencesManager.class);
private final ConcurrentMap<String, LookupExtractorFactory> lookupMap = new ConcurrentHashMap(); private final ConcurrentMap<String, LookupExtractorFactory> lookupMap = new ConcurrentHashMap<>();
private final Object lock = new Object(); // This is a lock against the state of the REFERENCE MANAGER (aka start/stop state), NOT of the lookup itself.
private final ReadWriteLock startStopLock = new ReentrantReadWriteLock(true);
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final LookupSnapshotTaker lookupSnapshotTaker; private final LookupSnapshotTaker lookupSnapshotTaker;
@ -73,7 +77,8 @@ public class LookupReferencesManager
@LifecycleStart @LifecycleStart
public void start() public void start()
{ {
synchronized (lock) { startStopLock.writeLock().lock();
try {
if (!started.getAndSet(true)) { if (!started.getAndSet(true)) {
if (lookupSnapshotTaker != null) { if (lookupSnapshotTaker != null) {
final List<LookupBean> lookupBeanList = lookupSnapshotTaker.pullExistingSnapshot(); final List<LookupBean> lookupBeanList = lookupSnapshotTaker.pullExistingSnapshot();
@ -84,12 +89,16 @@ public class LookupReferencesManager
LOGGER.info("Started lookup factory references manager"); LOGGER.info("Started lookup factory references manager");
} }
} }
finally {
startStopLock.writeLock().unlock();
}
} }
@LifecycleStop @LifecycleStop
public void stop() public void stop()
{ {
synchronized (lock) { startStopLock.writeLock().lock();
try {
if (started.getAndSet(false)) { if (started.getAndSet(false)) {
if (lookupSnapshotTaker != null) { if (lookupSnapshotTaker != null) {
lookupSnapshotTaker.takeSnapshot(getAllAsList()); lookupSnapshotTaker.takeSnapshot(getAllAsList());
@ -100,6 +109,9 @@ public class LookupReferencesManager
} }
} }
} }
finally {
startStopLock.writeLock().unlock();
}
} }
/** /**
@ -112,7 +124,13 @@ public class LookupReferencesManager
*/ */
public boolean put(String lookupName, final LookupExtractorFactory lookupExtractorFactory) public boolean put(String lookupName, final LookupExtractorFactory lookupExtractorFactory)
{ {
synchronized (lock) { try {
startStopLock.readLock().lockInterruptibly();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try {
assertStarted(); assertStarted();
if (lookupMap.containsKey(lookupName)) { if (lookupMap.containsKey(lookupName)) {
LOGGER.warn("lookup [%s] is not add, another lookup with the same name already exist", lookupName); LOGGER.warn("lookup [%s] is not add, another lookup with the same name already exist", lookupName);
@ -123,7 +141,7 @@ public class LookupReferencesManager
} }
final boolean noPrior = null == lookupMap.putIfAbsent(lookupName, lookupExtractorFactory); final boolean noPrior = null == lookupMap.putIfAbsent(lookupName, lookupExtractorFactory);
if (noPrior) { if (noPrior) {
if(lookupSnapshotTaker != null) { if (lookupSnapshotTaker != null) {
lookupSnapshotTaker.takeSnapshot(getAllAsList()); lookupSnapshotTaker.takeSnapshot(getAllAsList());
} }
} else { } else {
@ -133,6 +151,9 @@ public class LookupReferencesManager
} }
return noPrior; return noPrior;
} }
finally {
startStopLock.readLock().unlock();
}
} }
/** /**
@ -143,12 +164,19 @@ public class LookupReferencesManager
public void put(Map<String, LookupExtractorFactory> lookups) public void put(Map<String, LookupExtractorFactory> lookups)
{ {
Map<String, LookupExtractorFactory> failedExtractorFactoryMap = new HashMap<>(); Map<String, LookupExtractorFactory> failedExtractorFactoryMap = new HashMap<>();
synchronized (lock) { try {
startStopLock.readLock().lockInterruptibly();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try {
assertStarted(); assertStarted();
for (Map.Entry<String, LookupExtractorFactory> entry : lookups.entrySet()) { for (Map.Entry<String, LookupExtractorFactory> entry : lookups.entrySet()) {
final String lookupName = entry.getKey(); final String lookupName = entry.getKey();
final LookupExtractorFactory lookupExtractorFactory = entry.getValue(); final LookupExtractorFactory lookupExtractorFactory = entry.getValue();
if (lookupMap.containsKey(lookupName)) { if (lookupMap.containsKey(lookupName)) {
// Fail early without bothering to start
LOGGER.warn("lookup [%s] is not add, another lookup with the same name already exist", lookupName); LOGGER.warn("lookup [%s] is not add, another lookup with the same name already exist", lookupName);
continue; continue;
} }
@ -156,7 +184,14 @@ public class LookupReferencesManager
failedExtractorFactoryMap.put(lookupName, lookupExtractorFactory); failedExtractorFactoryMap.put(lookupName, lookupExtractorFactory);
continue; continue;
} }
lookupMap.put(lookupName, lookupExtractorFactory); if (null != lookupMap.putIfAbsent(lookupName, lookupExtractorFactory)) {
// handle race
LOGGER.warn("lookup [%s] is not add, another lookup with the same name already exist", lookupName);
if (!lookupExtractorFactory.close()) {
LOGGER.error("Failed to properly close stale lookup [%s]", lookupExtractorFactory);
}
continue;
}
if (lookupSnapshotTaker != null) { if (lookupSnapshotTaker != null) {
lookupSnapshotTaker.takeSnapshot(getAllAsList()); lookupSnapshotTaker.takeSnapshot(getAllAsList());
} }
@ -168,6 +203,9 @@ public class LookupReferencesManager
); );
} }
} }
finally {
startStopLock.readLock().unlock();
}
} }
/** /**
@ -182,23 +220,52 @@ public class LookupReferencesManager
*/ */
public boolean updateIfNew(String lookupName, final LookupExtractorFactory lookupExtractorFactory) public boolean updateIfNew(String lookupName, final LookupExtractorFactory lookupExtractorFactory)
{ {
final boolean update; boolean update = false;
synchronized (lock) { try {
startStopLock.readLock().lockInterruptibly();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try {
assertStarted(); assertStarted();
final LookupExtractorFactory prior = lookupMap.get(lookupName); LookupExtractorFactory prior = lookupMap.get(lookupName);
update = lookupExtractorFactory.replaces(prior); update = lookupExtractorFactory.replaces(prior);
if (update) { if (update) {
if (!lookupExtractorFactory.start()) { if (!lookupExtractorFactory.start()) {
throw new ISE("Could not start [%s]", lookupName); throw new ISE("Could not start [%s]", lookupName);
} }
lookupMap.put(lookupName, lookupExtractorFactory); boolean racy;
if (prior != null) { do {
if (prior == null) {
racy = null != lookupMap.putIfAbsent(lookupName, lookupExtractorFactory);
} else {
racy = !lookupMap.replace(lookupName, prior, lookupExtractorFactory);
}
if (racy) {
prior = lookupMap.get(lookupName);
update = lookupExtractorFactory.replaces(prior);
}
} while (racy && update);
if (prior != null && update) {
if (!prior.close()) { if (!prior.close()) {
LOGGER.error("Error closing [%s]:[%s]", lookupName, lookupExtractorFactory); LOGGER.error("Error closing [%s]:[%s]", lookupName, prior);
}
}
if (!update) {
// We started the lookup, failed a race, and now need to cleanup
if (!lookupExtractorFactory.close()) {
LOGGER.error("Error closing [%s]:[%s]", lookupExtractorFactory);
} }
} }
} }
} }
finally {
startStopLock.readLock().unlock();
}
return update; return update;
} }
@ -210,7 +277,13 @@ public class LookupReferencesManager
*/ */
public boolean remove(String lookupName) public boolean remove(String lookupName)
{ {
synchronized (lock) { try {
startStopLock.readLock().lockInterruptibly();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try {
final LookupExtractorFactory lookupExtractorFactory = lookupMap.remove(lookupName); final LookupExtractorFactory lookupExtractorFactory = lookupMap.remove(lookupName);
if (lookupExtractorFactory != null) { if (lookupExtractorFactory != null) {
LOGGER.debug("Removed lookup [%s]", lookupName); LOGGER.debug("Removed lookup [%s]", lookupName);
@ -220,6 +293,9 @@ public class LookupReferencesManager
return lookupExtractorFactory.close(); return lookupExtractorFactory.close();
} }
} }
finally {
startStopLock.readLock().unlock();
}
return false; return false;
} }
@ -233,9 +309,20 @@ public class LookupReferencesManager
@Nullable @Nullable
public LookupExtractorFactory get(String lookupName) public LookupExtractorFactory get(String lookupName)
{ {
final LookupExtractorFactory lookupExtractorFactory = lookupMap.get(lookupName); try {
assertStarted(); startStopLock.readLock().lockInterruptibly();
return lookupExtractorFactory; }
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try {
final LookupExtractorFactory lookupExtractorFactory = lookupMap.get(lookupName);
assertStarted();
return lookupExtractorFactory;
}
finally {
startStopLock.readLock().unlock();
}
} }
/** /**
@ -245,8 +332,19 @@ public class LookupReferencesManager
*/ */
public Map<String, LookupExtractorFactory> getAll() public Map<String, LookupExtractorFactory> getAll()
{ {
assertStarted(); try {
return Maps.newHashMap(lookupMap); startStopLock.readLock().lockInterruptibly();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
try {
assertStarted();
return Maps.newHashMap(lookupMap);
}
finally {
startStopLock.readLock().unlock();
}
} }
private void assertStarted() throws ISE private void assertStarted() throws ISE
@ -272,7 +370,7 @@ public class LookupReferencesManager
@Override @Override
public LookupBean apply( public LookupBean apply(
@Nullable @Nullable
Map.Entry<String, LookupExtractorFactory> input Map.Entry<String, LookupExtractorFactory> input
) )
{ {
final LookupBean lookupBean = new LookupBean(); final LookupBean lookupBean = new LookupBean();

View File

@ -54,11 +54,12 @@ import io.druid.server.listener.resource.AbstractListenerHandler;
import io.druid.server.listener.resource.ListenerResource; import io.druid.server.listener.resource.ListenerResource;
import io.druid.server.lookup.cache.LookupCoordinatorManager; import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.server.metrics.DataSourceTaskIdHolder; import io.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.curator.utils.ZKPaths;
import javax.ws.rs.Path;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.ws.rs.Path;
import org.apache.curator.utils.ZKPaths;
public class LookupModule implements DruidModule public class LookupModule implements DruidModule
{ {
@ -116,10 +117,8 @@ class LookupListeningResource extends ListenerResource
{ {
}) })
{ {
private final Object deleteLock = new Object();
@Override @Override
public synchronized Object post(final Map<String, LookupExtractorFactory> lookups) public Object post(final Map<String, LookupExtractorFactory> lookups)
throws Exception throws Exception
{ {
final Map<String, LookupExtractorFactory> failedUpdates = new HashMap<>(); final Map<String, LookupExtractorFactory> failedUpdates = new HashMap<>();
@ -154,17 +153,17 @@ class LookupListeningResource extends ListenerResource
@Override @Override
public Object delete(String id) public Object delete(String id)
{ {
// Prevent races to 404 vs 500 between concurrent delete requests if (manager.get(id) == null) {
synchronized (deleteLock) { return null;
}
if (!manager.remove(id)) {
if (manager.get(id) == null) { if (manager.get(id) == null) {
return null; return null;
} }
if (!manager.remove(id)) { // We don't have more information at this point.
// We don't have more information at this point. throw new RE("Could not remove lookup [%s]", id);
throw new RE("Could not remove lookup [%s]", id);
}
return id;
} }
return id;
} }
} }
); );

View File

@ -20,9 +20,16 @@
package io.druid.query.lookup; package io.druid.query.lookup;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import io.druid.concurrent.Execs;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.After; import org.junit.After;
@ -32,20 +39,40 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class LookupReferencesManagerTest public class LookupReferencesManagerTest
{ {
private static final int CONCURRENT_THREADS = 16;
LookupReferencesManager lookupReferencesManager; LookupReferencesManager lookupReferencesManager;
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
ObjectMapper mapper = new DefaultObjectMapper(); ObjectMapper mapper = new DefaultObjectMapper();
private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Execs.multiThreaded(
CONCURRENT_THREADS,
"hammer-time-%s"
));
@Before @Before
public void setUp() throws IOException public void setUp() throws IOException
{ {
mapper.registerSubtypes(MapLookupExtractorFactory.class); mapper.registerSubtypes(MapLookupExtractorFactory.class);
lookupReferencesManager = new LookupReferencesManager(new LookupConfig(Files.createTempDir().getAbsolutePath()), mapper); lookupReferencesManager = new LookupReferencesManager(
new LookupConfig(Files.createTempDir().getAbsolutePath()),
mapper
);
Assert.assertTrue("must be closed before start call", lookupReferencesManager.isClosed()); Assert.assertTrue("must be closed before start call", lookupReferencesManager.isClosed());
lookupReferencesManager.start(); lookupReferencesManager.start();
Assert.assertFalse("must start after start call", lookupReferencesManager.isClosed()); Assert.assertFalse("must start after start call", lookupReferencesManager.isClosed());
@ -56,6 +83,7 @@ public class LookupReferencesManagerTest
{ {
lookupReferencesManager.stop(); lookupReferencesManager.stop();
Assert.assertTrue("stop call should close it", lookupReferencesManager.isClosed()); Assert.assertTrue("stop call should close it", lookupReferencesManager.isClosed());
executorService.shutdownNow();
} }
@Test(expected = ISE.class) @Test(expected = ISE.class)
@ -253,11 +281,365 @@ public class LookupReferencesManagerTest
@Test @Test
public void testBootstrapFromFile() throws IOException public void testBootstrapFromFile() throws IOException
{ {
LookupExtractorFactory lookupExtractorFactory = new MapLookupExtractorFactory(ImmutableMap.<String, String>of("key", "value"), true); LookupExtractorFactory lookupExtractorFactory = new MapLookupExtractorFactory(ImmutableMap.<String, String>of(
lookupReferencesManager.put("testMockForBootstrap",lookupExtractorFactory); "key",
"value"
), true);
lookupReferencesManager.put("testMockForBootstrap", lookupExtractorFactory);
lookupReferencesManager.stop(); lookupReferencesManager.stop();
lookupReferencesManager.start(); lookupReferencesManager.start();
Assert.assertEquals(lookupExtractorFactory, lookupReferencesManager.get("testMockForBootstrap")); Assert.assertEquals(lookupExtractorFactory, lookupReferencesManager.get("testMockForBootstrap"));
} }
@Test
public void testConcurrencyStaaaaaaaaaaartStop() throws Exception
{
lookupReferencesManager.stop();
final CyclicBarrier cyclicBarrier = new CyclicBarrier(CONCURRENT_THREADS);
final Runnable start = new Runnable()
{
@Override
public void run()
{
try {
cyclicBarrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
throw Throwables.propagate(e);
}
lookupReferencesManager.start();
}
};
final Collection<ListenableFuture<?>> futures = new ArrayList<>(CONCURRENT_THREADS);
for (int i = 0; i < CONCURRENT_THREADS; ++i) {
futures.add(executorService.submit(start));
}
lookupReferencesManager.stop();
Futures.allAsList(futures).get(100, TimeUnit.MILLISECONDS);
for (ListenableFuture future : futures) {
Assert.assertNull(future.get());
}
}
@Test
public void testConcurrencyStartStoooooooooop() throws Exception
{
lookupReferencesManager.stop();
lookupReferencesManager.start();
final CyclicBarrier cyclicBarrier = new CyclicBarrier(CONCURRENT_THREADS);
final Runnable start = new Runnable()
{
@Override
public void run()
{
try {
cyclicBarrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
throw Throwables.propagate(e);
}
lookupReferencesManager.stop();
}
};
final Collection<ListenableFuture<?>> futures = new ArrayList<>(CONCURRENT_THREADS);
for (int i = 0; i < CONCURRENT_THREADS; ++i) {
futures.add(executorService.submit(start));
}
Futures.allAsList(futures).get(100, TimeUnit.MILLISECONDS);
for (ListenableFuture future : futures) {
Assert.assertNull(future.get());
}
}
@Test(timeout = 10000L)
public void testConcurrencySequentialChaos() throws Exception
{
final CountDownLatch runnableStartBarrier = new CountDownLatch(1);
final Random random = new Random(478137498L);
final int numUpdates = 100000;
final int numNamespaces = 100;
final CountDownLatch runnablesFinishedBarrier = new CountDownLatch(numUpdates);
final List<Runnable> runnables = new ArrayList<>(numUpdates);
final Map<String, Integer> maxNumber = new HashMap<>();
for (int i = 1; i <= numUpdates; ++i) {
final boolean shouldStart = random.nextInt(10) == 1;
final boolean shouldClose = random.nextInt(10) == 1;
final String name = Integer.toString(random.nextInt(numNamespaces));
final int position = i;
final LookupExtractorFactory lookupExtractorFactory = new LookupExtractorFactory()
{
@Override
public boolean start()
{
return shouldStart;
}
@Override
public boolean close()
{
return shouldClose;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
if (other == null) {
return true;
}
final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) other.getIntrospectHandler();
return position > introspectionHandler.position;
}
@Nullable
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
return new NamedIntrospectionHandler(position);
}
@Override
public String toString()
{
return String.format("TestFactroy position %d", position);
}
@Override
public LookupExtractor get()
{
return null;
}
};
if (shouldStart && (!maxNumber.containsKey(name) || maxNumber.get(name) < position)) {
maxNumber.put(name, position);
}
runnables.add(new LookupUpdatingRunnable(
name,
lookupExtractorFactory,
runnableStartBarrier,
lookupReferencesManager
));
}
////// Add some CHAOS!
Collections.shuffle(runnables, random);
final Runnable decrementFinished = new Runnable()
{
@Override
public void run()
{
runnablesFinishedBarrier.countDown();
}
};
for (Runnable runnable : runnables) {
executorService.submit(runnable).addListener(decrementFinished, MoreExecutors.sameThreadExecutor());
}
runnableStartBarrier.countDown();
do {
for (String name : maxNumber.keySet()) {
final LookupExtractorFactory factory;
try {
factory = lookupReferencesManager.get(name);
}
catch (ISE e) {
continue;
}
if (null == factory) {
continue;
}
final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) factory.getIntrospectHandler();
Assert.assertTrue(introspectionHandler.position >= 0);
}
} while (runnablesFinishedBarrier.getCount() > 0);
lookupReferencesManager.start();
for (String name : maxNumber.keySet()) {
final LookupExtractorFactory factory = lookupReferencesManager.get(name);
if (null == factory) {
continue;
}
final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) factory.getIntrospectHandler();
Assert.assertNotNull(introspectionHandler);
Assert.assertEquals(
StringUtils.safeFormat("Named position %s failed", name),
maxNumber.get(name),
Integer.valueOf(introspectionHandler.position)
);
}
Assert.assertEquals(maxNumber.size(), lookupReferencesManager.getAll().size());
}
@Test(timeout = 10000L)
public void testConcurrencyStartStopChaos() throws Exception
{
// Don't want to exercise snapshot here
final LookupReferencesManager manager = new LookupReferencesManager(new LookupConfig(null), mapper);
final Runnable chaosStart = new Runnable()
{
@Override
public void run()
{
manager.start();
}
};
final Runnable chaosStop = new Runnable()
{
@Override
public void run()
{
manager.stop();
}
};
final CountDownLatch runnableStartBarrier = new CountDownLatch(1);
final Random random = new Random(478137498L);
final int numUpdates = 100000;
final int numNamespaces = 100;
final CountDownLatch runnablesFinishedBarrier = new CountDownLatch(numUpdates);
final List<Runnable> runnables = new ArrayList<>(numUpdates);
final Map<String, Integer> maxNumber = new HashMap<>();
for (int i = 1; i <= numUpdates; ++i) {
final boolean shouldStart = random.nextInt(10) == 1;
final boolean shouldClose = random.nextInt(10) == 1;
final String name = Integer.toString(random.nextInt(numNamespaces));
final int position = i;
final LookupExtractorFactory lookupExtractorFactory = new LookupExtractorFactory()
{
@Override
public boolean start()
{
return shouldStart;
}
@Override
public boolean close()
{
return shouldClose;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
if (other == null) {
return true;
}
final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) other.getIntrospectHandler();
return position > introspectionHandler.position;
}
@Nullable
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
return new NamedIntrospectionHandler(position);
}
@Override
public String toString()
{
return String.format("TestFactroy position %d", position);
}
@Override
public LookupExtractor get()
{
return null;
}
};
if (random.nextFloat() < 0.001) {
if (random.nextBoolean()) {
runnables.add(chaosStart);
} else {
runnables.add(chaosStop);
}
} else {
if (shouldStart && (!maxNumber.containsKey(name) || maxNumber.get(name) < position)) {
maxNumber.put(name, position);
}
runnables.add(new LookupUpdatingRunnable(
name,
lookupExtractorFactory,
runnableStartBarrier,
manager
));
}
}
////// Add some CHAOS!
Collections.shuffle(runnables, random);
final Runnable decrementFinished = new Runnable()
{
@Override
public void run()
{
runnablesFinishedBarrier.countDown();
}
};
for (Runnable runnable : runnables) {
executorService.submit(runnable).addListener(decrementFinished, MoreExecutors.sameThreadExecutor());
}
runnableStartBarrier.countDown();
do {
for (String name : maxNumber.keySet()) {
final LookupExtractorFactory factory;
try {
factory = manager.get(name);
}
catch (ISE e) {
continue;
}
if (null == factory) {
continue;
}
final NamedIntrospectionHandler introspectionHandler = (NamedIntrospectionHandler) factory.getIntrospectHandler();
Assert.assertTrue(introspectionHandler.position >= 0);
}
} while (runnablesFinishedBarrier.getCount() > 0);
}
}
class LookupUpdatingRunnable implements Runnable
{
final String name;
final LookupExtractorFactory factory;
final CountDownLatch startLatch;
final LookupReferencesManager lookupReferencesManager;
LookupUpdatingRunnable(
String name,
LookupExtractorFactory factory,
CountDownLatch startLatch,
LookupReferencesManager lookupReferencesManager
)
{
this.name = name;
this.factory = factory;
this.startLatch = startLatch;
this.lookupReferencesManager = lookupReferencesManager;
}
@Override
public void run()
{
try {
startLatch.await();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
lookupReferencesManager.updateIfNew(name, factory);
}
}
class NamedIntrospectionHandler implements LookupIntrospectHandler
{
final int position;
NamedIntrospectionHandler(final int position)
{
this.position = position;
}
} }