Fixes HttpClient Content.Source reads from arbitrary threads (#12203)

* Reworked HttpReceiverOverHTTP state machine, in particular:
** Introduced a boolean parameter to parseAndFill() and parse(), that specifies whether to notify the application demand callback.
   This is necessary because reads may happen from any threads, and must not notify the application demand callback.
   Only when there is no data, and fill interest is set, then the application demand callback must be notified.
** Removed action field to avoid lambda allocation.
** Now the application is called directly from the parse() method.
** Reading -1 from the network drives the parser by calling again parse(), rather than the parser directly.
  This allows to have a central place to notify the response success event.

* Fixed FastCGI similarly to HTTP/1.1.
* Removed leftover of the multiplex implementation.

* Fixed test flakyness in `NetworkTrafficListenerTest`: consume the request content before sending the response.

* Follow up after #10880: only abort the request if there is request content in `AuthenticationProtocolHandler` and `RedirectProtocolHandler`.
  This avoids the rare case where the response arrives before the request thread has modified the request state, even if the request has been fully sent over the network, causing the request to be failed even if it should not.

* added `SerializedInvoker` assertions about current thread invoking.
* Name all SerializedInvoker instances for better troubleshooting.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Co-authored-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Simone Bordet 2024-08-30 13:01:43 +03:00 committed by GitHub
parent fb82a44bc6
commit 1726c8778c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 390 additions and 245 deletions

View File

@ -127,7 +127,8 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
{
// The request may still be sending content, stop it.
Request request = response.getRequest();
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
}
@Override

View File

@ -61,7 +61,8 @@ public class RedirectProtocolHandler implements ProtocolHandler, Response.Listen
{
// The request may still be sending content, stop it.
Request request = response.getRequest();
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
}
@Override

View File

@ -67,7 +67,7 @@ public abstract class HttpReceiver
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class);
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class);
private final HttpChannel channel;
private ResponseState responseState = ResponseState.IDLE;
private NotifiableContentSource contentSource;
@ -332,21 +332,10 @@ public abstract class HttpReceiver
if (exchange.isResponseCompleteOrTerminated())
return;
responseContentAvailable();
contentSource.onDataAvailable();
});
}
/**
* Method to be invoked when response content is available to be read.
* <p>
* This method directly invokes the demand callback, assuming the caller
* is already serialized with other events.
*/
protected void responseContentAvailable()
{
contentSource.onDataAvailable();
}
/**
* Method to be invoked when the response is successful.
* <p>
@ -720,6 +709,9 @@ public abstract class HttpReceiver
current = HttpReceiver.this.read(false);
if (LOG.isDebugEnabled())
LOG.debug("Read {} from {}", current, this);
try (AutoLock ignored = lock.lock())
{
if (currentChunk != null)
@ -739,6 +731,7 @@ public abstract class HttpReceiver
{
if (LOG.isDebugEnabled())
LOG.debug("onDataAvailable on {}", this);
invoker.assertCurrentThreadInvoking();
// The onDataAvailable() method is only ever called
// by the invoker so avoid using the invoker again.
invokeDemandCallback(false);
@ -763,6 +756,8 @@ public abstract class HttpReceiver
if (LOG.isDebugEnabled())
LOG.debug("Processing demand on {}", this);
invoker.assertCurrentThreadInvoking();
Content.Chunk current;
try (AutoLock ignored = lock.lock())
{
@ -802,9 +797,14 @@ public abstract class HttpReceiver
try
{
if (invoke)
{
invoker.run(demandCallback);
}
else
{
invoker.assertCurrentThreadInvoking();
demandCallback.run();
}
}
catch (Throwable x)
{

View File

@ -43,17 +43,18 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP.class);
private final Runnable receiveNext = this::receiveNext;
private final LongAdder inMessages = new LongAdder();
private final HttpParser parser;
private final ByteBufferPool byteBufferPool;
private RetainableByteBuffer networkBuffer;
private boolean shutdown;
private boolean complete;
private State state = State.STATUS;
private boolean unsolicited;
private String method;
private int status;
private String method;
private Content.Chunk chunk;
private Runnable action;
private boolean shutdown;
private boolean disposed;
public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
{
@ -73,7 +74,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
if (!hasContent())
{
boolean setFillInterest = parseAndFill();
boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest)
fillInterested();
}
@ -97,10 +98,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
super.reset();
parser.reset();
if (chunk != null)
{
chunk.release();
chunk = null;
}
chunk = null;
}
@Override
@ -109,10 +108,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
super.dispose();
parser.close();
if (chunk != null)
{
chunk.release();
chunk = null;
}
chunk = null;
disposed = true;
}
@Override
@ -124,7 +122,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
Content.Chunk chunk = consumeChunk();
if (chunk != null)
return chunk;
boolean needFillInterest = parseAndFill();
boolean needFillInterest = parseAndFill(false);
if (LOG.isDebugEnabled())
LOG.debug("ParseAndFill needFillInterest {} in {}", needFillInterest, this);
chunk = consumeChunk();
@ -236,7 +234,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
* If this method depletes the buffer, it will always try to re-fill until fill generates 0 byte.
* @return true if no bytes were filled.
*/
private boolean parseAndFill()
private boolean parseAndFill(boolean notifyContentAvailable)
{
HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
@ -246,23 +244,22 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
acquireNetworkBuffer();
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getByteBuffer()), this);
// Always parse even empty buffers to advance the parser.
if (parse())
boolean stopParsing = parse(notifyContentAvailable);
if (LOG.isDebugEnabled())
LOG.debug("Parsed stop={} in {}", stopParsing, this);
if (stopParsing)
{
// Return immediately, as this thread may be in a race
// with e.g. another thread demanding more content.
return false;
}
if (LOG.isDebugEnabled())
LOG.debug("Parser willing to advance in {}", this);
// Connection may be closed in a parser callback.
if (connection.isClosed())
if (connection.isClosed() || isShutdown())
{
if (LOG.isDebugEnabled())
LOG.debug("Closed {} in {}", connection, this);
LOG.debug("Closed/Shutdown {} in {}", connection, this);
releaseNetworkBuffer();
return false;
}
@ -271,6 +268,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
reacquireNetworkBuffer();
// The networkBuffer may have been reacquired.
assert !networkBuffer.hasRemaining();
int read = endPoint.fill(networkBuffer.getByteBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes in {} from {} in {}", read, networkBuffer, endPoint, this);
@ -286,9 +284,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
}
else
{
releaseNetworkBuffer();
shutdown();
return false;
// Loop around to parse again to advance the parser,
// for example for HTTP/1.0 connection-delimited content.
}
}
}
@ -307,62 +305,80 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
*
* @return true to indicate that parsing should be interrupted (and will be resumed by another thread).
*/
private boolean parse()
private boolean parse(boolean notifyContentAvailable)
{
// HttpParser is not reentrant, so we cannot invoke the
// application from the parser event callbacks.
// However, the mechanism in general (and this method)
// is reentrant: it notifies the application which may
// read response content, which reenters here.
ByteBuffer byteBuffer = networkBuffer.getByteBuffer();
while (true)
{
boolean handle = parser.parseNext(networkBuffer.getByteBuffer());
boolean handle = parser.parseNext(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("Parse result={} on {}", handle, this);
Runnable action = getAndSetAction(null);
if (action != null)
LOG.debug("Parse state={} result={} {} {} on {}", state, handle, BufferUtil.toDetailString(byteBuffer), parser, this);
if (!handle)
return false;
HttpExchange exchange = getHttpExchange();
if (exchange == null)
throw new IllegalStateException("No exchange");
switch (state)
{
if (LOG.isDebugEnabled())
LOG.debug("Executing action after parser returned: {} on {}", action, this);
action.run();
if (LOG.isDebugEnabled())
LOG.debug("Action executed after Parse result={} on {}", handle, this);
}
if (handle)
{
// When the receiver is aborted, the parser is closed in dispose() which changes
// its state to State.CLOSE; so checking parser.isClose() is just a way to check
// if the receiver was aborted or not.
return !parser.isClose();
}
boolean complete = this.complete;
this.complete = false;
if (LOG.isDebugEnabled())
LOG.debug("Parse complete={}, {} {} in {}", complete, networkBuffer, parser, this);
if (complete)
{
int status = this.status;
this.status = 0;
// Connection upgrade due to 101, bail out.
if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;
// Connection upgrade due to CONNECT + 200, bail out.
String method = this.method;
this.method = null;
if (getHttpChannel().isTunnel(method, status))
return true;
if (!networkBuffer.hasRemaining())
return false;
if (!HttpStatus.isInformational(status))
case HEADERS -> responseHeaders(exchange);
case CONTENT ->
{
if (LOG.isDebugEnabled())
LOG.debug("Discarding unexpected content after response {}: {} in {}", status, networkBuffer, this);
networkBuffer.clear();
if (notifyContentAvailable)
responseContentAvailable(exchange);
}
case COMPLETE ->
{
boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101;
boolean isTunnel = getHttpChannel().isTunnel(method, status);
Runnable task = isUpgrade || isTunnel ? null : this.receiveNext;
responseSuccess(exchange, task);
// Connection upgrade, bail out.
if (isUpgrade || isTunnel)
return true;
if (byteBuffer.hasRemaining())
{
if (HttpStatus.isInterim(status))
{
// There may be multiple interim responses in
// the same network buffer, continue parsing.
continue;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Discarding unexpected content after response {}: {} in {}", status, BufferUtil.toDetailString(byteBuffer), this);
BufferUtil.clear(byteBuffer);
return false;
}
}
// Continue to read from the network.
return false;
}
default -> throw new IllegalStateException("Invalid state " + state);
}
// The application may have aborted the request.
if (disposed)
{
BufferUtil.clear(byteBuffer);
return false;
}
if (!networkBuffer.hasRemaining())
return false;
// The application has been invoked,
// and it is now driving the parsing.
return true;
}
}
@ -386,7 +402,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
// header, the connection will be closed at exchange termination
// thanks to the flag we have set above.
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
}
protected boolean isShutdown()
@ -406,6 +421,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
this.status = status;
parser.setHeadResponse(HttpMethod.HEAD.is(method) || getHttpChannel().isTunnel(method, status));
exchange.getResponse().version(version).status(status).reason(reason);
state = State.STATUS;
responseBegin(exchange);
}
@ -432,10 +448,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
// Store the EndPoint is case of upgrades, tunnels, etc.
exchange.getRequest().getConversation().setAttribute(EndPoint.class.getName(), getHttpConnection().getEndPoint());
getHttpConnection().onResponseHeaders(exchange);
if (LOG.isDebugEnabled())
LOG.debug("Setting action to responseHeaders(exchange, boolean) on {}", this);
if (getAndSetAction(() -> responseHeaders(exchange)) != null)
throw new IllegalStateException();
state = State.HEADERS;
return true;
}
@ -451,17 +464,13 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (chunk != null)
throw new IllegalStateException("Content generated with unconsumed content left");
if (getHttpConnection().isFillInterested())
throw new IllegalStateException("Fill interested while parsing for content");
// Retain the chunk because it is stored for later use.
networkBuffer.retain();
chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
if (LOG.isDebugEnabled())
LOG.debug("Setting action to responseContentAvailable on {}", this);
if (getAndSetAction(this::responseContentAvailable) != null)
throw new IllegalStateException();
if (getHttpConnection().isFillInterested())
throw new IllegalStateException();
state = State.CONTENT;
return true;
}
@ -491,28 +500,20 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null || unsolicited)
{
// We received an unsolicited response from the server.
networkBuffer.clear();
getHttpConnection().close();
return false;
}
int status = exchange.getResponse().getStatus();
if (!HttpStatus.isInterim(status))
{
inMessages.increment();
complete = true;
}
if (chunk != null)
throw new IllegalStateException();
chunk = Content.Chunk.EOF;
boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101;
boolean isTunnel = getHttpChannel().isTunnel(method, status);
Runnable task = isUpgrade || isTunnel ? null : this::receiveNext;
if (LOG.isDebugEnabled())
LOG.debug("Message complete, calling response success with task {} in {}", task, this);
responseSuccess(exchange, task);
return false;
state = State.COMPLETE;
return true;
}
private void receiveNext()
@ -524,7 +525,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (LOG.isDebugEnabled())
LOG.debug("Receiving next request in {}", this);
boolean setFillInterest = parseAndFill();
boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest)
fillInterested();
}
@ -556,13 +557,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
}
}
private Runnable getAndSetAction(Runnable action)
{
Runnable r = this.action;
this.action = action;
return r;
}
long getMessagesIn()
{
return inMessages.longValue();
@ -573,4 +567,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
return String.format("%s[%s]", super.toString(), parser);
}
private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}
}

View File

@ -332,7 +332,10 @@ public class NetworkTrafficListenerTest
@Override
public boolean handle(Request request, Response response, Callback callback)
{
Response.sendRedirect(request, response, callback, location);
Content.Source.consumeAll(request, Callback.from(
() -> Response.sendRedirect(request, response, callback, location),
callback::failed
));
return true;
}
});

View File

@ -119,11 +119,25 @@ public class HttpChannelOverFCGI extends HttpChannel
receiver.content(chunk);
}
protected void responseContentAvailable()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseContentAvailable(exchange);
}
protected void end()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.end(exchange);
receiver.end();
}
protected void responseSuccess()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseSuccess(exchange);
}
protected void responseFailure(Throwable failure, Promise<Boolean> promise)

View File

@ -67,7 +67,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private final HttpChannelOverFCGI channel;
private RetainableByteBuffer networkBuffer;
private Object attachment;
private Runnable action;
private State state = State.STATUS;
private long idleTimeout;
private boolean shutdown;
@ -168,7 +168,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
this.networkBuffer = null;
}
boolean parseAndFill()
boolean parseAndFill(boolean notifyContentAvailable)
{
if (LOG.isDebugEnabled())
LOG.debug("parseAndFill {}", networkBuffer);
@ -179,7 +179,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
while (true)
{
if (parse(networkBuffer.getByteBuffer()))
if (parse(networkBuffer.getByteBuffer(), notifyContentAvailable))
return false;
if (networkBuffer.isRetained())
@ -214,13 +214,35 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
}
}
private boolean parse(ByteBuffer buffer)
private boolean parse(ByteBuffer buffer, boolean notifyContentAvailable)
{
boolean parse = parser.parse(buffer);
Runnable action = getAndSetAction(null);
if (action != null)
action.run();
return parse;
boolean handle = parser.parse(buffer);
switch (state)
{
case STATUS ->
{
// Nothing to do.
}
case HEADERS -> channel.responseHeaders();
case CONTENT ->
{
if (notifyContentAvailable)
channel.responseContentAvailable();
}
case COMPLETE ->
{
// For the complete event, handle==false, and cannot
// differentiate between a complete event and a parse()
// with zero or not enough bytes, so the state is reset
// here to avoid calling responseSuccess() again.
state = State.STATUS;
channel.responseSuccess();
}
default -> throw new IllegalStateException("Invalid state " + state);
}
return handle;
}
private void shutdown()
@ -318,13 +340,6 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
}, x -> close(failure)));
}
private Runnable getAndSetAction(Runnable action)
{
Runnable r = this.action;
this.action = action;
return r;
}
protected HttpChannelOverFCGI newHttpChannel()
{
return new HttpChannelOverFCGI(this);
@ -414,6 +429,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
if (LOG.isDebugEnabled())
LOG.debug("onBegin r={},c={},reason={}", request, code, reason);
state = State.STATUS;
channel.responseBegin(code, reason);
}
@ -430,8 +446,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
if (LOG.isDebugEnabled())
LOG.debug("onHeaders r={} {}", request, networkBuffer);
if (getAndSetAction(channel::responseHeaders) != null)
throw new IllegalStateException();
state = State.HEADERS;
return true;
}
@ -444,13 +459,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
case STD_OUT ->
{
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
// The receiver of the chunk decides whether to consume/retain it.
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
if (getAndSetAction(() -> channel.content(chunk)) == null)
return true;
throw new IllegalStateException();
channel.content(chunk);
state = State.CONTENT;
return true;
}
case STD_ERR -> LOG.info(BufferUtil.toUTF8String(buffer));
default -> throw new IllegalArgumentException();
@ -464,6 +476,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
if (LOG.isDebugEnabled())
LOG.debug("onEnd r={}", request);
channel.end();
state = State.COMPLETE;
}
@Override
@ -474,4 +487,9 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
failAndClose(failure);
}
}
private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}
}

View File

@ -34,7 +34,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver
if (!hasContent())
{
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill();
boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest)
httpConnection.fillInterested();
}
@ -81,7 +81,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver
if (chunk != null)
return chunk;
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean needFillInterest = httpConnection.parseAndFill();
boolean needFillInterest = httpConnection.parseAndFill(false);
chunk = consumeChunk();
if (chunk != null)
return chunk;
@ -109,23 +109,23 @@ public class HttpReceiverOverFCGI extends HttpReceiver
void content(Content.Chunk chunk)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
if (this.chunk != null)
throw new IllegalStateException();
// Retain the chunk because it is stored for later reads.
chunk.retain();
this.chunk = chunk;
responseContentAvailable();
}
void end(HttpExchange exchange)
void end()
{
if (chunk != null)
throw new IllegalStateException();
chunk = Content.Chunk.EOF;
responseSuccess(exchange, this::receiveNext);
}
void responseSuccess(HttpExchange exchange)
{
super.responseSuccess(exchange, this::receiveNext);
}
private void receiveNext()
@ -136,7 +136,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver
throw new IllegalStateException();
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill();
boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest)
httpConnection.fillInterested();
}
@ -165,6 +165,12 @@ public class HttpReceiverOverFCGI extends HttpReceiver
super.responseHeaders(exchange);
}
@Override
protected void responseContentAvailable(HttpExchange exchange)
{
super.responseContentAvailable(exchange);
}
@Override
protected void responseFailure(Throwable failure, Promise<Boolean> promise)
{

View File

@ -15,8 +15,6 @@ package org.eclipse.jetty.fcgi.parser;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.http.HttpCompliance;
@ -43,13 +41,12 @@ public class ResponseContentParser extends StreamContentParser
{
private static final Logger LOG = LoggerFactory.getLogger(ResponseContentParser.class);
private final Map<Integer, ResponseParser> parsers = new ConcurrentHashMap<>();
private final ClientParser.Listener listener;
private final ResponseParser parser;
public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener listener)
{
super(headerParser, FCGI.StreamType.STD_OUT, listener);
this.listener = listener;
this.parser = new ResponseParser(listener);
}
@Override
@ -63,13 +60,6 @@ public class ResponseContentParser extends StreamContentParser
@Override
protected boolean onContent(ByteBuffer buffer)
{
int request = getRequest();
ResponseParser parser = parsers.get(request);
if (parser == null)
{
parser = new ResponseParser(listener, request);
parsers.put(request, parser);
}
return parser.parse(buffer);
}
@ -77,37 +67,44 @@ public class ResponseContentParser extends StreamContentParser
protected void end(int request)
{
super.end(request);
parsers.remove(request);
parser.reset();
}
private static class ResponseParser implements HttpParser.ResponseHandler
private class ResponseParser implements HttpParser.ResponseHandler
{
private final HttpFields.Mutable fields = HttpFields.build();
private final ClientParser.Listener listener;
private final int request;
private final FCGIHttpParser httpParser;
private State state = State.HEADERS;
private boolean seenResponseCode;
private boolean stalled;
private ResponseParser(ClientParser.Listener listener, int request)
private ResponseParser(ClientParser.Listener listener)
{
this.listener = listener;
this.request = request;
this.httpParser = new FCGIHttpParser(this);
}
private void reset()
{
fields.clear();
httpParser.reset();
state = State.HEADERS;
seenResponseCode = false;
stalled = false;
}
public boolean parse(ByteBuffer buffer)
{
int remaining = buffer.remaining();
while (remaining > 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Response {} {}, state {} {}", request, FCGI.StreamType.STD_OUT, state, BufferUtil.toDetailString(buffer));
LOG.debug("Response {} {}, state {} {}", getRequest(), FCGI.StreamType.STD_OUT, state, BufferUtil.toDetailString(buffer));
switch (state)
{
case HEADERS:
case HEADERS ->
{
if (httpParser.parseNext(buffer))
{
@ -116,40 +113,33 @@ public class ResponseContentParser extends StreamContentParser
return true;
}
remaining = buffer.remaining();
break;
}
case CONTENT_MODE:
case CONTENT_MODE ->
{
// If we have no indication of the content, then
// the HTTP parser will assume there is no content
// and will not parse it even if it is provided,
// so we have to parse it raw ourselves here.
boolean rawContent = fields.size() == 0 ||
(fields.get(HttpHeader.CONTENT_LENGTH) == null &&
fields.get(HttpHeader.TRANSFER_ENCODING) == null);
(fields.get(HttpHeader.CONTENT_LENGTH) == null &&
fields.get(HttpHeader.TRANSFER_ENCODING) == null);
state = rawContent ? State.RAW_CONTENT : State.HTTP_CONTENT;
break;
}
case RAW_CONTENT:
case RAW_CONTENT ->
{
ByteBuffer content = buffer.asReadOnlyBuffer();
buffer.position(buffer.limit());
if (notifyContent(content))
return true;
remaining = 0;
break;
}
case HTTP_CONTENT:
case HTTP_CONTENT ->
{
if (httpParser.parseNext(buffer))
return true;
remaining = buffer.remaining();
break;
}
default:
{
throw new IllegalStateException();
}
default -> throw new IllegalStateException();
}
}
return false;
@ -205,7 +195,7 @@ public class ResponseContentParser extends StreamContentParser
{
try
{
listener.onBegin(request, code, reason);
listener.onBegin(getRequest(), code, reason);
}
catch (Throwable x)
{
@ -218,7 +208,7 @@ public class ResponseContentParser extends StreamContentParser
{
try
{
listener.onHeader(request, httpField);
listener.onHeader(getRequest(), httpField);
}
catch (Throwable x)
{
@ -242,7 +232,7 @@ public class ResponseContentParser extends StreamContentParser
{
try
{
return listener.onHeaders(request);
return listener.onHeaders(getRequest());
}
catch (Throwable x)
{
@ -278,7 +268,7 @@ public class ResponseContentParser extends StreamContentParser
{
try
{
return listener.onContent(request, FCGI.StreamType.STD_OUT, buffer);
return listener.onContent(getRequest(), FCGI.StreamType.STD_OUT, buffer);
}
catch (Throwable x)
{
@ -318,7 +308,7 @@ public class ResponseContentParser extends StreamContentParser
{
try
{
listener.onFailure(request, failure);
listener.onFailure(getRequest(), failure);
}
catch (Throwable x)
{

View File

@ -564,7 +564,7 @@ public class MultiPart
public abstract static class AbstractContentSource implements Content.Source, Closeable
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(AbstractContentSource.class);
private final Queue<Part> parts = new ArrayDeque<>();
private final String boundary;
private final ByteBuffer firstBoundary;

View File

@ -50,7 +50,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
};
private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(AsyncContent.class);
private final Queue<Content.Chunk> chunks = new ArrayDeque<>();
private Content.Chunk persistentFailure;
private boolean readClosed;

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class ByteBufferContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(ByteBufferContentSource.class);
private final long length;
private final Collection<ByteBuffer> byteBuffers;
private Iterator<ByteBuffer> iterator;

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class ChunksContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(ChunksContentSource.class);
private final long length;
private final Collection<Content.Chunk> chunks;
private Iterator<Content.Chunk> iterator;

View File

@ -39,7 +39,7 @@ public abstract class ContentSourceTransformer implements Content.Source
protected ContentSourceTransformer(Content.Source rawSource)
{
this(rawSource, new SerializedInvoker());
this(rawSource, new SerializedInvoker(ContentSourceTransformer.class));
}
protected ContentSourceTransformer(Content.Source rawSource, SerializedInvoker invoker)

View File

@ -38,7 +38,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class InputStreamContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(InputStreamContentSource.class);
private final InputStream inputStream;
private ByteBufferPool.Sized bufferPool;
private Runnable demandCallback;

View File

@ -41,7 +41,7 @@ public class PathContentSource implements Content.Source
// TODO in 12.1.x reimplement this class based on ByteChannelContentSource
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(PathContentSource.class);
private final Path path;
private final long length;
private final ByteBufferPool byteBufferPool;

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class ByteChannelContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker _invoker = new SerializedInvoker();
private final SerializedInvoker _invoker = new SerializedInvoker(ByteChannelContentSource.class);
private final ByteBufferPool.Sized _byteBufferPool;
private ByteChannel _byteChannel;
private final long _offset;

View File

@ -130,8 +130,8 @@ public class HttpChannelState implements HttpChannel, Components
{
_connectionMetaData = connectionMetaData;
// The SerializedInvoker is used to prevent infinite recursion of callbacks calling methods calling callbacks etc.
_readInvoker = new HttpChannelSerializedInvoker();
_writeInvoker = new HttpChannelSerializedInvoker();
_readInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "_readInvoker");
_writeInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "_writeInvoker");
}
@Override
@ -1825,6 +1825,11 @@ public class HttpChannelState implements HttpChannel, Components
private class HttpChannelSerializedInvoker extends SerializedInvoker
{
public HttpChannelSerializedInvoker(String name)
{
super(name);
}
@Override
protected void onError(Runnable task, Throwable failure)
{

View File

@ -42,7 +42,6 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.NanoTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -263,7 +262,7 @@ public class HttpClientDemandTest extends AbstractTest
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
Response response = result.getResponse();
assertEquals(HttpStatus.OK_200, response.getStatus());
resultLatch.countDown();
@ -346,7 +345,7 @@ public class HttpClientDemandTest extends AbstractTest
.onResponseContentAsync(listener2)
.send(result ->
{
Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
Response response = result.getResponse();
assertEquals(HttpStatus.OK_200, response.getStatus());
resultLatch.countDown();
@ -415,8 +414,8 @@ public class HttpClientDemandTest extends AbstractTest
})
.send(result ->
{
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
@ -480,8 +479,8 @@ public class HttpClientDemandTest extends AbstractTest
})
.send(result ->
{
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});
@ -540,8 +539,8 @@ public class HttpClientDemandTest extends AbstractTest
})
.send(result ->
{
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});
@ -572,8 +571,8 @@ public class HttpClientDemandTest extends AbstractTest
.onResponseContentSource((response, contentSource) -> contentSource.demand(() -> new Thread(new Accumulator(contentSource, chunks)).start()))
.send(result ->
{
Assertions.assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});

View File

@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
*/
public class SerializedExecutor implements Executor
{
private final SerializedInvoker _invoker = new SerializedInvoker()
private final SerializedInvoker _invoker = new SerializedInvoker(SerializedExecutor.class)
{
@Override
protected void onError(Runnable task, Throwable t)

View File

@ -13,8 +13,12 @@
package org.eclipse.jetty.util.thread;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,6 +39,51 @@ public class SerializedInvoker
private static final Logger LOG = LoggerFactory.getLogger(SerializedInvoker.class);
private final AtomicReference<Link> _tail = new AtomicReference<>();
private final String _name;
private volatile Thread _invokerThread;
/**
* Create a new instance whose name is {@code anonymous}.
*/
public SerializedInvoker()
{
this("anonymous");
}
/**
* Create a new instance whose name is derived from the given class.
* @param nameFrom the class to use as a name.
*/
public SerializedInvoker(Class<?> nameFrom)
{
this(nameFrom.getSimpleName());
}
/**
* Create a new instance with the given name.
* @param name the name.
*/
public SerializedInvoker(String name)
{
_name = name;
}
/**
* @return whether the current thread is currently executing a task using this invoker
*/
boolean isCurrentThreadInvoking()
{
return _invokerThread == Thread.currentThread();
}
/**
* @throws IllegalStateException when the current thread is not currently executing a task using this invoker
*/
public void assertCurrentThreadInvoking() throws IllegalStateException
{
if (!isCurrentThreadInvoking())
throw new IllegalStateException();
}
/**
* Arrange for a task to be invoked, mutually excluded from other tasks.
@ -59,7 +108,7 @@ public class SerializedInvoker
{
// Wrap the given task with another one that's going to delegate run() to the wrapped task while the
// wrapper's toString() returns a description of the place in code where SerializedInvoker.run() was called.
task = new NamedRunnable(task, deriveTaskName(task));
task = new NamedRunnable(task);
}
}
Link link = new Link(task);
@ -72,18 +121,6 @@ public class SerializedInvoker
return null;
}
protected String deriveTaskName(Runnable task)
{
StackTraceElement[] stackTrace = new Exception().getStackTrace();
for (StackTraceElement stackTraceElement : stackTrace)
{
String className = stackTraceElement.getClassName();
if (!className.equals(SerializedInvoker.class.getName()) && !className.equals(getClass().getName()))
return "Queued at " + stackTraceElement;
}
return task.toString();
}
/**
* Arrange for tasks to be invoked, mutually excluded from other tasks.
* @param tasks The tasks to invoke
@ -115,9 +152,8 @@ public class SerializedInvoker
Runnable todo = offer(task);
if (todo != null)
todo.run();
else
if (LOG.isDebugEnabled())
LOG.debug("Queued link in {}", this);
else if (LOG.isDebugEnabled())
LOG.debug("Queued link in {}", this);
}
/**
@ -130,15 +166,14 @@ public class SerializedInvoker
Runnable todo = offer(tasks);
if (todo != null)
todo.run();
else
if (LOG.isDebugEnabled())
LOG.debug("Queued links in {}", this);
else if (LOG.isDebugEnabled())
LOG.debug("Queued links in {}", this);
}
@Override
public String toString()
{
return String.format("%s@%x{tail=%s}", getClass().getSimpleName(), hashCode(), _tail);
return String.format("%s@%x{name=%s,tail=%s,invoker=%s}", getClass().getSimpleName(), hashCode(), _name, _tail, _invokerThread);
}
protected void onError(Runnable task, Throwable t)
@ -146,7 +181,7 @@ public class SerializedInvoker
LOG.warn("Serialized invocation error", t);
}
private class Link implements Runnable, Invocable
private class Link implements Runnable, Invocable, Dumpable
{
private final Runnable _task;
private final AtomicReference<Link> _next = new AtomicReference<>();
@ -156,6 +191,24 @@ public class SerializedInvoker
_task = task;
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
if (_task instanceof NamedRunnable nr)
{
StringWriter sw = new StringWriter();
nr.stack.printStackTrace(new PrintWriter(sw));
Dumpable.dumpObjects(out, indent, nr.toString(), sw.toString());
}
else
{
Dumpable.dumpObjects(out, indent, _task);
}
Link link = _next.get();
if (link != null)
link.dump(out, indent);
}
@Override
public InvocationType getInvocationType()
{
@ -186,6 +239,7 @@ public class SerializedInvoker
{
if (LOG.isDebugEnabled())
LOG.debug("Running link {} of {}", link, SerializedInvoker.this);
_invokerThread = Thread.currentThread();
try
{
link._task.run();
@ -196,6 +250,12 @@ public class SerializedInvoker
LOG.debug("Failed while running link {} of {}", link, SerializedInvoker.this, t);
onError(link._task, t);
}
finally
{
// _invokerThread must be nulled before calling link.next() as
// once the latter has executed, another thread can enter Link.run().
_invokerThread = null;
}
link = link.next();
if (link == null && LOG.isDebugEnabled())
LOG.debug("Next link is null, execution is over in {}", SerializedInvoker.this);
@ -209,10 +269,35 @@ public class SerializedInvoker
}
}
private record NamedRunnable(Runnable delegate, String name) implements Runnable
private class NamedRunnable implements Runnable
{
private static final Logger LOG = LoggerFactory.getLogger(NamedRunnable.class);
private final Runnable delegate;
private final String name;
private final Throwable stack;
private NamedRunnable(Runnable delegate)
{
this.delegate = delegate;
this.stack = new Throwable();
this.name = deriveTaskName(delegate, stack);
}
private String deriveTaskName(Runnable task, Throwable stack)
{
StackTraceElement[] stackTrace = stack.getStackTrace();
for (StackTraceElement stackTraceElement : stackTrace)
{
String className = stackTraceElement.getClassName();
if (!className.equals(SerializedInvoker.class.getName()) &&
!className.equals(SerializedInvoker.this.getClass().getName()) &&
!className.equals(getClass().getName()))
return "Queued by " + Thread.currentThread().getName() + " at " + stackTraceElement;
}
return task.toString();
}
@Override
public void run()
{

View File

@ -14,6 +14,8 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -25,17 +27,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class SerializedInvokerTest
{
SerializedInvoker _serialedInvoker;
private SerializedInvoker _serializedInvoker;
private ExecutorService _executor;
@BeforeEach
public void beforeEach()
{
_serialedInvoker = new SerializedInvoker();
_serializedInvoker = new SerializedInvoker(SerializedInvokerTest.class);
_executor = Executors.newSingleThreadExecutor();
}
@AfterEach
public void afterEach()
{
_executor.shutdownNow();
}
@Test
@ -45,24 +50,27 @@ public class SerializedInvokerTest
Task task2 = new Task();
Task task3 = new Task();
Runnable todo = _serialedInvoker.offer(task1);
assertNull(_serialedInvoker.offer(task2));
assertNull(_serialedInvoker.offer(task3));
Runnable todo = _serializedInvoker.offer(task1);
assertNull(_serializedInvoker.offer(task2));
assertNull(_serializedInvoker.offer(task3));
assertFalse(task1.hasRun());
assertFalse(task2.hasRun());
assertFalse(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
todo.run();
assertTrue(task1.hasRun());
assertTrue(task2.hasRun());
assertTrue(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
Task task4 = new Task();
todo = _serialedInvoker.offer(task4);
todo = _serializedInvoker.offer(task4);
todo.run();
assertTrue(task4.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
}
@Test
@ -72,22 +80,25 @@ public class SerializedInvokerTest
Task task2 = new Task();
Task task3 = new Task();
Runnable todo = _serialedInvoker.offer(null, task1, null, task2, null, task3, null);
Runnable todo = _serializedInvoker.offer(null, task1, null, task2, null, task3, null);
assertFalse(task1.hasRun());
assertFalse(task2.hasRun());
assertFalse(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
todo.run();
assertTrue(task1.hasRun());
assertTrue(task2.hasRun());
assertTrue(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
Task task4 = new Task();
todo = _serialedInvoker.offer(task4);
todo = _serializedInvoker.offer(task4);
todo.run();
assertTrue(task4.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
}
@Test
@ -99,7 +110,7 @@ public class SerializedInvokerTest
@Override
public void run()
{
assertNull(_serialedInvoker.offer(task3));
assertNull(_serializedInvoker.offer(task3));
super.run();
}
};
@ -108,32 +119,35 @@ public class SerializedInvokerTest
@Override
public void run()
{
assertNull(_serialedInvoker.offer(task2));
assertNull(_serializedInvoker.offer(task2));
super.run();
}
};
Runnable todo = _serialedInvoker.offer(task1);
Runnable todo = _serializedInvoker.offer(task1);
assertFalse(task1.hasRun());
assertFalse(task2.hasRun());
assertFalse(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
todo.run();
assertTrue(task1.hasRun());
assertTrue(task2.hasRun());
assertTrue(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
Task task4 = new Task();
todo = _serialedInvoker.offer(task4);
todo = _serializedInvoker.offer(task4);
todo.run();
assertTrue(task4.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
}
public static class Task implements Runnable
public class Task implements Runnable
{
CountDownLatch _run = new CountDownLatch(1);
final CountDownLatch _run = new CountDownLatch(1);
boolean hasRun()
{
@ -143,7 +157,17 @@ public class SerializedInvokerTest
@Override
public void run()
{
_run.countDown();
try
{
assertTrue(_serializedInvoker.isCurrentThreadInvoking());
assertFalse(_executor.submit(() -> _serializedInvoker.isCurrentThreadInvoking()).get());
_run.countDown();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}