Merge pull request #1049 from metamx/SlowIncrementalIndexTest

Account for very slow writer threads in IncrementalIndexTest
This commit is contained in:
Xavier Léauté 2015-01-19 10:14:30 -08:00
commit a550dd1f45
1 changed files with 8 additions and 4 deletions

View File

@ -291,7 +291,7 @@ public class IncrementalIndexTest
); );
final AtomicInteger currentlyRunning = new AtomicInteger(0); final AtomicInteger currentlyRunning = new AtomicInteger(0);
final AtomicBoolean concurrentlyRan = new AtomicBoolean(false); final AtomicBoolean concurrentlyRan = new AtomicBoolean(false);
final AtomicBoolean someoneRan = new AtomicBoolean(false); final AtomicInteger someoneRan = new AtomicInteger(0);
for (int j = 0; j < taskCount; j++) { for (int j = 0; j < taskCount; j++) {
indexFutures.add( indexFutures.add(
indexExecutor.submit( indexExecutor.submit(
@ -303,6 +303,7 @@ public class IncrementalIndexTest
currentlyRunning.incrementAndGet(); currentlyRunning.incrementAndGet();
try { try {
for (int i = 0; i < elementsPerThread; i++) { for (int i = 0; i < elementsPerThread; i++) {
someoneRan.incrementAndGet();
index.add(getLongRow(timestamp + i, i, dimensionCount)); index.add(getLongRow(timestamp + i, i, dimensionCount));
} }
} }
@ -310,7 +311,6 @@ public class IncrementalIndexTest
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
currentlyRunning.decrementAndGet(); currentlyRunning.decrementAndGet();
someoneRan.set(true);
} }
} }
) )
@ -339,8 +339,12 @@ public class IncrementalIndexTest
new LinkedList<Result<TimeseriesResultValue>>() new LinkedList<Result<TimeseriesResultValue>>()
) )
) { ) {
if (someoneRan.get()) { final Integer ranCount = someoneRan.get();
Assert.assertTrue(result.getValue().getDoubleMetric("doubleSumResult0") > 0); if (ranCount > 0) {
final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0");
// Eventually consistent, but should be somewhere in that range
// Actual result is validated after all writes are guaranteed done.
Assert.assertTrue(String.format("%d >= %g >= 0 violated", ranCount, sumResult), sumResult >= 0 && sumResult <= ranCount);
} }
} }
if (currentlyRunning.get() > 0) { if (currentlyRunning.get() > 0) {