diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/UpdateCounter.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/UpdateCounter.java index 46da9218428..ed229b45f89 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/UpdateCounter.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/UpdateCounter.java @@ -25,6 +25,11 @@ import java.util.concurrent.TimeoutException; final class UpdateCounter { + /** + * Max {@link Phaser}'s phase, specified in it's javadoc. Then it wraps to zero. + */ + private static final int MAX_PHASE = Integer.MAX_VALUE; + private final Phaser phaser = new Phaser(1); void update() @@ -34,14 +39,39 @@ final class UpdateCounter void awaitTotalUpdates(int totalUpdates) throws InterruptedException { + totalUpdates &= MAX_PHASE; int currentUpdates = phaser.getPhase(); - while (totalUpdates - currentUpdates > 0) { // overflow-aware + checkNotTerminated(currentUpdates); + while (comparePhases(totalUpdates, currentUpdates) > 0) { currentUpdates = phaser.awaitAdvanceInterruptibly(currentUpdates); + checkNotTerminated(currentUpdates); + } + } + + private static int comparePhases(int phase1, int phase2) + { + int diff = (phase1 - phase2) & MAX_PHASE; + if (diff == 0) { + return 0; + } + return diff < MAX_PHASE / 2 ? 1 : -1; + } + + private void checkNotTerminated(int phase) + { + if (phase < 0) { + throw new IllegalStateException("Phaser[" + phaser + "] unexpectedly terminated."); } } void awaitNextUpdates(int nextUpdates) throws InterruptedException { + if (nextUpdates <= 0) { + throw new IllegalArgumentException("nextUpdates is not positive: " + nextUpdates); + } + if (nextUpdates > MAX_PHASE / 4) { + throw new UnsupportedOperationException("Couldn't wait for so many updates: " + nextUpdates); + } awaitTotalUpdates(phaser.getPhase() + nextUpdates); }