From d6849805ea681cebc2debf8f4b01da2fba0dba70 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 8 Sep 2015 14:00:17 -0700 Subject: [PATCH] Add some basic latching to concurrency testing in IncrementalIndexTest --- .../segment/data/IncrementalIndexTest.java | 138 ++++++++++-------- 1 file changed, 81 insertions(+), 57 deletions(-) diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 1b1cbb01750..8e0ae0d2d27 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -292,68 +292,92 @@ public class IncrementalIndexTest final AtomicInteger currentlyRunning = new AtomicInteger(0); final AtomicBoolean concurrentlyRan = new AtomicBoolean(false); final AtomicInteger someoneRan = new AtomicInteger(0); - for (int j = 0; j < taskCount; j++) { - indexFutures.add( - indexExecutor.submit( - new Runnable() - { - @Override - public void run() + final CountDownLatch startLatch = new CountDownLatch(1); + try { + for (int j = 0; j < taskCount; j++) { + indexFutures.add( + indexExecutor.submit( + new Runnable() { - currentlyRunning.incrementAndGet(); - try { - for (int i = 0; i < elementsPerThread; i++) { - someoneRan.incrementAndGet(); - index.add(getLongRow(timestamp + i, i, dimensionCount)); + @Override + public void run() + { + try { + startLatch.await(); } - } - catch (IndexSizeExceededException e) { - throw Throwables.propagate(e); - } - currentlyRunning.decrementAndGet(); - } - } - ) - ); - queryFutures.add( - queryExecutor.submit( - new Runnable() - { - @Override - public void run() - { - QueryRunner> runner = new FinalizeResultsQueryRunner>( - factory.createRunner(incrementalIndexSegment), - factory.getToolchest() - ); - TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() - .dataSource("xxx") - .granularity(QueryGranularity.ALL) - .intervals(ImmutableList.of(queryInterval)) - .aggregators(queryAggregatorFactories) - .build(); - Map context = new HashMap(); - for (Result result : - Sequences.toList( - runner.run(query, context), - new LinkedList>() - ) - ) { - final Integer ranCount = someoneRan.get(); - if (ranCount > 0) { - final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0"); - // Eventually consistent, but should be somewhere in that range - // Actual result is validated after all writes are guaranteed done. - Assert.assertTrue(String.format("%d >= %g >= 0 violated", ranCount, sumResult), sumResult >= 0 && sumResult <= ranCount); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); } - } - if (currentlyRunning.get() > 0) { - concurrentlyRan.set(true); + currentlyRunning.incrementAndGet(); + try { + for (int i = 0; i < elementsPerThread; i++) { + someoneRan.incrementAndGet(); + index.add(getLongRow(timestamp + i, i, dimensionCount)); + } + } + catch (IndexSizeExceededException e) { + throw Throwables.propagate(e); + } + currentlyRunning.decrementAndGet(); } } - } - ) - ); + ) + ); + + final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("xxx") + .granularity(QueryGranularity.ALL) + .intervals(ImmutableList.of(queryInterval)) + .aggregators(queryAggregatorFactories) + .build(); + queryFutures.add( + queryExecutor.submit( + new Runnable() + { + @Override + public void run() + { + try { + startLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + QueryRunner> runner = new FinalizeResultsQueryRunner>( + factory.createRunner(incrementalIndexSegment), + factory.getToolchest() + ); + Map context = new HashMap(); + for (Result result : + Sequences.toList( + runner.run(query, context), + new LinkedList>() + ) + ) { + final Integer ranCount = someoneRan.get(); + if (ranCount > 0) { + final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0"); + // Eventually consistent, but should be somewhere in that range + // Actual result is validated after all writes are guaranteed done. + Assert.assertTrue( + String.format("%d >= %g >= 0 violated", ranCount, sumResult), + sumResult >= 0 && sumResult <= ranCount + ); + } + if (currentlyRunning.get() > 0) { + concurrentlyRan.set(true); + } + } + } + } + ) + ); + } + } + finally { + startLatch.countDown(); } List> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size()); allFutures.addAll(queryFutures);