From 8c10ea8a9bf551561ad6ea936729c6a960100578 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 28 Nov 2023 12:56:17 +0100 Subject: [PATCH] Fixes #10912 - Document Request listeners (#10920) * Fixes #10912 - Document Request listeners * Documented Request listeners and updated javadocs. * Removed code in HttpChannelState.onIdleTimeout() that was automatically complete the Handler callback. * Invoking failure listeners only once (although HttpChannelState.onFailure() may be called multiple times). * Made sure that in ChannelCallback.succeeded() the last stream send uses the ChannelResponse as Callback, like it is done in Response.write(). * Moved Request listeners tests from various test classes into RequestListenersTest. Signed-off-by: Simone Bordet --- .../http/server-http-handler-implement.adoc | 51 ++ .../org/eclipse/jetty/server/Request.java | 56 +- .../org/eclipse/jetty/server/Response.java | 2 +- .../jetty/server/handler/ErrorHandler.java | 13 +- .../server/internal/HttpChannelState.java | 69 ++- .../jetty/server/HttpServerTestBase.java | 52 -- .../jetty/server/RequestListenersTest.java | 483 ++++++++++++++++++ .../org/eclipse/jetty/server/ServerTest.java | 195 ------- .../client/transport/ServerTimeoutsTest.java | 68 +-- 9 files changed, 597 insertions(+), 392 deletions(-) create mode 100644 jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java diff --git a/documentation/jetty-documentation/src/main/asciidoc/programming-guide/server/http/server-http-handler-implement.adoc b/documentation/jetty-documentation/src/main/asciidoc/programming-guide/server/http/server-http-handler-implement.adoc index 535a6d14efc..5408847a116 100644 --- a/documentation/jetty-documentation/src/main/asciidoc/programming-guide/server/http/server-http-handler-implement.adoc +++ b/documentation/jetty-documentation/src/main/asciidoc/programming-guide/server/http/server-http-handler-implement.adoc @@ -156,6 +156,57 @@ It is therefore mandatory to use `CompletableFuture` APIs that are invoked even Failing to do so may result in the `Handler` `callback` parameter to never be completed, causing the request processing to hang forever. ==== +[[pg-server-http-handler-impl-request-listeners]] +====== `Request` Listeners + +Application may add listeners to the `Request` object to be notified of particular events happening during the request/response processing. + +`Request.addIdleTimeoutListener(Predicate)` allows you to add an idle timeout listener, which is invoked when an idle timeout period elapses during the request/response processing, if the idle timeout event is not notified otherwise. + +When an idle timeout event happens, it is delivered to the application as follows: + +* If there is pending demand (via `Request.demand(Runnable)`), then the demand `Runnable` is invoked and the application may see the idle timeout failure by reading from the `Request`, obtaining a xref:pg-arch-io-content-source[transient failure chunk]. +* If there is a pending response write (via `Response.write(boolean, ByteBuffer, Callback)`), the response write `Callback` is failed. +* If neither of the above, the idle timeout listeners are invoked, in the same order they have been added. +The first idle timeout listener that returns `true` stops the Jetty implementation from invoking the idle timeout listeners that follow. + +The idle timeout listeners are therefore invoked only when the application is really idle, neither trying to read nor trying to write. + +An idle timeout listener may return `true` to indicate that the idle timeout should be treated as a fatal failure of the request/response processing; otherwise the listener may return `false` to indicate that no further handling of the idle timeout is needed from the Jetty implementation. + +When idle timeout listeners return `false`, then any subsequent idle timeouts are handled as above. +In the case that the application does not initiate any read or write, then the idle timeout listeners are invoked again after another idle timeout period. + +`Request.addFailureListener(Consumer)` allows you to add a failure listener, which is invoked when a failure happens during the request/response processing. + +When a failure happens during the request/response processing, then: + +* The pending demand for request content, if any, is invoked; that is, the `Runnable` passed to `Request.demand(Runnable)` is invoked. +* The callback of an outstanding call to `Response.write(boolean, ByteBuffer, Callback)`, if any, is failed. +* The failure listeners are invoked, in the same order they have been added. + +Failure listeners are invoked also in case of idle timeouts, in the following cases: + +* At least one idle timeout listener returned `true` to indicate to the Jetty implementation to treat the idle timeout as a fatal failure. +* There are no idle timeout listeners. + +Failures reported to a failure listener are always fatal failures; see also xref:pg-arch-io-content-source[this section] about fatal versus transient failures. + +[NOTE] +==== +Applications are always required to complete the `Handler` callback, as described xref:pg-server-http-handler-impl[here]. +In case of asynchronous failures, failure listeners are a good place to complete (typically by failing it) the `Handler` callback. +==== + +`Request.addCompletionListener(Consumer)` allows you to add a completion listener, which is invoked at the very end of the request/response processing. +This is equivalent to adding an `HttpStream` wrapper and overriding both `HttpStream.succeeded()` and `HttpStream.failed(Throwable)`. + +Completion listeners are typically (but not only) used to recycle or dispose resources used during the request/response processing, or get a precise timing for when the request/response processing finishes, to be paired with `Request.getBeginNanoTime()`. + +Note that while failure listeners are invoked as soon as the failure happens, completion listeners are invoked only at the very end of the request/response processing: after the `Callback` passed to `Handler.handle(Request, Response, Callback)` has been completed, all container dispatched threads have returned, and all the response writes have been completed. + +In case of many completion listeners, they are invoked in the reverse order they have been added. + [[pg-server-http-handler-impl-response]] ====== Using the `Response` diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index c260ca0d69d..560f30078d6 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -287,36 +287,34 @@ public interface Request extends Attributes, Content.Source /** *

Adds a listener for idle timeouts.

*

The listener is a predicate function that should return {@code true} to indicate - * that the idle timeout should be handled by the container as a hard failure - * (see {@link #addFailureListener(Consumer)}); or {@code false} to ignore that specific timeout and for another timeout - * to occur after another idle period.

- *

Any pending {@link #demand(Runnable)} or {@link Response#write(boolean, ByteBuffer, Callback)} operations - * are not affected by this call. Applications need to be mindful of any such pending operations if attempting - * to make new operations.

- *

Listeners are processed in sequence, and the first that returns {@code true} - * stops the processing of subsequent listeners, which are therefore not invoked.

+ * that the idle timeout should be handled by the container as a fatal failure + * (see {@link #addFailureListener(Consumer)}); or {@code false} to ignore that specific + * timeout and for another timeout to occur after another idle period.

+ *

Idle timeout listeners are only invoked if there are no pending + * {@link #demand(Runnable)} or {@link Response#write(boolean, ByteBuffer, Callback)} + * operations.

+ *

Listeners are processed in the same order they are added, and the first that + * returns {@code true} stops the processing of subsequent listeners, which are + * therefore not invoked.

* - * @param onIdleTimeout the predicate function + * @param onIdleTimeout the idle timeout listener as a predicate function * @see #addFailureListener(Consumer) */ void addIdleTimeoutListener(Predicate onIdleTimeout); /** - *

Adds a listener for asynchronous hard errors.

- *

When a listener is called, the effects of the error will already have taken place:

+ *

Adds a listener for asynchronous fatal failures.

+ *

When a listener is called, the effects of the failure have already taken place:

*
    - *
  • Pending {@link #demand(Runnable)} will be woken up.
  • - *
  • Calls to {@link #read()} will return the {@code Throwable}.
  • - *
  • Pending and new {@link Response#write(boolean, ByteBuffer, Callback)} calls will be failed by - * calling {@link Callback#failed(Throwable)} on the callback passed to {@code write(...)}.
  • - *
  • Any call to {@link Callback#succeeded()} on the callback passed to - * {@link Handler#handle(Request, Response, Callback)} will effectively be a call to {@link Callback#failed(Throwable)} - * with the notified {@link Throwable}.
  • + *
  • Pending {@link #demand(Runnable)} have been woken up.
  • + *
  • Calls to {@link #read()} will return the {@code Throwable} failure.
  • + *
  • Pending and new {@link Response#write(boolean, ByteBuffer, Callback)} calls + * will be failed by calling {@link Callback#failed(Throwable)} on the callback + * passed to {@link Response#write(boolean, ByteBuffer, Callback)}.
  • *
- *

Listeners are processed in sequence. When all listeners are invoked then {@link Callback#failed(Throwable)} - * will be called on the callback passed to {@link Handler#handle(Request, Response, Callback)}.

+ *

Listeners are processed in the same order they are added.

* - * @param onFailure the consumer function + * @param onFailure the failure listener as a consumer function * @see #addIdleTimeoutListener(Predicate) */ void addFailureListener(Consumer onFailure); @@ -331,15 +329,17 @@ public interface Request extends Attributes, Content.Source void addHttpStreamWrapper(Function wrapper); /** - * Adds a completion listener that is an optimized equivalent to overriding the - * {@link HttpStream#succeeded()} and {@link HttpStream#failed(Throwable)} methods - * of a {@link HttpStream.Wrapper} created by a call to {@link #addHttpStreamWrapper(Function)}. - * In the case of a failure, the {@link Throwable} cause is passed to the listener, but unlike + *

Adds a completion listener that is an optimized equivalent to overriding the + * {@link HttpStream#succeeded()} and {@link HttpStream#failed(Throwable)} methods of a + * {@link HttpStream.Wrapper} created by a call to {@link #addHttpStreamWrapper(Function)}.

+ *

Because adding completion listeners relies on {@link HttpStream} wrapping, + * the completion listeners are invoked in reverse order they are added.

+ *

In the case of a failure, the {@link Throwable} cause is passed to the listener, but unlike * {@link #addFailureListener(Consumer)} listeners, which are called when the failure occurs, completion - * listeners are called only once the {@link HttpStream} is completed at the very end of processing. + * listeners are called only once the {@link HttpStream} is completed at the very end of processing.

* - * @param listener A {@link Consumer} of {@link Throwable} to call when the request handling is complete. The - * listener is passed a null {@link Throwable} on success. + * @param listener A {@link Consumer} of {@link Throwable} to call when the request handling is complete. + * The listener is passed a {@code null} {@link Throwable} on success. * @see #addHttpStreamWrapper(Function) */ static void addCompletionListener(Request request, Consumer listener) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index af24370e119..3800130afd6 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -530,7 +530,7 @@ public interface Response extends Content.Sink if (errorHandler.handle(errorRequest, response, callback)) return; } - catch (Exception e) + catch (Throwable e) { if (cause != null && cause != e) cause.addSuppressed(e); diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java index 97fabe0b461..caad1fb0639 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java @@ -86,7 +86,7 @@ public class ErrorHandler implements Request.Handler } @Override - public boolean handle(Request request, Response response, Callback callback) + public boolean handle(Request request, Response response, Callback callback) throws Exception { if (LOG.isDebugEnabled()) LOG.debug("handle({}, {}, {})", request, response, callback); @@ -112,16 +112,7 @@ public class ErrorHandler implements Request.Handler { if (message == null) message = cause == null ? HttpStatus.getMessage(code) : cause.toString(); - - try - { - generateResponse(request, response, code, message, cause, callback); - } - catch (Throwable x) - { - // TODO: cannot write the error response, give up and close the stream. - LOG.warn("Failure whilst generate error response", x); - } + generateResponse(request, response, code, message, cause, callback); } return true; } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index 6093fab1b47..811f4237d48 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -91,7 +91,7 @@ public class HttpChannelState implements HttpChannel, Components } private static final Logger LOG = LoggerFactory.getLogger(HttpChannelState.class); - private static final Throwable DO_NOT_SEND = new Throwable("No Send"); + private static final Throwable NOTHING_TO_SEND = new Throwable("nothing_to_send"); private static final HttpField SERVER_VERSION = new ResponseHttpFields.PersistentPreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); private static final HttpField POWERED_BY = new ResponseHttpFields.PersistentPreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION); @@ -362,14 +362,6 @@ public class HttpChannelState implements HttpChannel, Components } }); } - - // otherwise, if there is no failure listener, then we can fail the callback directly without a double lock - ChannelRequest request = _request; - if (_onFailure == null && request != null) - { - _failure = Content.Chunk.from(t, true); - return () -> request._callback.failed(t); - } } } @@ -388,9 +380,9 @@ public class HttpChannelState implements HttpChannel, Components LOG.debug("onFailure {}", this, x); // If the channel doesn't have a stream, then the error is ignored. - if (_stream == null) - return null; stream = _stream; + if (stream == null) + return null; if (_request == null) { @@ -414,16 +406,16 @@ public class HttpChannelState implements HttpChannel, Components } else { - // if we are currently demanding, take the onContentAvailable runnable to invoke below. + // If there is demand, take the onContentAvailable runnable to invoke below. Runnable invokeOnContentAvailable = _onContentAvailable; _onContentAvailable = null; - // If a write call is in progress, take the writeCallback to fail below + // If a write call is in progress, take the writeCallback to fail below. Runnable invokeWriteFailure = _response.lockedFailWrite(x); - // Create runnable to invoke any onError listeners - + // Notify the failure listeners only once. Consumer onFailure = _onFailure; + _onFailure = null; Runnable invokeOnFailureListeners = onFailure == null ? null : () -> { try @@ -504,7 +496,7 @@ public class HttpChannelState implements HttpChannel, Components // they are flushed. The DO_NOT_SEND option supports these by turning such writes into a NOOP. case LAST_SENDING, LAST_COMPLETE -> (length > 0) ? new IllegalStateException("last already written") - : DO_NOT_SEND; + : NOTHING_TO_SEND; }; } @@ -1124,7 +1116,7 @@ public class HttpChannelState implements HttpChannel, Components long length = BufferUtil.length(content); HttpChannelState httpChannelState; - HttpStream stream = null; + HttpStream stream; Throwable failure; MetaData.Response responseMetaData = null; try (AutoLock ignored = _request._lock.lock()) @@ -1135,7 +1127,9 @@ public class HttpChannelState implements HttpChannel, Components long contentLength = committedContentLength >= 0 ? committedContentLength : getHeaders().getLongField(HttpHeader.CONTENT_LENGTH); if (_writeCallback != null) + { failure = new IllegalStateException("write pending"); + } else { failure = getFailure(httpChannelState); @@ -1157,37 +1151,34 @@ public class HttpChannelState implements HttpChannel, Components } } - // If no failure by this point, we can try to send + // If no failure by this point, we can try to switch to sending state. if (failure == null) failure = httpChannelState.lockedStreamSend(last, length); - // Have we failed in some way? - if (failure == DO_NOT_SEND) + if (failure == NOTHING_TO_SEND) { httpChannelState._serializedInvoker.run(callback::succeeded); + return; } - else if (failure != null) + // Have we failed in some way? + if (failure != null) { Throwable throwable = failure; httpChannelState._serializedInvoker.run(() -> callback.failed(throwable)); + return; } - else - { - // We have not failed, so we will do a stream send - _writeCallback = callback; - _contentBytesWritten = totalWritten; - stream = httpChannelState._stream; - if (_httpFields.commit()) - responseMetaData = lockedPrepareResponse(httpChannelState, last); - } + + // No failure, do the actual stream send using the ChannelResponse as the callback. + _writeCallback = callback; + _contentBytesWritten = totalWritten; + stream = httpChannelState._stream; + if (_httpFields.commit()) + responseMetaData = lockedPrepareResponse(httpChannelState, last); } - if (failure == null) - { - if (LOG.isDebugEnabled()) - LOG.debug("writing last={} {} {}", last, BufferUtil.toDetailString(content), this); - stream.send(_request._metaData, responseMetaData, last, content, this); - } + if (LOG.isDebugEnabled()) + LOG.debug("writing last={} {} {}", last, BufferUtil.toDetailString(content), this); + stream.send(_request._metaData, responseMetaData, last, content, this); } protected Throwable getFailure(HttpChannelState httpChannelState) @@ -1395,6 +1386,8 @@ public class HttpChannelState implements HttpChannel, Components needLastStreamSend = httpChannelState.lockedLastStreamSend(); completeStream = !needLastStreamSend && httpChannelState._handling == null && httpChannelState.lockedIsLastStreamSendCompleted(); + if (needLastStreamSend) + response._writeCallback = httpChannelState._handlerInvoker; if (httpChannelState._responseHeaders.commit()) responseMetaData = response.lockedPrepareResponse(httpChannelState, true); @@ -1405,7 +1398,7 @@ public class HttpChannelState implements HttpChannel, Components if (committedContentLength >= 0 && committedContentLength != totalWritten && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod()))) failure = new IOException("content-length %d != %d written".formatted(committedContentLength, totalWritten)); - // is the request fully consumed? + // Is the request fully consumed? Throwable unconsumed = stream.consumeAvailable(); if (LOG.isDebugEnabled()) LOG.debug("consumeAvailable: {} {} ", unconsumed == null, httpChannelState); @@ -1429,7 +1422,7 @@ public class HttpChannelState implements HttpChannel, Components if (errorResponse != null) Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure); else if (needLastStreamSend) - stream.send(_request._metaData, responseMetaData, true, null, httpChannelState._handlerInvoker); + stream.send(_request._metaData, responseMetaData, true, null, response); else if (completeStream) httpChannelState._handlerInvoker.completeStream(stream, failure); else if (LOG.isDebugEnabled()) diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java index 534acc68ff7..8f1c60271b1 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java @@ -61,7 +61,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -2012,55 +2011,4 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture Awaitility.await().atMost(5, TimeUnit.SECONDS).until(result::get, equalTo("value")); } } - - @Test - public void testCompletionListener() throws Exception - { - Queue history = new ConcurrentLinkedQueue<>(); - startServer(new Handler.Abstract() - { - @Override - public boolean handle(Request request, Response response, Callback callback) throws Exception - { - Request.addCompletionListener(request, t -> history.add("four")); - Request.addCompletionListener(request, t -> history.add("three")); - request.addHttpStreamWrapper(s -> new HttpStream.Wrapper(s) - { - @Override - public void succeeded() - { - history.add("two"); - super.succeeded(); - } - }); - Request.addCompletionListener(request, t -> history.add("one")); - - callback.succeeded(); - - history.add("zero"); - return true; - } - }); - - try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) - { - OutputStream os = client.getOutputStream(); - - String request = """ - GET / HTTP/1.1 - Host: localhost - Connection: close - - """; - os.write(request.getBytes(StandardCharsets.ISO_8859_1)); - os.flush(); - - // Read the response. - String rawResponse = readResponse(client); - assertThat(rawResponse, containsString("HTTP/1.1 200 OK")); - - // Check the history - assertThat(history, contains("zero", "one", "two", "three", "four")); - } - } } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java new file mode 100644 index 00000000000..c93a16f520e --- /dev/null +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java @@ -0,0 +1,483 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.LifeCycle; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.containsStringIgnoringCase; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RequestListenersTest +{ + private Server server; + private LocalConnector connector; + private ContextHandler context; + + private void startServer(Handler handler) throws Exception + { + server = new Server(); + connector = new LocalConnector(server); + server.addConnector(connector); + context = new ContextHandler("/"); + server.setHandler(context); + context.setHandler(handler); + server.start(); + } + + @AfterEach + public void dispose() + { + LifeCycle.stop(server); + } + + @Test + public void testCompletionListeners() throws Exception + { + Queue history = new ConcurrentLinkedQueue<>(); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + // Completion listeners are invoked in reverse order + // because they are based on HttpStream wrapping. + Request.addCompletionListener(request, t -> history.add("four")); + Request.addCompletionListener(request, t -> history.add("three")); + request.addHttpStreamWrapper(s -> new HttpStream.Wrapper(s) + { + @Override + public void succeeded() + { + history.add("two"); + super.succeeded(); + } + }); + Request.addCompletionListener(request, t -> history.add("one")); + + callback.succeeded(); + + history.add("zero"); + return true; + } + }); + + HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(""" + GET / HTTP/1.1 + Host: localhost + Connection: close + + """)); + + assertEquals(HttpStatus.OK_200, response.getStatus()); + assertThat(history, contains("zero", "one", "two", "three", "four")); + } + + @Test + public void testListenersAreCalledInContext() throws Exception + { + CountDownLatch latch = new CountDownLatch(3); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + assertThat(ContextHandler.getCurrentContext(), sameInstance(context.getContext())); + latch.countDown(); + + request.addIdleTimeoutListener(t -> + { + assertThat(ContextHandler.getCurrentContext(), sameInstance(context.getContext())); + latch.countDown(); + return true; + }); + + request.addFailureListener(t -> + { + assertThat(ContextHandler.getCurrentContext(), sameInstance(context.getContext())); + callback.failed(t); + latch.countDown(); + }); + + return true; + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(""" + GET /path HTTP/1.0 + Host: localhost + + """)); + + assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500)); + } + + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + public void testIdleTimeoutListenerCompletesCallback(boolean failIdleTimeout, boolean succeedCallback) throws Exception + { + CountDownLatch failureLatch = new CountDownLatch(1); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + request.addIdleTimeoutListener(x -> + { + if (succeedCallback) + callback.succeeded(); + else + callback.failed(x); + return failIdleTimeout; + }); + + request.addFailureListener(x -> failureLatch.countDown()); + + return true; + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(""" + POST / HTTP/1.1 + Host: localhost + Content-Length: 1 + Connection: close + + """, 2000 * idleTimeout, TimeUnit.MILLISECONDS)); + + int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500; + assertEquals(expectedStatus, response.getStatus()); + assertThat(failureLatch.await(1, TimeUnit.SECONDS), is(failIdleTimeout)); + } + + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + public void testIdleTimeoutListenerFailsRequest(boolean failIdleTimeout, boolean succeedCallback) throws Exception + { + AtomicInteger failures = new AtomicInteger(); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + request.addIdleTimeoutListener(x -> + { + // Fail the request side, we should + // be able to write any response. + request.fail(x); + return failIdleTimeout; + }); + + request.addFailureListener(x -> + { + failures.incrementAndGet(); + if (succeedCallback) + callback.succeeded(); + else + callback.failed(x); + }); + return true; + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(""" + POST / HTTP/1.1 + Host: localhost + Content-Length: 1 + + """, 2 * idleTimeout, TimeUnit.MILLISECONDS)); + + int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500; + assertEquals(expectedStatus, response.getStatus()); + assertEquals(HttpHeaderValue.CLOSE.asString(), response.get(HttpHeader.CONNECTION)); + assertEquals(1, failures.get()); + } + + @Test + public void testIdleTimeoutListenerInvokedMultipleTimesWhenReturningFalse() throws Exception + { + AtomicInteger idleTimeouts = new AtomicInteger(); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + request.addIdleTimeoutListener(x -> + { + if (idleTimeouts.incrementAndGet() == 2) + callback.succeeded(); + return false; + }); + + return true; + } + }); + long idleTimeout = 500; + connector.setIdleTimeout(idleTimeout); + + HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(""" + GET / HTTP/1.1 + Host: localhost + Connection: close + + """, 3 * idleTimeout, TimeUnit.MILLISECONDS)); + + assertEquals(HttpStatus.OK_200, response.getStatus()); + assertThat(idleTimeouts.get(), greaterThan(1)); + } + + @Test + public void testIdleTimeoutListenerReturnsFalseThenAsyncSendResponse() throws Exception + { + AtomicReference responseRef = new AtomicReference<>(); + CompletableFuture callbackOnTimeout = new CompletableFuture<>(); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + responseRef.set(response); + request.addIdleTimeoutListener(t -> + { + callbackOnTimeout.complete(callback); + return false; // ignore timeout + }); + return true; + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest(""" + POST / HTTP/1.1 + Host: localhost + Content-Length: 1 + Connection: close + + """)) + { + + // Get the callback as promised by the error listener. + Callback callback = callbackOnTimeout.get(2 * idleTimeout, TimeUnit.MILLISECONDS); + assertNotNull(callback); + Content.Sink.write(responseRef.get(), true, "OK", callback); + + HttpTester.Response response = HttpTester.parseResponse(endPoint.waitForResponse(false, 3 * idleTimeout, TimeUnit.MILLISECONDS)); + + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(response.getContent(), is("OK")); + } + } + + @Test + public void testIdleTimeoutListenerReturnsFalseThenTrue() throws Exception + { + AtomicReference idleTimeoutFailure = new AtomicReference<>(); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + request.addIdleTimeoutListener(t -> idleTimeoutFailure.getAndSet(t) != null); + request.addFailureListener(callback::failed); + return true; + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(""" + POST / HTTP/1.1 + Host: localhost + Content-Length: 1 + Connection: close + + """, 3 * idleTimeout, TimeUnit.MILLISECONDS)); + + // The first time the listener returns false, but does not + // complete the callback, so another idle timeout elapses. + // The second time the listener returns true, the failure + // listener is called, which fails the Handler callback. + assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500)); + assertThat(response.getContent(), containsStringIgnoringCase("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout")); + assertThat(idleTimeoutFailure.get(), instanceOf(TimeoutException.class)); + } + + @Test + public void testIdleTimeoutWithoutIdleTimeoutListener() throws Exception + { + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + // Handler does not complete the callback, but the failure listener does. + request.addFailureListener(callback::failed); + return true; + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(""" + GET / HTTP/1.1 + Host: localhost + Connection: close + + """)); + assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500)); + assertThat(response.getContent(), containsString("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout expired:")); + } + + @Test + public void testIdleTimeoutInvokesDemandCallback() throws Exception + { + AtomicReference requestRef = new AtomicReference<>(); + CompletableFuture callbackCompletable = new CompletableFuture<>(); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + requestRef.set(request); + request.demand(() -> callbackCompletable.complete(callback)); + return true; + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest(""" + POST / HTTP/1.1 + Host: localhost + Content-Length: 1 + Connection: close + + """)) + { + + Callback callback = callbackCompletable.get(5 * idleTimeout, TimeUnit.MILLISECONDS); + Request request = requestRef.get(); + + Content.Chunk chunk = request.read(); + assertNotNull(chunk); + assertTrue(Content.Chunk.isFailure(chunk, false)); + chunk = request.read(); + assertNull(chunk); + + callback.succeeded(); + + HttpTester.Response response = HttpTester.parseResponse(endPoint.waitForResponse(false, idleTimeout, TimeUnit.MILLISECONDS)); + + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + } + } + + // TODO: test pending writes are failed fatally, but you can still read and still have to complete callback. + + @Test + public void testIdleTimeoutFailsWriteCallback() throws Exception + { + AtomicReference responseRef = new AtomicReference<>(); + CompletableFuture writeFailed = new CompletableFuture<>(); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + responseRef.set(response); + + // Issue a large write that will be congested. + // The idle timeout should fail the write callback. + ByteBuffer byteBuffer = ByteBuffer.allocate(128 * 1024 * 1024); + response.write(false, byteBuffer, Callback.from(() -> {}, x -> writeFailed.complete(callback))); + + return true; + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + try (LocalConnector.LocalEndPoint endPoint = connector.connect()) + { + // Do not grow the output so the response will be congested. + endPoint.setGrowOutput(false); + endPoint.addInputAndExecute(""" + POST / HTTP/1.1 + Host: localhost + Content-Length: 1 + Connection: close + + """); + + Callback callback = writeFailed.get(5 * idleTimeout, TimeUnit.MILLISECONDS); + + // Cannot write anymore. + Response response = responseRef.get(); + CountDownLatch writeFailedLatch = new CountDownLatch(1); + // Use a non-empty buffer to avoid short-circuit the write. + response.write(false, ByteBuffer.allocate(16), Callback.from(() -> {}, x -> writeFailedLatch.countDown())); + assertTrue(writeFailedLatch.await(5, TimeUnit.SECONDS)); + + // The write side has failed, but the read side has not. + Request request = response.getRequest(); + Content.Chunk chunk = request.read(); + assertNull(chunk); + + // Since we cannot write, only choice is to fail the Handler callback. + callback.failed(new IOException()); + + // Should throw. + endPoint.waitForResponse(false, idleTimeout, TimeUnit.MILLISECONDS); + } + } +} diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java index 2c1a99ff5a0..60f5c817b33 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java @@ -13,13 +13,8 @@ package org.eclipse.jetty.server; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -47,15 +42,11 @@ import org.junit.jupiter.params.provider.MethodSource; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.sameInstance; -import static org.junit.jupiter.api.Assertions.assertTrue; public class ServerTest { - private static final long IDLE_TIMEOUT = 1000L; private Server _server; private ContextHandler _context; private LocalConnector _connector; @@ -108,8 +99,6 @@ public class ServerTest return configure(connection, connector, endPoint); } }); - _connector.setIdleTimeout(IDLE_TIMEOUT); - _connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setIdleTimeout(IDLE_TIMEOUT); _server.addConnector(_connector); } @@ -224,188 +213,4 @@ public class ServerTest assertThat(rawResponse, containsString("Content-Length:")); } } - - @Test - public void testIdleTimeoutNoListener() throws Exception - { - // See ServerTimeoutsTest for more complete idle timeout testing. - _context.setHandler(new Handler.Abstract() - { - @Override - public boolean handle(Request request, Response response, Callback callback) - { - // Handler never completes the callback - return true; - } - }); - _server.start(); - - String request = """ - GET /path HTTP/1.0\r - Host: hostname\r - \r - """; - String rawResponse = _connector.getResponse(request); - HttpTester.Response response = HttpTester.parseResponse(rawResponse); - assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500)); - assertThat(response.getContent(), containsString("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout expired:")); - } - - @Test - public void testIdleTimeoutNoListenerHttpConfigurationOnly() throws Exception - { - // See ServerTimeoutsTest for more complete idle timeout testing. - - _context.setHandler(new Handler.Abstract() - { - @Override - public boolean handle(Request request, Response response, Callback callback) - { - // Handler never completes the callback - return true; - } - }); - - _connector.setIdleTimeout(10 * IDLE_TIMEOUT); - _connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setIdleTimeout(IDLE_TIMEOUT); - - _server.start(); - - String request = """ - GET /path HTTP/1.0\r - Host: hostname\r - \r - """; - String rawResponse = _connector.getResponse(request); - HttpTester.Response response = HttpTester.parseResponse(rawResponse); - assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500)); - assertThat(response.getContent(), containsString("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout expired:")); - } - - @Test - public void testIdleTimeoutFalseListener() throws Exception - { - // See ServerTimeoutsTest for more complete idle timeout testing. - CompletableFuture callbackOnTimeout = new CompletableFuture<>(); - _context.setHandler(new Handler.Abstract() - { - @Override - public boolean handle(Request request, Response response, Callback callback) - { - request.addIdleTimeoutListener(t -> !callbackOnTimeout.complete(callback)); - return true; - } - }); - _server.start(); - - String request = """ - GET /path HTTP/1.0\r - Host: hostname\r - \r - """; - - try (LocalConnector.LocalEndPoint localEndPoint = _connector.executeRequest(request)) - { - callbackOnTimeout.get(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS).succeeded(); - String rawResponse = localEndPoint.getResponse(); - HttpTester.Response response = HttpTester.parseResponse(rawResponse); - assertThat(response.getStatus(), is(HttpStatus.OK_200)); - } - } - - @Test - public void testIdleTimeoutWriteCallback() throws Exception - { - CompletableFuture onTimeout = new CompletableFuture<>(); - CompletableFuture writeFail = new CompletableFuture<>(); - _context.setHandler(new Handler.Abstract() - { - @Override - public boolean handle(Request request, Response response, Callback callback) - { - Runnable write = new Runnable() - { - final ByteBuffer buffer = ByteBuffer.allocate(128 * 1024 * 1024); - @Override - public void run() - { - response.write(false, buffer, Callback.from(this, - t -> - { - writeFail.complete(t); - callback.failed(t); - })); - } - }; - - request.addIdleTimeoutListener(t -> - { - request.getComponents().getThreadPool().execute(write); - return onTimeout.complete(t); - }); - - return true; - } - }); - _server.start(); - - String request = """ - GET /path HTTP/1.0\r - Host: localhost\r - \r - """; - try (LocalConnector.LocalEndPoint ignored = _connector.executeRequest(request)) - { - Throwable x = onTimeout.get(2 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS); - assertThat(x, instanceOf(TimeoutException.class)); - x = writeFail.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS); - assertThat(x, instanceOf(TimeoutException.class)); - } - } - - @Test - public void testListenersInContext() throws Exception - { - CountDownLatch latch = new CountDownLatch(3); - _context.setHandler(new Handler.Abstract() - { - @Override - public boolean handle(Request request, Response response, Callback callback) - { - assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext())); - latch.countDown(); - - request.addIdleTimeoutListener(t -> - { - assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext())); - latch.countDown(); - return true; - }); - - request.addFailureListener(t -> - { - assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext())); - callback.failed(t); - latch.countDown(); - }); - return true; - } - }); - _server.start(); - - String request = """ - GET /path HTTP/1.0\r - Host: hostname\r - \r - """; - - try (LocalConnector.LocalEndPoint localEndPoint = _connector.executeRequest(request)) - { - assertTrue(latch.await(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS)); - String rawResponse = localEndPoint.getResponse(); - HttpTester.Response response = HttpTester.parseResponse(rawResponse); - assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500)); - } - } - } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ServerTimeoutsTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ServerTimeoutsTest.java index b47a079db3d..98bfdd5396e 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ServerTimeoutsTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ServerTimeoutsTest.java @@ -41,7 +41,6 @@ import static org.hamcrest.Matchers.containsStringIgnoringCase; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -75,10 +74,8 @@ public class ServerTimeoutsTest extends AbstractTest public boolean handle(Request request, Response response, Callback callback) { if (addIdleTimeoutListener) - { request.addIdleTimeoutListener(t -> listenerCalled.compareAndSet(false, true)); - request.addFailureListener(callback::failed); - } + request.addFailureListener(callback::failed); // Do not complete the callback, so it idle times out. return true; @@ -150,68 +147,5 @@ public class ServerTimeoutsTest extends AbstractTest assertFalse(listenerCalled.get()); } - @ParameterizedTest - @MethodSource("transports") - public void testIdleTimeoutErrorListenerReturnsFalse(Transport transport) throws Exception - { - AtomicReference responseRef = new AtomicReference<>(); - CompletableFuture callbackOnTimeout = new CompletableFuture<>(); - start(transport, new Handler.Abstract() - { - @Override - public boolean handle(Request request, Response response, Callback callback) - { - responseRef.set(response); - request.addIdleTimeoutListener(t -> - { - callbackOnTimeout.complete(callback); - return false; // ignore timeout - }); - return true; - } - }); - - org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport)) - .timeout(IDLE_TIMEOUT * 5, TimeUnit.MILLISECONDS); - CompletableFuture completable = new CompletableResponseListener(request).send(); - - // Get the callback as promised by the error listener. - Callback callback = callbackOnTimeout.get(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS); - assertNotNull(callback); - Content.Sink.write(responseRef.get(), true, "OK", callback); - - ContentResponse response = completable.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS); - assertThat(response.getStatus(), is(HttpStatus.OK_200)); - assertThat(response.getContentAsString(), is("OK")); - } - - @ParameterizedTest - @MethodSource("transports") - public void testIdleTimeoutErrorListenerReturnsFalseThenTrue(Transport transport) throws Exception - { - AtomicReference error = new AtomicReference<>(); - start(transport, new Handler.Abstract() - { - @Override - public boolean handle(Request request, Response response, Callback callback) - { - request.addIdleTimeoutListener(t -> error.getAndSet(t) != null); - request.addFailureListener(callback::failed); - return true; - } - }); - - ContentResponse response = client.newRequest(newURI(transport)) - .timeout(IDLE_TIMEOUT * 5, TimeUnit.MILLISECONDS) - .send(); - - // The first time the listener returns false, but does not complete the callback, - // so another idle timeout elapses. - // The second time the listener returns true and the implementation produces the response. - assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500)); - assertThat(response.getContentAsString(), containsStringIgnoringCase("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout")); - assertThat(error.get(), instanceOf(TimeoutException.class)); - } - // TODO write side tests }