mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Make UpdateCounter proof to update count overflow (#4138)
* Make UpdateCounter proof to update count overflow. * Fix
This commit is contained in:
parent
0c464f4a84
commit
0bc18e7906
@ -25,6 +25,11 @@ import java.util.concurrent.TimeoutException;
|
|||||||
|
|
||||||
final class UpdateCounter
|
final class UpdateCounter
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Max {@link Phaser}'s phase, specified in it's javadoc. Then it wraps to zero.
|
||||||
|
*/
|
||||||
|
private static final int MAX_PHASE = Integer.MAX_VALUE;
|
||||||
|
|
||||||
private final Phaser phaser = new Phaser(1);
|
private final Phaser phaser = new Phaser(1);
|
||||||
|
|
||||||
void update()
|
void update()
|
||||||
@ -34,14 +39,39 @@ final class UpdateCounter
|
|||||||
|
|
||||||
void awaitTotalUpdates(int totalUpdates) throws InterruptedException
|
void awaitTotalUpdates(int totalUpdates) throws InterruptedException
|
||||||
{
|
{
|
||||||
|
totalUpdates &= MAX_PHASE;
|
||||||
int currentUpdates = phaser.getPhase();
|
int currentUpdates = phaser.getPhase();
|
||||||
while (totalUpdates - currentUpdates > 0) { // overflow-aware
|
checkNotTerminated(currentUpdates);
|
||||||
|
while (comparePhases(totalUpdates, currentUpdates) > 0) {
|
||||||
currentUpdates = phaser.awaitAdvanceInterruptibly(currentUpdates);
|
currentUpdates = phaser.awaitAdvanceInterruptibly(currentUpdates);
|
||||||
|
checkNotTerminated(currentUpdates);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int comparePhases(int phase1, int phase2)
|
||||||
|
{
|
||||||
|
int diff = (phase1 - phase2) & MAX_PHASE;
|
||||||
|
if (diff == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return diff < MAX_PHASE / 2 ? 1 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkNotTerminated(int phase)
|
||||||
|
{
|
||||||
|
if (phase < 0) {
|
||||||
|
throw new IllegalStateException("Phaser[" + phaser + "] unexpectedly terminated.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void awaitNextUpdates(int nextUpdates) throws InterruptedException
|
void awaitNextUpdates(int nextUpdates) throws InterruptedException
|
||||||
{
|
{
|
||||||
|
if (nextUpdates <= 0) {
|
||||||
|
throw new IllegalArgumentException("nextUpdates is not positive: " + nextUpdates);
|
||||||
|
}
|
||||||
|
if (nextUpdates > MAX_PHASE / 4) {
|
||||||
|
throw new UnsupportedOperationException("Couldn't wait for so many updates: " + nextUpdates);
|
||||||
|
}
|
||||||
awaitTotalUpdates(phaser.getPhase() + nextUpdates);
|
awaitTotalUpdates(phaser.getPhase() + nextUpdates);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user