fix cache populate incorrect content when numBackgroundThreads>1 (#3943)

* fix cache populate incorrect content when numBackgroundThreads>1

* simplify code by using Futures.allAsList and use CountDownLatch in UT

* fix test code style and assert countDownLatch.await()
This commit is contained in:
kaijianding 2017-02-17 23:17:19 +08:00 committed by Nishant Bangarwa
parent 797488a677
commit a029b33499
2 changed files with 95 additions and 18 deletions

View File

@ -29,7 +29,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.java.util.common.guava.BaseSequence;
@ -47,7 +47,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@ -152,7 +151,6 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(Lists.<ListenableFuture<?>>newLinkedList());
if (populateCache) {
final Function cacheFn = strategy.prepareForCache();
final List<Object> cacheResults = Lists.newLinkedList();
return Sequences.withEffect(
Sequences.map(
@ -162,17 +160,23 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
@Override
public T apply(final T input)
{
cacheFutures.add(
final SettableFuture<Object> future = SettableFuture.create();
cacheFutures.add(future);
backgroundExecutorService.submit(
new Runnable()
{
@Override
public void run()
{
cacheResults.add(cacheFn.apply(input));
try {
future.set(cacheFn.apply(input));
}
catch (Exception e) {
// if there is exception, should setException to quit the caching processing
future.setException(e);
}
}
}
)
);
return input;
}
@ -184,8 +188,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
public void run()
{
try {
Futures.allAsList(cacheFutures).get();
CacheUtil.populate(cache, mapper, key, cacheResults);
CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get());
}
catch (Exception e) {
log.error(e, "Error while getting future for cache task");

View File

@ -25,8 +25,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CacheStats;
import io.druid.client.cache.MapCache;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
@ -56,6 +58,8 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.Closeable;
import java.io.IOException;
@ -64,10 +68,21 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@RunWith(Parameterized.class)
public class CachingQueryRunnerTest
{
@Parameterized.Parameters(name = "numBackgroundThreads={0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.cartesian(Arrays.asList(5, 1, 0));
}
private static final List<AggregatorFactory> AGGS = Arrays.asList(
new CountAggregatorFactory("rows"),
@ -83,6 +98,17 @@ public class CachingQueryRunnerTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
};
private ExecutorService backgroundExecutorService;
public CachingQueryRunnerTest(int numBackgroundThreads)
{
if (numBackgroundThreads > 0) {
backgroundExecutorService = Executors.newFixedThreadPool(numBackgroundThreads);
} else {
backgroundExecutorService = MoreExecutors.sameThreadExecutor();
}
}
@Test
public void testCloseAndPopulate() throws Exception
{
@ -183,7 +209,52 @@ public class CachingQueryRunnerTest
}
);
Cache cache = MapCache.create(1024 * 1024);
final CountDownLatch cacheMustBePutOnce = new CountDownLatch(1);
Cache cache = new Cache()
{
private final Map<NamedKey, byte[]> baseMap = new ConcurrentHashMap<>();
@Override
public byte[] get(NamedKey key)
{
return baseMap.get(key);
}
@Override
public void put(NamedKey key, byte[] value)
{
baseMap.put(key, value);
cacheMustBePutOnce.countDown();
}
@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
return null;
}
@Override
public void close(String namespace)
{
}
@Override
public CacheStats getStats()
{
return null;
}
@Override
public boolean isLocal()
{
return true;
}
@Override
public void doMonitor(ServiceEmitter emitter)
{
}
};
String segmentIdentifier = "segment";
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
@ -203,7 +274,7 @@ public class CachingQueryRunnerTest
return resultSeq;
}
},
MoreExecutors.sameThreadExecutor(),
backgroundExecutorService,
new CacheConfig()
{
@Override
@ -237,6 +308,9 @@ public class CachingQueryRunnerTest
Assert.assertTrue(closable.isClosed());
Assert.assertEquals(expectedRes.toString(), results.toString());
// wait for background caching finish
// wait at most 10 seconds to fail the test to avoid block overall tests
Assert.assertTrue("cache must be populated", cacheMustBePutOnce.await(10, TimeUnit.SECONDS));
byte[] cacheValue = cache.get(cacheKey);
Assert.assertNotNull(cacheValue);
@ -293,7 +367,7 @@ public class CachingQueryRunnerTest
return Sequences.empty();
}
},
MoreExecutors.sameThreadExecutor(),
backgroundExecutorService,
new CacheConfig()
{
@Override