diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 8e0ae0d2d27..af3af0d6569 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -20,11 +20,14 @@ package io.druid.segment.data; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; @@ -74,7 +77,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -217,7 +219,7 @@ public class IncrementalIndexTest Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2")); } - @Test(timeout = 60000) + @Test(timeout = 60_000L) public void testConcurrentAddRead() throws InterruptedException, ExecutionException { final int dimensionCount = 5; @@ -257,9 +259,8 @@ public class IncrementalIndexTest final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount])); - final int taskCount = 30; - final int concurrentThreads = 3; - final int elementsPerThread = 100; + final int concurrentThreads = 2; + final int elementsPerThread = 10_000; final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( concurrentThreads, @@ -281,8 +282,8 @@ public class IncrementalIndexTest ); final long timestamp = System.currentTimeMillis(); final Interval queryInterval = new Interval("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z"); - final List> indexFutures = new LinkedList<>(); - final List> queryFutures = new LinkedList<>(); + final List> indexFutures = Lists.newArrayListWithExpectedSize(concurrentThreads); + final List> queryFutures = Lists.newArrayListWithExpectedSize(concurrentThreads); final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null); final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), @@ -290,100 +291,115 @@ public class IncrementalIndexTest QueryRunnerTestHelper.NOOP_QUERYWATCHER ); final AtomicInteger currentlyRunning = new AtomicInteger(0); - final AtomicBoolean concurrentlyRan = new AtomicBoolean(false); + final AtomicInteger concurrentlyRan = new AtomicInteger(0); final AtomicInteger someoneRan = new AtomicInteger(0); final CountDownLatch startLatch = new CountDownLatch(1); - try { - for (int j = 0; j < taskCount; j++) { - indexFutures.add( - indexExecutor.submit( - new Runnable() + final CountDownLatch readyLatch = new CountDownLatch(concurrentThreads * 2); + final AtomicInteger queriesAccumualted = new AtomicInteger(0); + for (int j = 0; j < concurrentThreads; j++) { + indexFutures.add( + indexExecutor.submit( + new Runnable() + { + @Override + public void run() { - @Override - public void run() - { - try { - startLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Throwables.propagate(e); - } - currentlyRunning.incrementAndGet(); - try { - for (int i = 0; i < elementsPerThread; i++) { - someoneRan.incrementAndGet(); - index.add(getLongRow(timestamp + i, i, dimensionCount)); - } - } - catch (IndexSizeExceededException e) { - throw Throwables.propagate(e); - } - currentlyRunning.decrementAndGet(); + readyLatch.countDown(); + try { + startLatch.await(); } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + currentlyRunning.incrementAndGet(); + try { + for (int i = 0; i < elementsPerThread; i++) { + index.add(getLongRow(timestamp + i, i, dimensionCount)); + someoneRan.incrementAndGet(); + } + } + catch (IndexSizeExceededException e) { + throw Throwables.propagate(e); + } + currentlyRunning.decrementAndGet(); } - ) - ); + } + ) + ); - final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() - .dataSource("xxx") - .granularity(QueryGranularity.ALL) - .intervals(ImmutableList.of(queryInterval)) - .aggregators(queryAggregatorFactories) - .build(); - queryFutures.add( - queryExecutor.submit( - new Runnable() + final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("xxx") + .granularity(QueryGranularity.ALL) + .intervals(ImmutableList.of(queryInterval)) + .aggregators(queryAggregatorFactories) + .build(); + queryFutures.add( + queryExecutor.submit( + new Runnable() + { + @Override + public void run() { - @Override - public void run() - { - try { - startLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Throwables.propagate(e); - } + readyLatch.countDown(); + try { + startLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + while (concurrentlyRan.get() == 0) { QueryRunner> runner = new FinalizeResultsQueryRunner>( factory.createRunner(incrementalIndexSegment), factory.getToolchest() ); Map context = new HashMap(); - for (Result result : - Sequences.toList( - runner.run(query, context), - new LinkedList>() + Sequence> sequence = runner.run(query, context); + + for (Double result : + sequence.accumulate( + new Double[0], new Accumulator>() + { + @Override + public Double[] accumulate( + Double[] accumulated, Result in + ) + { + if (currentlyRunning.get() > 0) { + concurrentlyRan.incrementAndGet(); + } + queriesAccumualted.incrementAndGet(); + return Lists.asList(in.getValue().getDoubleMetric("doubleSumResult0"), accumulated) + .toArray(new Double[accumulated.length + 1]); + } + } ) ) { final Integer ranCount = someoneRan.get(); if (ranCount > 0) { - final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0"); // Eventually consistent, but should be somewhere in that range // Actual result is validated after all writes are guaranteed done. Assert.assertTrue( - String.format("%d >= %g >= 0 violated", ranCount, sumResult), - sumResult >= 0 && sumResult <= ranCount + String.format("%d >= %g >= 0 violated", ranCount, result), + result >= 0 && result <= ranCount ); } - if (currentlyRunning.get() > 0) { - concurrentlyRan.set(true); - } } } } - ) - ); - } - } - finally { - startLatch.countDown(); + } + ) + ); } + readyLatch.await(); + startLatch.countDown(); List> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size()); allFutures.addAll(queryFutures); allFutures.addAll(indexFutures); Futures.allAsList(allFutures).get(); - Assert.assertTrue("Did not hit concurrency, please try again", concurrentlyRan.get()); + Assert.assertTrue("Queries ran too fast", queriesAccumualted.get() > 0); + Assert.assertTrue("Did not hit concurrency, please try again", concurrentlyRan.get() > 0); queryExecutor.shutdown(); indexExecutor.shutdown(); QueryRunner> runner = new FinalizeResultsQueryRunner>( @@ -406,12 +422,12 @@ public class IncrementalIndexTest for (int i = 0; i < dimensionCount; ++i) { Assert.assertEquals( String.format("Failed long sum on dimension %d", i), - elementsPerThread * taskCount, + elementsPerThread * concurrentThreads, result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue() ); Assert.assertEquals( String.format("Failed double sum on dimension %d", i), - elementsPerThread * taskCount, + elementsPerThread * concurrentThreads, result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue() ); }