improve hash wheel timer

This commit is contained in:
kimchy 2011-01-31 16:26:17 +02:00
parent cebdae5707
commit 54c770bcc9
1 changed files with 60 additions and 60 deletions

View File

@ -75,11 +75,6 @@ public class HashedWheelTimer implements Timer {
private static final AtomicInteger id = new AtomicInteger(); private static final AtomicInteger id = new AtomicInteger();
// I'd say 64 active timer threads are obvious misuse.
private static final int MISUSE_WARNING_THRESHOLD = 64;
private static final AtomicInteger activeInstances = new AtomicInteger();
private static final AtomicBoolean loggedMisuseWarning = new AtomicBoolean();
private final Worker worker = new Worker(); private final Worker worker = new Worker();
final Thread workerThread; final Thread workerThread;
final AtomicBoolean shutdown = new AtomicBoolean(); final AtomicBoolean shutdown = new AtomicBoolean();
@ -153,17 +148,6 @@ public class HashedWheelTimer implements Timer {
workerThread = threadFactory.newThread(new ThreadRenamingRunnable( workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
worker, "Hashed wheel timer #" + id.incrementAndGet())); worker, "Hashed wheel timer #" + id.incrementAndGet()));
// Misuse check
int activeInstances = HashedWheelTimer.activeInstances.incrementAndGet();
if (activeInstances >= MISUSE_WARNING_THRESHOLD &&
loggedMisuseWarning.compareAndSet(false, true)) {
logger.debug(
"There are too many active " +
HashedWheelTimer.class.getSimpleName() + " instances (" +
activeInstances + ") - you should share the small number " +
"of instances to avoid excessive resource consumption.");
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -239,8 +223,6 @@ public class HashedWheelTimer implements Timer {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
activeInstances.decrementAndGet();
Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
for (Set<HashedWheelTimeout> bucket : wheel) { for (Set<HashedWheelTimeout> bucket : wheel) {
unprocessedTimeouts.addAll(bucket); unprocessedTimeouts.addAll(bucket);
@ -269,13 +251,23 @@ public class HashedWheelTimer implements Timer {
start(); start();
} }
// Prepare the required parameters to create the timeout object. HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
HashedWheelTimeout timeout; scheduleTimeout(timeout, delay);
return timeout;
}
void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
// delay must be equal to or greater than tickDuration so that the
// worker thread never misses the timeout.
if (delay < tickDuration) {
delay = tickDuration;
}
// Prepare the required parameters to schedule the timeout object.
final long lastRoundDelay = delay % roundDuration; final long lastRoundDelay = delay % roundDuration;
final long lastTickDelay = delay % tickDuration; final long lastTickDelay = delay % tickDuration;
final long relativeIndex = final long relativeIndex =
lastRoundDelay / tickDuration + (lastTickDelay != 0 ? 1 : 0); lastRoundDelay / tickDuration + (lastTickDelay != 0 ? 1 : 0);
final long deadline = currentTime + delay;
final long remainingRounds = final long remainingRounds =
delay / roundDuration - (delay % roundDuration == 0 ? 1 : 0); delay / roundDuration - (delay % roundDuration == 0 ? 1 : 0);
@ -283,18 +275,14 @@ public class HashedWheelTimer implements Timer {
// Add the timeout to the wheel. // Add the timeout to the wheel.
lock.readLock().lock(); lock.readLock().lock();
try { try {
timeout = int stopIndex = (int) (wheelCursor + relativeIndex & mask);
new HashedWheelTimeout( timeout.stopIndex = stopIndex;
task, deadline, timeout.remainingRounds = remainingRounds;
(int) (wheelCursor + relativeIndex & mask),
remainingRounds);
wheel[timeout.stopIndex].add(timeout); wheel[stopIndex].add(timeout);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
return timeout;
} }
private final class Worker implements Runnable { private final class Worker implements Runnable {
@ -306,6 +294,7 @@ public class HashedWheelTimer implements Timer {
super(); super();
} }
@Override
public void run() { public void run() {
List<HashedWheelTimeout> expiredTimeouts = List<HashedWheelTimeout> expiredTimeouts =
new ArrayList<HashedWheelTimeout>(); new ArrayList<HashedWheelTimeout>();
@ -314,14 +303,16 @@ public class HashedWheelTimer implements Timer {
tick = 1; tick = 1;
while (!shutdown.get()) { while (!shutdown.get()) {
waitForNextTick(); final long deadline = waitForNextTick();
fetchExpiredTimeouts(expiredTimeouts); if (deadline > 0) {
notifyExpiredTimeouts(expiredTimeouts); fetchExpiredTimeouts(expiredTimeouts, deadline);
notifyExpiredTimeouts(expiredTimeouts);
}
} }
} }
private void fetchExpiredTimeouts( private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts) { List<HashedWheelTimeout> expiredTimeouts, long deadline) {
// Find the expired timeouts and decrease the round counter // Find the expired timeouts and decrease the round counter
// if necessary. Note that we don't send the notification // if necessary. Note that we don't send the notification
@ -329,12 +320,9 @@ public class HashedWheelTimer implements Timer {
// an exclusive lock. // an exclusive lock.
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
int oldBucketHead = wheelCursor; int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
int newBucketHead = oldBucketHead + 1 & mask; ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
wheelCursor = newBucketHead; fetchExpiredTimeouts(expiredTimeouts, i, deadline);
ReusableIterator<HashedWheelTimeout> i = iterators[oldBucketHead];
fetchExpiredTimeouts(expiredTimeouts, i);
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
@ -342,24 +330,37 @@ public class HashedWheelTimer implements Timer {
private void fetchExpiredTimeouts( private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts, List<HashedWheelTimeout> expiredTimeouts,
ReusableIterator<HashedWheelTimeout> i) { ReusableIterator<HashedWheelTimeout> i, long deadline) {
long currentDeadline = System.currentTimeMillis() + tickDuration; List<HashedWheelTimeout> slipped = null;
i.rewind(); i.rewind();
while (i.hasNext()) { while (i.hasNext()) {
HashedWheelTimeout timeout = i.next(); HashedWheelTimeout timeout = i.next();
if (timeout.remainingRounds <= 0) { if (timeout.remainingRounds <= 0) {
if (timeout.deadline < currentDeadline) { i.remove();
i.remove(); if (timeout.deadline <= deadline) {
expiredTimeouts.add(timeout); expiredTimeouts.add(timeout);
} else { } else {
// A rare case where a timeout is put for the next // Handle the case where the timeout is put into a wrong
// round: just wait for the next round. // place, usually one tick earlier. For now, just add
// it to a temporary list - we will reschedule it in a
// separate loop.
if (slipped == null) {
slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
}
slipped.add(timeout);
} }
} else { } else {
timeout.remainingRounds--; timeout.remainingRounds--;
} }
} }
// Reschedule the slipped timeouts.
if (slipped != null) {
for (HashedWheelTimeout timeout : slipped) {
scheduleTimeout(timeout, timeout.deadline - deadline);
}
}
} }
private void notifyExpiredTimeouts( private void notifyExpiredTimeouts(
@ -373,7 +374,9 @@ public class HashedWheelTimer implements Timer {
expiredTimeouts.clear(); expiredTimeouts.clear();
} }
private void waitForNextTick() { private long waitForNextTick() {
long deadline = startTime + tickDuration * tick;
for (; ;) { for (; ;) {
final long currentTime = System.currentTimeMillis(); final long currentTime = System.currentTimeMillis();
final long sleepTime = tickDuration * tick - (currentTime - startTime); final long sleepTime = tickDuration * tick - (currentTime - startTime);
@ -386,46 +389,41 @@ public class HashedWheelTimer implements Timer {
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (shutdown.get()) { if (shutdown.get()) {
return; return -1;
} }
} }
} }
// Reset the tick if overflow is expected. // Increase the tick.
if (tickDuration * tick > Long.MAX_VALUE - tickDuration) { tick++;
startTime = System.currentTimeMillis(); return deadline;
tick = 1;
} else {
// Increase the tick if overflow is not likely to happen.
tick++;
}
} }
} }
private final class HashedWheelTimeout implements Timeout { private final class HashedWheelTimeout implements Timeout {
private final TimerTask task; private final TimerTask task;
final int stopIndex;
final long deadline; final long deadline;
volatile int stopIndex;
volatile long remainingRounds; volatile long remainingRounds;
private volatile boolean cancelled; private volatile boolean cancelled;
HashedWheelTimeout( HashedWheelTimeout(TimerTask task, long deadline) {
TimerTask task, long deadline, int stopIndex, long remainingRounds) {
this.task = task; this.task = task;
this.deadline = deadline; this.deadline = deadline;
this.stopIndex = stopIndex;
this.remainingRounds = remainingRounds;
} }
@Override
public Timer getTimer() { public Timer getTimer() {
return HashedWheelTimer.this; return HashedWheelTimer.this;
} }
@Override
public TimerTask getTask() { public TimerTask getTask() {
return task; return task;
} }
@Override
public void cancel() { public void cancel() {
if (isExpired()) { if (isExpired()) {
return; return;
@ -437,10 +435,12 @@ public class HashedWheelTimer implements Timer {
wheel[stopIndex].remove(this); wheel[stopIndex].remove(this);
} }
@Override
public boolean isCancelled() { public boolean isCancelled() {
return cancelled; return cancelled;
} }
@Override
public boolean isExpired() { public boolean isExpired() {
return cancelled || System.currentTimeMillis() > deadline; return cancelled || System.currentTimeMillis() > deadline;
} }