From 1726c8778c0c1a2040d5aafeb265c59ad927e599 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 30 Aug 2024 13:01:43 +0300 Subject: [PATCH] Fixes HttpClient Content.Source reads from arbitrary threads (#12203) * Reworked HttpReceiverOverHTTP state machine, in particular: ** Introduced a boolean parameter to parseAndFill() and parse(), that specifies whether to notify the application demand callback. This is necessary because reads may happen from any threads, and must not notify the application demand callback. Only when there is no data, and fill interest is set, then the application demand callback must be notified. ** Removed action field to avoid lambda allocation. ** Now the application is called directly from the parse() method. ** Reading -1 from the network drives the parser by calling again parse(), rather than the parser directly. This allows to have a central place to notify the response success event. * Fixed FastCGI similarly to HTTP/1.1. * Removed leftover of the multiplex implementation. * Fixed test flakyness in `NetworkTrafficListenerTest`: consume the request content before sending the response. * Follow up after #10880: only abort the request if there is request content in `AuthenticationProtocolHandler` and `RedirectProtocolHandler`. This avoids the rare case where the response arrives before the request thread has modified the request state, even if the request has been fully sent over the network, causing the request to be failed even if it should not. * added `SerializedInvoker` assertions about current thread invoking. * Name all SerializedInvoker instances for better troubleshooting. Signed-off-by: Simone Bordet Signed-off-by: Ludovic Orban Co-authored-by: Ludovic Orban --- .../client/AuthenticationProtocolHandler.java | 3 +- .../jetty/client/RedirectProtocolHandler.java | 3 +- .../jetty/client/transport/HttpReceiver.java | 26 +-- .../internal/HttpReceiverOverHTTP.java | 197 +++++++++--------- .../client/NetworkTrafficListenerTest.java | 5 +- .../internal/HttpChannelOverFCGI.java | 16 +- .../internal/HttpConnectionOverFCGI.java | 66 +++--- .../internal/HttpReceiverOverFCGI.java | 24 ++- .../fcgi/parser/ResponseContentParser.java | 64 +++--- .../org/eclipse/jetty/http/MultiPart.java | 2 +- .../jetty/io/content/AsyncContent.java | 2 +- .../io/content/ByteBufferContentSource.java | 2 +- .../jetty/io/content/ChunksContentSource.java | 2 +- .../io/content/ContentSourceTransformer.java | 2 +- .../io/content/InputStreamContentSource.java | 2 +- .../jetty/io/content/PathContentSource.java | 2 +- .../io/internal/ByteChannelContentSource.java | 2 +- .../server/internal/HttpChannelState.java | 9 +- .../transport/HttpClientDemandTest.java | 21 +- .../jetty/util/thread/SerializedExecutor.java | 2 +- .../jetty/util/thread/SerializedInvoker.java | 129 ++++++++++-- .../util/thread/SerializedInvokerTest.java | 54 +++-- 22 files changed, 390 insertions(+), 245 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 894461c8fda..1d018b89a21 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -127,7 +127,8 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler { // The request may still be sending content, stop it. Request request = response.getRequest(); - request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); + if (request.getBody() != null) + request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); } @Override diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java index 161a29948a6..56692b7a200 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java @@ -61,7 +61,8 @@ public class RedirectProtocolHandler implements ProtocolHandler, Response.Listen { // The request may still be sending content, stop it. Request request = response.getRequest(); - request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); + if (request.getBody() != null) + request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); } @Override diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index 93859d695d7..1da157b97f2 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -67,7 +67,7 @@ public abstract class HttpReceiver { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class); private final HttpChannel channel; private ResponseState responseState = ResponseState.IDLE; private NotifiableContentSource contentSource; @@ -332,21 +332,10 @@ public abstract class HttpReceiver if (exchange.isResponseCompleteOrTerminated()) return; - responseContentAvailable(); + contentSource.onDataAvailable(); }); } - /** - * Method to be invoked when response content is available to be read. - *

- * This method directly invokes the demand callback, assuming the caller - * is already serialized with other events. - */ - protected void responseContentAvailable() - { - contentSource.onDataAvailable(); - } - /** * Method to be invoked when the response is successful. *

@@ -720,6 +709,9 @@ public abstract class HttpReceiver current = HttpReceiver.this.read(false); + if (LOG.isDebugEnabled()) + LOG.debug("Read {} from {}", current, this); + try (AutoLock ignored = lock.lock()) { if (currentChunk != null) @@ -739,6 +731,7 @@ public abstract class HttpReceiver { if (LOG.isDebugEnabled()) LOG.debug("onDataAvailable on {}", this); + invoker.assertCurrentThreadInvoking(); // The onDataAvailable() method is only ever called // by the invoker so avoid using the invoker again. invokeDemandCallback(false); @@ -763,6 +756,8 @@ public abstract class HttpReceiver if (LOG.isDebugEnabled()) LOG.debug("Processing demand on {}", this); + invoker.assertCurrentThreadInvoking(); + Content.Chunk current; try (AutoLock ignored = lock.lock()) { @@ -802,9 +797,14 @@ public abstract class HttpReceiver try { if (invoke) + { invoker.run(demandCallback); + } else + { + invoker.assertCurrentThreadInvoking(); demandCallback.run(); + } } catch (Throwable x) { diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java index de6ff4973a3..f74dbc6c149 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java @@ -43,17 +43,18 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP.class); + private final Runnable receiveNext = this::receiveNext; private final LongAdder inMessages = new LongAdder(); private final HttpParser parser; private final ByteBufferPool byteBufferPool; private RetainableByteBuffer networkBuffer; - private boolean shutdown; - private boolean complete; + private State state = State.STATUS; private boolean unsolicited; - private String method; private int status; + private String method; private Content.Chunk chunk; - private Runnable action; + private boolean shutdown; + private boolean disposed; public HttpReceiverOverHTTP(HttpChannelOverHTTP channel) { @@ -73,7 +74,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res { if (!hasContent()) { - boolean setFillInterest = parseAndFill(); + boolean setFillInterest = parseAndFill(true); if (!hasContent() && setFillInterest) fillInterested(); } @@ -97,10 +98,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res super.reset(); parser.reset(); if (chunk != null) - { chunk.release(); - chunk = null; - } + chunk = null; } @Override @@ -109,10 +108,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res super.dispose(); parser.close(); if (chunk != null) - { chunk.release(); - chunk = null; - } + chunk = null; + disposed = true; } @Override @@ -124,7 +122,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res Content.Chunk chunk = consumeChunk(); if (chunk != null) return chunk; - boolean needFillInterest = parseAndFill(); + boolean needFillInterest = parseAndFill(false); if (LOG.isDebugEnabled()) LOG.debug("ParseAndFill needFillInterest {} in {}", needFillInterest, this); chunk = consumeChunk(); @@ -236,7 +234,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res * If this method depletes the buffer, it will always try to re-fill until fill generates 0 byte. * @return true if no bytes were filled. */ - private boolean parseAndFill() + private boolean parseAndFill(boolean notifyContentAvailable) { HttpConnectionOverHTTP connection = getHttpConnection(); EndPoint endPoint = connection.getEndPoint(); @@ -246,23 +244,22 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res acquireNetworkBuffer(); while (true) { - if (LOG.isDebugEnabled()) - LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getByteBuffer()), this); // Always parse even empty buffers to advance the parser. - if (parse()) + boolean stopParsing = parse(notifyContentAvailable); + if (LOG.isDebugEnabled()) + LOG.debug("Parsed stop={} in {}", stopParsing, this); + if (stopParsing) { // Return immediately, as this thread may be in a race // with e.g. another thread demanding more content. return false; } - if (LOG.isDebugEnabled()) - LOG.debug("Parser willing to advance in {}", this); // Connection may be closed in a parser callback. - if (connection.isClosed()) + if (connection.isClosed() || isShutdown()) { if (LOG.isDebugEnabled()) - LOG.debug("Closed {} in {}", connection, this); + LOG.debug("Closed/Shutdown {} in {}", connection, this); releaseNetworkBuffer(); return false; } @@ -271,6 +268,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res reacquireNetworkBuffer(); // The networkBuffer may have been reacquired. + assert !networkBuffer.hasRemaining(); int read = endPoint.fill(networkBuffer.getByteBuffer()); if (LOG.isDebugEnabled()) LOG.debug("Read {} bytes in {} from {} in {}", read, networkBuffer, endPoint, this); @@ -286,9 +284,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res } else { - releaseNetworkBuffer(); shutdown(); - return false; + // Loop around to parse again to advance the parser, + // for example for HTTP/1.0 connection-delimited content. } } } @@ -307,62 +305,80 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res * * @return true to indicate that parsing should be interrupted (and will be resumed by another thread). */ - private boolean parse() + private boolean parse(boolean notifyContentAvailable) { + // HttpParser is not reentrant, so we cannot invoke the + // application from the parser event callbacks. + // However, the mechanism in general (and this method) + // is reentrant: it notifies the application which may + // read response content, which reenters here. + + ByteBuffer byteBuffer = networkBuffer.getByteBuffer(); while (true) { - boolean handle = parser.parseNext(networkBuffer.getByteBuffer()); + boolean handle = parser.parseNext(byteBuffer); if (LOG.isDebugEnabled()) - LOG.debug("Parse result={} on {}", handle, this); - Runnable action = getAndSetAction(null); - if (action != null) + LOG.debug("Parse state={} result={} {} {} on {}", state, handle, BufferUtil.toDetailString(byteBuffer), parser, this); + if (!handle) + return false; + + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + throw new IllegalStateException("No exchange"); + + switch (state) { - if (LOG.isDebugEnabled()) - LOG.debug("Executing action after parser returned: {} on {}", action, this); - action.run(); - if (LOG.isDebugEnabled()) - LOG.debug("Action executed after Parse result={} on {}", handle, this); - } - if (handle) - { - // When the receiver is aborted, the parser is closed in dispose() which changes - // its state to State.CLOSE; so checking parser.isClose() is just a way to check - // if the receiver was aborted or not. - return !parser.isClose(); - } - - boolean complete = this.complete; - this.complete = false; - if (LOG.isDebugEnabled()) - LOG.debug("Parse complete={}, {} {} in {}", complete, networkBuffer, parser, this); - - if (complete) - { - int status = this.status; - this.status = 0; - // Connection upgrade due to 101, bail out. - if (status == HttpStatus.SWITCHING_PROTOCOLS_101) - return true; - // Connection upgrade due to CONNECT + 200, bail out. - String method = this.method; - this.method = null; - if (getHttpChannel().isTunnel(method, status)) - return true; - - if (!networkBuffer.hasRemaining()) - return false; - - if (!HttpStatus.isInformational(status)) + case HEADERS -> responseHeaders(exchange); + case CONTENT -> { - if (LOG.isDebugEnabled()) - LOG.debug("Discarding unexpected content after response {}: {} in {}", status, networkBuffer, this); - networkBuffer.clear(); + if (notifyContentAvailable) + responseContentAvailable(exchange); } + case COMPLETE -> + { + boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101; + boolean isTunnel = getHttpChannel().isTunnel(method, status); + + Runnable task = isUpgrade || isTunnel ? null : this.receiveNext; + responseSuccess(exchange, task); + + // Connection upgrade, bail out. + if (isUpgrade || isTunnel) + return true; + + if (byteBuffer.hasRemaining()) + { + if (HttpStatus.isInterim(status)) + { + // There may be multiple interim responses in + // the same network buffer, continue parsing. + continue; + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Discarding unexpected content after response {}: {} in {}", status, BufferUtil.toDetailString(byteBuffer), this); + BufferUtil.clear(byteBuffer); + return false; + } + } + + // Continue to read from the network. + return false; + } + default -> throw new IllegalStateException("Invalid state " + state); + } + + // The application may have aborted the request. + if (disposed) + { + BufferUtil.clear(byteBuffer); return false; } - if (!networkBuffer.hasRemaining()) - return false; + // The application has been invoked, + // and it is now driving the parsing. + return true; } } @@ -386,7 +402,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res // header, the connection will be closed at exchange termination // thanks to the flag we have set above. parser.atEOF(); - parser.parseNext(BufferUtil.EMPTY_BUFFER); } protected boolean isShutdown() @@ -406,6 +421,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res this.status = status; parser.setHeadResponse(HttpMethod.HEAD.is(method) || getHttpChannel().isTunnel(method, status)); exchange.getResponse().version(version).status(status).reason(reason); + state = State.STATUS; responseBegin(exchange); } @@ -432,10 +448,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res // Store the EndPoint is case of upgrades, tunnels, etc. exchange.getRequest().getConversation().setAttribute(EndPoint.class.getName(), getHttpConnection().getEndPoint()); getHttpConnection().onResponseHeaders(exchange); - if (LOG.isDebugEnabled()) - LOG.debug("Setting action to responseHeaders(exchange, boolean) on {}", this); - if (getAndSetAction(() -> responseHeaders(exchange)) != null) - throw new IllegalStateException(); + state = State.HEADERS; return true; } @@ -451,17 +464,13 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res if (chunk != null) throw new IllegalStateException("Content generated with unconsumed content left"); + if (getHttpConnection().isFillInterested()) + throw new IllegalStateException("Fill interested while parsing for content"); // Retain the chunk because it is stored for later use. networkBuffer.retain(); chunk = Content.Chunk.asChunk(buffer, false, networkBuffer); - - if (LOG.isDebugEnabled()) - LOG.debug("Setting action to responseContentAvailable on {}", this); - if (getAndSetAction(this::responseContentAvailable) != null) - throw new IllegalStateException(); - if (getHttpConnection().isFillInterested()) - throw new IllegalStateException(); + state = State.CONTENT; return true; } @@ -491,28 +500,20 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res if (exchange == null || unsolicited) { // We received an unsolicited response from the server. + networkBuffer.clear(); getHttpConnection().close(); return false; } int status = exchange.getResponse().getStatus(); if (!HttpStatus.isInterim(status)) - { inMessages.increment(); - complete = true; - } if (chunk != null) throw new IllegalStateException(); chunk = Content.Chunk.EOF; - - boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101; - boolean isTunnel = getHttpChannel().isTunnel(method, status); - Runnable task = isUpgrade || isTunnel ? null : this::receiveNext; - if (LOG.isDebugEnabled()) - LOG.debug("Message complete, calling response success with task {} in {}", task, this); - responseSuccess(exchange, task); - return false; + state = State.COMPLETE; + return true; } private void receiveNext() @@ -524,7 +525,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res if (LOG.isDebugEnabled()) LOG.debug("Receiving next request in {}", this); - boolean setFillInterest = parseAndFill(); + boolean setFillInterest = parseAndFill(true); if (!hasContent() && setFillInterest) fillInterested(); } @@ -556,13 +557,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res } } - private Runnable getAndSetAction(Runnable action) - { - Runnable r = this.action; - this.action = action; - return r; - } - long getMessagesIn() { return inMessages.longValue(); @@ -573,4 +567,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res { return String.format("%s[%s]", super.toString(), parser); } + + private enum State + { + STATUS, HEADERS, CONTENT, COMPLETE + } } diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java index cb9e242ccc5..70be097bc97 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java @@ -332,7 +332,10 @@ public class NetworkTrafficListenerTest @Override public boolean handle(Request request, Response response, Callback callback) { - Response.sendRedirect(request, response, callback, location); + Content.Source.consumeAll(request, Callback.from( + () -> Response.sendRedirect(request, response, callback, location), + callback::failed + )); return true; } }); diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpChannelOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpChannelOverFCGI.java index 0d96492a654..8d636a63de4 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpChannelOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpChannelOverFCGI.java @@ -119,11 +119,25 @@ public class HttpChannelOverFCGI extends HttpChannel receiver.content(chunk); } + protected void responseContentAvailable() + { + HttpExchange exchange = getHttpExchange(); + if (exchange != null) + receiver.responseContentAvailable(exchange); + } + protected void end() { HttpExchange exchange = getHttpExchange(); if (exchange != null) - receiver.end(exchange); + receiver.end(); + } + + protected void responseSuccess() + { + HttpExchange exchange = getHttpExchange(); + if (exchange != null) + receiver.responseSuccess(exchange); } protected void responseFailure(Throwable failure, Promise promise) diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java index 0309c2872b6..4bdc222b125 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java @@ -67,7 +67,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne private final HttpChannelOverFCGI channel; private RetainableByteBuffer networkBuffer; private Object attachment; - private Runnable action; + private State state = State.STATUS; private long idleTimeout; private boolean shutdown; @@ -168,7 +168,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne this.networkBuffer = null; } - boolean parseAndFill() + boolean parseAndFill(boolean notifyContentAvailable) { if (LOG.isDebugEnabled()) LOG.debug("parseAndFill {}", networkBuffer); @@ -179,7 +179,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne { while (true) { - if (parse(networkBuffer.getByteBuffer())) + if (parse(networkBuffer.getByteBuffer(), notifyContentAvailable)) return false; if (networkBuffer.isRetained()) @@ -214,13 +214,35 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne } } - private boolean parse(ByteBuffer buffer) + private boolean parse(ByteBuffer buffer, boolean notifyContentAvailable) { - boolean parse = parser.parse(buffer); - Runnable action = getAndSetAction(null); - if (action != null) - action.run(); - return parse; + boolean handle = parser.parse(buffer); + + switch (state) + { + case STATUS -> + { + // Nothing to do. + } + case HEADERS -> channel.responseHeaders(); + case CONTENT -> + { + if (notifyContentAvailable) + channel.responseContentAvailable(); + } + case COMPLETE -> + { + // For the complete event, handle==false, and cannot + // differentiate between a complete event and a parse() + // with zero or not enough bytes, so the state is reset + // here to avoid calling responseSuccess() again. + state = State.STATUS; + channel.responseSuccess(); + } + default -> throw new IllegalStateException("Invalid state " + state); + } + + return handle; } private void shutdown() @@ -318,13 +340,6 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne }, x -> close(failure))); } - private Runnable getAndSetAction(Runnable action) - { - Runnable r = this.action; - this.action = action; - return r; - } - protected HttpChannelOverFCGI newHttpChannel() { return new HttpChannelOverFCGI(this); @@ -414,6 +429,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne { if (LOG.isDebugEnabled()) LOG.debug("onBegin r={},c={},reason={}", request, code, reason); + state = State.STATUS; channel.responseBegin(code, reason); } @@ -430,8 +446,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne { if (LOG.isDebugEnabled()) LOG.debug("onHeaders r={} {}", request, networkBuffer); - if (getAndSetAction(channel::responseHeaders) != null) - throw new IllegalStateException(); + state = State.HEADERS; return true; } @@ -444,13 +459,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne { case STD_OUT -> { - // No need to call networkBuffer.retain() here, since we know - // that the action will be run before releasing the networkBuffer. - // The receiver of the chunk decides whether to consume/retain it. Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer); - if (getAndSetAction(() -> channel.content(chunk)) == null) - return true; - throw new IllegalStateException(); + channel.content(chunk); + state = State.CONTENT; + return true; } case STD_ERR -> LOG.info(BufferUtil.toUTF8String(buffer)); default -> throw new IllegalArgumentException(); @@ -464,6 +476,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne if (LOG.isDebugEnabled()) LOG.debug("onEnd r={}", request); channel.end(); + state = State.COMPLETE; } @Override @@ -474,4 +487,9 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne failAndClose(failure); } } + + private enum State + { + STATUS, HEADERS, CONTENT, COMPLETE + } } diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java index 8c810e215ac..805cff268ac 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java @@ -34,7 +34,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver if (!hasContent()) { HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); - boolean setFillInterest = httpConnection.parseAndFill(); + boolean setFillInterest = httpConnection.parseAndFill(true); if (!hasContent() && setFillInterest) httpConnection.fillInterested(); } @@ -81,7 +81,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver if (chunk != null) return chunk; HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); - boolean needFillInterest = httpConnection.parseAndFill(); + boolean needFillInterest = httpConnection.parseAndFill(false); chunk = consumeChunk(); if (chunk != null) return chunk; @@ -109,23 +109,23 @@ public class HttpReceiverOverFCGI extends HttpReceiver void content(Content.Chunk chunk) { - HttpExchange exchange = getHttpExchange(); - if (exchange == null) - return; if (this.chunk != null) throw new IllegalStateException(); // Retain the chunk because it is stored for later reads. chunk.retain(); this.chunk = chunk; - responseContentAvailable(); } - void end(HttpExchange exchange) + void end() { if (chunk != null) throw new IllegalStateException(); chunk = Content.Chunk.EOF; - responseSuccess(exchange, this::receiveNext); + } + + void responseSuccess(HttpExchange exchange) + { + super.responseSuccess(exchange, this::receiveNext); } private void receiveNext() @@ -136,7 +136,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver throw new IllegalStateException(); HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); - boolean setFillInterest = httpConnection.parseAndFill(); + boolean setFillInterest = httpConnection.parseAndFill(true); if (!hasContent() && setFillInterest) httpConnection.fillInterested(); } @@ -165,6 +165,12 @@ public class HttpReceiverOverFCGI extends HttpReceiver super.responseHeaders(exchange); } + @Override + protected void responseContentAvailable(HttpExchange exchange) + { + super.responseContentAvailable(exchange); + } + @Override protected void responseFailure(Throwable failure, Promise promise) { diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java index d86c22b78d9..85eec1b85ba 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java @@ -15,8 +15,6 @@ package org.eclipse.jetty.fcgi.parser; import java.io.EOFException; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.eclipse.jetty.fcgi.FCGI; import org.eclipse.jetty.http.HttpCompliance; @@ -43,13 +41,12 @@ public class ResponseContentParser extends StreamContentParser { private static final Logger LOG = LoggerFactory.getLogger(ResponseContentParser.class); - private final Map parsers = new ConcurrentHashMap<>(); - private final ClientParser.Listener listener; + private final ResponseParser parser; public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener listener) { super(headerParser, FCGI.StreamType.STD_OUT, listener); - this.listener = listener; + this.parser = new ResponseParser(listener); } @Override @@ -63,13 +60,6 @@ public class ResponseContentParser extends StreamContentParser @Override protected boolean onContent(ByteBuffer buffer) { - int request = getRequest(); - ResponseParser parser = parsers.get(request); - if (parser == null) - { - parser = new ResponseParser(listener, request); - parsers.put(request, parser); - } return parser.parse(buffer); } @@ -77,37 +67,44 @@ public class ResponseContentParser extends StreamContentParser protected void end(int request) { super.end(request); - parsers.remove(request); + parser.reset(); } - private static class ResponseParser implements HttpParser.ResponseHandler + private class ResponseParser implements HttpParser.ResponseHandler { private final HttpFields.Mutable fields = HttpFields.build(); private final ClientParser.Listener listener; - private final int request; private final FCGIHttpParser httpParser; private State state = State.HEADERS; private boolean seenResponseCode; private boolean stalled; - private ResponseParser(ClientParser.Listener listener, int request) + private ResponseParser(ClientParser.Listener listener) { this.listener = listener; - this.request = request; this.httpParser = new FCGIHttpParser(this); } + private void reset() + { + fields.clear(); + httpParser.reset(); + state = State.HEADERS; + seenResponseCode = false; + stalled = false; + } + public boolean parse(ByteBuffer buffer) { int remaining = buffer.remaining(); while (remaining > 0) { if (LOG.isDebugEnabled()) - LOG.debug("Response {} {}, state {} {}", request, FCGI.StreamType.STD_OUT, state, BufferUtil.toDetailString(buffer)); + LOG.debug("Response {} {}, state {} {}", getRequest(), FCGI.StreamType.STD_OUT, state, BufferUtil.toDetailString(buffer)); switch (state) { - case HEADERS: + case HEADERS -> { if (httpParser.parseNext(buffer)) { @@ -116,40 +113,33 @@ public class ResponseContentParser extends StreamContentParser return true; } remaining = buffer.remaining(); - break; } - case CONTENT_MODE: + case CONTENT_MODE -> { // If we have no indication of the content, then // the HTTP parser will assume there is no content // and will not parse it even if it is provided, // so we have to parse it raw ourselves here. boolean rawContent = fields.size() == 0 || - (fields.get(HttpHeader.CONTENT_LENGTH) == null && - fields.get(HttpHeader.TRANSFER_ENCODING) == null); + (fields.get(HttpHeader.CONTENT_LENGTH) == null && + fields.get(HttpHeader.TRANSFER_ENCODING) == null); state = rawContent ? State.RAW_CONTENT : State.HTTP_CONTENT; - break; } - case RAW_CONTENT: + case RAW_CONTENT -> { ByteBuffer content = buffer.asReadOnlyBuffer(); buffer.position(buffer.limit()); if (notifyContent(content)) return true; remaining = 0; - break; } - case HTTP_CONTENT: + case HTTP_CONTENT -> { if (httpParser.parseNext(buffer)) return true; remaining = buffer.remaining(); - break; - } - default: - { - throw new IllegalStateException(); } + default -> throw new IllegalStateException(); } } return false; @@ -205,7 +195,7 @@ public class ResponseContentParser extends StreamContentParser { try { - listener.onBegin(request, code, reason); + listener.onBegin(getRequest(), code, reason); } catch (Throwable x) { @@ -218,7 +208,7 @@ public class ResponseContentParser extends StreamContentParser { try { - listener.onHeader(request, httpField); + listener.onHeader(getRequest(), httpField); } catch (Throwable x) { @@ -242,7 +232,7 @@ public class ResponseContentParser extends StreamContentParser { try { - return listener.onHeaders(request); + return listener.onHeaders(getRequest()); } catch (Throwable x) { @@ -278,7 +268,7 @@ public class ResponseContentParser extends StreamContentParser { try { - return listener.onContent(request, FCGI.StreamType.STD_OUT, buffer); + return listener.onContent(getRequest(), FCGI.StreamType.STD_OUT, buffer); } catch (Throwable x) { @@ -318,7 +308,7 @@ public class ResponseContentParser extends StreamContentParser { try { - listener.onFailure(request, failure); + listener.onFailure(getRequest(), failure); } catch (Throwable x) { diff --git a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java index 452cd394bbd..fd9d89673f3 100644 --- a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java +++ b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java @@ -564,7 +564,7 @@ public class MultiPart public abstract static class AbstractContentSource implements Content.Source, Closeable { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(AbstractContentSource.class); private final Queue parts = new ArrayDeque<>(); private final String boundary; private final ByteBuffer firstBoundary; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java index 6bd5eeebc32..02bc1d0cf6b 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java @@ -50,7 +50,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable }; private final AutoLock.WithCondition lock = new AutoLock.WithCondition(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(AsyncContent.class); private final Queue chunks = new ArrayDeque<>(); private Content.Chunk persistentFailure; private boolean readClosed; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteBufferContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteBufferContentSource.java index 23a4e6fb97d..6273cab9e76 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteBufferContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteBufferContentSource.java @@ -31,7 +31,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker; public class ByteBufferContentSource implements Content.Source { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(ByteBufferContentSource.class); private final long length; private final Collection byteBuffers; private Iterator iterator; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java index a00680251cf..b065efeb618 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java @@ -33,7 +33,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker; public class ChunksContentSource implements Content.Source { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(ChunksContentSource.class); private final long length; private final Collection chunks; private Iterator iterator; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java index 760e4f1f42d..c4a6a9fe9c5 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java @@ -39,7 +39,7 @@ public abstract class ContentSourceTransformer implements Content.Source protected ContentSourceTransformer(Content.Source rawSource) { - this(rawSource, new SerializedInvoker()); + this(rawSource, new SerializedInvoker(ContentSourceTransformer.class)); } protected ContentSourceTransformer(Content.Source rawSource, SerializedInvoker invoker) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java index bc14ba9d9e9..dcce4d5c7ce 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java @@ -38,7 +38,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker; public class InputStreamContentSource implements Content.Source { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(InputStreamContentSource.class); private final InputStream inputStream; private ByteBufferPool.Sized bufferPool; private Runnable demandCallback; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java index f731280f737..0481761f5b9 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java @@ -41,7 +41,7 @@ public class PathContentSource implements Content.Source // TODO in 12.1.x reimplement this class based on ByteChannelContentSource private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(PathContentSource.class); private final Path path; private final long length; private final ByteBufferPool byteBufferPool; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java index 65336ac6520..c8713b2b39b 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java @@ -39,7 +39,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker; public class ByteChannelContentSource implements Content.Source { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker _invoker = new SerializedInvoker(); + private final SerializedInvoker _invoker = new SerializedInvoker(ByteChannelContentSource.class); private final ByteBufferPool.Sized _byteBufferPool; private ByteChannel _byteChannel; private final long _offset; diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index d2068f6381e..bd9a75581c6 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -130,8 +130,8 @@ public class HttpChannelState implements HttpChannel, Components { _connectionMetaData = connectionMetaData; // The SerializedInvoker is used to prevent infinite recursion of callbacks calling methods calling callbacks etc. - _readInvoker = new HttpChannelSerializedInvoker(); - _writeInvoker = new HttpChannelSerializedInvoker(); + _readInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "_readInvoker"); + _writeInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "_writeInvoker"); } @Override @@ -1825,6 +1825,11 @@ public class HttpChannelState implements HttpChannel, Components private class HttpChannelSerializedInvoker extends SerializedInvoker { + public HttpChannelSerializedInvoker(String name) + { + super(name); + } + @Override protected void onError(Runnable task, Throwable failure) { diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java index 2403239119f..4bbf288a1ab 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java @@ -42,7 +42,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.NanoTime; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -263,7 +262,7 @@ public class HttpClientDemandTest extends AbstractTest .timeout(5, TimeUnit.SECONDS) .send(result -> { - Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure())); + assertFalse(result.isFailed(), String.valueOf(result.getFailure())); Response response = result.getResponse(); assertEquals(HttpStatus.OK_200, response.getStatus()); resultLatch.countDown(); @@ -346,7 +345,7 @@ public class HttpClientDemandTest extends AbstractTest .onResponseContentAsync(listener2) .send(result -> { - Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure())); + assertFalse(result.isFailed(), String.valueOf(result.getFailure())); Response response = result.getResponse(); assertEquals(HttpStatus.OK_200, response.getStatus()); resultLatch.countDown(); @@ -415,8 +414,8 @@ public class HttpClientDemandTest extends AbstractTest }) .send(result -> { - Assertions.assertTrue(result.isSucceeded()); - Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); resultLatch.countDown(); }); assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); @@ -480,8 +479,8 @@ public class HttpClientDemandTest extends AbstractTest }) .send(result -> { - Assertions.assertTrue(result.isSucceeded()); - Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); resultLatch.countDown(); }); @@ -540,8 +539,8 @@ public class HttpClientDemandTest extends AbstractTest }) .send(result -> { - Assertions.assertTrue(result.isSucceeded()); - Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); resultLatch.countDown(); }); @@ -572,8 +571,8 @@ public class HttpClientDemandTest extends AbstractTest .onResponseContentSource((response, contentSource) -> contentSource.demand(() -> new Thread(new Accumulator(contentSource, chunks)).start())) .send(result -> { - Assertions.assertTrue(result.isSucceeded()); - Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); resultLatch.countDown(); }); diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedExecutor.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedExecutor.java index 5c09087c522..9377d4cad70 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedExecutor.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedExecutor.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; */ public class SerializedExecutor implements Executor { - private final SerializedInvoker _invoker = new SerializedInvoker() + private final SerializedInvoker _invoker = new SerializedInvoker(SerializedExecutor.class) { @Override protected void onError(Runnable task, Throwable t) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java index 72bcd693a73..8da7c9861f1 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java @@ -13,8 +13,12 @@ package org.eclipse.jetty.util.thread; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +39,51 @@ public class SerializedInvoker private static final Logger LOG = LoggerFactory.getLogger(SerializedInvoker.class); private final AtomicReference _tail = new AtomicReference<>(); + private final String _name; + private volatile Thread _invokerThread; + + /** + * Create a new instance whose name is {@code anonymous}. + */ + public SerializedInvoker() + { + this("anonymous"); + } + + /** + * Create a new instance whose name is derived from the given class. + * @param nameFrom the class to use as a name. + */ + public SerializedInvoker(Class nameFrom) + { + this(nameFrom.getSimpleName()); + } + + /** + * Create a new instance with the given name. + * @param name the name. + */ + public SerializedInvoker(String name) + { + _name = name; + } + + /** + * @return whether the current thread is currently executing a task using this invoker + */ + boolean isCurrentThreadInvoking() + { + return _invokerThread == Thread.currentThread(); + } + + /** + * @throws IllegalStateException when the current thread is not currently executing a task using this invoker + */ + public void assertCurrentThreadInvoking() throws IllegalStateException + { + if (!isCurrentThreadInvoking()) + throw new IllegalStateException(); + } /** * Arrange for a task to be invoked, mutually excluded from other tasks. @@ -59,7 +108,7 @@ public class SerializedInvoker { // Wrap the given task with another one that's going to delegate run() to the wrapped task while the // wrapper's toString() returns a description of the place in code where SerializedInvoker.run() was called. - task = new NamedRunnable(task, deriveTaskName(task)); + task = new NamedRunnable(task); } } Link link = new Link(task); @@ -72,18 +121,6 @@ public class SerializedInvoker return null; } - protected String deriveTaskName(Runnable task) - { - StackTraceElement[] stackTrace = new Exception().getStackTrace(); - for (StackTraceElement stackTraceElement : stackTrace) - { - String className = stackTraceElement.getClassName(); - if (!className.equals(SerializedInvoker.class.getName()) && !className.equals(getClass().getName())) - return "Queued at " + stackTraceElement; - } - return task.toString(); - } - /** * Arrange for tasks to be invoked, mutually excluded from other tasks. * @param tasks The tasks to invoke @@ -115,9 +152,8 @@ public class SerializedInvoker Runnable todo = offer(task); if (todo != null) todo.run(); - else - if (LOG.isDebugEnabled()) - LOG.debug("Queued link in {}", this); + else if (LOG.isDebugEnabled()) + LOG.debug("Queued link in {}", this); } /** @@ -130,15 +166,14 @@ public class SerializedInvoker Runnable todo = offer(tasks); if (todo != null) todo.run(); - else - if (LOG.isDebugEnabled()) - LOG.debug("Queued links in {}", this); + else if (LOG.isDebugEnabled()) + LOG.debug("Queued links in {}", this); } @Override public String toString() { - return String.format("%s@%x{tail=%s}", getClass().getSimpleName(), hashCode(), _tail); + return String.format("%s@%x{name=%s,tail=%s,invoker=%s}", getClass().getSimpleName(), hashCode(), _name, _tail, _invokerThread); } protected void onError(Runnable task, Throwable t) @@ -146,7 +181,7 @@ public class SerializedInvoker LOG.warn("Serialized invocation error", t); } - private class Link implements Runnable, Invocable + private class Link implements Runnable, Invocable, Dumpable { private final Runnable _task; private final AtomicReference _next = new AtomicReference<>(); @@ -156,6 +191,24 @@ public class SerializedInvoker _task = task; } + @Override + public void dump(Appendable out, String indent) throws IOException + { + if (_task instanceof NamedRunnable nr) + { + StringWriter sw = new StringWriter(); + nr.stack.printStackTrace(new PrintWriter(sw)); + Dumpable.dumpObjects(out, indent, nr.toString(), sw.toString()); + } + else + { + Dumpable.dumpObjects(out, indent, _task); + } + Link link = _next.get(); + if (link != null) + link.dump(out, indent); + } + @Override public InvocationType getInvocationType() { @@ -186,6 +239,7 @@ public class SerializedInvoker { if (LOG.isDebugEnabled()) LOG.debug("Running link {} of {}", link, SerializedInvoker.this); + _invokerThread = Thread.currentThread(); try { link._task.run(); @@ -196,6 +250,12 @@ public class SerializedInvoker LOG.debug("Failed while running link {} of {}", link, SerializedInvoker.this, t); onError(link._task, t); } + finally + { + // _invokerThread must be nulled before calling link.next() as + // once the latter has executed, another thread can enter Link.run(). + _invokerThread = null; + } link = link.next(); if (link == null && LOG.isDebugEnabled()) LOG.debug("Next link is null, execution is over in {}", SerializedInvoker.this); @@ -209,10 +269,35 @@ public class SerializedInvoker } } - private record NamedRunnable(Runnable delegate, String name) implements Runnable + private class NamedRunnable implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(NamedRunnable.class); + private final Runnable delegate; + private final String name; + private final Throwable stack; + + private NamedRunnable(Runnable delegate) + { + this.delegate = delegate; + this.stack = new Throwable(); + this.name = deriveTaskName(delegate, stack); + } + + private String deriveTaskName(Runnable task, Throwable stack) + { + StackTraceElement[] stackTrace = stack.getStackTrace(); + for (StackTraceElement stackTraceElement : stackTrace) + { + String className = stackTraceElement.getClassName(); + if (!className.equals(SerializedInvoker.class.getName()) && + !className.equals(SerializedInvoker.this.getClass().getName()) && + !className.equals(getClass().getName())) + return "Queued by " + Thread.currentThread().getName() + " at " + stackTraceElement; + } + return task.toString(); + } + @Override public void run() { diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SerializedInvokerTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SerializedInvokerTest.java index 2fc9011d779..16a5dff80d2 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SerializedInvokerTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SerializedInvokerTest.java @@ -14,6 +14,8 @@ package org.eclipse.jetty.util.thread; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -25,17 +27,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class SerializedInvokerTest { - SerializedInvoker _serialedInvoker; + private SerializedInvoker _serializedInvoker; + private ExecutorService _executor; @BeforeEach public void beforeEach() { - _serialedInvoker = new SerializedInvoker(); + _serializedInvoker = new SerializedInvoker(SerializedInvokerTest.class); + _executor = Executors.newSingleThreadExecutor(); } @AfterEach public void afterEach() { + _executor.shutdownNow(); } @Test @@ -45,24 +50,27 @@ public class SerializedInvokerTest Task task2 = new Task(); Task task3 = new Task(); - Runnable todo = _serialedInvoker.offer(task1); - assertNull(_serialedInvoker.offer(task2)); - assertNull(_serialedInvoker.offer(task3)); + Runnable todo = _serializedInvoker.offer(task1); + assertNull(_serializedInvoker.offer(task2)); + assertNull(_serializedInvoker.offer(task3)); assertFalse(task1.hasRun()); assertFalse(task2.hasRun()); assertFalse(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); todo.run(); assertTrue(task1.hasRun()); assertTrue(task2.hasRun()); assertTrue(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); Task task4 = new Task(); - todo = _serialedInvoker.offer(task4); + todo = _serializedInvoker.offer(task4); todo.run(); assertTrue(task4.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); } @Test @@ -72,22 +80,25 @@ public class SerializedInvokerTest Task task2 = new Task(); Task task3 = new Task(); - Runnable todo = _serialedInvoker.offer(null, task1, null, task2, null, task3, null); + Runnable todo = _serializedInvoker.offer(null, task1, null, task2, null, task3, null); assertFalse(task1.hasRun()); assertFalse(task2.hasRun()); assertFalse(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); todo.run(); assertTrue(task1.hasRun()); assertTrue(task2.hasRun()); assertTrue(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); Task task4 = new Task(); - todo = _serialedInvoker.offer(task4); + todo = _serializedInvoker.offer(task4); todo.run(); assertTrue(task4.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); } @Test @@ -99,7 +110,7 @@ public class SerializedInvokerTest @Override public void run() { - assertNull(_serialedInvoker.offer(task3)); + assertNull(_serializedInvoker.offer(task3)); super.run(); } }; @@ -108,32 +119,35 @@ public class SerializedInvokerTest @Override public void run() { - assertNull(_serialedInvoker.offer(task2)); + assertNull(_serializedInvoker.offer(task2)); super.run(); } }; - Runnable todo = _serialedInvoker.offer(task1); + Runnable todo = _serializedInvoker.offer(task1); assertFalse(task1.hasRun()); assertFalse(task2.hasRun()); assertFalse(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); todo.run(); assertTrue(task1.hasRun()); assertTrue(task2.hasRun()); assertTrue(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); Task task4 = new Task(); - todo = _serialedInvoker.offer(task4); + todo = _serializedInvoker.offer(task4); todo.run(); assertTrue(task4.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); } - public static class Task implements Runnable + public class Task implements Runnable { - CountDownLatch _run = new CountDownLatch(1); + final CountDownLatch _run = new CountDownLatch(1); boolean hasRun() { @@ -143,7 +157,17 @@ public class SerializedInvokerTest @Override public void run() { - _run.countDown(); + try + { + assertTrue(_serializedInvoker.isCurrentThreadInvoking()); + assertFalse(_executor.submit(() -> _serializedInvoker.isCurrentThreadInvoking()).get()); + + _run.countDown(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } } }