Fixes #10912 - Document Request listeners (#10920)

* Fixes #10912 - Document Request listeners

* Documented Request listeners and updated javadocs.
* Removed code in HttpChannelState.onIdleTimeout() that was automatically complete the Handler callback.
* Invoking failure listeners only once (although HttpChannelState.onFailure() may be called multiple times).
* Made sure that in ChannelCallback.succeeded() the last stream send uses the ChannelResponse as Callback, like it is done in Response.write().
* Moved Request listeners tests from various test classes into RequestListenersTest.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-11-28 12:56:17 +01:00 committed by GitHub
parent 5a273f0d3e
commit 8c10ea8a9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 597 additions and 392 deletions

View File

@ -156,6 +156,57 @@ It is therefore mandatory to use `CompletableFuture` APIs that are invoked even
Failing to do so may result in the `Handler` `callback` parameter to never be completed, causing the request processing to hang forever.
====
[[pg-server-http-handler-impl-request-listeners]]
====== `Request` Listeners
Application may add listeners to the `Request` object to be notified of particular events happening during the request/response processing.
`Request.addIdleTimeoutListener(Predicate<TimeoutException>)` allows you to add an idle timeout listener, which is invoked when an idle timeout period elapses during the request/response processing, if the idle timeout event is not notified otherwise.
When an idle timeout event happens, it is delivered to the application as follows:
* If there is pending demand (via `Request.demand(Runnable)`), then the demand `Runnable` is invoked and the application may see the idle timeout failure by reading from the `Request`, obtaining a xref:pg-arch-io-content-source[transient failure chunk].
* If there is a pending response write (via `Response.write(boolean, ByteBuffer, Callback)`), the response write `Callback` is failed.
* If neither of the above, the idle timeout listeners are invoked, in the same order they have been added.
The first idle timeout listener that returns `true` stops the Jetty implementation from invoking the idle timeout listeners that follow.
The idle timeout listeners are therefore invoked only when the application is really idle, neither trying to read nor trying to write.
An idle timeout listener may return `true` to indicate that the idle timeout should be treated as a fatal failure of the request/response processing; otherwise the listener may return `false` to indicate that no further handling of the idle timeout is needed from the Jetty implementation.
When idle timeout listeners return `false`, then any subsequent idle timeouts are handled as above.
In the case that the application does not initiate any read or write, then the idle timeout listeners are invoked again after another idle timeout period.
`Request.addFailureListener(Consumer<Throwable>)` allows you to add a failure listener, which is invoked when a failure happens during the request/response processing.
When a failure happens during the request/response processing, then:
* The pending demand for request content, if any, is invoked; that is, the `Runnable` passed to `Request.demand(Runnable)` is invoked.
* The callback of an outstanding call to `Response.write(boolean, ByteBuffer, Callback)`, if any, is failed.
* The failure listeners are invoked, in the same order they have been added.
Failure listeners are invoked also in case of idle timeouts, in the following cases:
* At least one idle timeout listener returned `true` to indicate to the Jetty implementation to treat the idle timeout as a fatal failure.
* There are no idle timeout listeners.
Failures reported to a failure listener are always fatal failures; see also xref:pg-arch-io-content-source[this section] about fatal versus transient failures.
[NOTE]
====
Applications are always required to complete the `Handler` callback, as described xref:pg-server-http-handler-impl[here].
In case of asynchronous failures, failure listeners are a good place to complete (typically by failing it) the `Handler` callback.
====
`Request.addCompletionListener(Consumer<Throwable>)` allows you to add a completion listener, which is invoked at the very end of the request/response processing.
This is equivalent to adding an `HttpStream` wrapper and overriding both `HttpStream.succeeded()` and `HttpStream.failed(Throwable)`.
Completion listeners are typically (but not only) used to recycle or dispose resources used during the request/response processing, or get a precise timing for when the request/response processing finishes, to be paired with `Request.getBeginNanoTime()`.
Note that while failure listeners are invoked as soon as the failure happens, completion listeners are invoked only at the very end of the request/response processing: after the `Callback` passed to `Handler.handle(Request, Response, Callback)` has been completed, all container dispatched threads have returned, and all the response writes have been completed.
In case of many completion listeners, they are invoked in the reverse order they have been added.
[[pg-server-http-handler-impl-response]]
====== Using the `Response`

View File

@ -287,36 +287,34 @@ public interface Request extends Attributes, Content.Source
/**
* <p>Adds a listener for idle timeouts.</p>
* <p>The listener is a predicate function that should return {@code true} to indicate
* that the idle timeout should be handled by the container as a hard failure
* (see {@link #addFailureListener(Consumer)}); or {@code false} to ignore that specific timeout and for another timeout
* to occur after another idle period.</p>
* <p>Any pending {@link #demand(Runnable)} or {@link Response#write(boolean, ByteBuffer, Callback)} operations
* are not affected by this call. Applications need to be mindful of any such pending operations if attempting
* to make new operations.</p>
* <p>Listeners are processed in sequence, and the first that returns {@code true}
* stops the processing of subsequent listeners, which are therefore not invoked.</p>
* that the idle timeout should be handled by the container as a fatal failure
* (see {@link #addFailureListener(Consumer)}); or {@code false} to ignore that specific
* timeout and for another timeout to occur after another idle period.</p>
* <p>Idle timeout listeners are only invoked if there are no pending
* {@link #demand(Runnable)} or {@link Response#write(boolean, ByteBuffer, Callback)}
* operations.</p>
* <p>Listeners are processed in the same order they are added, and the first that
* returns {@code true} stops the processing of subsequent listeners, which are
* therefore not invoked.</p>
*
* @param onIdleTimeout the predicate function
* @param onIdleTimeout the idle timeout listener as a predicate function
* @see #addFailureListener(Consumer)
*/
void addIdleTimeoutListener(Predicate<TimeoutException> onIdleTimeout);
/**
* <p>Adds a listener for asynchronous hard errors.</p>
* <p>When a listener is called, the effects of the error will already have taken place:</p>
* <p>Adds a listener for asynchronous fatal failures.</p>
* <p>When a listener is called, the effects of the failure have already taken place:</p>
* <ul>
* <li>Pending {@link #demand(Runnable)} will be woken up.</li>
* <li>Calls to {@link #read()} will return the {@code Throwable}.</li>
* <li>Pending and new {@link Response#write(boolean, ByteBuffer, Callback)} calls will be failed by
* calling {@link Callback#failed(Throwable)} on the callback passed to {@code write(...)}.</li>
* <li>Any call to {@link Callback#succeeded()} on the callback passed to
* {@link Handler#handle(Request, Response, Callback)} will effectively be a call to {@link Callback#failed(Throwable)}
* with the notified {@link Throwable}.</li>
* <li>Pending {@link #demand(Runnable)} have been woken up.</li>
* <li>Calls to {@link #read()} will return the {@code Throwable} failure.</li>
* <li>Pending and new {@link Response#write(boolean, ByteBuffer, Callback)} calls
* will be failed by calling {@link Callback#failed(Throwable)} on the callback
* passed to {@link Response#write(boolean, ByteBuffer, Callback)}.</li>
* </ul>
* <p>Listeners are processed in sequence. When all listeners are invoked then {@link Callback#failed(Throwable)}
* will be called on the callback passed to {@link Handler#handle(Request, Response, Callback)}.</p>
* <p>Listeners are processed in the same order they are added.</p>
*
* @param onFailure the consumer function
* @param onFailure the failure listener as a consumer function
* @see #addIdleTimeoutListener(Predicate)
*/
void addFailureListener(Consumer<Throwable> onFailure);
@ -331,15 +329,17 @@ public interface Request extends Attributes, Content.Source
void addHttpStreamWrapper(Function<HttpStream, HttpStream> wrapper);
/**
* Adds a completion listener that is an optimized equivalent to overriding the
* {@link HttpStream#succeeded()} and {@link HttpStream#failed(Throwable)} methods
* of a {@link HttpStream.Wrapper} created by a call to {@link #addHttpStreamWrapper(Function)}.
* In the case of a failure, the {@link Throwable} cause is passed to the listener, but unlike
* <p>Adds a completion listener that is an optimized equivalent to overriding the
* {@link HttpStream#succeeded()} and {@link HttpStream#failed(Throwable)} methods of a
* {@link HttpStream.Wrapper} created by a call to {@link #addHttpStreamWrapper(Function)}.</p>
* <p>Because adding completion listeners relies on {@link HttpStream} wrapping,
* the completion listeners are invoked in reverse order they are added.</p>
* <p>In the case of a failure, the {@link Throwable} cause is passed to the listener, but unlike
* {@link #addFailureListener(Consumer)} listeners, which are called when the failure occurs, completion
* listeners are called only once the {@link HttpStream} is completed at the very end of processing.
* listeners are called only once the {@link HttpStream} is completed at the very end of processing.</p>
*
* @param listener A {@link Consumer} of {@link Throwable} to call when the request handling is complete. The
* listener is passed a null {@link Throwable} on success.
* @param listener A {@link Consumer} of {@link Throwable} to call when the request handling is complete.
* The listener is passed a {@code null} {@link Throwable} on success.
* @see #addHttpStreamWrapper(Function)
*/
static void addCompletionListener(Request request, Consumer<Throwable> listener)

View File

@ -530,7 +530,7 @@ public interface Response extends Content.Sink
if (errorHandler.handle(errorRequest, response, callback))
return;
}
catch (Exception e)
catch (Throwable e)
{
if (cause != null && cause != e)
cause.addSuppressed(e);

View File

@ -86,7 +86,7 @@ public class ErrorHandler implements Request.Handler
}
@Override
public boolean handle(Request request, Response response, Callback callback)
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug("handle({}, {}, {})", request, response, callback);
@ -112,16 +112,7 @@ public class ErrorHandler implements Request.Handler
{
if (message == null)
message = cause == null ? HttpStatus.getMessage(code) : cause.toString();
try
{
generateResponse(request, response, code, message, cause, callback);
}
catch (Throwable x)
{
// TODO: cannot write the error response, give up and close the stream.
LOG.warn("Failure whilst generate error response", x);
}
generateResponse(request, response, code, message, cause, callback);
}
return true;
}

View File

@ -91,7 +91,7 @@ public class HttpChannelState implements HttpChannel, Components
}
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelState.class);
private static final Throwable DO_NOT_SEND = new Throwable("No Send");
private static final Throwable NOTHING_TO_SEND = new Throwable("nothing_to_send");
private static final HttpField SERVER_VERSION = new ResponseHttpFields.PersistentPreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
private static final HttpField POWERED_BY = new ResponseHttpFields.PersistentPreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION);
@ -362,14 +362,6 @@ public class HttpChannelState implements HttpChannel, Components
}
});
}
// otherwise, if there is no failure listener, then we can fail the callback directly without a double lock
ChannelRequest request = _request;
if (_onFailure == null && request != null)
{
_failure = Content.Chunk.from(t, true);
return () -> request._callback.failed(t);
}
}
}
@ -388,9 +380,9 @@ public class HttpChannelState implements HttpChannel, Components
LOG.debug("onFailure {}", this, x);
// If the channel doesn't have a stream, then the error is ignored.
if (_stream == null)
return null;
stream = _stream;
if (stream == null)
return null;
if (_request == null)
{
@ -414,16 +406,16 @@ public class HttpChannelState implements HttpChannel, Components
}
else
{
// if we are currently demanding, take the onContentAvailable runnable to invoke below.
// If there is demand, take the onContentAvailable runnable to invoke below.
Runnable invokeOnContentAvailable = _onContentAvailable;
_onContentAvailable = null;
// If a write call is in progress, take the writeCallback to fail below
// If a write call is in progress, take the writeCallback to fail below.
Runnable invokeWriteFailure = _response.lockedFailWrite(x);
// Create runnable to invoke any onError listeners
// Notify the failure listeners only once.
Consumer<Throwable> onFailure = _onFailure;
_onFailure = null;
Runnable invokeOnFailureListeners = onFailure == null ? null : () ->
{
try
@ -504,7 +496,7 @@ public class HttpChannelState implements HttpChannel, Components
// they are flushed. The DO_NOT_SEND option supports these by turning such writes into a NOOP.
case LAST_SENDING, LAST_COMPLETE -> (length > 0)
? new IllegalStateException("last already written")
: DO_NOT_SEND;
: NOTHING_TO_SEND;
};
}
@ -1124,7 +1116,7 @@ public class HttpChannelState implements HttpChannel, Components
long length = BufferUtil.length(content);
HttpChannelState httpChannelState;
HttpStream stream = null;
HttpStream stream;
Throwable failure;
MetaData.Response responseMetaData = null;
try (AutoLock ignored = _request._lock.lock())
@ -1135,7 +1127,9 @@ public class HttpChannelState implements HttpChannel, Components
long contentLength = committedContentLength >= 0 ? committedContentLength : getHeaders().getLongField(HttpHeader.CONTENT_LENGTH);
if (_writeCallback != null)
{
failure = new IllegalStateException("write pending");
}
else
{
failure = getFailure(httpChannelState);
@ -1157,37 +1151,34 @@ public class HttpChannelState implements HttpChannel, Components
}
}
// If no failure by this point, we can try to send
// If no failure by this point, we can try to switch to sending state.
if (failure == null)
failure = httpChannelState.lockedStreamSend(last, length);
// Have we failed in some way?
if (failure == DO_NOT_SEND)
if (failure == NOTHING_TO_SEND)
{
httpChannelState._serializedInvoker.run(callback::succeeded);
return;
}
else if (failure != null)
// Have we failed in some way?
if (failure != null)
{
Throwable throwable = failure;
httpChannelState._serializedInvoker.run(() -> callback.failed(throwable));
return;
}
else
{
// We have not failed, so we will do a stream send
_writeCallback = callback;
_contentBytesWritten = totalWritten;
stream = httpChannelState._stream;
if (_httpFields.commit())
responseMetaData = lockedPrepareResponse(httpChannelState, last);
}
// No failure, do the actual stream send using the ChannelResponse as the callback.
_writeCallback = callback;
_contentBytesWritten = totalWritten;
stream = httpChannelState._stream;
if (_httpFields.commit())
responseMetaData = lockedPrepareResponse(httpChannelState, last);
}
if (failure == null)
{
if (LOG.isDebugEnabled())
LOG.debug("writing last={} {} {}", last, BufferUtil.toDetailString(content), this);
stream.send(_request._metaData, responseMetaData, last, content, this);
}
if (LOG.isDebugEnabled())
LOG.debug("writing last={} {} {}", last, BufferUtil.toDetailString(content), this);
stream.send(_request._metaData, responseMetaData, last, content, this);
}
protected Throwable getFailure(HttpChannelState httpChannelState)
@ -1395,6 +1386,8 @@ public class HttpChannelState implements HttpChannel, Components
needLastStreamSend = httpChannelState.lockedLastStreamSend();
completeStream = !needLastStreamSend && httpChannelState._handling == null && httpChannelState.lockedIsLastStreamSendCompleted();
if (needLastStreamSend)
response._writeCallback = httpChannelState._handlerInvoker;
if (httpChannelState._responseHeaders.commit())
responseMetaData = response.lockedPrepareResponse(httpChannelState, true);
@ -1405,7 +1398,7 @@ public class HttpChannelState implements HttpChannel, Components
if (committedContentLength >= 0 && committedContentLength != totalWritten && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod())))
failure = new IOException("content-length %d != %d written".formatted(committedContentLength, totalWritten));
// is the request fully consumed?
// Is the request fully consumed?
Throwable unconsumed = stream.consumeAvailable();
if (LOG.isDebugEnabled())
LOG.debug("consumeAvailable: {} {} ", unconsumed == null, httpChannelState);
@ -1429,7 +1422,7 @@ public class HttpChannelState implements HttpChannel, Components
if (errorResponse != null)
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
else if (needLastStreamSend)
stream.send(_request._metaData, responseMetaData, true, null, httpChannelState._handlerInvoker);
stream.send(_request._metaData, responseMetaData, true, null, response);
else if (completeStream)
httpChannelState._handlerInvoker.completeStream(stream, failure);
else if (LOG.isDebugEnabled())

View File

@ -61,7 +61,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
@ -2012,55 +2011,4 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(result::get, equalTo("value"));
}
}
@Test
public void testCompletionListener() throws Exception
{
Queue<String> history = new ConcurrentLinkedQueue<>();
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
Request.addCompletionListener(request, t -> history.add("four"));
Request.addCompletionListener(request, t -> history.add("three"));
request.addHttpStreamWrapper(s -> new HttpStream.Wrapper(s)
{
@Override
public void succeeded()
{
history.add("two");
super.succeeded();
}
});
Request.addCompletionListener(request, t -> history.add("one"));
callback.succeeded();
history.add("zero");
return true;
}
});
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream os = client.getOutputStream();
String request = """
GET / HTTP/1.1
Host: localhost
Connection: close
""";
os.write(request.getBytes(StandardCharsets.ISO_8859_1));
os.flush();
// Read the response.
String rawResponse = readResponse(client);
assertThat(rawResponse, containsString("HTTP/1.1 200 OK"));
// Check the history
assertThat(history, contains("zero", "one", "two", "three", "four"));
}
}
}

View File

@ -0,0 +1,483 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.containsStringIgnoringCase;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RequestListenersTest
{
private Server server;
private LocalConnector connector;
private ContextHandler context;
private void startServer(Handler handler) throws Exception
{
server = new Server();
connector = new LocalConnector(server);
server.addConnector(connector);
context = new ContextHandler("/");
server.setHandler(context);
context.setHandler(handler);
server.start();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(server);
}
@Test
public void testCompletionListeners() throws Exception
{
Queue<String> history = new ConcurrentLinkedQueue<>();
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// Completion listeners are invoked in reverse order
// because they are based on HttpStream wrapping.
Request.addCompletionListener(request, t -> history.add("four"));
Request.addCompletionListener(request, t -> history.add("three"));
request.addHttpStreamWrapper(s -> new HttpStream.Wrapper(s)
{
@Override
public void succeeded()
{
history.add("two");
super.succeeded();
}
});
Request.addCompletionListener(request, t -> history.add("one"));
callback.succeeded();
history.add("zero");
return true;
}
});
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
Connection: close
"""));
assertEquals(HttpStatus.OK_200, response.getStatus());
assertThat(history, contains("zero", "one", "two", "three", "four"));
}
@Test
public void testListenersAreCalledInContext() throws Exception
{
CountDownLatch latch = new CountDownLatch(3);
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
assertThat(ContextHandler.getCurrentContext(), sameInstance(context.getContext()));
latch.countDown();
request.addIdleTimeoutListener(t ->
{
assertThat(ContextHandler.getCurrentContext(), sameInstance(context.getContext()));
latch.countDown();
return true;
});
request.addFailureListener(t ->
{
assertThat(ContextHandler.getCurrentContext(), sameInstance(context.getContext()));
callback.failed(t);
latch.countDown();
});
return true;
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET /path HTTP/1.0
Host: localhost
"""));
assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
}
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testIdleTimeoutListenerCompletesCallback(boolean failIdleTimeout, boolean succeedCallback) throws Exception
{
CountDownLatch failureLatch = new CountDownLatch(1);
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
request.addIdleTimeoutListener(x ->
{
if (succeedCallback)
callback.succeeded();
else
callback.failed(x);
return failIdleTimeout;
});
request.addFailureListener(x -> failureLatch.countDown());
return true;
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
POST / HTTP/1.1
Host: localhost
Content-Length: 1
Connection: close
""", 2000 * idleTimeout, TimeUnit.MILLISECONDS));
int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500;
assertEquals(expectedStatus, response.getStatus());
assertThat(failureLatch.await(1, TimeUnit.SECONDS), is(failIdleTimeout));
}
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testIdleTimeoutListenerFailsRequest(boolean failIdleTimeout, boolean succeedCallback) throws Exception
{
AtomicInteger failures = new AtomicInteger();
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
request.addIdleTimeoutListener(x ->
{
// Fail the request side, we should
// be able to write any response.
request.fail(x);
return failIdleTimeout;
});
request.addFailureListener(x ->
{
failures.incrementAndGet();
if (succeedCallback)
callback.succeeded();
else
callback.failed(x);
});
return true;
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
POST / HTTP/1.1
Host: localhost
Content-Length: 1
""", 2 * idleTimeout, TimeUnit.MILLISECONDS));
int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500;
assertEquals(expectedStatus, response.getStatus());
assertEquals(HttpHeaderValue.CLOSE.asString(), response.get(HttpHeader.CONNECTION));
assertEquals(1, failures.get());
}
@Test
public void testIdleTimeoutListenerInvokedMultipleTimesWhenReturningFalse() throws Exception
{
AtomicInteger idleTimeouts = new AtomicInteger();
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
request.addIdleTimeoutListener(x ->
{
if (idleTimeouts.incrementAndGet() == 2)
callback.succeeded();
return false;
});
return true;
}
});
long idleTimeout = 500;
connector.setIdleTimeout(idleTimeout);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
Connection: close
""", 3 * idleTimeout, TimeUnit.MILLISECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
assertThat(idleTimeouts.get(), greaterThan(1));
}
@Test
public void testIdleTimeoutListenerReturnsFalseThenAsyncSendResponse() throws Exception
{
AtomicReference<Response> responseRef = new AtomicReference<>();
CompletableFuture<Callback> callbackOnTimeout = new CompletableFuture<>();
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
responseRef.set(response);
request.addIdleTimeoutListener(t ->
{
callbackOnTimeout.complete(callback);
return false; // ignore timeout
});
return true;
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
POST / HTTP/1.1
Host: localhost
Content-Length: 1
Connection: close
"""))
{
// Get the callback as promised by the error listener.
Callback callback = callbackOnTimeout.get(2 * idleTimeout, TimeUnit.MILLISECONDS);
assertNotNull(callback);
Content.Sink.write(responseRef.get(), true, "OK", callback);
HttpTester.Response response = HttpTester.parseResponse(endPoint.waitForResponse(false, 3 * idleTimeout, TimeUnit.MILLISECONDS));
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(response.getContent(), is("OK"));
}
}
@Test
public void testIdleTimeoutListenerReturnsFalseThenTrue() throws Exception
{
AtomicReference<Throwable> idleTimeoutFailure = new AtomicReference<>();
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
request.addIdleTimeoutListener(t -> idleTimeoutFailure.getAndSet(t) != null);
request.addFailureListener(callback::failed);
return true;
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
POST / HTTP/1.1
Host: localhost
Content-Length: 1
Connection: close
""", 3 * idleTimeout, TimeUnit.MILLISECONDS));
// The first time the listener returns false, but does not
// complete the callback, so another idle timeout elapses.
// The second time the listener returns true, the failure
// listener is called, which fails the Handler callback.
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
assertThat(response.getContent(), containsStringIgnoringCase("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout"));
assertThat(idleTimeoutFailure.get(), instanceOf(TimeoutException.class));
}
@Test
public void testIdleTimeoutWithoutIdleTimeoutListener() throws Exception
{
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// Handler does not complete the callback, but the failure listener does.
request.addFailureListener(callback::failed);
return true;
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
Connection: close
"""));
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
assertThat(response.getContent(), containsString("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout expired:"));
}
@Test
public void testIdleTimeoutInvokesDemandCallback() throws Exception
{
AtomicReference<Request> requestRef = new AtomicReference<>();
CompletableFuture<Callback> callbackCompletable = new CompletableFuture<>();
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
requestRef.set(request);
request.demand(() -> callbackCompletable.complete(callback));
return true;
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
POST / HTTP/1.1
Host: localhost
Content-Length: 1
Connection: close
"""))
{
Callback callback = callbackCompletable.get(5 * idleTimeout, TimeUnit.MILLISECONDS);
Request request = requestRef.get();
Content.Chunk chunk = request.read();
assertNotNull(chunk);
assertTrue(Content.Chunk.isFailure(chunk, false));
chunk = request.read();
assertNull(chunk);
callback.succeeded();
HttpTester.Response response = HttpTester.parseResponse(endPoint.waitForResponse(false, idleTimeout, TimeUnit.MILLISECONDS));
assertThat(response.getStatus(), is(HttpStatus.OK_200));
}
}
// TODO: test pending writes are failed fatally, but you can still read and still have to complete callback.
@Test
public void testIdleTimeoutFailsWriteCallback() throws Exception
{
AtomicReference<Response> responseRef = new AtomicReference<>();
CompletableFuture<Callback> writeFailed = new CompletableFuture<>();
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
responseRef.set(response);
// Issue a large write that will be congested.
// The idle timeout should fail the write callback.
ByteBuffer byteBuffer = ByteBuffer.allocate(128 * 1024 * 1024);
response.write(false, byteBuffer, Callback.from(() -> {}, x -> writeFailed.complete(callback)));
return true;
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);
try (LocalConnector.LocalEndPoint endPoint = connector.connect())
{
// Do not grow the output so the response will be congested.
endPoint.setGrowOutput(false);
endPoint.addInputAndExecute("""
POST / HTTP/1.1
Host: localhost
Content-Length: 1
Connection: close
""");
Callback callback = writeFailed.get(5 * idleTimeout, TimeUnit.MILLISECONDS);
// Cannot write anymore.
Response response = responseRef.get();
CountDownLatch writeFailedLatch = new CountDownLatch(1);
// Use a non-empty buffer to avoid short-circuit the write.
response.write(false, ByteBuffer.allocate(16), Callback.from(() -> {}, x -> writeFailedLatch.countDown()));
assertTrue(writeFailedLatch.await(5, TimeUnit.SECONDS));
// The write side has failed, but the read side has not.
Request request = response.getRequest();
Content.Chunk chunk = request.read();
assertNull(chunk);
// Since we cannot write, only choice is to fail the Handler callback.
callback.failed(new IOException());
// Should throw.
endPoint.waitForResponse(false, idleTimeout, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -13,13 +13,8 @@
package org.eclipse.jetty.server;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
@ -47,15 +42,11 @@ import org.junit.jupiter.params.provider.MethodSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ServerTest
{
private static final long IDLE_TIMEOUT = 1000L;
private Server _server;
private ContextHandler _context;
private LocalConnector _connector;
@ -108,8 +99,6 @@ public class ServerTest
return configure(connection, connector, endPoint);
}
});
_connector.setIdleTimeout(IDLE_TIMEOUT);
_connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setIdleTimeout(IDLE_TIMEOUT);
_server.addConnector(_connector);
}
@ -224,188 +213,4 @@ public class ServerTest
assertThat(rawResponse, containsString("Content-Length:"));
}
}
@Test
public void testIdleTimeoutNoListener() throws Exception
{
// See ServerTimeoutsTest for more complete idle timeout testing.
_context.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// Handler never completes the callback
return true;
}
});
_server.start();
String request = """
GET /path HTTP/1.0\r
Host: hostname\r
\r
""";
String rawResponse = _connector.getResponse(request);
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
assertThat(response.getContent(), containsString("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout expired:"));
}
@Test
public void testIdleTimeoutNoListenerHttpConfigurationOnly() throws Exception
{
// See ServerTimeoutsTest for more complete idle timeout testing.
_context.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// Handler never completes the callback
return true;
}
});
_connector.setIdleTimeout(10 * IDLE_TIMEOUT);
_connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setIdleTimeout(IDLE_TIMEOUT);
_server.start();
String request = """
GET /path HTTP/1.0\r
Host: hostname\r
\r
""";
String rawResponse = _connector.getResponse(request);
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
assertThat(response.getContent(), containsString("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout expired:"));
}
@Test
public void testIdleTimeoutFalseListener() throws Exception
{
// See ServerTimeoutsTest for more complete idle timeout testing.
CompletableFuture<Callback> callbackOnTimeout = new CompletableFuture<>();
_context.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
request.addIdleTimeoutListener(t -> !callbackOnTimeout.complete(callback));
return true;
}
});
_server.start();
String request = """
GET /path HTTP/1.0\r
Host: hostname\r
\r
""";
try (LocalConnector.LocalEndPoint localEndPoint = _connector.executeRequest(request))
{
callbackOnTimeout.get(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS).succeeded();
String rawResponse = localEndPoint.getResponse();
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
assertThat(response.getStatus(), is(HttpStatus.OK_200));
}
}
@Test
public void testIdleTimeoutWriteCallback() throws Exception
{
CompletableFuture<Throwable> onTimeout = new CompletableFuture<>();
CompletableFuture<Throwable> writeFail = new CompletableFuture<>();
_context.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
Runnable write = new Runnable()
{
final ByteBuffer buffer = ByteBuffer.allocate(128 * 1024 * 1024);
@Override
public void run()
{
response.write(false, buffer, Callback.from(this,
t ->
{
writeFail.complete(t);
callback.failed(t);
}));
}
};
request.addIdleTimeoutListener(t ->
{
request.getComponents().getThreadPool().execute(write);
return onTimeout.complete(t);
});
return true;
}
});
_server.start();
String request = """
GET /path HTTP/1.0\r
Host: localhost\r
\r
""";
try (LocalConnector.LocalEndPoint ignored = _connector.executeRequest(request))
{
Throwable x = onTimeout.get(2 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS);
assertThat(x, instanceOf(TimeoutException.class));
x = writeFail.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
assertThat(x, instanceOf(TimeoutException.class));
}
}
@Test
public void testListenersInContext() throws Exception
{
CountDownLatch latch = new CountDownLatch(3);
_context.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext()));
latch.countDown();
request.addIdleTimeoutListener(t ->
{
assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext()));
latch.countDown();
return true;
});
request.addFailureListener(t ->
{
assertThat(ContextHandler.getCurrentContext(), sameInstance(_context.getContext()));
callback.failed(t);
latch.countDown();
});
return true;
}
});
_server.start();
String request = """
GET /path HTTP/1.0\r
Host: hostname\r
\r
""";
try (LocalConnector.LocalEndPoint localEndPoint = _connector.executeRequest(request))
{
assertTrue(latch.await(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS));
String rawResponse = localEndPoint.getResponse();
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
}
}
}

View File

@ -41,7 +41,6 @@ import static org.hamcrest.Matchers.containsStringIgnoringCase;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -75,10 +74,8 @@ public class ServerTimeoutsTest extends AbstractTest
public boolean handle(Request request, Response response, Callback callback)
{
if (addIdleTimeoutListener)
{
request.addIdleTimeoutListener(t -> listenerCalled.compareAndSet(false, true));
request.addFailureListener(callback::failed);
}
request.addFailureListener(callback::failed);
// Do not complete the callback, so it idle times out.
return true;
@ -150,68 +147,5 @@ public class ServerTimeoutsTest extends AbstractTest
assertFalse(listenerCalled.get());
}
@ParameterizedTest
@MethodSource("transports")
public void testIdleTimeoutErrorListenerReturnsFalse(Transport transport) throws Exception
{
AtomicReference<Response> responseRef = new AtomicReference<>();
CompletableFuture<Callback> callbackOnTimeout = new CompletableFuture<>();
start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
responseRef.set(response);
request.addIdleTimeoutListener(t ->
{
callbackOnTimeout.complete(callback);
return false; // ignore timeout
});
return true;
}
});
org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport))
.timeout(IDLE_TIMEOUT * 5, TimeUnit.MILLISECONDS);
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
// Get the callback as promised by the error listener.
Callback callback = callbackOnTimeout.get(3 * IDLE_TIMEOUT, TimeUnit.MILLISECONDS);
assertNotNull(callback);
Content.Sink.write(responseRef.get(), true, "OK", callback);
ContentResponse response = completable.get(IDLE_TIMEOUT / 2, TimeUnit.MILLISECONDS);
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(response.getContentAsString(), is("OK"));
}
@ParameterizedTest
@MethodSource("transports")
public void testIdleTimeoutErrorListenerReturnsFalseThenTrue(Transport transport) throws Exception
{
AtomicReference<Throwable> error = new AtomicReference<>();
start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
request.addIdleTimeoutListener(t -> error.getAndSet(t) != null);
request.addFailureListener(callback::failed);
return true;
}
});
ContentResponse response = client.newRequest(newURI(transport))
.timeout(IDLE_TIMEOUT * 5, TimeUnit.MILLISECONDS)
.send();
// The first time the listener returns false, but does not complete the callback,
// so another idle timeout elapses.
// The second time the listener returns true and the implementation produces the response.
assertThat(response.getStatus(), is(HttpStatus.INTERNAL_SERVER_ERROR_500));
assertThat(response.getContentAsString(), containsStringIgnoringCase("HTTP ERROR 500 java.util.concurrent.TimeoutException: Idle timeout"));
assertThat(error.get(), instanceOf(TimeoutException.class));
}
// TODO write side tests
}