mirror of https://github.com/apache/druid.git
Add some basic latching to concurrency testing in IncrementalIndexTest
This commit is contained in:
parent
e5532367e1
commit
d6849805ea
|
@ -292,68 +292,92 @@ public class IncrementalIndexTest
|
|||
final AtomicInteger currentlyRunning = new AtomicInteger(0);
|
||||
final AtomicBoolean concurrentlyRan = new AtomicBoolean(false);
|
||||
final AtomicInteger someoneRan = new AtomicInteger(0);
|
||||
for (int j = 0; j < taskCount; j++) {
|
||||
indexFutures.add(
|
||||
indexExecutor.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
try {
|
||||
for (int j = 0; j < taskCount; j++) {
|
||||
indexFutures.add(
|
||||
indexExecutor.submit(
|
||||
new Runnable()
|
||||
{
|
||||
currentlyRunning.incrementAndGet();
|
||||
try {
|
||||
for (int i = 0; i < elementsPerThread; i++) {
|
||||
someoneRan.incrementAndGet();
|
||||
index.add(getLongRow(timestamp + i, i, dimensionCount));
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
startLatch.await();
|
||||
}
|
||||
}
|
||||
catch (IndexSizeExceededException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
currentlyRunning.decrementAndGet();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
queryFutures.add(
|
||||
queryExecutor.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
|
||||
factory.createRunner(incrementalIndexSegment),
|
||||
factory.getToolchest()
|
||||
);
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("xxx")
|
||||
.granularity(QueryGranularity.ALL)
|
||||
.intervals(ImmutableList.of(queryInterval))
|
||||
.aggregators(queryAggregatorFactories)
|
||||
.build();
|
||||
Map<String, Object> context = new HashMap<String, Object>();
|
||||
for (Result<TimeseriesResultValue> result :
|
||||
Sequences.toList(
|
||||
runner.run(query, context),
|
||||
new LinkedList<Result<TimeseriesResultValue>>()
|
||||
)
|
||||
) {
|
||||
final Integer ranCount = someoneRan.get();
|
||||
if (ranCount > 0) {
|
||||
final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0");
|
||||
// Eventually consistent, but should be somewhere in that range
|
||||
// Actual result is validated after all writes are guaranteed done.
|
||||
Assert.assertTrue(String.format("%d >= %g >= 0 violated", ranCount, sumResult), sumResult >= 0 && sumResult <= ranCount);
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
if (currentlyRunning.get() > 0) {
|
||||
concurrentlyRan.set(true);
|
||||
currentlyRunning.incrementAndGet();
|
||||
try {
|
||||
for (int i = 0; i < elementsPerThread; i++) {
|
||||
someoneRan.incrementAndGet();
|
||||
index.add(getLongRow(timestamp + i, i, dimensionCount));
|
||||
}
|
||||
}
|
||||
catch (IndexSizeExceededException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
currentlyRunning.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
)
|
||||
);
|
||||
|
||||
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("xxx")
|
||||
.granularity(QueryGranularity.ALL)
|
||||
.intervals(ImmutableList.of(queryInterval))
|
||||
.aggregators(queryAggregatorFactories)
|
||||
.build();
|
||||
queryFutures.add(
|
||||
queryExecutor.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
startLatch.await();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
|
||||
factory.createRunner(incrementalIndexSegment),
|
||||
factory.getToolchest()
|
||||
);
|
||||
Map<String, Object> context = new HashMap<String, Object>();
|
||||
for (Result<TimeseriesResultValue> result :
|
||||
Sequences.toList(
|
||||
runner.run(query, context),
|
||||
new LinkedList<Result<TimeseriesResultValue>>()
|
||||
)
|
||||
) {
|
||||
final Integer ranCount = someoneRan.get();
|
||||
if (ranCount > 0) {
|
||||
final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0");
|
||||
// Eventually consistent, but should be somewhere in that range
|
||||
// Actual result is validated after all writes are guaranteed done.
|
||||
Assert.assertTrue(
|
||||
String.format("%d >= %g >= 0 violated", ranCount, sumResult),
|
||||
sumResult >= 0 && sumResult <= ranCount
|
||||
);
|
||||
}
|
||||
if (currentlyRunning.get() > 0) {
|
||||
concurrentlyRan.set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
startLatch.countDown();
|
||||
}
|
||||
List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() + indexFutures.size());
|
||||
allFutures.addAll(queryFutures);
|
||||
|
|
Loading…
Reference in New Issue