diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpConnection.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpConnection.java index d2d2c9fffdf..c9f26a6806b 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpConnection.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpConnection.java @@ -315,6 +315,11 @@ public abstract class HttpConnection implements IConnection, Attachable return String.format("%s@%h", getClass().getSimpleName(), this); } + /** + *

Enforces the total timeout for requests that have been sent.

+ *

The total timeout for exchanges that are in the destination queue + * is enforced in {@link HttpDestination}.

+ */ private class RequestTimeouts extends CyclicTimeouts { private RequestTimeouts(Scheduler scheduler) @@ -332,11 +337,14 @@ public abstract class HttpConnection implements IConnection, Attachable protected boolean onExpired(HttpChannel channel) { HttpExchange exchange = channel.getHttpExchange(); - if (exchange != null) - { - HttpRequest request = exchange.getRequest(); - request.abort(new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed")); - } + // The expiration lost the race, as the + // exchange may have just been completed. + if (exchange == null) + return false; + HttpRequest request = exchange.getRequest(); + request.abort(new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed")); + // The implementation of the Iterator returned above may not support + // removal, but the HttpChannel will be removed by request.abort(). return false; } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpDestination.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpDestination.java index 1dfb5b02fbc..4c61e00350b 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpDestination.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpDestination.java @@ -550,7 +550,7 @@ public class HttpDestination extends ContainerLifeCycle implements Destination, } /** - *

Enforces the total timeout for for exchanges that are still in the queue.

+ *

Enforces the total timeout for exchanges that are still in the queue.

*

The total timeout for exchanges that are not in the destination queue * is enforced in {@link HttpConnection}.

*/ @@ -572,6 +572,8 @@ public class HttpDestination extends ContainerLifeCycle implements Destination, { HttpRequest request = exchange.getRequest(); request.abort(new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed")); + // The implementation of the Iterator returned above does not support + // removal, but the HttpExchange will be removed by request.abort(). return false; } } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 386a6eac3af..12d6202ec20 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -2385,6 +2385,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session protected boolean onExpired(HTTP2Stream stream) { stream.onIdleTimeout(new TimeoutException("Idle timeout " + stream.getIdleTimeout() + " ms elapsed")); + // The implementation of the Iterator returned above does not support + // removal, but the HTTP2Stream will be removed by stream.onIdleTimeout(). return false; } } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 596207a070f..f992f2fc0de 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -311,10 +311,9 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum notifyIdleTimeout(this, timeout, Promise.from(timedOut -> { if (timedOut) - { - // Tell the other peer that we timed out. reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - } + else + notIdle(); }, x -> reset(new ResetFrame(getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP))); } diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Stream.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Stream.java index 6795b6686c2..2eadbe90012 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Stream.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Stream.java @@ -46,7 +46,7 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A private CloseState closeState = CloseState.NOT_CLOSED; private FrameState frameState = FrameState.INITIAL; private long idleTimeout; - private long expireNanoTime; + private long expireNanoTime = Long.MAX_VALUE; private Object attachment; private boolean dataDemand; private boolean dataStalled; @@ -129,6 +129,8 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A { if (timedOut) endPoint.close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), timeout); + else + notIdle(); promise.succeeded(timedOut); }, promise::failed)); } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java index 50fa36844a5..3f0f0aefc8d 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java @@ -65,6 +65,10 @@ public abstract class CyclicTimeouts impleme *

This method may be invoked multiple times, and even concurrently, * for the same expirable entity and therefore the expiration of the * entity, if any, should be an idempotent action.

+ *

When {@code false} is returned, the implementation should adjust + * the {@link Expirable} expiration, so that a call to + * {@link Expirable#getExpireNanoTime()} after this method has returned + * yields a new expiration nanoTime.

* * @param expirable the entity that is expired * @return whether the entity should be removed from the iterator via {@link Iterator#remove()} @@ -78,11 +82,10 @@ public abstract class CyclicTimeouts impleme long now = NanoTime.now(); long earliest = Long.MAX_VALUE; - // Reset the earliest timeout so we can expire again. - // A concurrent call to schedule(long) may lose an - // earliest value, but the corresponding entity will - // be seen during the iteration below. - earliestTimeout.set(earliest); + // Move the earliest timeout far in the future, so we can expire again. + // A concurrent call to schedule(long) may lose an earliest value, but + // the corresponding entity will be seen during the iteration below. + earliestTimeout.set(now + Long.MAX_VALUE); Iterator iterator = iterator(); if (iterator == null) @@ -98,23 +101,26 @@ public abstract class CyclicTimeouts impleme if (LOG.isDebugEnabled()) LOG.debug("Entity {} expires in {} ms for {}", expirable, NanoTime.millisElapsed(now, expiresAt), this); - if (expiresAt == -1) - continue; - if (NanoTime.isBeforeOrSame(expiresAt, now)) { boolean remove = onExpired(expirable); if (LOG.isDebugEnabled()) LOG.debug("Entity {} expired, remove={} for {}", expirable, remove, this); if (remove) + { iterator.remove(); - continue; + continue; + } + long newExpiresAt = expirable.getExpireNanoTime(); + if (newExpiresAt == expiresAt) + continue; + expiresAt = newExpiresAt; } earliest = Math.min(earliest, NanoTime.elapsed(now, expiresAt)); } - if (earliest < Long.MAX_VALUE) + if (earliest != Long.MAX_VALUE) schedule(now + earliest); } @@ -126,7 +132,7 @@ public abstract class CyclicTimeouts impleme public void schedule(T expirable) { long expiresAt = expirable.getExpireNanoTime(); - if (expiresAt < Long.MAX_VALUE) + if (expiresAt != Long.MAX_VALUE) schedule(expiresAt); } diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java index 0e55d0d050a..4756e533223 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java @@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class CyclicTimeoutsTest { private Scheduler scheduler; - private CyclicTimeouts timeouts; + private CyclicTimeouts timeouts; @BeforeEach public void prepare() @@ -65,14 +65,14 @@ public class CyclicTimeoutsTest timeouts = new CyclicTimeouts<>(scheduler) { @Override - protected Iterator iterator() + protected Iterator iterator() { latch.countDown(); return null; } @Override - protected boolean onExpired(ConstantExpirable expirable) + protected boolean onExpired(CyclicTimeouts.Expirable expirable) { return false; } @@ -93,14 +93,14 @@ public class CyclicTimeoutsTest timeouts = new CyclicTimeouts<>(scheduler) { @Override - protected Iterator iterator() + protected Iterator iterator() { iteratorLatch.countDown(); return Collections.emptyIterator(); } @Override - protected boolean onExpired(ConstantExpirable expirable) + protected boolean onExpired(CyclicTimeouts.Expirable expirable) { expiredLatch.countDown(); return false; @@ -118,22 +118,22 @@ public class CyclicTimeoutsTest public void testIterateAndExpire(boolean remove) throws Exception { ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS); - ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS); - Collection collection = new ArrayList<>(); + DynamicExpirable one = new DynamicExpirable(NanoTime.now() + TimeUnit.SECONDS.toNanos(1)); + Collection collection = new ArrayList<>(); collection.add(one); AtomicInteger iterations = new AtomicInteger(); CountDownLatch expiredLatch = new CountDownLatch(1); timeouts = new CyclicTimeouts<>(scheduler) { @Override - protected Iterator iterator() + protected Iterator iterator() { iterations.incrementAndGet(); return collection.iterator(); } @Override - protected boolean onExpired(ConstantExpirable expirable) + protected boolean onExpired(CyclicTimeouts.Expirable expirable) { assertSame(one, expirable); expiredLatch.countDown(); @@ -169,22 +169,22 @@ public class CyclicTimeoutsTest long delayMs = 2000; ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS); ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS); - Collection collection = new ArrayList<>(); + Collection collection = new ArrayList<>(); collection.add(two); CountDownLatch expiredLatch = new CountDownLatch(2); - List expired = new ArrayList<>(); + List expired = new ArrayList<>(); timeouts = new CyclicTimeouts<>(scheduler) { private final AtomicBoolean overtakeScheduled = new AtomicBoolean(); @Override - protected Iterator iterator() + protected Iterator iterator() { return collection.iterator(); } @Override - protected boolean onExpired(ConstantExpirable expirable) + protected boolean onExpired(CyclicTimeouts.Expirable expirable) { expired.add(expirable); expiredLatch.countDown(); @@ -220,6 +220,39 @@ public class CyclicTimeoutsTest assertSame(two, expired.get(1)); } + @Test + public void testDynamicExpirableEntityIsNotifiedMultipleTimes() throws Exception + { + long delay = 500; + DynamicExpirable entity = new DynamicExpirable(NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(delay)); + List entities = List.of(entity); + + CountDownLatch latch = new CountDownLatch(2); + timeouts = new CyclicTimeouts<>(scheduler) + { + @Override + protected Iterator iterator() + { + return entities.iterator(); + } + + @Override + protected boolean onExpired(CyclicTimeouts.Expirable expirable) + { + assertSame(entity, expirable); + // Postpone expiration. + entity.expireNanoTime = NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(delay); + latch.countDown(); + return false; + } + }; + + // Trigger the initial call to iterator(). + timeouts.schedule(entities.get(0)); + + assertTrue(latch.await(3 * delay, TimeUnit.MILLISECONDS), latch.toString()); + } + private static class ConstantExpirable implements CyclicTimeouts.Expirable { private static ConstantExpirable noExpire() @@ -259,4 +292,26 @@ public class CyclicTimeoutsTest return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString); } } + + private static class DynamicExpirable implements CyclicTimeouts.Expirable + { + private long expireNanoTime; + + public DynamicExpirable(long expireNanoTime) + { + this.expireNanoTime = expireNanoTime; + } + + @Override + public long getExpireNanoTime() + { + return expireNanoTime; + } + + @Override + public String toString() + { + return String.format("%s@%x[%dms]", getClass().getSimpleName(), hashCode(), NanoTime.millisUntil(expireNanoTime)); + } + } } diff --git a/jetty-core/jetty-quic/jetty-quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicConnection.java b/jetty-core/jetty-quic/jetty-quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicConnection.java index bba0c545f63..01a1a8b34ed 100644 --- a/jetty-core/jetty-quic/jetty-quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicConnection.java +++ b/jetty-core/jetty-quic/jetty-quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicConnection.java @@ -133,6 +133,8 @@ public class ServerQuicConnection extends QuicConnection protected boolean onExpired(ServerQuicSession session) { session.onIdleTimeout(); + // The implementation of the Iterator returned above does not support + // removal, but the session will be removed by session.onIdleTimeout(). return false; } } diff --git a/jetty-core/jetty-quic/jetty-quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicSession.java b/jetty-core/jetty-quic/jetty-quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicSession.java index 625b036dd38..17598e557ca 100644 --- a/jetty-core/jetty-quic/jetty-quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicSession.java +++ b/jetty-core/jetty-quic/jetty-quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerQuicSession.java @@ -44,7 +44,7 @@ import org.eclipse.jetty.util.thread.Scheduler; public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Expirable { private final Connector connector; - private long expireNanoTime; + private long expireNanoTime = Long.MAX_VALUE; protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector) { @@ -103,6 +103,15 @@ public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Exp getQuicConnection().schedule(this); } + @Override + public boolean onIdleTimeout() + { + boolean result = super.onIdleTimeout(); + if (!result) + notIdle(); + return result; + } + @Override public Runnable process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException {