Fixes #11072 - Jetty 12: CompleteCallbackHandler (#11786)

Introduced StateTrackingHandler.

StateTrackingHandler is a troubleshooting Handler that helps to identify those cases where the Handler/Request/Response APIs are used improperly.

In particular, it tracks the events described in StateTrackingHandler.Listener, such as the Handler callback not completed, or blocking demand callback, or a write callback not completed, etc.

It also provides dump() capabilities, so the current requests and their state is dumped to help troubleshooting.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-05-19 19:32:37 +02:00 committed by GitHub
parent c97c995642
commit a4c297011b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1958 additions and 13 deletions

View File

@ -0,0 +1,31 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
[[og-module-state-tracking]]
===== Module `state-tracking`
The `state-tracking` Jetty module inserts the `StateTrackingHandler` at the beginning of the Handler chain.
`StateTrackingHandler` is a xref:og-troubleshooting[troubleshooting] `Handler` that tracks usages of `Handler`/`Request`/`Response` asynchronous APIs, and logs at warning level invalid usages of the APIs that may lead to blockages, deadlocks, or missing completion of ``Callback``s.
This module can be enabled to troubleshoot web applications that do not behave as expected, for example:
* That consume a lot of threads (possibly because they block).
* That do not send responses (or send only partial responses) to clients.
* That timeout when apparently they have received or have sent all data.
The module properties are:
----
include::{jetty-home}/modules/state-tracking.mod[tags=documentation]
----

View File

@ -20,6 +20,7 @@ include::module-console-capture.adoc[]
include::module-core-deploy.adoc[]
include::module-cross-origin.adoc[]
include::module-eeN-deploy.adoc[]
include::module-eeN-webapp.adoc[]
include::module-http.adoc[]
include::module-http2.adoc[]
include::module-http2c.adoc[]
@ -34,9 +35,9 @@ include::module-rewrite.adoc[]
include::module-server.adoc[]
include::module-ssl.adoc[]
include::module-ssl-reload.adoc[]
include::module-state-tracking.adoc[]
include::module-test-keystore.adoc[]
include::module-threadpool.adoc[]
include::module-threadpool-virtual.adoc[]
include::module-threadpool-virtual-preview.adoc[]
include::module-eeN-webapp.adoc[]
include::module-well-known.adoc[]

View File

@ -14,7 +14,7 @@
[[og-troubleshooting]]
=== Troubleshooting
To troubleshoot Jetty when used as a production server, there are two main tools: the Jetty Server Dump and enabling DEBUG level logging.
To troubleshoot Jetty when used as a standalone server, there are two main tools: the Jetty Server Dump and enabling DEBUG level logging.
Jetty is based on components organized as a tree, with the `Server` instance at the root of the tree.
@ -36,3 +36,4 @@ IMPORTANT: Make sure you read about how to secure the access to Jetty when using
include::troubleshooting-dump.adoc[]
include::troubleshooting-logging.adoc[]
include::troubleshooting-debugging.adoc[]
include::troubleshooting-handlers.adoc[]

View File

@ -14,7 +14,7 @@
[[og-troubleshooting-dump]]
==== Server Dump
The Jetty Server Dump is obtained by invoking, via JMX, the `Server.dump()` operation, as shown below.
The Jetty Server Dump is obtained by invoking, via JMX, the `Server.dump()` operation, as shown below using link:https://adoptium.net/jmc.html[Java Mission Control (JMC)]:
image::jmc-server-dump.png[]

View File

@ -0,0 +1,28 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
[[og-troubleshooting-handlers]]
==== Troubleshooting Handlers
[[og-troubleshooting-handlers-state-tracking]]
===== `StateTrackingHandler`
Jetty's `StateTrackingHandler` (described in xref:og-module-state-tracking[this module]) can be used to troubleshoot problems in web applications.
`StateTrackingHandler` tracks the usages of `Handler`/`Request`/`Response` asynchronous APIs by web applications, emitting events (logged at warning level) when an invalid usage of the APIs is detected.
In conjunction with xref:og-troubleshooting-dump[dumping the Jetty component tree], it dumps the state of current requests, detailing whether they have reads or writes that are pending, whether callbacks have been completed, along with thread stack traces (including virtual threads) of operations that have been started but not completed, or are stuck in blocking code.
You need to enable the `state-tracking` Jetty module, and configure it to track what you are interested in tracking (for more details, see the link:{javadoc-url}/org/eclipse/jetty/server/handler/StateTrackingHandler.html[javadocs]).
// TODO: add a section about DebugHandler.

Binary file not shown.

After

Width:  |  Height:  |  Size: 262 KiB

View File

@ -456,6 +456,34 @@ If the application deployed in the cross domain requires cookies or authenticati
For more `CrossOriginHandler` configuration options, refer to the link:{javadoc-url}/org/eclipse/jetty/server/handler/CrossOriginHandler.html[`CrossOriginHandler` javadocs].
[[pg-server-http-handler-use-state-tracking]]
====== StateTrackingHandler
`StateTrackingHandler` is a xref:pg-troubleshooting[troubleshooting] `Handler` that tracks whether `Handler`/`Request`/`Response` asynchronous APIs are properly used by applications.</p>
Asynchronous APIs are notoriously more difficult to troubleshoot than blocking APIs, and may be subject to restrictions that applications need to respect (a typical case is that they cannot perform blocking operations).
For example, a `Handler` implementation whose `handle(\...)` method returns `true` _must_ eventually complete the callback passed to `handle(\...)` (for more details on the `Handler` APIs, see xref:pg-server-http-handler-impl[this section]).
When an application forgets to complete the callback passed to `handle(\...)`, the HTTP response may not be sent to the client, but it will be difficult to troubleshoot why the client is not receiving responses.
`StateTrackingHandler` helps with this troubleshooting because it tracks the callback passed to `handle(\...)` and emits an event if the callback is not completed within a configurable timeout:
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java[tags=stateTrackingHandle]
----
By default, events are logged at warning level, but it is possible to specify a listener to be notified of the events tracked by `StateTrackingHandler`:
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/http/HTTPServerDocs.java[tags=stateTrackingListener]
----
Other events tracked by `StateTrackingHandler` are demand callbacks that block, writes that do not complete their callbacks, or write callbacks that block.
The complete list of events is specified by the `StateTrackingHandler.Listener` class (link:{javadoc-url}/org/eclipse/jetty/server/handler/StateTrackingHandler.Listener.html[javadocs]).
[[pg-server-http-handler-use-default]]
====== DefaultHandler

View File

@ -72,22 +72,34 @@ java -Dorg.eclipse.jetty.http2.LEVEL=DEBUG --class-path ...
=== JVM Thread Dump
TODO
[[pg-troubleshooting-state-tracking]]
=== `StateTrackingHandler`
`StateTrackingHandler` (described xref:pg-server-http-handler-use-state-tracking[here]) is a troubleshooting `Handler` that can be inserted in the `Handler` chain to track usages of `Handler`/`Request`/`Response` asynchronous APIs.
xref:pg-troubleshooting-component-dump[Dumping the Jetty component tree] will dump the `StateTrackingHandler`, which will dump the state of the current requests.
This will help detecting whether requests are not completed due to callbacks not being completed, or whether callback code is stuck while invoking blocking APIs, etc.
Thread stack traces (including virtual threads) of operations that have been started but not completed, or are stuck in blocking code are provided in the component tree dump.
[[pg-troubleshooting-component-dump]]
=== Jetty Component Tree Dump
=== Component Tree Dump
Jetty components are organized in a xref:pg-arch-bean[component tree].
At the root of the component tree there is typically a `ContainerLifeCycle` instance -- typically a `Server` instance on the server and an `HttpClient` instance on the client.
`ContainerLifeCycle` has built-in _dump_ APIs that can be invoked either directly or xref:pg-arch-jmx[via JMX].
`ContainerLifeCycle` has built-in _dump_ APIs that can be invoked either directly on the `Server` instance, or xref:pg-arch-jmx[via JMX].
// TODO: images from JMC?
// TODO: Command line JMX will be in JMX section.
You can invoke `Server.dump()` via JMX using a JMX console such as link:https://adoptium.net/jmc.html[Java Mission Control (JMC)]:
image::jmc-server-dump.png[]
TIP: You can get more details from a Jetty's `QueuedThreadPool` dump by enabling detailed dumps via `queuedThreadPool.setDetailedDump(true)`.
[[pg-troubleshooting-debugging]]
=== Debugging
=== Remote Debugging
Sometimes, in order to figure out a problem, enabling xref:pg-troubleshooting-logging[DEBUG logging] is not enough and you really need to debug the code with a debugger.

View File

@ -85,6 +85,7 @@ import org.eclipse.jetty.server.handler.EventsHandler;
import org.eclipse.jetty.server.handler.QoSHandler;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.handler.SecuredRedirectHandler;
import org.eclipse.jetty.server.handler.StateTrackingHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.unixdomain.server.UnixDomainServerConnector;
@ -1535,6 +1536,33 @@ public class HTTPServerDocs
// end::crossOriginAllowedOrigins[]
}
public void stateTrackingHandle()
{
// tag::stateTrackingHandle[]
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler();
// Emit an event if the Handler callback is not completed with 5 seconds.
stateTrackingHandler.setHandlerCallbackTimeout(5000);
// end::stateTrackingHandle[]
}
public void stateTrackingListener()
{
// tag::stateTrackingListener[]
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(new StateTrackingHandler.Listener()
{
@Override
public void onHandlerCallbackNotCompleted(Request request, StateTrackingHandler.ThreadInfo handlerThreadInfo)
{
// Your own event handling logic.
}
});
// Emit an event if the Handler callback is not completed with 5 seconds.
stateTrackingHandler.setHandlerCallbackTimeout(5000);
// end::stateTrackingListener[]
}
public void defaultHandler() throws Exception
{
// tag::defaultHandler[]

View File

@ -0,0 +1,16 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "https://www.eclipse.org/jetty/configure_10_0.dtd">
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="insertHandler">
<Arg>
<New class="org.eclipse.jetty.server.handler.StateTrackingHandler">
<Set name="handlerCallbackTimeout" property="jetty.stateTracking.handlerCallbackTimeout" />
<Set name="completeHandlerCallbackAtTimeout" property="jetty.stateTracking.completeHandlerCallbackAtTimeout" />
<Set name="demandCallbackTimeout" property="jetty.stateTracking.demandCallbackTimeout" />
<Set name="writeTimeout" property="jetty.stateTracking.writeTimeout" />
<Set name="writeCallbackTimeout" property="jetty.stateTracking.writeCallbackTimeout" />
</New>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,39 @@
[description]
Enables the StateTrackingHandler as the outermost Handler, to track
the state of various Handler/Request/Response asynchronous APIs.
[tags]
server
debug
[depends]
server
[before]
cross-origin
graceful
gzip
secure-redirect
statistics
thread-limit
[xml]
etc/jetty-state-tracking.xml
[ini-template]
# tag::documentation[]
## The timeout in ms for the completion of the handle() callback.
# jetty.stateTracking.handlerCallbackTimeout=0
## Whether the handle() callback is completed in case of timeout.
# jetty.stateTracking.completeHandlerCallbackAtTimeout=false
## The timeout in ms for the execution of the demand callback.
# jetty.stateTracking.demandCallbackTimeout=0
## The timeout in ms for the execution of a response write.
# jetty.stateTracking.writeTimeout=0
## The timeout in ms for the execution of the response write callback.
# jetty.stateTracking.writeCallbackTimeout=0
# end::documentation[]

View File

@ -19,7 +19,7 @@ etc/well-known.xml
# tag::documentation[]
## Well Known Directory (relative to $JETTY_BASE if relative path, otherwise it is an absolute path).
# jetty.wellknown.dir=.well-known
# end::documentation[]
## Allow contents of the well-known directory to be listed.
# jetty.wellknown.listDirectories=false
# end::documentation[]

View File

@ -696,7 +696,7 @@ public interface Request extends Attributes, Content.Source
/**
* <p>A wrapper for {@code Request} instances.</p>
*/
class Wrapper implements Request, Attributes
class Wrapper implements Request
{
/**
* Implementation note: {@link Request.Wrapper} does not extend from {@link Attributes.Wrapper}

View File

@ -0,0 +1,938 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EventListener;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* <p>A troubleshooting {@link Handler.Wrapper} that tracks whether
* {@link Handler}/{@link Request}/{@link Response} asynchronous APIs
* are properly used by applications.</p>
* <p>The violation of these tracked APIs are reported to a {@link Listener}
* instance; the default listener implementation emits warning logs.</p>
* <p>{@code StateTrackingHandler} can be linked in at any point in
* the {@code Handler} chain, and even be present in multiple instances,
* likely configured differently.</p>
* <p>For example, to troubleshoot wrong usages of the callback passed to method
* {@link #handle(Request, Response, Callback)}, a {@code StateTrackingHandler}
* should be configured as the outermost {@code Handler}.
* This is because the {@code handle(...)} call propagates inwards.
* In this way, {@code StateTrackingHandler} can wrap the callback passed
* to inner {@code Handler}s and verify that it is eventually completed.</p>
* <p>On the other hand, to troubleshoot custom {@code Handler} implementations
* that perform wrapping of {@link Response#write(boolean, ByteBuffer, Callback)},
* a {@code StateTrackingHandler} should be configured after the custom
* {@code Handler} implementation.
* This is because the {@code write(...)} call propagates outwards.
* In this way, {@code StateTrackingHandler} can wrap the {@code write(...)}
* call before forwarding it to outer {@code Handler}s and eventually to the
* Jetty implementation, and verify that it is eventually completed.</p>
*
* @see Listener
*/
@ManagedObject
public class StateTrackingHandler extends Handler.Wrapper
{
private static final Logger LOG = LoggerFactory.getLogger(StateTrackingHandler.class);
private final Map<Request, StateInfo> stateInfos = new ConcurrentHashMap<>();
private final Listener listener;
private long handlerCallbackTimeout;
private boolean completeHandlerCallbackAtTimeout;
private long demandCallbackTimeout;
private long writeTimeout;
private long writeCallbackTimeout;
/**
* <p>Creates a new instance with a default {@link Listener}
* that logs events at warning level.</p>
*/
public StateTrackingHandler()
{
this(new WarnListener());
}
/**
* <p>Creates a new instance with the given {@link Listener}.</p>
*
* @param listener the event listener
*/
public StateTrackingHandler(Listener listener)
{
this.listener = listener;
}
/**
* @return the timeout in ms for the completion of the {@link #handle(Request, Response, Callback)} callback
*/
@ManagedAttribute("The timeout in ms for the completion of the handle() callback")
public long getHandlerCallbackTimeout()
{
return handlerCallbackTimeout;
}
public void setHandlerCallbackTimeout(long timeout)
{
this.handlerCallbackTimeout = timeout;
}
/**
* @return whether the {@link #handle(Request, Response, Callback)} callback is completed
* in case the {@link #getHandlerCallbackTimeout()} expires
* @see #getHandlerCallbackTimeout()
*/
@ManagedAttribute("Whether the handle() callback is completed in case of timeout")
public boolean isCompleteHandlerCallbackAtTimeout()
{
return completeHandlerCallbackAtTimeout;
}
public void setCompleteHandlerCallbackAtTimeout(boolean completeHandlerCallbackAtTimeout)
{
this.completeHandlerCallbackAtTimeout = completeHandlerCallbackAtTimeout;
}
/**
* @return the timeout in ms for the execution of the demand callback passed to {@link Request#demand(Runnable)}
*/
@ManagedAttribute("The timeout in ms for the execution of the demand callback")
public long getDemandCallbackTimeout()
{
return demandCallbackTimeout;
}
public void setDemandCallbackTimeout(long timeout)
{
this.demandCallbackTimeout = timeout;
}
/**
* @return the timeout in ms for the execution of a {@link Response#write(boolean, ByteBuffer, Callback)} call
*/
@ManagedAttribute("The timeout in ms for the execution of a response write")
public long getWriteTimeout()
{
return writeTimeout;
}
public void setWriteTimeout(long timeout)
{
this.writeTimeout = timeout;
}
/**
* @return the timeout in ms for the execution of the response write callback passed to {@link Response#write(boolean, ByteBuffer, Callback)}
*/
@ManagedAttribute("The timeout in ms for the execution of the response write callback")
public long getWriteCallbackTimeout()
{
return writeCallbackTimeout;
}
public void setWriteCallbackTimeout(long timeout)
{
this.writeCallbackTimeout = timeout;
}
@Override
public boolean handle(Request originalRequest, Response originalResponse, Callback originalCallback) throws Exception
{
StateInfo stateInfo = new StateInfo(originalRequest);
stateInfos.put(originalRequest, stateInfo);
Request.addCompletionListener(originalRequest, x -> stateInfos.remove(originalRequest));
Request request = originalRequest;
if (demandCallbackTimeout > 0)
request = new RequestWrapper(stateInfo);
Response response = originalResponse;
if (writeTimeout > 0 || writeCallbackTimeout > 0)
response = new ResponseWrapper(stateInfo, originalResponse);
HandlerCallback callback = new HandlerCallback(stateInfo, originalCallback);
stateInfo.handlerCallback = callback;
try
{
boolean handled = super.handle(request, response, callback);
callback.setHandled(handled);
if (!handled)
{
// Check if the callback was completed.
ThreadInfo completionThreadInfo = callback.getCompletionThreadInfo();
if (completionThreadInfo != null)
notifyInvalidHandlerReturnValue(stateInfo.request, completionThreadInfo);
}
return handled;
}
catch (Throwable x)
{
stateInfos.remove(originalRequest);
notifyHandlerException(stateInfo.request, x, callback.getCompletionThreadInfo());
throw x;
}
}
private void notifyInvalidHandlerReturnValue(Request request, ThreadInfo completionThreadInfo)
{
try
{
listener.onInvalidHandlerReturnValue(request, completionThreadInfo);
}
catch (Throwable x)
{
LOG.info("failure while notifying {}", listener, x);
}
}
private void notifyHandlerException(Request request, Throwable failure, ThreadInfo completionThreadInfo)
{
try
{
listener.onHandlerException(request, failure, completionThreadInfo);
}
catch (Throwable x)
{
LOG.info("failure while notifying {}", listener, x);
}
}
private void notifyHandlerCallbackNotCompleted(Request request, ThreadInfo handlerThreadInfo)
{
try
{
listener.onHandlerCallbackNotCompleted(request, handlerThreadInfo);
}
catch (Throwable x)
{
LOG.info("failure while notifying {}", listener, x);
}
}
private void notifyDemandCallbackBlocked(Request request, ThreadInfo demandThreadInfo, ThreadInfo runThreadInfo)
{
try
{
listener.onDemandCallbackBlocked(request, demandThreadInfo, runThreadInfo);
}
catch (Throwable x)
{
LOG.info("failure while notifying {}", listener, x);
}
}
private void notifyWriteBlocked(Request request, ThreadInfo writeThreadInfo, ThreadInfo writingThreadInfo)
{
try
{
listener.onWriteBlocked(request, writeThreadInfo, writingThreadInfo);
}
catch (Throwable x)
{
LOG.info("failure while notifying {}", listener, x);
}
}
private void notifyWriteCallbackNotCompleted(Request request, Throwable failure, ThreadInfo writeThreadInfo)
{
try
{
listener.onWriteCallbackNotCompleted(request, failure, writeThreadInfo);
}
catch (Throwable x)
{
LOG.info("failure while notifying {}", listener, x);
}
}
private void notifyWriteCallbackBlocked(Request request, Throwable writeFailure, ThreadInfo writeThreadInfo, ThreadInfo callbackThreadInfo)
{
try
{
listener.onWriteCallbackBlocked(request, writeFailure, writeThreadInfo, callbackThreadInfo);
}
catch (Throwable x)
{
LOG.info("failure while notifying {}", listener, x);
}
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
dumpObjects(out, indent, new DumpableCollection("requests", stateInfos.values()));
}
@Override
public String toString()
{
return "%s@%x".formatted(getClass().getSimpleName(), hashCode());
}
public static class ThreadInfo
{
private final String info;
private final StackTraceElement[] stackFrames;
private ThreadInfo(Thread thread)
{
this.info = thread.toString();
this.stackFrames = thread.getStackTrace();
}
public String getInfo()
{
return info;
}
public StackTraceElement[] getStackFrames()
{
return stackFrames;
}
@Override
public String toString()
{
return toString("");
}
private String toString(String indent)
{
StringBuilder builder = new StringBuilder();
builder.append(getInfo()).append(System.lineSeparator());
for (StackTraceElement stackFrame : getStackFrames())
{
builder.append(indent).append("\tat ").append(stackFrame).append(System.lineSeparator());
}
return builder.toString();
}
}
/**
* <p>Listener of events emitted by {@link StateTrackingHandler}.</p>
* <p>The methods of this interface are named after the wrong API usages
* tracked by {@code StateTrackingHandler}.</p>
*/
public interface Listener extends EventListener
{
/**
* <p>Invoked when the {@link Handler} chain returns {@code false},
* but the handler callback has been completed.</p>
* <p>This event is always enabled.</p>
*
* @param request the current request
* @param completionThreadInfo the {@link ThreadInfo} of the thread that completed the handler callback
*/
default void onInvalidHandlerReturnValue(Request request, ThreadInfo completionThreadInfo)
{
}
/**
* <p>Invoked when the {@link Handler} chain throws an exception from
* the {@link Handler#handle(Request, Response, Callback)} method.</p>
* <p>This event is always enabled.</p>
*
* @param request the current request
* @param failure the exception thrown
* @param completionThreadInfo the {@link ThreadInfo} of the thread that completed the handler callback,
* or {@code null} if the handler callback has not been completed
*/
default void onHandlerException(Request request, Throwable failure, ThreadInfo completionThreadInfo)
{
}
/**
* <p>Invoked when the {@link Handler} callback is not completed within
* the timeout specified with {@link #getHandlerCallbackTimeout()}.</p>
* <p>This event is enabled only when {@link #getHandlerCallbackTimeout()}
* is non-{@code null}.</p>
* <p>When handler thread has already returned from the handler chain,
* the thread info parameter is {@code null}.
* Otherwise, the handler thread has not returned yet and may be blocked,
* and the thread info parameter is not {@code null}.</p>
* <p>Note: when present, the thread info stack trace may not be accurate,
* as the thread blockage might have resolved just before the thread info
* was taken.</p>
*
* @param request the current request
* @param handlerThreadInfo the handler thread info, or {@code null}
* if the handler thread already returned from the handler chain
*/
default void onHandlerCallbackNotCompleted(Request request, ThreadInfo handlerThreadInfo)
{
}
/**
* <p>Invoked when the {@link Request#demand(Runnable) request demand callback}
* {@code run()} method blocks for longer than the timeout specified with
* {@link #getDemandCallbackTimeout()}.</p>
* <p>This event is enabled only when {@link #getDemandCallbackTimeout()}
* is non-{@code null}.</p>
* <p>Note: the thread info stack trace of the thread that is running the
* demand callback may not be accurate, as the thread blockage might have
* resolved just before the thread info was taken.</p>
*
* @param request the current request
* @param demandThreadInfo the thread info of the thread that called {@link Request#demand(Runnable)}
* @param runThreadInfo the thread info of the thread running the demand callback
*/
default void onDemandCallbackBlocked(Request request, ThreadInfo demandThreadInfo, ThreadInfo runThreadInfo)
{
}
/**
* <p>Invoked when the {@link Response#write(boolean, ByteBuffer, Callback)} call
* blocks for longer than the timeout specified with {@link #getWriteTimeout()}.</p>
* <p>This event is enabled only when {@link #getWriteTimeout()} is non-{@code null}.</p>
* <p>Note: the thread info stack trace of the thread that is writing may not be
* accurate, as the thread blockage might have resolved just before the thread
* info was taken.</p>
*
* @param request the current request
* @param writeThreadInfo the thread info of the thread that called {@link Response#write(boolean, ByteBuffer, Callback)}
* @param writingThreadInfo the thread info of the thread tht is writing
*/
default void onWriteBlocked(Request request, ThreadInfo writeThreadInfo, ThreadInfo writingThreadInfo)
{
}
/**
* <p>Invoked when the write callback passed to {@link Response#write(boolean, ByteBuffer, Callback)}
* is not completed for longer than the timeout specified with {@link #getWriteTimeout()}.</p>
* <p>This event is enabled only when {@link #getWriteTimeout()} is non-{@code null}.</p>
* <p>Note that the write might have been fully performed, but since the callback is not
* completed, this case is indistinguishable from the case where the callback is not complete
* because the write has not been fully performed.</p>
*
* @param request the current request
* @param writeFailure the write failure, or {@code null} if the write succeeded
* @param writeThreadInfo the thread info of the thread that called {@link Response#write(boolean, ByteBuffer, Callback)}
*/
default void onWriteCallbackNotCompleted(Request request, Throwable writeFailure, ThreadInfo writeThreadInfo)
{
}
/**
* <p>Invoked when the write callback passed to {@link Response#write(boolean, ByteBuffer, Callback)}
* blocks for longer than the timeout specified with {@link #getWriteCallbackTimeout()}.</p>
* <p>This event is enabled only when {@link #getWriteCallbackTimeout()} is non-{@code null}.</p>
* <p>Note: the thread info stack trace of the thread that is running the write callback may not be
* accurate, as the thread blockage might have resolved just before the thread info was taken.</p>
*
* @param request the current request
* @param writeFailure the write failure, or {@code null} if the write succeeded
* @param writeThreadInfo the thread info of the thread that called {@link Response#write(boolean, ByteBuffer, Callback)}
* @param callbackThreadInfo the thread info of the thread invoking the write callback
*/
default void onWriteCallbackBlocked(Request request, Throwable writeFailure, ThreadInfo writeThreadInfo, ThreadInfo callbackThreadInfo)
{
}
}
private static class WarnListener implements Listener
{
@Override
public void onInvalidHandlerReturnValue(Request request, ThreadInfo completionThreadInfo)
{
LOG.warn("handler callback completed but false returned for: {}{}completed by: {}",
request,
System.lineSeparator(),
completionThreadInfo
);
}
@Override
public void onHandlerException(Request request, Throwable failure, ThreadInfo completionThreadInfo)
{
String format = "handler exception thrown for {}";
List<Object> args = new ArrayList<>();
args.add(request);
if (completionThreadInfo != null)
{
format += "{}completed by: {}";
args.add(System.lineSeparator());
args.add(completionThreadInfo);
}
args.add(failure);
LOG.warn(format, args.toArray());
}
@Override
public void onHandlerCallbackNotCompleted(Request request, ThreadInfo handlerThreadInfo)
{
LOG.warn("handler callback not completed for: {}{}handled by: {}",
request,
System.lineSeparator(),
handlerThreadInfo
);
}
@Override
public void onDemandCallbackBlocked(Request request, ThreadInfo demandThreadInfo, ThreadInfo runThreadInfo)
{
LOG.warn("demand callback blocked for: {}{}demanded by: {}{}possibly blocked: {}",
request,
System.lineSeparator(),
demandThreadInfo,
System.lineSeparator(),
runThreadInfo
);
}
@Override
public void onWriteBlocked(Request request, ThreadInfo writeThreadInfo, ThreadInfo writingThreadInfo)
{
LOG.warn("write blocked for: {}{}write by: {}{}possibly blocked: {}",
request,
System.lineSeparator(),
writeThreadInfo,
System.lineSeparator(),
writingThreadInfo
);
}
@Override
public void onWriteCallbackNotCompleted(Request request, Throwable writeFailure, ThreadInfo writeThreadInfo)
{
LOG.warn("write callback not completed for: {}{}write {} by: {}",
request,
System.lineSeparator(),
writeFailure == null ? "succeeded" : "failed with " + writeFailure,
writeThreadInfo
);
}
@Override
public void onWriteCallbackBlocked(Request request, Throwable writeFailure, ThreadInfo writeThreadInfo, ThreadInfo callbackThreadInfo)
{
LOG.warn("write callback blocked for: {}{}write {} by: {}{}possibly blocked: {}",
request,
System.lineSeparator(),
writeFailure == null ? "succeeded" : "failed with " + writeFailure,
writeThreadInfo,
System.lineSeparator(),
callbackThreadInfo
);
}
}
private class StateInfo implements Dumpable
{
private final Queue<RequestWrapper.DemandCallback> demandCallbacks = new ConcurrentLinkedQueue<>();
private final Queue<ResponseWrapper.WriteCallback> writeCallbacks = new ConcurrentLinkedQueue<>();
private final Request request;
private volatile HandlerCallback handlerCallback;
private StateInfo(Request request)
{
this.request = request;
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable demandDumpable = demandCallbackTimeout > 0
? new DumpableCollection("demands", demandCallbacks)
: (o, i) -> o.append("demands not tracked\n");
Dumpable writeDumpable = writeTimeout > 0 || writeCallbackTimeout > 0
? new DumpableCollection("writes", writeCallbacks)
: (o, i) -> o.append("writes not tracked\n");
Dumpable.dumpObjects(out, indent, request.toString(), handlerCallback, demandDumpable, writeDumpable);
}
}
private class HandlerCallback extends Callback.Nested implements Runnable, Dumpable
{
private final AtomicBoolean completed = new AtomicBoolean();
private final Request request;
private final Scheduler.Task task;
private final Thread handleThread;
private volatile Boolean handled;
private volatile ThreadInfo completionThreadInfo;
private volatile String completion;
private HandlerCallback(StateInfo stateInfo, Callback callback)
{
super(callback);
this.request = stateInfo.request;
long timeout = getHandlerCallbackTimeout();
this.task = timeout > 0 ? request.getComponents().getScheduler().schedule(this, timeout, MILLISECONDS) : () -> true;
this.handleThread = Thread.currentThread();
}
private void setHandled(boolean handled)
{
this.handled = handled;
}
@Override
public void succeeded()
{
if (completed(null))
super.succeeded();
}
@Override
public void failed(Throwable x)
{
if (completed(x))
super.failed(x);
}
private boolean completed(Throwable failure)
{
if (!completed.compareAndSet(false, true))
return false;
boolean cancelled = task.cancel();
if (LOG.isDebugEnabled())
LOG.debug("handler callback timeout cancelled={} for {}", cancelled, request);
completion = failure == null ? "succeeded" : "failed with " + failure;
ThreadInfo threadInfo = completionThreadInfo = new ThreadInfo(Thread.currentThread());
boolean notify = handled == Boolean.FALSE;
if (notify)
notifyInvalidHandlerReturnValue(request, threadInfo);
return true;
}
private ThreadInfo getCompletionThreadInfo()
{
return completionThreadInfo;
}
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("handler callback not completed within {} for {}", getHandlerCallbackTimeout(), request);
Boolean handled = this.handled;
ThreadInfo handlerThreadInfo = null;
// The Handler chain has not returned yet, likely the thread is blocked.
if (handled == null)
{
// The thread info of the thread that blocks.
// Only create it if the handler thread is blocked,
// otherwise we will have a stack trace possibly
// belonging to a different request.
handlerThreadInfo = new ThreadInfo(handleThread);
}
notifyHandlerCallbackNotCompleted(request, handlerThreadInfo);
if (isCompleteHandlerCallbackAtTimeout())
super.failed(new TimeoutException());
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Boolean handled = this.handled;
ThreadInfo handleThreadInfo = null;
if (handled == null)
handleThreadInfo = new ThreadInfo(handleThread);
String completion = this.completion;
ThreadInfo completionThreadInfo = this.completionThreadInfo;
out.append("handle() result: %s\n".formatted(handled == null ? "pending" : handled));
if (handleThreadInfo != null)
out.append(indent).append(handleThreadInfo.toString(indent));
out.append(indent).append("handler callback: %s [%s]\n".formatted(Objects.toString(completion, "not completed"), getCallback()));
if (completionThreadInfo != null)
out.append(indent).append(completionThreadInfo.toString(indent));
}
}
private class RequestWrapper extends Request.Wrapper
{
private final StateInfo stateInfo;
private RequestWrapper(StateInfo stateInfo)
{
super(stateInfo.request);
this.stateInfo = stateInfo;
}
@Override
public void demand(Runnable reader)
{
DemandCallback demandCallback = new DemandCallback(reader);
stateInfo.demandCallbacks.offer(demandCallback);
super.demand(demandCallback);
}
private class DemandCallback implements Invocable.Task, Dumpable
{
private final Runnable callback;
private final ThreadInfo demandThreadInfo;
// Tri-state: null -> no demand, thread -> running demand, this -> done running demand.
private volatile Object demandRunner;
private DemandCallback(Runnable callback)
{
this.callback = callback;
this.demandThreadInfo = new ThreadInfo(Thread.currentThread());
}
@Override
public void run()
{
demandRunner = Thread.currentThread();
Scheduler.Task task = getComponents().getScheduler().schedule(this::expired, getDemandCallbackTimeout(), MILLISECONDS);
try
{
callback.run();
}
finally
{
demandRunner = this;
stateInfo.demandCallbacks.remove(this);
task.cancel();
}
}
private void expired()
{
Object demandRunner = this.demandRunner;
// If expired() lost the race with run(), return.
if (demandRunner == this)
return;
// Avoid clash with Request.LOG.
Logger log = StateTrackingHandler.LOG;
if (log.isDebugEnabled())
log.debug("demand callback blocked more than {} for {}", getDemandCallbackTimeout(), getWrapped());
// The thread info of the thread that blocks.
ThreadInfo runThreadInfo = new ThreadInfo((Thread)demandRunner);
notifyDemandCallbackBlocked(getWrapped(), demandThreadInfo, runThreadInfo);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Object demandRunner = this.demandRunner;
if (demandRunner instanceof Thread runThread)
{
out.append("demand: running [%s]\n".formatted(callback));
out.append(indent).append(new ThreadInfo(runThread).toString(indent));
}
else
{
out.append("demand: %s [%s]\n".formatted(demandRunner == null ? "pending" : "none", callback));
}
}
@Override
public InvocationType getInvocationType()
{
return Invocable.getInvocationType(callback);
}
}
}
private class ResponseWrapper extends Response.Wrapper
{
private final StateInfo stateInfo;
private ResponseWrapper(StateInfo stateInfo, Response wrapped)
{
super(stateInfo.request, wrapped);
this.stateInfo = stateInfo;
}
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
WriteCallback writeCallback = new WriteCallback(callback);
stateInfo.writeCallbacks.offer(writeCallback);
try
{
super.write(last, byteBuffer, writeCallback);
writeCallback.writeComplete(null);
}
catch (Throwable x)
{
writeCallback.writeComplete(x);
throw x;
}
}
private class WriteCallback extends Callback.Nested implements Dumpable
{
private final AtomicBoolean callbackCompleted = new AtomicBoolean();
private final Thread writeThread;
private final ThreadInfo writeThreadInfo;
private final Scheduler.Task writeTask;
// Tri-state: null -> write pending, this -> write succeeded, Throwable -> write failed.
private volatile Object writeCompleted;
// Tri-state: null -> callback pending, thread -> callback running, this -> callback completed.
private volatile Object callbackRunner;
private WriteCallback(Callback callback)
{
super(callback);
this.writeThread = Thread.currentThread();
this.writeThreadInfo = new ThreadInfo(writeThread);
long writeTimeout = getWriteTimeout();
this.writeTask = writeTimeout > 0 ? stateInfo.request.getComponents().getScheduler().schedule(this::writeExpired, writeTimeout, MILLISECONDS) : null;
}
private void writeComplete(Throwable failure)
{
writeCompleted = failure == null ? this : failure;
}
private void writeExpired()
{
Object writeCompleted = this.writeCompleted;
Request request = stateInfo.request;
if (LOG.isDebugEnabled())
LOG.debug("write not completed within {} for {}", getWriteTimeout(), request);
if (writeCompleted == null)
notifyWriteBlocked(request, writeThreadInfo, new ThreadInfo(writeThread));
else
notifyWriteCallbackNotCompleted(request, writeCompleted == this ? null : (Throwable)writeCompleted, writeThreadInfo);
}
@Override
public void succeeded()
{
if (!callbackCompleted.compareAndSet(false, true))
return;
if (writeTask != null)
writeTask.cancel();
callbackRunner = Thread.currentThread();
long timeout = getWriteCallbackTimeout();
Scheduler.Task task = timeout > 0 ? stateInfo.request.getComponents().getScheduler().schedule(() -> callbackExpired(null), timeout, MILLISECONDS) : null;
try
{
super.succeeded();
}
finally
{
callbackRunner = this;
stateInfo.writeCallbacks.remove(this);
if (task != null)
task.cancel();
}
}
@Override
public void failed(Throwable x)
{
if (!callbackCompleted.compareAndSet(false, true))
return;
if (writeTask != null)
writeTask.cancel();
callbackRunner = Thread.currentThread();
long timeout = getWriteCallbackTimeout();
Scheduler.Task task = timeout > 0 ? stateInfo.request.getComponents().getScheduler().schedule(() -> callbackExpired(x), timeout, MILLISECONDS) : null;
try
{
super.failed(x);
}
finally
{
callbackRunner = this;
stateInfo.writeCallbacks.remove(this);
if (task != null)
task.cancel();
}
}
private void callbackExpired(Throwable failure)
{
Object callbackRunner = this.callbackRunner;
// Return if callbackExpired() lost the race with succeeded()/failed().
if (callbackRunner == this)
return;
if (LOG.isDebugEnabled())
LOG.debug("write callback not completed within {} for {}", getWriteCallbackTimeout(), getRequest());
// The thread info of the thread that blocks.
ThreadInfo callbackThreadInfo = new ThreadInfo((Thread)callbackRunner);
notifyWriteCallbackBlocked(getRequest(), failure, writeThreadInfo, callbackThreadInfo);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Object writeCompleted = this.writeCompleted;
if (writeCompleted == null)
{
out.append("write: pending\n");
out.append(indent).append(new ThreadInfo(writeThread).toString(indent));
}
else
{
out.append("write: %s\n".formatted(writeCompleted == this ? "succeeded" : "failed with " + writeCompleted));
}
Object callbackRunner = this.callbackRunner;
if (callbackRunner instanceof Thread callbackThread)
{
out.append(indent).append("write callback: running [%s]\n".formatted(getCallback()));
out.append(indent).append(new ThreadInfo(callbackThread).toString(indent));
}
else
{
out.append(indent).append("write callback: %s [%s]\n".formatted(callbackRunner == null ? "pending" : "completed", getCallback()));
}
}
}
}
}

View File

@ -1073,6 +1073,22 @@ public class HttpChannelState implements HttpChannel, Components
return null;
}
@Override
public int hashCode()
{
// Override the implementation from the base class,
// and align with the implementation of Request.Wrapper.
return System.identityHashCode(this);
}
@Override
public boolean equals(Object obj)
{
// Override the implementation from the base class,
// and align with the implementation of Request.Wrapper.
return this == obj;
}
@Override
public String toString()
{
@ -1460,6 +1476,9 @@ public class HttpChannelState implements HttpChannel, Components
try (AutoLock ignored = _request._lock.lock())
{
if (lockedCompleteCallback())
return;
request = _request;
httpChannelState = _request._httpChannelState;
response = httpChannelState._response;
@ -1471,9 +1490,6 @@ public class HttpChannelState implements HttpChannel, Components
if (response.lockedIsWriting())
failure = ExceptionUtil.combine(failure, new IllegalStateException("write pending"));
if (lockedCompleteCallback())
return;
assert httpChannelState._callbackFailure == null;
needLastStreamSend = httpChannelState.lockedLastStreamSend();

View File

@ -0,0 +1,742 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class StateTrackingHandlerTest
{
private Server server;
private LocalConnector connector;
public void start(Handler handler) throws Exception
{
server = new Server();
connector = new LocalConnector(server);
server.addConnector(connector);
server.setHandler(handler);
server.start();
}
@AfterEach
public void destroy()
{
LifeCycle.stop(server);
}
@Test
public void testHandlerCallbackSucceededThenHandlerReturnTrue() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
Request.addCompletionListener(request, x -> latch.countDown());
callback.succeeded();
return true;
}
});
start(stateTrackingHandler);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
"""));
assertEquals(HttpStatus.OK_200, response.getStatus());
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertThat(listener.events(), empty());
}
@Test
public void testHandlerReturnTrueThenHandlerCallbackSucceeded() throws Exception
{
long delay = 500;
CountDownLatch latch = new CountDownLatch(1);
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
Request.addCompletionListener(request, x -> latch.countDown());
request.getComponents().getScheduler().schedule(callback::succeeded, delay, TimeUnit.MILLISECONDS);
return true;
}
});
start(stateTrackingHandler);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
"""));
assertEquals(HttpStatus.OK_200, response.getStatus());
assertTrue(latch.await(2 * delay, TimeUnit.MILLISECONDS));
assertThat(listener.events(), empty());
}
@Test
public void testHandlerCallbackSucceededThenHandlerReturnFalse() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
Request.addCompletionListener(request, x -> latch.countDown());
callback.succeeded();
return false;
}
});
start(stateTrackingHandler);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
"""));
assertEquals(HttpStatus.OK_200, response.getStatus());
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertThat(listener.events(), contains("invalid"));
}
@Test
public void testHandlerReturnsFalseThenHandlerCallbackSucceeded() throws Exception
{
long delay = 500;
String threadName = "cch-test";
CountDownLatch latch = new CountDownLatch(2);
AtomicReference<StateTrackingHandler.ThreadInfo> threadInfoRef = new AtomicReference<>();
EventsListener listener = new EventsListener()
{
@Override
public void onInvalidHandlerReturnValue(Request request, StateTrackingHandler.ThreadInfo completionThreadInfo)
{
super.onInvalidHandlerReturnValue(request, completionThreadInfo);
threadInfoRef.set(completionThreadInfo);
}
};
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
Request.addCompletionListener(request, x -> latch.countDown());
request.getComponents().getScheduler().schedule(() -> new Thread(() ->
{
callback.succeeded();
latch.countDown();
}, threadName).start(), delay, TimeUnit.MILLISECONDS);
return false;
}
});
start(stateTrackingHandler);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
"""));
assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus());
assertTrue(latch.await(2 * delay, TimeUnit.MILLISECONDS));
assertThat(listener.events(), contains("invalid"));
assertThat(threadInfoRef.get().getInfo(), containsString(threadName));
}
@Test
public void testHandlerReturnsTrueHandlerCallbackNotCompleted() throws Exception
{
long timeout = 1000;
AtomicReference<StateTrackingHandler.ThreadInfo> threadInfoRef = new AtomicReference<>();
EventsListener listener = new EventsListener()
{
@Override
public void onHandlerCallbackNotCompleted(Request request, StateTrackingHandler.ThreadInfo handlerThreadInfo)
{
super.onHandlerCallbackNotCompleted(request, handlerThreadInfo);
threadInfoRef.set(handlerThreadInfo);
}
};
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setHandlerCallbackTimeout(timeout);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// Do not complete the callback.
return true;
}
});
start(stateTrackingHandler);
String response = connector.getResponse("""
GET / HTTP/1.1
Host: localhost
""", 2 * timeout, TimeUnit.MILLISECONDS);
// There should be no response, as the callback was not completed.
assertNull(response);
assertThat(listener.events(), contains("handler"));
assertNull(threadInfoRef.get());
}
@Test
public void testHandlerReturnsTrueHandlerCallbackNotCompletedThenHandlerCallbackIsForcefullyFailed() throws Exception
{
long timeout = 1000;
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setHandlerCallbackTimeout(timeout);
stateTrackingHandler.setCompleteHandlerCallbackAtTimeout(true);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// Do not complete the callback.
return true;
}
});
start(stateTrackingHandler);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
""", 2 * timeout, TimeUnit.MILLISECONDS));
// There should be an error response.
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus());
assertThat(listener.events(), contains("handler"));
}
@Test
public void testHandlerBlocksHandlerCallbackNotCompleted() throws Exception
{
long timeout = 1000;
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> threadNameRef = new AtomicReference<>();
AtomicReference<StateTrackingHandler.ThreadInfo> threadInfoRef = new AtomicReference<>();
EventsListener listener = new EventsListener()
{
@Override
public void onHandlerCallbackNotCompleted(Request request, StateTrackingHandler.ThreadInfo handlerThreadInfo)
{
super.onHandlerCallbackNotCompleted(request, handlerThreadInfo);
threadInfoRef.set(handlerThreadInfo);
}
};
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setHandlerCallbackTimeout(timeout);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
threadNameRef.set(Thread.currentThread().getName());
// Block.
latch.await();
return true;
}
});
start(stateTrackingHandler);
String response = connector.getResponse("""
GET / HTTP/1.1
Host: localhost
""", 2 * timeout, TimeUnit.MILLISECONDS);
// There should be no response, as the callback was not completed.
assertNull(response);
latch.countDown();
assertThat(listener.events(), contains("handler"));
assertThat(threadInfoRef.get().getInfo(), containsString(threadNameRef.get()));
}
@Test
public void testDemandCallbackBlocks() throws Exception
{
long timeout = 1000;
CountDownLatch latch = new CountDownLatch(1);
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setDemandCallbackTimeout(timeout);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
request.demand(new Runnable()
{
@Override
public void run()
{
Content.Chunk chunk = request.read();
if (chunk != null)
{
chunk.release();
if (chunk.isLast())
{
try
{
// Block.
latch.await();
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
return;
}
}
request.demand(this);
}
});
return true;
}
});
start(stateTrackingHandler);
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
POST / HTTP/1.1
Host: localhost
Content-Length: 1
A"""))
{
await().atMost(2 * timeout, TimeUnit.MILLISECONDS).until(listener::events, contains("demand-blocked"));
// Let the server send the response.
latch.countDown();
HttpTester.Response response = HttpTester.parseResponse(endPoint.getResponse(false, timeout, TimeUnit.MILLISECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
@Test
public void testWriteBlocks() throws Exception
{
// A Handler that blocks in write().
CountDownLatch writeLatch = new CountDownLatch(1);
Handler.Wrapper wrapper = new Handler.Wrapper()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
Response.Wrapper wrapper = new Response.Wrapper(request, response)
{
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
try
{
// Block.
writeLatch.await();
super.write(last, byteBuffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
}
}
};
return super.handle(request, wrapper, callback);
}
};
long timeout = 1000;
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
wrapper.setHandler(stateTrackingHandler);
stateTrackingHandler.setWriteTimeout(timeout);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.write(true, null, callback);
return true;
}
});
start(wrapper);
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
GET / HTTP/1.1
Host: localhost
"""))
{
await().atMost(2 * timeout, TimeUnit.MILLISECONDS).until(listener::events, contains("write-blocked"));
writeLatch.countDown();
HttpTester.Response response = HttpTester.parseResponse(endPoint.getResponse(false, timeout, TimeUnit.MILLISECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
@Test
public void testWriteCallbackNotCompleted() throws Exception
{
// Simulates a Handler with a bug: it does not complete write callbacks.
Handler.Wrapper wrapper = new Handler.Wrapper()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
Response wrapped = new Response.Wrapper(request, response)
{
@Override
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
// The callback parameter is the write callback from
// StateTrackingHandler that will not be completed.
super.write(last, byteBuffer, Callback.NOOP);
}
};
return super.handle(request, wrapped, callback);
}
};
long timeout = 1000;
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setWriteTimeout(timeout);
stateTrackingHandler.setHandlerCallbackTimeout(2 * timeout);
wrapper.setHandler(stateTrackingHandler);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.write(true, null, callback);
return true;
}
});
start(wrapper);
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
GET / HTTP/1.1
Host: localhost
"""))
{
await().atMost(3 * timeout, TimeUnit.MILLISECONDS).until(listener::events, contains("write-callback", "handler"));
HttpTester.Response response = HttpTester.parseResponse(endPoint.getResponse(false, timeout, TimeUnit.MILLISECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
@Test
public void testWriteCallbackBlocks() throws Exception
{
long timeout = 1000;
CountDownLatch latch = new CountDownLatch(1);
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setWriteCallbackTimeout(timeout);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.write(false, null, Callback.from(() ->
{
try
{
// Block.
latch.await();
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}, callback::failed));
return true;
}
});
start(stateTrackingHandler);
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
GET / HTTP/1.1
Host: localhost
"""))
{
await().atMost(2 * timeout, TimeUnit.MILLISECONDS).until(listener::events, contains("write-callback-blocked"));
// Let the server send the response.
latch.countDown();
HttpTester.Response response = HttpTester.parseResponse(endPoint.getResponse(false, timeout, TimeUnit.MILLISECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
@Test
public void testDemandCallbackCallsRequestDemand() throws Exception
{
long timeout = 1000;
CountDownLatch demandLatch = new CountDownLatch(1);
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setDemandCallbackTimeout(timeout);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
request.demand(new Runnable()
{
@Override
public void run()
{
try
{
while (true)
{
Content.Chunk chunk = request.read();
if (chunk == null)
{
request.demand(this);
// Bad behavior: must not block a demand
// callback because they are serialized.
demandLatch.await();
return;
}
chunk.release();
if (chunk.isLast())
{
callback.succeeded();
return;
}
}
}
catch (Throwable x)
{
callback.failed(x);
}
}
});
return true;
}
});
start(stateTrackingHandler);
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
POST / HTTP/1.1
Host: localhost
Content-Length: 2
"""))
{
// Wait to return from handle(), then send the first chunk of content.
Thread.sleep(500);
endPoint.addInputAndExecute("A");
// Wait to detect the blocked demand callback, then add the last chunk of content.
await().atMost(2 * timeout, TimeUnit.MILLISECONDS).until(listener::events, contains("demand-blocked"));
assertThat(stateTrackingHandler.dump(), containsString("demands size=2"));
demandLatch.countDown();
endPoint.addInputAndExecute("B");
HttpTester.Response response = HttpTester.parseResponse(endPoint.getResponse(false, 5, TimeUnit.SECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
@Test
public void testWriteCallbackCallsResponseWrite() throws Exception
{
long timeout = 1000;
CountDownLatch writeLatch = new CountDownLatch(1);
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setWriteCallbackTimeout(timeout);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// This write should call the callback synchronously.
Content.Sink.write(response, false, "A", Callback.from(() ->
{
try
{
Content.Sink.write(response, true, "B", callback);
// Bad behavior: must not block a write
// callback because they are serialized.
writeLatch.await();
}
catch (Throwable x)
{
callback.failed(x);
}
}, callback::failed));
return true;
}
});
start(stateTrackingHandler);
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
GET / HTTP/1.1
Host: localhost
"""))
{
// Wait to detect the blocked demand callback, then add the last chunk of content.
await().atMost(2 * timeout, TimeUnit.MILLISECONDS).until(listener::events, contains("write-callback-blocked"));
assertThat(stateTrackingHandler.dump(), containsString("writes size=2"));
writeLatch.countDown();
endPoint.addInputAndExecute("B");
HttpTester.Response response = HttpTester.parseResponse(endPoint.getResponse(false, 5, TimeUnit.SECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals("AB", response.getContent());
}
}
@Test
public void testHandlerThrows() throws Exception
{
EventsListener listener = new EventsListener();
StateTrackingHandler stateTrackingHandler = new StateTrackingHandler(listener);
stateTrackingHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
throw new QuietException.RuntimeException();
}
});
start(stateTrackingHandler);
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse("""
GET / HTTP/1.1
Host: localhost
""", 5, TimeUnit.SECONDS));
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus());
assertThat(listener.events(), contains("exception"));
}
private static class EventsListener implements StateTrackingHandler.Listener
{
private final List<String> events = new CopyOnWriteArrayList<>();
private List<String> events()
{
return events;
}
@Override
public void onInvalidHandlerReturnValue(Request request, StateTrackingHandler.ThreadInfo completionThreadInfo)
{
events.add("invalid");
}
@Override
public void onHandlerException(Request request, Throwable failure, StateTrackingHandler.ThreadInfo completionThreadInfo)
{
events.add("exception");
}
@Override
public void onHandlerCallbackNotCompleted(Request request, StateTrackingHandler.ThreadInfo handlerThreadInfo)
{
events.add("handler");
}
@Override
public void onDemandCallbackBlocked(Request request, StateTrackingHandler.ThreadInfo demandThreadInfo, StateTrackingHandler.ThreadInfo runThreadInfo)
{
events.add("demand-blocked");
}
@Override
public void onWriteBlocked(Request request, StateTrackingHandler.ThreadInfo writeThreadInfo, StateTrackingHandler.ThreadInfo writingThreadInfo)
{
events.add("write-blocked");
}
@Override
public void onWriteCallbackNotCompleted(Request request, Throwable writeFailure, StateTrackingHandler.ThreadInfo writeThreadInfo)
{
events.add("write-callback");
}
@Override
public void onWriteCallbackBlocked(Request request, Throwable writeFailure, StateTrackingHandler.ThreadInfo writeThreadInfo, StateTrackingHandler.ThreadInfo callbackThreadInfo)
{
events.add("write-callback-blocked");
}
}
}

View File

@ -13,16 +13,52 @@
package org.eclipse.jetty.util.thread;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.LifeCycle;
/**
* <p>Schedules tasks to be executed after a delay.</p>
*/
public interface Scheduler extends LifeCycle
{
/**
* <p>A delayed task that can be cancelled.</p>
*/
interface Task
{
/**
* <p>Attempts to cancel the execution of this task.</p>
* <p>If the task is already cancelled, or already executed,
* this method has no effect and returns {@code false}.</p>
* <p>Otherwise, the execution of this task is cancelled
* and this method returns {@code true}.</p>
*
* @return whether the task was cancelled
*/
boolean cancel();
}
/**
* <p>Schedules a task to be executed after the given delay.</p>
*
* @param task the task to execute
* @param delay the delay value
* @param units the unit of time of the delay
* @return a delayed task
*/
Task schedule(Runnable task, long delay, TimeUnit units);
/**
* <p>Schedules a task to be executed after the given delay.</p>
*
* @param task the task to execute
* @param delay the delay duration
* @return a delayed task
*/
default Task schedule(Runnable task, Duration delay)
{
return schedule(task, delay.toNanos(), TimeUnit.NANOSECONDS);
}
}

View File

@ -1863,4 +1863,33 @@ public class DistributionTests extends AbstractJettyHomeTest
}
}
}
@Test
public void testStateTrackingModule() throws Exception
{
String jettyVersion = System.getProperty("jettyVersion");
JettyHomeTester distribution = JettyHomeTester.Builder.newInstance()
.jettyVersion(jettyVersion)
.build();
try (JettyHomeTester.Run run1 = distribution.start("--add-modules=state-tracking,http,demo-handler"))
{
run1.awaitFor(START_TIMEOUT, TimeUnit.SECONDS);
assertThat(run1.getExitValue(), is(0));
int httpPort = Tester.freePort();
try (JettyHomeTester.Run run2 = distribution.start("jetty.http.port=" + httpPort))
{
assertThat(run2.awaitConsoleLogsFor("Started oejs.Server", START_TIMEOUT, TimeUnit.SECONDS), is(true));
startHttpClient();
ContentResponse response = client.newRequest("http://localhost:" + httpPort + "/demo-handler/")
.timeout(15, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
assertThat(response.getContentAsString(), containsString("Hello World"));
}
}
}
}