Merge pull request #1710 from metamx/incrementalIndexConcurrentTestLatching

Add some basic latching to concurrency testing in IncrementalIndexTest
This commit is contained in:
Fangjin Yang 2015-09-15 13:55:52 -07:00
commit 8b071a7230
1 changed files with 81 additions and 57 deletions

View File

@ -292,68 +292,92 @@ public class IncrementalIndexTest
final AtomicInteger currentlyRunning = new AtomicInteger(0); final AtomicInteger currentlyRunning = new AtomicInteger(0);
final AtomicBoolean concurrentlyRan = new AtomicBoolean(false); final AtomicBoolean concurrentlyRan = new AtomicBoolean(false);
final AtomicInteger someoneRan = new AtomicInteger(0); final AtomicInteger someoneRan = new AtomicInteger(0);
for (int j = 0; j < taskCount; j++) { final CountDownLatch startLatch = new CountDownLatch(1);
indexFutures.add( try {
indexExecutor.submit( for (int j = 0; j < taskCount; j++) {
new Runnable() indexFutures.add(
{ indexExecutor.submit(
@Override new Runnable()
public void run()
{ {
currentlyRunning.incrementAndGet(); @Override
try { public void run()
for (int i = 0; i < elementsPerThread; i++) { {
someoneRan.incrementAndGet(); try {
index.add(getLongRow(timestamp + i, i, dimensionCount)); startLatch.await();
} }
} catch (InterruptedException e) {
catch (IndexSizeExceededException e) { Thread.currentThread().interrupt();
throw Throwables.propagate(e); throw Throwables.propagate(e);
}
currentlyRunning.decrementAndGet();
}
}
)
);
queryFutures.add(
queryExecutor.submit(
new Runnable()
{
@Override
public void run()
{
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
factory.createRunner(incrementalIndexSegment),
factory.getToolchest()
);
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("xxx")
.granularity(QueryGranularity.ALL)
.intervals(ImmutableList.of(queryInterval))
.aggregators(queryAggregatorFactories)
.build();
Map<String, Object> context = new HashMap<String, Object>();
for (Result<TimeseriesResultValue> result :
Sequences.toList(
runner.run(query, context),
new LinkedList<Result<TimeseriesResultValue>>()
)
) {
final Integer ranCount = someoneRan.get();
if (ranCount > 0) {
final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0");
// Eventually consistent, but should be somewhere in that range
// Actual result is validated after all writes are guaranteed done.
Assert.assertTrue(String.format("%d >= %g >= 0 violated", ranCount, sumResult), sumResult >= 0 && sumResult <= ranCount);
} }
} currentlyRunning.incrementAndGet();
if (currentlyRunning.get() > 0) { try {
concurrentlyRan.set(true); for (int i = 0; i < elementsPerThread; i++) {
someoneRan.incrementAndGet();
index.add(getLongRow(timestamp + i, i, dimensionCount));
}
}
catch (IndexSizeExceededException e) {
throw Throwables.propagate(e);
}
currentlyRunning.decrementAndGet();
} }
} }
} )
) );
);
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("xxx")
.granularity(QueryGranularity.ALL)
.intervals(ImmutableList.of(queryInterval))
.aggregators(queryAggregatorFactories)
.build();
queryFutures.add(
queryExecutor.submit(
new Runnable()
{
@Override
public void run()
{
try {
startLatch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
factory.createRunner(incrementalIndexSegment),
factory.getToolchest()
);
Map<String, Object> context = new HashMap<String, Object>();
for (Result<TimeseriesResultValue> result :
Sequences.toList(
runner.run(query, context),
new LinkedList<Result<TimeseriesResultValue>>()
)
) {
final Integer ranCount = someoneRan.get();
if (ranCount > 0) {
final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0");
// Eventually consistent, but should be somewhere in that range
// Actual result is validated after all writes are guaranteed done.
Assert.assertTrue(
String.format("%d >= %g >= 0 violated", ranCount, sumResult),
sumResult >= 0 && sumResult <= ranCount
);
}
if (currentlyRunning.get() > 0) {
concurrentlyRan.set(true);
}
}
}
}
)
);
}
}
finally {
startLatch.countDown();
} }
List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size()); List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size());
allFutures.addAll(queryFutures); allFutures.addAll(queryFutures);