diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index e469febceb0..cfab3fe0486 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -291,7 +291,7 @@ public class IncrementalIndexTest ); final AtomicInteger currentlyRunning = new AtomicInteger(0); final AtomicBoolean concurrentlyRan = new AtomicBoolean(false); - final AtomicBoolean someoneRan = new AtomicBoolean(false); + final AtomicInteger someoneRan = new AtomicInteger(0); for (int j = 0; j < taskCount; j++) { indexFutures.add( indexExecutor.submit( @@ -303,6 +303,7 @@ public class IncrementalIndexTest currentlyRunning.incrementAndGet(); try { for (int i = 0; i < elementsPerThread; i++) { + someoneRan.incrementAndGet(); index.add(getLongRow(timestamp + i, i, dimensionCount)); } } @@ -310,7 +311,6 @@ public class IncrementalIndexTest throw Throwables.propagate(e); } currentlyRunning.decrementAndGet(); - someoneRan.set(true); } } ) @@ -339,8 +339,12 @@ public class IncrementalIndexTest new LinkedList>() ) ) { - if (someoneRan.get()) { - Assert.assertTrue(result.getValue().getDoubleMetric("doubleSumResult0") > 0); + final Integer ranCount = someoneRan.get(); + if (ranCount > 0) { + final Double sumResult = result.getValue().getDoubleMetric("doubleSumResult0"); + // Eventually consistent, but should be somewhere in that range + // Actual result is validated after all writes are guaranteed done. + Assert.assertTrue(String.format("%d >= %g >= 0 violated", ranCount, sumResult), sumResult >= 0 && sumResult <= ranCount); } } if (currentlyRunning.get() > 0) {