From 5f23689aa7f44c0660ba2ad92c7c6a15d7c4af15 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 11 May 2021 22:14:45 +0200 Subject: [PATCH] Issue #6254 - Total timeout not enforced for queued requests. Various code cleanups. Renamed HttpDestination.TimeoutTask to RequestTimeouts for clarity. Improved javadocs, code comments and logging. Signed-off-by: Simone Bordet --- .../eclipse/jetty/client/HttpDestination.java | 68 +++++++++---------- .../jetty/client/TimeoutCompleteListener.java | 23 +++---- .../org/eclipse/jetty/io/CyclicTimeout.java | 20 +++++- 3 files changed, 59 insertions(+), 52 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index f82a194eaf4..13c1d20662f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -65,7 +65,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest private final ProxyConfiguration.Proxy proxy; private final ClientConnectionFactory connectionFactory; private final HttpField hostField; - private final TimeoutTask timeout; + private final RequestTimeouts requestTimeouts; private ConnectionPool connectionPool; public HttpDestination(HttpClient client, Origin origin) @@ -78,7 +78,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest this.requestNotifier = new RequestNotifier(client); this.responseNotifier = new ResponseNotifier(); - this.timeout = new TimeoutTask(client.getScheduler()); + this.requestTimeouts = new RequestTimeouts(client.getScheduler()); ProxyConfiguration proxyConfig = client.getProxyConfiguration(); proxy = proxyConfig.match(origin); @@ -272,7 +272,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest { long expiresAt = request.getTimeoutAt(); if (expiresAt != -1) - timeout.schedule(expiresAt); + requestTimeouts.schedule(expiresAt); if (!client.isRunning() && exchanges.remove(exchange)) { @@ -425,7 +425,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest if (LOG.isDebugEnabled()) LOG.debug("Closed {}", this); connectionPool.close(); - timeout.destroy(); + requestTimeouts.destroy(); } public void release(Connection connection) @@ -547,15 +547,15 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest } /** - * This class enforces the total timeout for exchanges that are still in the queue. - * The total timeout for exchanges that are not in the destination queue is enforced - * by {@link HttpChannel}. + *

Enforces the total timeout for for exchanges that are still in the queue.

+ *

The total timeout for exchanges that are not in the destination queue + * is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.

*/ - private class TimeoutTask extends CyclicTimeout + private class RequestTimeouts extends CyclicTimeout { - private final AtomicLong nextTimeout = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE); - private TimeoutTask(Scheduler scheduler) + private RequestTimeouts(Scheduler scheduler) { super(scheduler); } @@ -564,14 +564,17 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest public void onTimeoutExpired() { if (LOG.isDebugEnabled()) - LOG.debug("{} timeout expired", this); + LOG.debug("{} timeouts check", this); - nextTimeout.set(Long.MAX_VALUE); long now = System.nanoTime(); - long nextExpiresAt = Long.MAX_VALUE; - // Check all queued exchanges for those that have expired - // and to determine when the next check must be. + // Reset the earliest timeout so we can expire again. + // A concurrent call to schedule(long) may lose an earliest + // value, but the corresponding exchange is already enqueued + // and will be seen by scanning the exchange queue below. + earliestTimeout.set(Long.MAX_VALUE); + + long earliest = Long.MAX_VALUE; for (HttpExchange exchange : exchanges) { HttpRequest request = exchange.getRequest(); @@ -580,34 +583,27 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest continue; if (expiresAt <= now) request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed")); - else if (expiresAt < nextExpiresAt) - nextExpiresAt = expiresAt; + else if (expiresAt < earliest) + earliest = expiresAt; } - if (nextExpiresAt < Long.MAX_VALUE && client.isRunning()) - schedule(nextExpiresAt); + if (earliest < Long.MAX_VALUE && client.isRunning()) + schedule(earliest); } private void schedule(long expiresAt) { - // Schedule a timeout for the soonest any known exchange can expire. - // If subsequently that exchange is removed from the queue, the - // timeout is not cancelled, instead the entire queue is swept - // for expired exchanges and a new timeout is set. - long timeoutAt = nextTimeout.getAndUpdate(e -> Math.min(e, expiresAt)); - if (timeoutAt != expiresAt) + // Schedule a timeout for the earliest exchange that may expire. + // When the timeout expires, scan the exchange queue for the next + // earliest exchange that may expire, and reschedule a new timeout. + long earliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt)); + if (expiresAt != earliest) { - long delay = expiresAt - System.nanoTime(); - if (delay <= 0) - { - onTimeoutExpired(); - } - else - { - schedule(delay, TimeUnit.NANOSECONDS); - if (LOG.isDebugEnabled()) - LOG.debug("{} scheduled timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay)); - } + // A new request expires earlier than previous requests, schedule it. + long delay = Math.max(0, expiresAt - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay)); + schedule(delay, TimeUnit.NANOSECONDS); } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java index 7457fbd32e1..46417885779 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java @@ -34,7 +34,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C { private static final Logger LOG = Log.getLogger(TimeoutCompleteListener.class); - private final AtomicReference request = new AtomicReference<>(); + private final AtomicReference requestTimeout = new AtomicReference<>(); public TimeoutCompleteListener(Scheduler scheduler) { @@ -44,7 +44,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C @Override public void onTimeoutExpired() { - Request request = this.request.getAndSet(null); + Request request = requestTimeout.getAndSet(null); if (LOG.isDebugEnabled()) LOG.debug("Total timeout {} ms elapsed for {} on {}", request.getTimeout(), request, this); if (request != null) @@ -54,7 +54,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C @Override public void onComplete(Result result) { - Request request = this.request.getAndSet(null); + Request request = requestTimeout.getAndSet(null); if (request != null) { boolean cancelled = cancel(); @@ -65,19 +65,12 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C void schedule(HttpRequest request, long timeoutAt) { - if (this.request.compareAndSet(null, request)) + if (requestTimeout.compareAndSet(null, request)) { - long delay = timeoutAt - System.nanoTime(); - if (delay <= 0) - { - onTimeoutExpired(); - } - else - { - schedule(delay, TimeUnit.NANOSECONDS); - if (LOG.isDebugEnabled()) - LOG.debug("Scheduled timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this); - } + long delay = Math.max(0, timeoutAt - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("Scheduling timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this); + schedule(delay, TimeUnit.NANOSECONDS); } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java index e4b5f7e6cf0..fae0374c403 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java @@ -33,6 +33,23 @@ import static java.lang.Long.MAX_VALUE; *

Subclasses should implement {@link #onTimeoutExpired()}.

*

This implementation is optimised assuming that the timeout * will mostly be cancelled and then reused with a similar value.

+ *

The typical scenario to use this class is when you have events + * that postpone (by re-scheduling), or cancel then re-schedule, a + * timeout for a single entity. + * For example: connection idleness, where for each connection there + * is a CyclicTimeout and a read/write postpones the timeout; when + * the timeout expires, the implementation checks against a timestamp + * if the connection is really idle. + * Another example: HTTP session expiration, where for each HTTP + * session there is a CyclicTimeout and at the beginning of the + * request processing the timeout is canceled (via cancel()), but at + * the end of the request processing the timeout is re-scheduled.

+ *

Another typical scenario is for a parent entity to manage + * the timeouts of many children entities; the timeout is scheduled + * for the child entity that expires the earlier; when the timeout + * expires, the implementation scans the children entities to find + * the expired child entities and to find the next child entity + * that expires the earlier.

*

This implementation has a {@link Timeout} holding the time * at which the scheduled task should fire, and a linked list of * {@link Wakeup}, each holding the actual scheduled task.

@@ -102,8 +119,9 @@ public abstract class CyclicTimeout implements Destroyable if (_timeout.compareAndSet(timeout, new Timeout(newTimeoutAt, wakeup))) { if (LOG.isDebugEnabled()) - LOG.debug("Installed timeout in {} ms, waking up in {} ms", + LOG.debug("Installed timeout in {} ms, {} wake up in {} ms", units.toMillis(delay), + newWakeup != null ? "new" : "existing", TimeUnit.NANOSECONDS.toMillis(wakeup._at - now)); break; }