Issue #6254 - Total timeout not enforced for queued requests.
Updates after review. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
da50e06b64
commit
88ac10439a
|
@ -567,14 +567,15 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
||||||
LOG.debug("{} timeouts check", this);
|
LOG.debug("{} timeouts check", this);
|
||||||
|
|
||||||
long now = System.nanoTime();
|
long now = System.nanoTime();
|
||||||
|
long earliest = Long.MAX_VALUE;
|
||||||
// Reset the earliest timeout so we can expire again.
|
// Reset the earliest timeout so we can expire again.
|
||||||
// A concurrent call to schedule(long) may lose an earliest
|
// A concurrent call to schedule(long) may lose an earliest
|
||||||
// value, but the corresponding exchange is already enqueued
|
// value, but the corresponding exchange is already enqueued
|
||||||
// and will be seen by scanning the exchange queue below.
|
// and will be seen by scanning the exchange queue below.
|
||||||
earliestTimeout.set(Long.MAX_VALUE);
|
earliestTimeout.set(earliest);
|
||||||
|
|
||||||
long earliest = Long.MAX_VALUE;
|
// Scan the message queue to abort expired exchanges
|
||||||
|
// and to find the exchange that expire the earliest.
|
||||||
for (HttpExchange exchange : exchanges)
|
for (HttpExchange exchange : exchanges)
|
||||||
{
|
{
|
||||||
HttpRequest request = exchange.getRequest();
|
HttpRequest request = exchange.getRequest();
|
||||||
|
@ -596,8 +597,8 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
||||||
// Schedule a timeout for the earliest exchange that may expire.
|
// Schedule a timeout for the earliest exchange that may expire.
|
||||||
// When the timeout expires, scan the exchange queue for the next
|
// When the timeout expires, scan the exchange queue for the next
|
||||||
// earliest exchange that may expire, and reschedule a new timeout.
|
// earliest exchange that may expire, and reschedule a new timeout.
|
||||||
long earliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
|
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
|
||||||
if (expiresAt < earliest)
|
if (expiresAt < prevEarliest)
|
||||||
{
|
{
|
||||||
// A new request expires earlier than previous requests, schedule it.
|
// A new request expires earlier than previous requests, schedule it.
|
||||||
long delay = Math.max(0, expiresAt - System.nanoTime());
|
long delay = Math.max(0, expiresAt - System.nanoTime());
|
||||||
|
|
|
@ -119,10 +119,12 @@ public abstract class CyclicTimeout implements Destroyable
|
||||||
if (_timeout.compareAndSet(timeout, new Timeout(newTimeoutAt, wakeup)))
|
if (_timeout.compareAndSet(timeout, new Timeout(newTimeoutAt, wakeup)))
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
|
{
|
||||||
LOG.debug("Installed timeout in {} ms, {} wake up in {} ms",
|
LOG.debug("Installed timeout in {} ms, {} wake up in {} ms",
|
||||||
units.toMillis(delay),
|
units.toMillis(delay),
|
||||||
newWakeup != null ? "new" : "existing",
|
newWakeup != null ? "new" : "existing",
|
||||||
TimeUnit.NANOSECONDS.toMillis(wakeup._at - now));
|
TimeUnit.NANOSECONDS.toMillis(wakeup._at - now));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue