diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 2e0ecf0d34b..e55582217aa 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -217,8 +218,25 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler path = request.getPath(); } Request newRequest = client.copyRequest(request, requestURI); - // Disable the timeout so that only the one from the initial request applies. - newRequest.timeout(0, TimeUnit.MILLISECONDS); + + // Adjust the timeout of the new request, taking into account the + // timeout of the previous request and the time already elapsed. + long timeoutAt = request.getTimeoutAt(); + if (timeoutAt < Long.MAX_VALUE) + { + long newTimeout = timeoutAt - System.nanoTime(); + if (newTimeout > 0) + { + newRequest.timeout(newTimeout, TimeUnit.NANOSECONDS); + } + else + { + TimeoutException failure = new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed"); + forwardFailureComplete(request, failure, response, failure); + return; + } + } + if (path != null) newRequest.path(path); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java index d9f97dd5e00..e86eeea7c26 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java @@ -19,26 +19,24 @@ package org.eclipse.jetty.client; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.io.CyclicTimeouts; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public abstract class HttpChannel +public abstract class HttpChannel implements CyclicTimeouts.Expirable { protected static final Logger LOG = Log.getLogger(HttpChannel.class); private final HttpDestination _destination; - private final TimeoutCompleteListener _totalTimeout; private HttpExchange _exchange; protected HttpChannel(HttpDestination destination) { _destination = destination; - _totalTimeout = new TimeoutCompleteListener(destination.getHttpClient().getScheduler()); } public void destroy() { - _totalTimeout.destroy(); } public HttpDestination getHttpDestination() @@ -109,6 +107,13 @@ public abstract class HttpChannel } } + @Override + public long getExpireNanoTime() + { + HttpExchange exchange = getHttpExchange(); + return exchange != null ? exchange.getExpireNanoTime() : Long.MAX_VALUE; + } + protected abstract HttpSender getHttpSender(); protected abstract HttpReceiver getHttpReceiver(); @@ -117,16 +122,7 @@ public abstract class HttpChannel { HttpExchange exchange = getHttpExchange(); if (exchange != null) - { - HttpRequest request = exchange.getRequest(); - long timeoutAt = request.getTimeoutAt(); - if (timeoutAt != -1) - { - exchange.getResponseListeners().add(_totalTimeout); - _totalTimeout.schedule(request, timeoutAt); - } send(exchange); - } } public abstract void send(HttpExchange exchange); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index ed8bc1cce18..76f6422f56b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -22,6 +22,7 @@ import java.net.CookieStore; import java.net.HttpCookie; import java.net.URI; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -35,16 +36,19 @@ import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.io.CyclicTimeouts; import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.HttpCookieStore; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; public abstract class HttpConnection implements Connection, Attachable { private static final Logger LOG = Log.getLogger(HttpConnection.class); private final HttpDestination destination; + private final RequestTimeouts requestTimeouts; private Object attachment; private int idleTimeoutGuard; private long idleTimeoutStamp; @@ -52,6 +56,7 @@ public abstract class HttpConnection implements Connection, Attachable protected HttpConnection(HttpDestination destination) { this.destination = destination; + this.requestTimeouts = new RequestTimeouts(destination.getHttpClient().getScheduler()); this.idleTimeoutStamp = System.nanoTime(); } @@ -65,6 +70,8 @@ public abstract class HttpConnection implements Connection, Attachable return destination; } + protected abstract Iterator getHttpChannels(); + @Override public void send(Request request, Response.CompleteListener listener) { @@ -230,6 +237,7 @@ public abstract class HttpConnection implements Connection, Attachable SendFailure result; if (channel.associate(exchange)) { + requestTimeouts.schedule(channel); channel.send(); result = null; } @@ -292,9 +300,40 @@ public abstract class HttpConnection implements Connection, Attachable return attachment; } + protected void destroy() + { + requestTimeouts.destroy(); + } + @Override public String toString() { return String.format("%s@%h", getClass().getSimpleName(), this); } + + private class RequestTimeouts extends CyclicTimeouts + { + private RequestTimeouts(Scheduler scheduler) + { + super(scheduler); + } + + @Override + protected Iterator iterator() + { + return getHttpChannels(); + } + + @Override + protected boolean onExpired(HttpChannel channel) + { + HttpExchange exchange = channel.getHttpExchange(); + if (exchange != null) + { + HttpRequest request = exchange.getRequest(); + request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed")); + } + return false; + } + } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java index 3817ecbfe97..fb345a1e79d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java @@ -23,6 +23,7 @@ import java.util.Deque; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; +import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.util.AttributesMap; import org.eclipse.jetty.util.log.Log; @@ -143,6 +144,20 @@ public class HttpConversation extends AttributesMap this.listeners = listeners; } + /** + *

Returns the total timeout for the conversation.

+ *

The conversation total timeout is the total timeout + * of the first request in the conversation.

+ * + * @return the total timeout of the conversation + * @see Request#getTimeout() + */ + public long getTimeout() + { + HttpExchange firstExchange = exchanges.peekFirst(); + return firstExchange == null ? 0 : firstExchange.getRequest().getTimeout(); + } + public boolean abort(Throwable cause) { HttpExchange exchange = exchanges.peekLast(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 2686b22cf95..8d80fc378a9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -22,12 +22,11 @@ import java.io.Closeable; import java.io.IOException; import java.nio.channels.AsynchronousCloseException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; @@ -36,7 +35,7 @@ import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.ClientConnectionFactory; -import org.eclipse.jetty.io.CyclicTimeout; +import org.eclipse.jetty.io.CyclicTimeouts; import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.HostPort; @@ -55,7 +54,7 @@ import org.eclipse.jetty.util.thread.Sweeper; @ManagedObject public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable { - protected static final Logger LOG = Log.getLogger(HttpDestination.class); + private static final Logger LOG = Log.getLogger(HttpDestination.class); private final HttpClient client; private final Origin origin; @@ -270,10 +269,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest { if (enqueue(exchanges, exchange)) { - long expiresAt = request.getTimeoutAt(); - if (expiresAt != -1) - requestTimeouts.schedule(expiresAt); - + requestTimeouts.schedule(exchange); if (!client.isRunning() && exchanges.remove(exchange)) { request.abort(new RejectedExecutionException(client + " is stopping")); @@ -549,63 +545,27 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest /** *

Enforces the total timeout for for exchanges that are still in the queue.

*

The total timeout for exchanges that are not in the destination queue - * is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.

+ * is enforced in {@link HttpConnection}.

*/ - private class RequestTimeouts extends CyclicTimeout + private class RequestTimeouts extends CyclicTimeouts { - private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE); - private RequestTimeouts(Scheduler scheduler) { super(scheduler); } @Override - public void onTimeoutExpired() + protected Iterator iterator() { - if (LOG.isDebugEnabled()) - LOG.debug("{} timeouts check", this); - - long now = System.nanoTime(); - long earliest = Long.MAX_VALUE; - // Reset the earliest timeout so we can expire again. - // A concurrent call to schedule(long) may lose an earliest - // value, but the corresponding exchange is already enqueued - // and will be seen by scanning the exchange queue below. - earliestTimeout.set(earliest); - - // Scan the message queue to abort expired exchanges - // and to find the exchange that expire the earliest. - for (HttpExchange exchange : exchanges) - { - HttpRequest request = exchange.getRequest(); - long expiresAt = request.getTimeoutAt(); - if (expiresAt == -1) - continue; - if (expiresAt <= now) - request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed")); - else if (expiresAt < earliest) - earliest = expiresAt; - } - - if (earliest < Long.MAX_VALUE && client.isRunning()) - schedule(earliest); + return exchanges.iterator(); } - private void schedule(long expiresAt) + @Override + protected boolean onExpired(HttpExchange exchange) { - // Schedule a timeout for the earliest exchange that may expire. - // When the timeout expires, scan the exchange queue for the next - // earliest exchange that may expire, and reschedule a new timeout. - long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt)); - if (expiresAt < prevEarliest) - { - // A new request expires earlier than previous requests, schedule it. - long delay = Math.max(0, expiresAt - System.nanoTime()); - if (LOG.isDebugEnabled()) - LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay)); - schedule(delay, TimeUnit.NANOSECONDS); - } + HttpRequest request = exchange.getRequest(); + request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed")); + return false; } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 439be1817ad..f1fdcaf505c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -22,10 +22,11 @@ import java.util.List; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.io.CyclicTimeouts; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class HttpExchange +public class HttpExchange implements CyclicTimeouts.Expirable { private static final Logger LOG = Log.getLogger(HttpExchange.class); @@ -86,6 +87,12 @@ public class HttpExchange } } + @Override + public long getExpireNanoTime() + { + return request.getTimeoutAt(); + } + /** *

Associates the given {@code channel} to this exchange.

*

Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.

diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 33985f909b8..e1e0c779d4c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -474,9 +474,9 @@ public abstract class HttpReceiver boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering(); if (!ordered) channel.exchangeTerminated(exchange, result); - if (LOG.isDebugEnabled()) - LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result); List listeners = exchange.getConversation().getResponseListeners(); + if (LOG.isDebugEnabled()) + LOG.debug("Request/Response {}: {}, notifying {}", failure == null ? "succeeded" : "failed", result, listeners); ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); notifier.notifyComplete(listeners, result); if (ordered) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRedirector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRedirector.java index 6b378e3f9db..4eb85ebd818 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRedirector.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRedirector.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -111,8 +112,8 @@ public class HttpRedirector */ public Result redirect(Request request, Response response) throws InterruptedException, ExecutionException { - final AtomicReference resultRef = new AtomicReference<>(); - final CountDownLatch latch = new CountDownLatch(1); + AtomicReference resultRef = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); Request redirect = redirect(request, response, new BufferingResponseListener() { @Override @@ -307,13 +308,29 @@ public class HttpRedirector } } - private Request sendRedirect(final HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method) + private Request sendRedirect(HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method) { try { Request redirect = client.copyRequest(httpRequest, location); - // Disable the timeout so that only the one from the initial request applies. - redirect.timeout(0, TimeUnit.MILLISECONDS); + + // Adjust the timeout of the new request, taking into account the + // timeout of the previous request and the time already elapsed. + long timeoutAt = httpRequest.getTimeoutAt(); + if (timeoutAt < Long.MAX_VALUE) + { + long newTimeout = timeoutAt - System.nanoTime(); + if (newTimeout > 0) + { + redirect.timeout(newTimeout, TimeUnit.NANOSECONDS); + } + else + { + TimeoutException failure = new TimeoutException("Total timeout " + httpRequest.getConversation().getTimeout() + " ms elapsed"); + fail(httpRequest, failure, response); + return null; + } + } // Use given method redirect.method(method); @@ -330,17 +347,27 @@ public class HttpRedirector } catch (Throwable x) { - fail(httpRequest, response, x); + fail(httpRequest, x, response); return null; } } protected void fail(Request request, Response response, Throwable failure) + { + fail(request, null, response, failure); + } + + protected void fail(Request request, Throwable failure, Response response) + { + fail(request, failure, response, failure); + } + + private void fail(Request request, Throwable requestFailure, Response response, Throwable responseFailure) { HttpConversation conversation = ((HttpRequest)request).getConversation(); conversation.updateResponseListeners(null); List listeners = conversation.getResponseListeners(); - notifier.notifyFailure(listeners, response, failure); - notifier.notifyComplete(listeners, new Result(request, response, failure)); + notifier.notifyFailure(listeners, response, responseFailure); + notifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure)); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index 2b94c47edcc..0f52bdb3f22 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -80,7 +80,7 @@ public class HttpRequest implements Request private HttpVersion version = HttpVersion.HTTP_1_1; private long idleTimeout = -1; private long timeout; - private long timeoutAt; + private long timeoutAt = Long.MAX_VALUE; private ContentProvider content; private boolean followRedirects; private List cookies; @@ -781,11 +781,12 @@ public class HttpRequest implements Request void sent() { long timeout = getTimeout(); - timeoutAt = timeout > 0 ? System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) : -1; + if (timeout > 0) + timeoutAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout); } /** - * @return The nanoTime at which the timeout expires or -1 if there is no timeout. + * @return The nanoTime at which the timeout expires or {@link Long#MAX_VALUE} if there is no timeout. * @see #timeout(long, TimeUnit) */ long getTimeoutAt() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java index 46417885779..c9006758156 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java @@ -26,10 +26,15 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.io.CyclicTimeout; +import org.eclipse.jetty.io.CyclicTimeouts; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; +/** + * @deprecated Do not use it, use {@link CyclicTimeouts} instead. + */ +@Deprecated public class TimeoutCompleteListener extends CyclicTimeout implements Response.CompleteListener { private static final Logger LOG = Log.getLogger(TimeoutCompleteListener.class); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 2acc7569a08..f0ae5c88b15 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -20,11 +20,14 @@ package org.eclipse.jetty.client.http; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; +import java.util.Collections; +import java.util.Iterator; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; +import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; @@ -243,6 +246,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec super(destination); } + @Override + protected Iterator getHttpChannels() + { + return Collections.singleton(channel).iterator(); + } + @Override protected SendFailure send(HttpExchange exchange) { @@ -264,6 +273,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec public void close() { HttpConnectionOverHTTP.this.close(); + destroy(); } @Override diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientRedirectTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientRedirectTest.java index 498980a9636..50e6add2cf6 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientRedirectTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientRedirectTest.java @@ -521,6 +521,117 @@ public class HttpClientRedirectTest extends AbstractHttpClientServerTest }); } + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testRedirectToDifferentHostThenRequestToFirstHostExpires(Scenario scenario) throws Exception + { + long timeout = 1000; + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + if ("/one".equals(target)) + { + response.setStatus(HttpStatus.SEE_OTHER_303); + response.setHeader(HttpHeader.LOCATION.asString(), scenario.getScheme() + "://127.0.0.1:" + connector.getLocalPort() + "/two"); + } + else if ("/two".equals(target)) + { + try + { + // Send another request to "localhost", therefore reusing the + // connection used for the first request, it must timeout. + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .path("/three") + .timeout(timeout, TimeUnit.MILLISECONDS) + .send(result -> + { + if (result.getFailure() instanceof TimeoutException) + latch.countDown(); + }); + // Wait for the request to fail as it should. + assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS)); + } + catch (Throwable x) + { + throw new ServletException(x); + } + } + else if ("/three".equals(target)) + { + try + { + // The third request must timeout. + Thread.sleep(2 * timeout); + } + catch (InterruptedException x) + { + throw new ServletException(x); + } + } + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .path("/one") + // The timeout should not expire, but must be present to trigger the test conditions. + .timeout(3 * timeout, TimeUnit.MILLISECONDS) + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testManyRedirectsTotalTimeoutExpires(Scenario scenario) throws Exception + { + long timeout = 1000; + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + try + { + String serverURI = scenario.getScheme() + "://localhost:" + connector.getLocalPort(); + if ("/one".equals(target)) + { + Thread.sleep(timeout); + response.setStatus(HttpStatus.SEE_OTHER_303); + response.setHeader(HttpHeader.LOCATION.asString(), serverURI + "/two"); + } + else if ("/two".equals(target)) + { + Thread.sleep(timeout); + response.setStatus(HttpStatus.SEE_OTHER_303); + response.setHeader(HttpHeader.LOCATION.asString(), serverURI + "/three"); + } + else if ("/three".equals(target)) + { + Thread.sleep(2 * timeout); + } + } + catch (InterruptedException x) + { + throw new ServletException(x); + } + } + }); + + assertThrows(TimeoutException.class, () -> + { + client.setMaxRedirects(-1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .path("/one") + .timeout(3 * timeout, TimeUnit.MILLISECONDS) + .send(); + }); + } + private void testSameMethodRedirect(final Scenario scenario, final HttpMethod method, int redirectCode) throws Exception { testMethodRedirect(scenario, method, method, redirectCode); @@ -569,7 +680,7 @@ public class HttpClientRedirectTest extends AbstractHttpClientServerTest assertEquals(200, response.getStatus()); } - private class RedirectHandler extends EmptyServerHandler + private static class RedirectHandler extends EmptyServerHandler { @Override protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index b38e1f738e9..5149eba51e8 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.fcgi.client.http; import java.io.EOFException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; +import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Queue; @@ -373,6 +374,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec super(destination); } + @Override + protected Iterator getHttpChannels() + { + return new IteratorWrapper<>(activeChannels.values().iterator()); + } + @Override protected SendFailure send(HttpExchange exchange) { @@ -391,6 +398,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec public void close() { HttpConnectionOverFCGI.this.close(); + destroy(); } protected void close(Throwable failure) @@ -511,4 +519,32 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec LOG.debug("Channel not found for request {}", request); } } + + private static final class IteratorWrapper implements Iterator + { + private final Iterator iterator; + + private IteratorWrapper(Iterator iterator) + { + this.iterator = iterator; + } + + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public T next() + { + return iterator.next(); + } + + @Override + public void remove() + { + iterator.remove(); + } + } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index b7dbad78bde..54c34582a68 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.http2.client.http; import java.nio.channels.AsynchronousCloseException; +import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -73,6 +74,12 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S this.recycleHttpChannels = recycleHttpChannels; } + @Override + protected Iterator getHttpChannels() + { + return activeChannels.iterator(); + } + @Override protected SendFailure send(HttpExchange exchange) { @@ -133,6 +140,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S public void close() { close(new AsynchronousCloseException()); + destroy(); } protected void close(Throwable failure) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java index 60a04ec0b39..35086629b9e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java @@ -44,12 +44,6 @@ import static java.lang.Long.MAX_VALUE; * session there is a CyclicTimeout and at the beginning of the * request processing the timeout is canceled (via cancel()), but at * the end of the request processing the timeout is re-scheduled.

- *

Another typical scenario is for a parent entity to manage - * the timeouts of many children entities; the timeout is scheduled - * for the child entity that expires the earlier; when the timeout - * expires, the implementation scans the children entities to find - * the expired child entities and to find the next child entity - * that expires the earlier.

*

This implementation has a {@link Timeout} holding the time * at which the scheduled task should fire, and a linked list of * {@link Wakeup}, each holding the actual scheduled task.

@@ -64,6 +58,8 @@ import static java.lang.Long.MAX_VALUE; * When the Wakeup task fires, it will see that the Timeout is now * in the future and will attach a new Wakeup with the future time * to the Timeout, and submit a scheduler task for the new Wakeup.

+ * + * @see CyclicTimeouts */ public abstract class CyclicTimeout implements Destroyable { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java new file mode 100644 index 00000000000..a36da726563 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java @@ -0,0 +1,199 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.util.Iterator; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jetty.util.component.Destroyable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; + +/** + *

An implementation of a timeout that manages many {@link Expirable expirable} entities whose + * timeouts are mostly cancelled or re-scheduled.

+ *

A typical scenario is for a parent entity to manage the timeouts of many children entities.

+ *

When a new entity is created, call {@link #schedule(Expirable)} with the new entity so that + * this instance can be aware and manage the timeout of the new entity.

+ *

Eventually, this instance wakes up and iterates over the entities provided by {@link #iterator()}. + * During the iteration, each entity:

+ *
    + *
  • may never expire (see {@link Expirable#getExpireNanoTime()}; the entity is ignored
  • + *
  • may be expired; {@link #onExpired(Expirable)} is called with that entity as parameter
  • + *
  • may expire at a future time; the iteration records the earliest expiration time among + * all non-expired entities
  • + *
+ *

When the iteration is complete, this instance is re-scheduled with the earliest expiration time + * calculated during the iteration.

+ * + * @param the {@link Expirable} entity type + * @see CyclicTimeout + */ +public abstract class CyclicTimeouts implements Destroyable +{ + private static final Logger LOG = Log.getLogger(CyclicTimeouts.class); + + private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE); + private final CyclicTimeout cyclicTimeout; + + public CyclicTimeouts(Scheduler scheduler) + { + cyclicTimeout = new Timeouts(scheduler); + } + + /** + * @return the entities to iterate over when this instance expires + */ + protected abstract Iterator iterator(); + + /** + *

Invoked during the iteration when the given entity is expired.

+ *

This method may be invoked multiple times, and even concurrently, + * for the same expirable entity and therefore the expiration of the + * entity, if any, should be an idempotent action.

+ * + * @param expirable the entity that is expired + * @return whether the entity should be removed from the iterator via {@link Iterator#remove()} + */ + protected abstract boolean onExpired(T expirable); + + private void onTimeoutExpired() + { + if (LOG.isDebugEnabled()) + LOG.debug("Timeouts check for {}", this); + + long now = System.nanoTime(); + long earliest = Long.MAX_VALUE; + // Reset the earliest timeout so we can expire again. + // A concurrent call to schedule(long) may lose an + // earliest value, but the corresponding entity will + // be seen during the iteration below. + earliestTimeout.set(earliest); + + Iterator iterator = iterator(); + if (iterator == null) + return; + + // Scan the entities to abort expired entities + // and to find the entity that expires the earliest. + while (iterator.hasNext()) + { + T expirable = iterator.next(); + long expiresAt = expirable.getExpireNanoTime(); + + if (LOG.isDebugEnabled()) + LOG.debug("Entity {} expires in {} ms for {}", expirable, TimeUnit.NANOSECONDS.toMillis(expiresAt - now), this); + + if (expiresAt == -1) + continue; + + if (expiresAt <= now) + { + boolean remove = onExpired(expirable); + if (LOG.isDebugEnabled()) + LOG.debug("Entity {} expired, remove={} for {}", expirable, remove, this); + if (remove) + iterator.remove(); + continue; + } + earliest = Math.min(earliest, expiresAt); + } + + if (earliest < Long.MAX_VALUE) + schedule(earliest); + } + + /** + *

Manages the timeout of a new entity.

+ * + * @param expirable the new entity to manage the timeout for + */ + public void schedule(T expirable) + { + long expiresAt = expirable.getExpireNanoTime(); + if (expiresAt < Long.MAX_VALUE) + schedule(expiresAt); + } + + private void schedule(long expiresAt) + { + // Schedule a timeout for the earliest entity that may expire. + // When the timeout expires, scan the entities for the next + // earliest entity that may expire, and reschedule a new timeout. + long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt)); + long expires = expiresAt; + while (expires < prevEarliest) + { + // A new entity expires earlier than previous entities, schedule it. + long delay = Math.max(0, expires - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("Scheduling timeout in {} ms for {}", TimeUnit.NANOSECONDS.toMillis(delay), this); + schedule(cyclicTimeout, delay, TimeUnit.NANOSECONDS); + + // If we lost a race and overwrote a schedule() with an earlier time, then that earlier time + // is remembered by earliestTimeout, in which case we will loop and set it again ourselves. + prevEarliest = expires; + expires = earliestTimeout.get(); + } + } + + @Override + public void destroy() + { + cyclicTimeout.destroy(); + } + + boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit) + { + return cyclicTimeout.schedule(delay, unit); + } + + /** + *

An entity that may expire.

+ */ + public interface Expirable + { + /** + *

Returns the expiration time in nanoseconds.

+ *

The value to return must be calculated taking into account {@link System#nanoTime()}, + * for example:

+ * {@code expireNanoTime = System.nanoTime() + timeoutNanos} + *

Returning {@link Long#MAX_VALUE} indicates that this entity does not expire.

+ * + * @return the expiration time in nanoseconds, or {@link Long#MAX_VALUE} if this entity does not expire + */ + public long getExpireNanoTime(); + } + + private class Timeouts extends CyclicTimeout + { + private Timeouts(Scheduler scheduler) + { + super(scheduler); + } + + @Override + public void onTimeoutExpired() + { + CyclicTimeouts.this.onTimeoutExpired(); + } + } +} diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java new file mode 100644 index 00000000000..62769882a44 --- /dev/null +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java @@ -0,0 +1,266 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.eclipse.jetty.util.thread.Scheduler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CyclicTimeoutsTest +{ + private Scheduler scheduler; + private CyclicTimeouts timeouts; + + @BeforeEach + public void prepare() + { + scheduler = new ScheduledExecutorScheduler(); + LifeCycle.start(scheduler); + } + + @AfterEach + public void dispose() + { + if (timeouts != null) + timeouts.destroy(); + LifeCycle.stop(scheduler); + } + + @Test + public void testNoExpirationForNonExpiringEntity() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + timeouts = new CyclicTimeouts(scheduler) + { + @Override + protected Iterator iterator() + { + latch.countDown(); + return null; + } + + @Override + protected boolean onExpired(ConstantExpirable expirable) + { + return false; + } + }; + + // Schedule an entity that does not expire. + timeouts.schedule(ConstantExpirable.noExpire()); + + Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testScheduleZero() throws Exception + { + ConstantExpirable entity = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS); + CountDownLatch iteratorLatch = new CountDownLatch(1); + CountDownLatch expiredLatch = new CountDownLatch(1); + timeouts = new CyclicTimeouts(scheduler) + { + @Override + protected Iterator iterator() + { + iteratorLatch.countDown(); + return Collections.emptyIterator(); + } + + @Override + protected boolean onExpired(ConstantExpirable expirable) + { + expiredLatch.countDown(); + return false; + } + }; + + timeouts.schedule(entity); + + Assertions.assertTrue(iteratorLatch.await(1, TimeUnit.SECONDS)); + Assertions.assertFalse(expiredLatch.await(1, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testIterateAndExpire(boolean remove) throws Exception + { + ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS); + ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS); + Collection collection = new ArrayList<>(); + collection.add(one); + AtomicInteger iterations = new AtomicInteger(); + CountDownLatch expiredLatch = new CountDownLatch(1); + timeouts = new CyclicTimeouts(scheduler) + { + @Override + protected Iterator iterator() + { + iterations.incrementAndGet(); + return collection.iterator(); + } + + @Override + protected boolean onExpired(ConstantExpirable expirable) + { + assertSame(one, expirable); + expiredLatch.countDown(); + return remove; + } + }; + + // Triggers immediate call to iterator(), which + // returns an entity that expires in 1 second. + timeouts.schedule(zero); + + // After 1 second there is a second call to + // iterator(), which returns the now expired + // entity, which is passed to onExpired(). + assertTrue(expiredLatch.await(2, TimeUnit.SECONDS)); + + // Wait for the collection to be processed + // with the return value of onExpired(). + Thread.sleep(1000); + + // Verify the processing of the return value of onExpired(). + assertEquals(remove ? 0 : 1, collection.size()); + + // Wait to see if iterator() is called again (it should not). + Thread.sleep(1000); + assertEquals(2, iterations.get()); + } + + @Test + public void testScheduleOvertake() throws Exception + { + ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS); + long delayMs = 2000; + ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS); + ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS); + Collection collection = new ArrayList<>(); + collection.add(two); + CountDownLatch expiredLatch = new CountDownLatch(2); + List expired = new ArrayList<>(); + timeouts = new CyclicTimeouts(scheduler) + { + private final AtomicBoolean overtakeScheduled = new AtomicBoolean(); + + @Override + protected Iterator iterator() + { + return collection.iterator(); + } + + @Override + protected boolean onExpired(ConstantExpirable expirable) + { + expired.add(expirable); + expiredLatch.countDown(); + return true; + } + + @Override + boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit) + { + if (delay <= 0) + return super.schedule(cyclicTimeout, delay, unit); + + // Simulate that an entity with a shorter timeout + // overtakes the entity that is currently being scheduled. + // Only schedule the overtake once. + if (overtakeScheduled.compareAndSet(false, true)) + { + collection.add(overtake); + schedule(overtake); + } + return super.schedule(cyclicTimeout, delay, unit); + } + }; + + // Trigger the initial call to iterator(). + timeouts.schedule(zero); + + // Make sure that the overtake entity expires properly. + assertTrue(expiredLatch.await(2 * delayMs, TimeUnit.MILLISECONDS)); + + // Make sure all entities expired properly. + assertSame(overtake, expired.get(0)); + assertSame(two, expired.get(1)); + } + + private static class ConstantExpirable implements CyclicTimeouts.Expirable + { + private static ConstantExpirable noExpire() + { + return new ConstantExpirable(); + } + + private static ConstantExpirable ofDelay(long delay, TimeUnit unit) + { + return new ConstantExpirable(delay, unit); + } + + private final long expireNanos; + private final String asString; + + private ConstantExpirable() + { + this.expireNanos = Long.MAX_VALUE; + this.asString = "noexp"; + } + + public ConstantExpirable(long delay, TimeUnit unit) + { + this.expireNanos = System.nanoTime() + unit.toNanos(delay); + this.asString = String.valueOf(unit.toMillis(delay)); + } + + @Override + public long getExpireNanoTime() + { + return expireNanos; + } + + @Override + public String toString() + { + return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString); + } + } +}