diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/timer/HashedWheelTimer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/timer/HashedWheelTimer.java index be7bfdbd4a0..4927b31afe4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/timer/HashedWheelTimer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/timer/HashedWheelTimer.java @@ -75,11 +75,6 @@ public class HashedWheelTimer implements Timer { private static final AtomicInteger id = new AtomicInteger(); - // I'd say 64 active timer threads are obvious misuse. - private static final int MISUSE_WARNING_THRESHOLD = 64; - private static final AtomicInteger activeInstances = new AtomicInteger(); - private static final AtomicBoolean loggedMisuseWarning = new AtomicBoolean(); - private final Worker worker = new Worker(); final Thread workerThread; final AtomicBoolean shutdown = new AtomicBoolean(); @@ -153,17 +148,6 @@ public class HashedWheelTimer implements Timer { workerThread = threadFactory.newThread(new ThreadRenamingRunnable( worker, "Hashed wheel timer #" + id.incrementAndGet())); - - // Misuse check - int activeInstances = HashedWheelTimer.activeInstances.incrementAndGet(); - if (activeInstances >= MISUSE_WARNING_THRESHOLD && - loggedMisuseWarning.compareAndSet(false, true)) { - logger.debug( - "There are too many active " + - HashedWheelTimer.class.getSimpleName() + " instances (" + - activeInstances + ") - you should share the small number " + - "of instances to avoid excessive resource consumption."); - } } @SuppressWarnings("unchecked") @@ -239,8 +223,6 @@ public class HashedWheelTimer implements Timer { Thread.currentThread().interrupt(); } - activeInstances.decrementAndGet(); - Set unprocessedTimeouts = new HashSet(); for (Set bucket : wheel) { unprocessedTimeouts.addAll(bucket); @@ -269,13 +251,23 @@ public class HashedWheelTimer implements Timer { start(); } - // Prepare the required parameters to create the timeout object. - HashedWheelTimeout timeout; + HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); + scheduleTimeout(timeout, delay); + return timeout; + } + + void scheduleTimeout(HashedWheelTimeout timeout, long delay) { + // delay must be equal to or greater than tickDuration so that the + // worker thread never misses the timeout. + if (delay < tickDuration) { + delay = tickDuration; + } + + // Prepare the required parameters to schedule the timeout object. final long lastRoundDelay = delay % roundDuration; final long lastTickDelay = delay % tickDuration; final long relativeIndex = lastRoundDelay / tickDuration + (lastTickDelay != 0 ? 1 : 0); - final long deadline = currentTime + delay; final long remainingRounds = delay / roundDuration - (delay % roundDuration == 0 ? 1 : 0); @@ -283,18 +275,14 @@ public class HashedWheelTimer implements Timer { // Add the timeout to the wheel. lock.readLock().lock(); try { - timeout = - new HashedWheelTimeout( - task, deadline, - (int) (wheelCursor + relativeIndex & mask), - remainingRounds); + int stopIndex = (int) (wheelCursor + relativeIndex & mask); + timeout.stopIndex = stopIndex; + timeout.remainingRounds = remainingRounds; - wheel[timeout.stopIndex].add(timeout); + wheel[stopIndex].add(timeout); } finally { lock.readLock().unlock(); } - - return timeout; } private final class Worker implements Runnable { @@ -306,6 +294,7 @@ public class HashedWheelTimer implements Timer { super(); } + @Override public void run() { List expiredTimeouts = new ArrayList(); @@ -314,14 +303,16 @@ public class HashedWheelTimer implements Timer { tick = 1; while (!shutdown.get()) { - waitForNextTick(); - fetchExpiredTimeouts(expiredTimeouts); - notifyExpiredTimeouts(expiredTimeouts); + final long deadline = waitForNextTick(); + if (deadline > 0) { + fetchExpiredTimeouts(expiredTimeouts, deadline); + notifyExpiredTimeouts(expiredTimeouts); + } } } private void fetchExpiredTimeouts( - List expiredTimeouts) { + List expiredTimeouts, long deadline) { // Find the expired timeouts and decrease the round counter // if necessary. Note that we don't send the notification @@ -329,12 +320,9 @@ public class HashedWheelTimer implements Timer { // an exclusive lock. lock.writeLock().lock(); try { - int oldBucketHead = wheelCursor; - int newBucketHead = oldBucketHead + 1 & mask; - wheelCursor = newBucketHead; - - ReusableIterator i = iterators[oldBucketHead]; - fetchExpiredTimeouts(expiredTimeouts, i); + int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; + ReusableIterator i = iterators[newWheelCursor]; + fetchExpiredTimeouts(expiredTimeouts, i, deadline); } finally { lock.writeLock().unlock(); } @@ -342,24 +330,37 @@ public class HashedWheelTimer implements Timer { private void fetchExpiredTimeouts( List expiredTimeouts, - ReusableIterator i) { + ReusableIterator i, long deadline) { - long currentDeadline = System.currentTimeMillis() + tickDuration; + List slipped = null; i.rewind(); while (i.hasNext()) { HashedWheelTimeout timeout = i.next(); if (timeout.remainingRounds <= 0) { - if (timeout.deadline < currentDeadline) { - i.remove(); + i.remove(); + if (timeout.deadline <= deadline) { expiredTimeouts.add(timeout); } else { - // A rare case where a timeout is put for the next - // round: just wait for the next round. + // Handle the case where the timeout is put into a wrong + // place, usually one tick earlier. For now, just add + // it to a temporary list - we will reschedule it in a + // separate loop. + if (slipped == null) { + slipped = new ArrayList(); + } + slipped.add(timeout); } } else { timeout.remainingRounds--; } } + + // Reschedule the slipped timeouts. + if (slipped != null) { + for (HashedWheelTimeout timeout : slipped) { + scheduleTimeout(timeout, timeout.deadline - deadline); + } + } } private void notifyExpiredTimeouts( @@ -373,7 +374,9 @@ public class HashedWheelTimer implements Timer { expiredTimeouts.clear(); } - private void waitForNextTick() { + private long waitForNextTick() { + long deadline = startTime + tickDuration * tick; + for (; ;) { final long currentTime = System.currentTimeMillis(); final long sleepTime = tickDuration * tick - (currentTime - startTime); @@ -386,46 +389,41 @@ public class HashedWheelTimer implements Timer { Thread.sleep(sleepTime); } catch (InterruptedException e) { if (shutdown.get()) { - return; + return -1; } } } - // Reset the tick if overflow is expected. - if (tickDuration * tick > Long.MAX_VALUE - tickDuration) { - startTime = System.currentTimeMillis(); - tick = 1; - } else { - // Increase the tick if overflow is not likely to happen. - tick++; - } + // Increase the tick. + tick++; + return deadline; } } private final class HashedWheelTimeout implements Timeout { private final TimerTask task; - final int stopIndex; final long deadline; + volatile int stopIndex; volatile long remainingRounds; private volatile boolean cancelled; - HashedWheelTimeout( - TimerTask task, long deadline, int stopIndex, long remainingRounds) { + HashedWheelTimeout(TimerTask task, long deadline) { this.task = task; this.deadline = deadline; - this.stopIndex = stopIndex; - this.remainingRounds = remainingRounds; } + @Override public Timer getTimer() { return HashedWheelTimer.this; } + @Override public TimerTask getTask() { return task; } + @Override public void cancel() { if (isExpired()) { return; @@ -437,10 +435,12 @@ public class HashedWheelTimer implements Timer { wheel[stopIndex].remove(this); } + @Override public boolean isCancelled() { return cancelled; } + @Override public boolean isExpired() { return cancelled || System.currentTimeMillis() > deadline; }