Merge pull request #1741 from metamx/nicerConcurrencyTesting

Add better concurrency testing to IncrementalIndexTest
This commit is contained in:
Fangjin Yang 2015-09-17 08:35:40 -07:00
commit aab8c627c6
1 changed files with 90 additions and 74 deletions

View File

@ -20,11 +20,14 @@ package io.druid.segment.data;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
@ -74,7 +77,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -217,7 +219,7 @@ public class IncrementalIndexTest
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2")); Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
} }
@Test(timeout = 60000) @Test(timeout = 60_000L)
public void testConcurrentAddRead() throws InterruptedException, ExecutionException public void testConcurrentAddRead() throws InterruptedException, ExecutionException
{ {
final int dimensionCount = 5; final int dimensionCount = 5;
@ -257,9 +259,8 @@ public class IncrementalIndexTest
final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount])); final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount]));
final int taskCount = 30; final int concurrentThreads = 2;
final int concurrentThreads = 3; final int elementsPerThread = 10_000;
final int elementsPerThread = 100;
final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator( final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool( Executors.newFixedThreadPool(
concurrentThreads, concurrentThreads,
@ -281,8 +282,8 @@ public class IncrementalIndexTest
); );
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
final Interval queryInterval = new Interval("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z"); final Interval queryInterval = new Interval("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
final List<ListenableFuture<?>> indexFutures = new LinkedList<>(); final List<ListenableFuture<?>> indexFutures = Lists.newArrayListWithExpectedSize(concurrentThreads);
final List<ListenableFuture<?>> queryFutures = new LinkedList<>(); final List<ListenableFuture<?>> queryFutures = Lists.newArrayListWithExpectedSize(concurrentThreads);
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null); final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
@ -290,100 +291,115 @@ public class IncrementalIndexTest
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
final AtomicInteger currentlyRunning = new AtomicInteger(0); final AtomicInteger currentlyRunning = new AtomicInteger(0);
final AtomicBoolean concurrentlyRan = new AtomicBoolean(false); final AtomicInteger concurrentlyRan = new AtomicInteger(0);
final AtomicInteger someoneRan = new AtomicInteger(0); final AtomicInteger someoneRan = new AtomicInteger(0);
final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch startLatch = new CountDownLatch(1);
try { final CountDownLatch readyLatch = new CountDownLatch(concurrentThreads * 2);
for (int j = 0; j < taskCount; j++) { final AtomicInteger queriesAccumualted = new AtomicInteger(0);
indexFutures.add( for (int j = 0; j < concurrentThreads; j++) {
indexExecutor.submit( indexFutures.add(
new Runnable() indexExecutor.submit(
new Runnable()
{
@Override
public void run()
{ {
@Override readyLatch.countDown();
public void run() try {
{ startLatch.await();
try {
startLatch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
currentlyRunning.incrementAndGet();
try {
for (int i = 0; i < elementsPerThread; i++) {
someoneRan.incrementAndGet();
index.add(getLongRow(timestamp + i, i, dimensionCount));
}
}
catch (IndexSizeExceededException e) {
throw Throwables.propagate(e);
}
currentlyRunning.decrementAndGet();
} }
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
currentlyRunning.incrementAndGet();
try {
for (int i = 0; i < elementsPerThread; i++) {
index.add(getLongRow(timestamp + i, i, dimensionCount));
someoneRan.incrementAndGet();
}
}
catch (IndexSizeExceededException e) {
throw Throwables.propagate(e);
}
currentlyRunning.decrementAndGet();
} }
) }
); )
);
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("xxx") .dataSource("xxx")
.granularity(QueryGranularity.ALL) .granularity(QueryGranularity.ALL)
.intervals(ImmutableList.of(queryInterval)) .intervals(ImmutableList.of(queryInterval))
.aggregators(queryAggregatorFactories) .aggregators(queryAggregatorFactories)
.build(); .build();
queryFutures.add( queryFutures.add(
queryExecutor.submit( queryExecutor.submit(
new Runnable() new Runnable()
{
@Override
public void run()
{ {
@Override readyLatch.countDown();
public void run() try {
{ startLatch.await();
try { }
startLatch.await(); catch (InterruptedException e) {
} Thread.currentThread().interrupt();
catch (InterruptedException e) { throw Throwables.propagate(e);
Thread.currentThread().interrupt(); }
throw Throwables.propagate(e); while (concurrentlyRan.get() == 0) {
}
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>( QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
factory.createRunner(incrementalIndexSegment), factory.createRunner(incrementalIndexSegment),
factory.getToolchest() factory.getToolchest()
); );
Map<String, Object> context = new HashMap<String, Object>(); Map<String, Object> context = new HashMap<String, Object>();
for (Result<TimeseriesResultValue> result : Sequence<Result<TimeseriesResultValue>> sequence = runner.run(query, context);
Sequences.toList(
runner.run(query, context), for (Double result :
new LinkedList<Result<TimeseriesResultValue>>() sequence.accumulate(
new Double[0], new Accumulator<Double[], Result<TimeseriesResultValue>>()
{
@Override
public Double[] accumulate(
Double[] accumulated, Result<TimeseriesResultValue> in
)
{
if (currentlyRunning.get() > 0) {
concurrentlyRan.incrementAndGet();
}
queriesAccumualted.incrementAndGet();
return Lists.asList(in.getValue().getDoubleMetric("doubleSumResult0"), accumulated)
.toArray(new Double[accumulated.length + 1]);
}
}
) )
) { ) {
final Integer ranCount = someoneRan.get(); final Integer ranCount = someoneRan.get();
if (ranCount > 0) { if (ranCount > 0) {
final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0");
// Eventually consistent, but should be somewhere in that range // Eventually consistent, but should be somewhere in that range
// Actual result is validated after all writes are guaranteed done. // Actual result is validated after all writes are guaranteed done.
Assert.assertTrue( Assert.assertTrue(
String.format("%d >= %g >= 0 violated", ranCount, sumResult), String.format("%d >= %g >= 0 violated", ranCount, result),
sumResult >= 0 && sumResult <= ranCount result >= 0 && result <= ranCount
); );
} }
if (currentlyRunning.get() > 0) {
concurrentlyRan.set(true);
}
} }
} }
} }
) }
); )
} );
}
finally {
startLatch.countDown();
} }
readyLatch.await();
startLatch.countDown();
List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size()); List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size());
allFutures.addAll(queryFutures); allFutures.addAll(queryFutures);
allFutures.addAll(indexFutures); allFutures.addAll(indexFutures);
Futures.allAsList(allFutures).get(); Futures.allAsList(allFutures).get();
Assert.assertTrue("Did not hit concurrency, please try again", concurrentlyRan.get()); Assert.assertTrue("Queries ran too fast", queriesAccumualted.get() > 0);
Assert.assertTrue("Did not hit concurrency, please try again", concurrentlyRan.get() > 0);
queryExecutor.shutdown(); queryExecutor.shutdown();
indexExecutor.shutdown(); indexExecutor.shutdown();
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>( QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
@ -406,12 +422,12 @@ public class IncrementalIndexTest
for (int i = 0; i < dimensionCount; ++i) { for (int i = 0; i < dimensionCount; ++i) {
Assert.assertEquals( Assert.assertEquals(
String.format("Failed long sum on dimension %d", i), String.format("Failed long sum on dimension %d", i),
elementsPerThread * taskCount, elementsPerThread * concurrentThreads,
result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue() result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue()
); );
Assert.assertEquals( Assert.assertEquals(
String.format("Failed double sum on dimension %d", i), String.format("Failed double sum on dimension %d", i),
elementsPerThread * taskCount, elementsPerThread * concurrentThreads,
result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue() result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue()
); );
} }