mirror of https://github.com/apache/druid.git
Add better concurrency testing to IncrementalIndexTest
This commit is contained in:
parent
9705c5139b
commit
6e1eb3b7fe
|
@ -20,11 +20,14 @@ package io.druid.segment.data;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import com.metamx.common.guava.Accumulator;
|
||||||
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import io.druid.data.input.MapBasedInputRow;
|
import io.druid.data.input.MapBasedInputRow;
|
||||||
import io.druid.data.input.Row;
|
import io.druid.data.input.Row;
|
||||||
|
@ -74,7 +77,6 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -217,7 +219,7 @@ public class IncrementalIndexTest
|
||||||
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
|
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60_000L)
|
||||||
public void testConcurrentAddRead() throws InterruptedException, ExecutionException
|
public void testConcurrentAddRead() throws InterruptedException, ExecutionException
|
||||||
{
|
{
|
||||||
final int dimensionCount = 5;
|
final int dimensionCount = 5;
|
||||||
|
@ -257,9 +259,8 @@ public class IncrementalIndexTest
|
||||||
|
|
||||||
|
|
||||||
final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount]));
|
final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount]));
|
||||||
final int taskCount = 30;
|
final int concurrentThreads = 2;
|
||||||
final int concurrentThreads = 3;
|
final int elementsPerThread = 10_000;
|
||||||
final int elementsPerThread = 100;
|
|
||||||
final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator(
|
final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator(
|
||||||
Executors.newFixedThreadPool(
|
Executors.newFixedThreadPool(
|
||||||
concurrentThreads,
|
concurrentThreads,
|
||||||
|
@ -281,8 +282,8 @@ public class IncrementalIndexTest
|
||||||
);
|
);
|
||||||
final long timestamp = System.currentTimeMillis();
|
final long timestamp = System.currentTimeMillis();
|
||||||
final Interval queryInterval = new Interval("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
|
final Interval queryInterval = new Interval("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
|
||||||
final List<ListenableFuture<?>> indexFutures = new LinkedList<>();
|
final List<ListenableFuture<?>> indexFutures = Lists.newArrayListWithExpectedSize(concurrentThreads);
|
||||||
final List<ListenableFuture<?>> queryFutures = new LinkedList<>();
|
final List<ListenableFuture<?>> queryFutures = Lists.newArrayListWithExpectedSize(concurrentThreads);
|
||||||
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
|
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
|
||||||
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
|
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
|
||||||
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
|
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
|
||||||
|
@ -290,100 +291,115 @@ public class IncrementalIndexTest
|
||||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER
|
QueryRunnerTestHelper.NOOP_QUERYWATCHER
|
||||||
);
|
);
|
||||||
final AtomicInteger currentlyRunning = new AtomicInteger(0);
|
final AtomicInteger currentlyRunning = new AtomicInteger(0);
|
||||||
final AtomicBoolean concurrentlyRan = new AtomicBoolean(false);
|
final AtomicInteger concurrentlyRan = new AtomicInteger(0);
|
||||||
final AtomicInteger someoneRan = new AtomicInteger(0);
|
final AtomicInteger someoneRan = new AtomicInteger(0);
|
||||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||||
try {
|
final CountDownLatch readyLatch = new CountDownLatch(concurrentThreads * 2);
|
||||||
for (int j = 0; j < taskCount; j++) {
|
final AtomicInteger queriesAccumualted = new AtomicInteger(0);
|
||||||
indexFutures.add(
|
for (int j = 0; j < concurrentThreads; j++) {
|
||||||
indexExecutor.submit(
|
indexFutures.add(
|
||||||
new Runnable()
|
indexExecutor.submit(
|
||||||
|
new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
{
|
{
|
||||||
@Override
|
readyLatch.countDown();
|
||||||
public void run()
|
try {
|
||||||
{
|
startLatch.await();
|
||||||
try {
|
|
||||||
startLatch.await();
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
currentlyRunning.incrementAndGet();
|
|
||||||
try {
|
|
||||||
for (int i = 0; i < elementsPerThread; i++) {
|
|
||||||
someoneRan.incrementAndGet();
|
|
||||||
index.add(getLongRow(timestamp + i, i, dimensionCount));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (IndexSizeExceededException e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
currentlyRunning.decrementAndGet();
|
|
||||||
}
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
currentlyRunning.incrementAndGet();
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < elementsPerThread; i++) {
|
||||||
|
index.add(getLongRow(timestamp + i, i, dimensionCount));
|
||||||
|
someoneRan.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IndexSizeExceededException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
currentlyRunning.decrementAndGet();
|
||||||
}
|
}
|
||||||
)
|
}
|
||||||
);
|
)
|
||||||
|
);
|
||||||
|
|
||||||
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||||
.dataSource("xxx")
|
.dataSource("xxx")
|
||||||
.granularity(QueryGranularity.ALL)
|
.granularity(QueryGranularity.ALL)
|
||||||
.intervals(ImmutableList.of(queryInterval))
|
.intervals(ImmutableList.of(queryInterval))
|
||||||
.aggregators(queryAggregatorFactories)
|
.aggregators(queryAggregatorFactories)
|
||||||
.build();
|
.build();
|
||||||
queryFutures.add(
|
queryFutures.add(
|
||||||
queryExecutor.submit(
|
queryExecutor.submit(
|
||||||
new Runnable()
|
new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
{
|
{
|
||||||
@Override
|
readyLatch.countDown();
|
||||||
public void run()
|
try {
|
||||||
{
|
startLatch.await();
|
||||||
try {
|
}
|
||||||
startLatch.await();
|
catch (InterruptedException e) {
|
||||||
}
|
Thread.currentThread().interrupt();
|
||||||
catch (InterruptedException e) {
|
throw Throwables.propagate(e);
|
||||||
Thread.currentThread().interrupt();
|
}
|
||||||
throw Throwables.propagate(e);
|
while (concurrentlyRan.get() == 0) {
|
||||||
}
|
|
||||||
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
|
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
|
||||||
factory.createRunner(incrementalIndexSegment),
|
factory.createRunner(incrementalIndexSegment),
|
||||||
factory.getToolchest()
|
factory.getToolchest()
|
||||||
);
|
);
|
||||||
Map<String, Object> context = new HashMap<String, Object>();
|
Map<String, Object> context = new HashMap<String, Object>();
|
||||||
for (Result<TimeseriesResultValue> result :
|
Sequence<Result<TimeseriesResultValue>> sequence = runner.run(query, context);
|
||||||
Sequences.toList(
|
|
||||||
runner.run(query, context),
|
for (Double result :
|
||||||
new LinkedList<Result<TimeseriesResultValue>>()
|
sequence.accumulate(
|
||||||
|
new Double[0], new Accumulator<Double[], Result<TimeseriesResultValue>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Double[] accumulate(
|
||||||
|
Double[] accumulated, Result<TimeseriesResultValue> in
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (currentlyRunning.get() > 0) {
|
||||||
|
concurrentlyRan.incrementAndGet();
|
||||||
|
}
|
||||||
|
queriesAccumualted.incrementAndGet();
|
||||||
|
return Lists.asList(in.getValue().getDoubleMetric("doubleSumResult0"), accumulated)
|
||||||
|
.toArray(new Double[accumulated.length + 1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
)
|
)
|
||||||
) {
|
) {
|
||||||
final Integer ranCount = someoneRan.get();
|
final Integer ranCount = someoneRan.get();
|
||||||
if (ranCount > 0) {
|
if (ranCount > 0) {
|
||||||
final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0");
|
|
||||||
// Eventually consistent, but should be somewhere in that range
|
// Eventually consistent, but should be somewhere in that range
|
||||||
// Actual result is validated after all writes are guaranteed done.
|
// Actual result is validated after all writes are guaranteed done.
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(
|
||||||
String.format("%d >= %g >= 0 violated", ranCount, sumResult),
|
String.format("%d >= %g >= 0 violated", ranCount, result),
|
||||||
sumResult >= 0 && sumResult <= ranCount
|
result >= 0 && result <= ranCount
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if (currentlyRunning.get() > 0) {
|
|
||||||
concurrentlyRan.set(true);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
}
|
||||||
);
|
)
|
||||||
}
|
);
|
||||||
}
|
|
||||||
finally {
|
|
||||||
startLatch.countDown();
|
|
||||||
}
|
}
|
||||||
|
readyLatch.await();
|
||||||
|
startLatch.countDown();
|
||||||
List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size());
|
List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size());
|
||||||
allFutures.addAll(queryFutures);
|
allFutures.addAll(queryFutures);
|
||||||
allFutures.addAll(indexFutures);
|
allFutures.addAll(indexFutures);
|
||||||
Futures.allAsList(allFutures).get();
|
Futures.allAsList(allFutures).get();
|
||||||
Assert.assertTrue("Did not hit concurrency, please try again", concurrentlyRan.get());
|
Assert.assertTrue("Queries ran too fast", queriesAccumualted.get() > 0);
|
||||||
|
Assert.assertTrue("Did not hit concurrency, please try again", concurrentlyRan.get() > 0);
|
||||||
queryExecutor.shutdown();
|
queryExecutor.shutdown();
|
||||||
indexExecutor.shutdown();
|
indexExecutor.shutdown();
|
||||||
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
|
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
|
||||||
|
@ -406,12 +422,12 @@ public class IncrementalIndexTest
|
||||||
for (int i = 0; i < dimensionCount; ++i) {
|
for (int i = 0; i < dimensionCount; ++i) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
String.format("Failed long sum on dimension %d", i),
|
String.format("Failed long sum on dimension %d", i),
|
||||||
elementsPerThread * taskCount,
|
elementsPerThread * concurrentThreads,
|
||||||
result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue()
|
result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue()
|
||||||
);
|
);
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
String.format("Failed double sum on dimension %d", i),
|
String.format("Failed double sum on dimension %d", i),
|
||||||
elementsPerThread * taskCount,
|
elementsPerThread * concurrentThreads,
|
||||||
result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue()
|
result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue