Merged branch 'jetty-12.0.x' into 'jetty-12.1.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-08-30 12:12:52 +02:00
commit 4ee55c2a4d
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
22 changed files with 390 additions and 245 deletions

View File

@ -127,6 +127,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
{ {
// The request may still be sending content, stop it. // The request may still be sending content, stop it.
Request request = response.getRequest(); Request request = response.getRequest();
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
} }

View File

@ -61,6 +61,7 @@ public class RedirectProtocolHandler implements ProtocolHandler, Response.Listen
{ {
// The request may still be sending content, stop it. // The request may still be sending content, stop it.
Request request = response.getRequest(); Request request = response.getRequest();
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
} }

View File

@ -67,7 +67,7 @@ public abstract class HttpReceiver
{ {
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class); private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class);
private final SerializedInvoker invoker = new SerializedInvoker(); private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class);
private final HttpChannel channel; private final HttpChannel channel;
private ResponseState responseState = ResponseState.IDLE; private ResponseState responseState = ResponseState.IDLE;
private NotifiableContentSource contentSource; private NotifiableContentSource contentSource;
@ -332,19 +332,8 @@ public abstract class HttpReceiver
if (exchange.isResponseCompleteOrTerminated()) if (exchange.isResponseCompleteOrTerminated())
return; return;
responseContentAvailable();
});
}
/**
* Method to be invoked when response content is available to be read.
* <p>
* This method directly invokes the demand callback, assuming the caller
* is already serialized with other events.
*/
protected void responseContentAvailable()
{
contentSource.onDataAvailable(); contentSource.onDataAvailable();
});
} }
/** /**
@ -720,6 +709,9 @@ public abstract class HttpReceiver
current = HttpReceiver.this.read(false); current = HttpReceiver.this.read(false);
if (LOG.isDebugEnabled())
LOG.debug("Read {} from {}", current, this);
try (AutoLock ignored = lock.lock()) try (AutoLock ignored = lock.lock())
{ {
if (currentChunk != null) if (currentChunk != null)
@ -739,6 +731,7 @@ public abstract class HttpReceiver
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onDataAvailable on {}", this); LOG.debug("onDataAvailable on {}", this);
invoker.assertCurrentThreadInvoking();
// The onDataAvailable() method is only ever called // The onDataAvailable() method is only ever called
// by the invoker so avoid using the invoker again. // by the invoker so avoid using the invoker again.
invokeDemandCallback(false); invokeDemandCallback(false);
@ -763,6 +756,8 @@ public abstract class HttpReceiver
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Processing demand on {}", this); LOG.debug("Processing demand on {}", this);
invoker.assertCurrentThreadInvoking();
Content.Chunk current; Content.Chunk current;
try (AutoLock ignored = lock.lock()) try (AutoLock ignored = lock.lock())
{ {
@ -802,10 +797,15 @@ public abstract class HttpReceiver
try try
{ {
if (invoke) if (invoke)
{
invoker.run(demandCallback); invoker.run(demandCallback);
}
else else
{
invoker.assertCurrentThreadInvoking();
demandCallback.run(); demandCallback.run();
} }
}
catch (Throwable x) catch (Throwable x)
{ {
fail(x); fail(x);

View File

@ -43,17 +43,18 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{ {
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP.class); private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP.class);
private final Runnable receiveNext = this::receiveNext;
private final LongAdder inMessages = new LongAdder(); private final LongAdder inMessages = new LongAdder();
private final HttpParser parser; private final HttpParser parser;
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
private RetainableByteBuffer networkBuffer; private RetainableByteBuffer networkBuffer;
private boolean shutdown; private State state = State.STATUS;
private boolean complete;
private boolean unsolicited; private boolean unsolicited;
private String method;
private int status; private int status;
private String method;
private Content.Chunk chunk; private Content.Chunk chunk;
private Runnable action; private boolean shutdown;
private boolean disposed;
public HttpReceiverOverHTTP(HttpChannelOverHTTP channel) public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
{ {
@ -73,7 +74,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{ {
if (!hasContent()) if (!hasContent())
{ {
boolean setFillInterest = parseAndFill(); boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest) if (!hasContent() && setFillInterest)
fillInterested(); fillInterested();
} }
@ -97,11 +98,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
super.reset(); super.reset();
parser.reset(); parser.reset();
if (chunk != null) if (chunk != null)
{
chunk.release(); chunk.release();
chunk = null; chunk = null;
} }
}
@Override @Override
protected void dispose() protected void dispose()
@ -109,10 +108,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
super.dispose(); super.dispose();
parser.close(); parser.close();
if (chunk != null) if (chunk != null)
{
chunk.release(); chunk.release();
chunk = null; chunk = null;
} disposed = true;
} }
@Override @Override
@ -124,7 +122,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
Content.Chunk chunk = consumeChunk(); Content.Chunk chunk = consumeChunk();
if (chunk != null) if (chunk != null)
return chunk; return chunk;
boolean needFillInterest = parseAndFill(); boolean needFillInterest = parseAndFill(false);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("ParseAndFill needFillInterest {} in {}", needFillInterest, this); LOG.debug("ParseAndFill needFillInterest {} in {}", needFillInterest, this);
chunk = consumeChunk(); chunk = consumeChunk();
@ -236,7 +234,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
* If this method depletes the buffer, it will always try to re-fill until fill generates 0 byte. * If this method depletes the buffer, it will always try to re-fill until fill generates 0 byte.
* @return true if no bytes were filled. * @return true if no bytes were filled.
*/ */
private boolean parseAndFill() private boolean parseAndFill(boolean notifyContentAvailable)
{ {
HttpConnectionOverHTTP connection = getHttpConnection(); HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint(); EndPoint endPoint = connection.getEndPoint();
@ -246,23 +244,22 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
acquireNetworkBuffer(); acquireNetworkBuffer();
while (true) while (true)
{ {
if (LOG.isDebugEnabled())
LOG.debug("Parsing {} in {}", networkBuffer, this);
// Always parse even empty buffers to advance the parser. // Always parse even empty buffers to advance the parser.
if (parse()) boolean stopParsing = parse(notifyContentAvailable);
if (LOG.isDebugEnabled())
LOG.debug("Parsed stop={} in {}", stopParsing, this);
if (stopParsing)
{ {
// Return immediately, as this thread may be in a race // Return immediately, as this thread may be in a race
// with e.g. another thread demanding more content. // with e.g. another thread demanding more content.
return false; return false;
} }
if (LOG.isDebugEnabled())
LOG.debug("Parser willing to advance in {}", this);
// Connection may be closed in a parser callback. // Connection may be closed in a parser callback.
if (connection.isClosed()) if (connection.isClosed() || isShutdown())
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Closed {} in {}", connection, this); LOG.debug("Closed/Shutdown {} in {}", connection, this);
releaseNetworkBuffer(); releaseNetworkBuffer();
return false; return false;
} }
@ -271,6 +268,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
reacquireNetworkBuffer(); reacquireNetworkBuffer();
// The networkBuffer may have been reacquired. // The networkBuffer may have been reacquired.
assert !networkBuffer.hasRemaining();
int read = endPoint.fill(networkBuffer.getByteBuffer()); int read = endPoint.fill(networkBuffer.getByteBuffer());
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes in {} from {} in {}", read, networkBuffer, endPoint, this); LOG.debug("Read {} bytes in {} from {} in {}", read, networkBuffer, endPoint, this);
@ -286,9 +284,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
} }
else else
{ {
releaseNetworkBuffer();
shutdown(); shutdown();
return false; // Loop around to parse again to advance the parser,
// for example for HTTP/1.0 connection-delimited content.
} }
} }
} }
@ -307,62 +305,80 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
* *
* @return true to indicate that parsing should be interrupted (and will be resumed by another thread). * @return true to indicate that parsing should be interrupted (and will be resumed by another thread).
*/ */
private boolean parse() private boolean parse(boolean notifyContentAvailable)
{ {
// HttpParser is not reentrant, so we cannot invoke the
// application from the parser event callbacks.
// However, the mechanism in general (and this method)
// is reentrant: it notifies the application which may
// read response content, which reenters here.
ByteBuffer byteBuffer = networkBuffer.getByteBuffer();
while (true) while (true)
{ {
boolean handle = parser.parseNext(networkBuffer.getByteBuffer()); boolean handle = parser.parseNext(byteBuffer);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Parse result={} on {}", handle, this); LOG.debug("Parse state={} result={} {} {} on {}", state, handle, BufferUtil.toDetailString(byteBuffer), parser, this);
Runnable action = getAndSetAction(null); if (!handle)
if (action != null) return false;
{
if (LOG.isDebugEnabled())
LOG.debug("Executing action after parser returned: {} on {}", action, this);
action.run();
if (LOG.isDebugEnabled())
LOG.debug("Action executed after Parse result={} on {}", handle, this);
}
if (handle)
{
// When the receiver is aborted, the parser is closed in dispose() which changes
// its state to State.CLOSE; so checking parser.isClose() is just a way to check
// if the receiver was aborted or not.
return !parser.isClose();
}
boolean complete = this.complete; HttpExchange exchange = getHttpExchange();
this.complete = false; if (exchange == null)
if (LOG.isDebugEnabled()) throw new IllegalStateException("No exchange");
LOG.debug("Parse complete={}, {} {} in {}", complete, networkBuffer, parser, this);
if (complete) switch (state)
{ {
int status = this.status; case HEADERS -> responseHeaders(exchange);
this.status = 0; case CONTENT ->
// Connection upgrade due to 101, bail out. {
if (status == HttpStatus.SWITCHING_PROTOCOLS_101) if (notifyContentAvailable)
return true; responseContentAvailable(exchange);
// Connection upgrade due to CONNECT + 200, bail out. }
String method = this.method; case COMPLETE ->
this.method = null; {
if (getHttpChannel().isTunnel(method, status)) boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101;
boolean isTunnel = getHttpChannel().isTunnel(method, status);
Runnable task = isUpgrade || isTunnel ? null : this.receiveNext;
responseSuccess(exchange, task);
// Connection upgrade, bail out.
if (isUpgrade || isTunnel)
return true; return true;
if (networkBuffer.isEmpty()) if (byteBuffer.hasRemaining())
return false; {
if (HttpStatus.isInterim(status))
if (!HttpStatus.isInformational(status)) {
// There may be multiple interim responses in
// the same network buffer, continue parsing.
continue;
}
else
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Discarding unexpected content after response {}: {} in {}", status, networkBuffer, this); LOG.debug("Discarding unexpected content after response {}: {} in {}", status, BufferUtil.toDetailString(byteBuffer), this);
networkBuffer.clear(); BufferUtil.clear(byteBuffer);
return false;
} }
}
// Continue to read from the network.
return false;
}
default -> throw new IllegalStateException("Invalid state " + state);
}
// The application may have aborted the request.
if (disposed)
{
BufferUtil.clear(byteBuffer);
return false; return false;
} }
if (networkBuffer.isEmpty()) // The application has been invoked,
return false; // and it is now driving the parsing.
return true;
} }
} }
@ -386,7 +402,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
// header, the connection will be closed at exchange termination // header, the connection will be closed at exchange termination
// thanks to the flag we have set above. // thanks to the flag we have set above.
parser.atEOF(); parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
} }
protected boolean isShutdown() protected boolean isShutdown()
@ -406,6 +421,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
this.status = status; this.status = status;
parser.setHeadResponse(HttpMethod.HEAD.is(method) || getHttpChannel().isTunnel(method, status)); parser.setHeadResponse(HttpMethod.HEAD.is(method) || getHttpChannel().isTunnel(method, status));
exchange.getResponse().version(version).status(status).reason(reason); exchange.getResponse().version(version).status(status).reason(reason);
state = State.STATUS;
responseBegin(exchange); responseBegin(exchange);
} }
@ -432,10 +448,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
// Store the EndPoint is case of upgrades, tunnels, etc. // Store the EndPoint is case of upgrades, tunnels, etc.
exchange.getRequest().getConversation().setAttribute(EndPoint.class.getName(), getHttpConnection().getEndPoint()); exchange.getRequest().getConversation().setAttribute(EndPoint.class.getName(), getHttpConnection().getEndPoint());
getHttpConnection().onResponseHeaders(exchange); getHttpConnection().onResponseHeaders(exchange);
if (LOG.isDebugEnabled()) state = State.HEADERS;
LOG.debug("Setting action to responseHeaders(exchange, boolean) on {}", this);
if (getAndSetAction(() -> responseHeaders(exchange)) != null)
throw new IllegalStateException();
return true; return true;
} }
@ -451,17 +464,13 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (chunk != null) if (chunk != null)
throw new IllegalStateException("Content generated with unconsumed content left"); throw new IllegalStateException("Content generated with unconsumed content left");
if (getHttpConnection().isFillInterested())
throw new IllegalStateException("Fill interested while parsing for content");
// Retain the chunk because it is stored for later use. // Retain the chunk because it is stored for later use.
networkBuffer.retain(); networkBuffer.retain();
chunk = Content.Chunk.asChunk(buffer, false, networkBuffer); chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
state = State.CONTENT;
if (LOG.isDebugEnabled())
LOG.debug("Setting action to responseContentAvailable on {}", this);
if (getAndSetAction(this::responseContentAvailable) != null)
throw new IllegalStateException();
if (getHttpConnection().isFillInterested())
throw new IllegalStateException();
return true; return true;
} }
@ -491,28 +500,20 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null || unsolicited) if (exchange == null || unsolicited)
{ {
// We received an unsolicited response from the server. // We received an unsolicited response from the server.
networkBuffer.clear();
getHttpConnection().close(); getHttpConnection().close();
return false; return false;
} }
int status = exchange.getResponse().getStatus(); int status = exchange.getResponse().getStatus();
if (!HttpStatus.isInterim(status)) if (!HttpStatus.isInterim(status))
{
inMessages.increment(); inMessages.increment();
complete = true;
}
if (chunk != null) if (chunk != null)
throw new IllegalStateException(); throw new IllegalStateException();
chunk = Content.Chunk.EOF; chunk = Content.Chunk.EOF;
state = State.COMPLETE;
boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101; return true;
boolean isTunnel = getHttpChannel().isTunnel(method, status);
Runnable task = isUpgrade || isTunnel ? null : this::receiveNext;
if (LOG.isDebugEnabled())
LOG.debug("Message complete, calling response success with task {} in {}", task, this);
responseSuccess(exchange, task);
return false;
} }
private void receiveNext() private void receiveNext()
@ -524,7 +525,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Receiving next request in {}", this); LOG.debug("Receiving next request in {}", this);
boolean setFillInterest = parseAndFill(); boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest) if (!hasContent() && setFillInterest)
fillInterested(); fillInterested();
} }
@ -556,13 +557,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
} }
} }
private Runnable getAndSetAction(Runnable action)
{
Runnable r = this.action;
this.action = action;
return r;
}
long getMessagesIn() long getMessagesIn()
{ {
return inMessages.longValue(); return inMessages.longValue();
@ -573,4 +567,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{ {
return String.format("%s[%s]", super.toString(), parser); return String.format("%s[%s]", super.toString(), parser);
} }
private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}
} }

View File

@ -332,7 +332,10 @@ public class NetworkTrafficListenerTest
@Override @Override
public boolean handle(Request request, Response response, Callback callback) public boolean handle(Request request, Response response, Callback callback)
{ {
Response.sendRedirect(request, response, callback, location); Content.Source.consumeAll(request, Callback.from(
() -> Response.sendRedirect(request, response, callback, location),
callback::failed
));
return true; return true;
} }
}); });

View File

@ -119,11 +119,25 @@ public class HttpChannelOverFCGI extends HttpChannel
receiver.content(chunk); receiver.content(chunk);
} }
protected void responseContentAvailable()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseContentAvailable(exchange);
}
protected void end() protected void end()
{ {
HttpExchange exchange = getHttpExchange(); HttpExchange exchange = getHttpExchange();
if (exchange != null) if (exchange != null)
receiver.end(exchange); receiver.end();
}
protected void responseSuccess()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseSuccess(exchange);
} }
protected void responseFailure(Throwable failure, Promise<Boolean> promise) protected void responseFailure(Throwable failure, Promise<Boolean> promise)

View File

@ -67,7 +67,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private final HttpChannelOverFCGI channel; private final HttpChannelOverFCGI channel;
private RetainableByteBuffer networkBuffer; private RetainableByteBuffer networkBuffer;
private Object attachment; private Object attachment;
private Runnable action; private State state = State.STATUS;
private long idleTimeout; private long idleTimeout;
private boolean shutdown; private boolean shutdown;
@ -168,7 +168,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
this.networkBuffer = null; this.networkBuffer = null;
} }
boolean parseAndFill() boolean parseAndFill(boolean notifyContentAvailable)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("parseAndFill {}", networkBuffer); LOG.debug("parseAndFill {}", networkBuffer);
@ -179,7 +179,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{ {
while (true) while (true)
{ {
if (parse(networkBuffer.getByteBuffer())) if (parse(networkBuffer.getByteBuffer(), notifyContentAvailable))
return false; return false;
if (networkBuffer.isRetained()) if (networkBuffer.isRetained())
@ -214,13 +214,35 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
} }
} }
private boolean parse(ByteBuffer buffer) private boolean parse(ByteBuffer buffer, boolean notifyContentAvailable)
{ {
boolean parse = parser.parse(buffer); boolean handle = parser.parse(buffer);
Runnable action = getAndSetAction(null);
if (action != null) switch (state)
action.run(); {
return parse; case STATUS ->
{
// Nothing to do.
}
case HEADERS -> channel.responseHeaders();
case CONTENT ->
{
if (notifyContentAvailable)
channel.responseContentAvailable();
}
case COMPLETE ->
{
// For the complete event, handle==false, and cannot
// differentiate between a complete event and a parse()
// with zero or not enough bytes, so the state is reset
// here to avoid calling responseSuccess() again.
state = State.STATUS;
channel.responseSuccess();
}
default -> throw new IllegalStateException("Invalid state " + state);
}
return handle;
} }
private void shutdown() private void shutdown()
@ -318,13 +340,6 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
}, x -> close(failure))); }, x -> close(failure)));
} }
private Runnable getAndSetAction(Runnable action)
{
Runnable r = this.action;
this.action = action;
return r;
}
protected HttpChannelOverFCGI newHttpChannel() protected HttpChannelOverFCGI newHttpChannel()
{ {
return new HttpChannelOverFCGI(this); return new HttpChannelOverFCGI(this);
@ -414,6 +429,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onBegin r={},c={},reason={}", request, code, reason); LOG.debug("onBegin r={},c={},reason={}", request, code, reason);
state = State.STATUS;
channel.responseBegin(code, reason); channel.responseBegin(code, reason);
} }
@ -430,8 +446,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onHeaders r={} {}", request, networkBuffer); LOG.debug("onHeaders r={} {}", request, networkBuffer);
if (getAndSetAction(channel::responseHeaders) != null) state = State.HEADERS;
throw new IllegalStateException();
return true; return true;
} }
@ -444,13 +459,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{ {
case STD_OUT -> case STD_OUT ->
{ {
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
// The receiver of the chunk decides whether to consume/retain it.
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer); Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
if (getAndSetAction(() -> channel.content(chunk)) == null) channel.content(chunk);
state = State.CONTENT;
return true; return true;
throw new IllegalStateException();
} }
case STD_ERR -> LOG.info(BufferUtil.toUTF8String(buffer)); case STD_ERR -> LOG.info(BufferUtil.toUTF8String(buffer));
default -> throw new IllegalArgumentException(); default -> throw new IllegalArgumentException();
@ -464,6 +476,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onEnd r={}", request); LOG.debug("onEnd r={}", request);
channel.end(); channel.end();
state = State.COMPLETE;
} }
@Override @Override
@ -474,4 +487,9 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
failAndClose(failure); failAndClose(failure);
} }
} }
private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}
} }

View File

@ -34,7 +34,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver
if (!hasContent()) if (!hasContent())
{ {
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill(); boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest) if (!hasContent() && setFillInterest)
httpConnection.fillInterested(); httpConnection.fillInterested();
} }
@ -81,7 +81,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver
if (chunk != null) if (chunk != null)
return chunk; return chunk;
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean needFillInterest = httpConnection.parseAndFill(); boolean needFillInterest = httpConnection.parseAndFill(false);
chunk = consumeChunk(); chunk = consumeChunk();
if (chunk != null) if (chunk != null)
return chunk; return chunk;
@ -109,23 +109,23 @@ public class HttpReceiverOverFCGI extends HttpReceiver
void content(Content.Chunk chunk) void content(Content.Chunk chunk)
{ {
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
if (this.chunk != null) if (this.chunk != null)
throw new IllegalStateException(); throw new IllegalStateException();
// Retain the chunk because it is stored for later reads. // Retain the chunk because it is stored for later reads.
chunk.retain(); chunk.retain();
this.chunk = chunk; this.chunk = chunk;
responseContentAvailable();
} }
void end(HttpExchange exchange) void end()
{ {
if (chunk != null) if (chunk != null)
throw new IllegalStateException(); throw new IllegalStateException();
chunk = Content.Chunk.EOF; chunk = Content.Chunk.EOF;
responseSuccess(exchange, this::receiveNext); }
void responseSuccess(HttpExchange exchange)
{
super.responseSuccess(exchange, this::receiveNext);
} }
private void receiveNext() private void receiveNext()
@ -136,7 +136,7 @@ public class HttpReceiverOverFCGI extends HttpReceiver
throw new IllegalStateException(); throw new IllegalStateException();
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill(); boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest) if (!hasContent() && setFillInterest)
httpConnection.fillInterested(); httpConnection.fillInterested();
} }
@ -165,6 +165,12 @@ public class HttpReceiverOverFCGI extends HttpReceiver
super.responseHeaders(exchange); super.responseHeaders(exchange);
} }
@Override
protected void responseContentAvailable(HttpExchange exchange)
{
super.responseContentAvailable(exchange);
}
@Override @Override
protected void responseFailure(Throwable failure, Promise<Boolean> promise) protected void responseFailure(Throwable failure, Promise<Boolean> promise)
{ {

View File

@ -15,8 +15,6 @@ package org.eclipse.jetty.fcgi.parser;
import java.io.EOFException; import java.io.EOFException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.fcgi.FCGI; import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.http.HttpCompliance;
@ -43,13 +41,12 @@ public class ResponseContentParser extends StreamContentParser
{ {
private static final Logger LOG = LoggerFactory.getLogger(ResponseContentParser.class); private static final Logger LOG = LoggerFactory.getLogger(ResponseContentParser.class);
private final Map<Integer, ResponseParser> parsers = new ConcurrentHashMap<>(); private final ResponseParser parser;
private final ClientParser.Listener listener;
public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener listener) public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener listener)
{ {
super(headerParser, FCGI.StreamType.STD_OUT, listener); super(headerParser, FCGI.StreamType.STD_OUT, listener);
this.listener = listener; this.parser = new ResponseParser(listener);
} }
@Override @Override
@ -63,13 +60,6 @@ public class ResponseContentParser extends StreamContentParser
@Override @Override
protected boolean onContent(ByteBuffer buffer) protected boolean onContent(ByteBuffer buffer)
{ {
int request = getRequest();
ResponseParser parser = parsers.get(request);
if (parser == null)
{
parser = new ResponseParser(listener, request);
parsers.put(request, parser);
}
return parser.parse(buffer); return parser.parse(buffer);
} }
@ -77,37 +67,44 @@ public class ResponseContentParser extends StreamContentParser
protected void end(int request) protected void end(int request)
{ {
super.end(request); super.end(request);
parsers.remove(request); parser.reset();
} }
private static class ResponseParser implements HttpParser.ResponseHandler private class ResponseParser implements HttpParser.ResponseHandler
{ {
private final HttpFields.Mutable fields = HttpFields.build(); private final HttpFields.Mutable fields = HttpFields.build();
private final ClientParser.Listener listener; private final ClientParser.Listener listener;
private final int request;
private final FCGIHttpParser httpParser; private final FCGIHttpParser httpParser;
private State state = State.HEADERS; private State state = State.HEADERS;
private boolean seenResponseCode; private boolean seenResponseCode;
private boolean stalled; private boolean stalled;
private ResponseParser(ClientParser.Listener listener, int request) private ResponseParser(ClientParser.Listener listener)
{ {
this.listener = listener; this.listener = listener;
this.request = request;
this.httpParser = new FCGIHttpParser(this); this.httpParser = new FCGIHttpParser(this);
} }
private void reset()
{
fields.clear();
httpParser.reset();
state = State.HEADERS;
seenResponseCode = false;
stalled = false;
}
public boolean parse(ByteBuffer buffer) public boolean parse(ByteBuffer buffer)
{ {
int remaining = buffer.remaining(); int remaining = buffer.remaining();
while (remaining > 0) while (remaining > 0)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Response {} {}, state {} {}", request, FCGI.StreamType.STD_OUT, state, BufferUtil.toDetailString(buffer)); LOG.debug("Response {} {}, state {} {}", getRequest(), FCGI.StreamType.STD_OUT, state, BufferUtil.toDetailString(buffer));
switch (state) switch (state)
{ {
case HEADERS: case HEADERS ->
{ {
if (httpParser.parseNext(buffer)) if (httpParser.parseNext(buffer))
{ {
@ -116,9 +113,8 @@ public class ResponseContentParser extends StreamContentParser
return true; return true;
} }
remaining = buffer.remaining(); remaining = buffer.remaining();
break;
} }
case CONTENT_MODE: case CONTENT_MODE ->
{ {
// If we have no indication of the content, then // If we have no indication of the content, then
// the HTTP parser will assume there is no content // the HTTP parser will assume there is no content
@ -128,28 +124,22 @@ public class ResponseContentParser extends StreamContentParser
(fields.get(HttpHeader.CONTENT_LENGTH) == null && (fields.get(HttpHeader.CONTENT_LENGTH) == null &&
fields.get(HttpHeader.TRANSFER_ENCODING) == null); fields.get(HttpHeader.TRANSFER_ENCODING) == null);
state = rawContent ? State.RAW_CONTENT : State.HTTP_CONTENT; state = rawContent ? State.RAW_CONTENT : State.HTTP_CONTENT;
break;
} }
case RAW_CONTENT: case RAW_CONTENT ->
{ {
ByteBuffer content = buffer.asReadOnlyBuffer(); ByteBuffer content = buffer.asReadOnlyBuffer();
buffer.position(buffer.limit()); buffer.position(buffer.limit());
if (notifyContent(content)) if (notifyContent(content))
return true; return true;
remaining = 0; remaining = 0;
break;
} }
case HTTP_CONTENT: case HTTP_CONTENT ->
{ {
if (httpParser.parseNext(buffer)) if (httpParser.parseNext(buffer))
return true; return true;
remaining = buffer.remaining(); remaining = buffer.remaining();
break;
}
default:
{
throw new IllegalStateException();
} }
default -> throw new IllegalStateException();
} }
} }
return false; return false;
@ -205,7 +195,7 @@ public class ResponseContentParser extends StreamContentParser
{ {
try try
{ {
listener.onBegin(request, code, reason); listener.onBegin(getRequest(), code, reason);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -218,7 +208,7 @@ public class ResponseContentParser extends StreamContentParser
{ {
try try
{ {
listener.onHeader(request, httpField); listener.onHeader(getRequest(), httpField);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -242,7 +232,7 @@ public class ResponseContentParser extends StreamContentParser
{ {
try try
{ {
return listener.onHeaders(request); return listener.onHeaders(getRequest());
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -278,7 +268,7 @@ public class ResponseContentParser extends StreamContentParser
{ {
try try
{ {
return listener.onContent(request, FCGI.StreamType.STD_OUT, buffer); return listener.onContent(getRequest(), FCGI.StreamType.STD_OUT, buffer);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -318,7 +308,7 @@ public class ResponseContentParser extends StreamContentParser
{ {
try try
{ {
listener.onFailure(request, failure); listener.onFailure(getRequest(), failure);
} }
catch (Throwable x) catch (Throwable x)
{ {

View File

@ -564,7 +564,7 @@ public class MultiPart
public abstract static class AbstractContentSource implements Content.Source, Closeable public abstract static class AbstractContentSource implements Content.Source, Closeable
{ {
private final AutoLock lock = new AutoLock(); private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker(); private final SerializedInvoker invoker = new SerializedInvoker(AbstractContentSource.class);
private final Queue<Part> parts = new ArrayDeque<>(); private final Queue<Part> parts = new ArrayDeque<>();
private final String boundary; private final String boundary;
private final ByteBuffer firstBoundary; private final ByteBuffer firstBoundary;

View File

@ -50,7 +50,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
}; };
private final AutoLock.WithCondition lock = new AutoLock.WithCondition(); private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
private final SerializedInvoker invoker = new SerializedInvoker(); private final SerializedInvoker invoker = new SerializedInvoker(AsyncContent.class);
private final Queue<Content.Chunk> chunks = new ArrayDeque<>(); private final Queue<Content.Chunk> chunks = new ArrayDeque<>();
private Content.Chunk persistentFailure; private Content.Chunk persistentFailure;
private boolean readClosed; private boolean readClosed;

View File

@ -44,7 +44,7 @@ public class BufferedContentSink implements Content.Sink
private final Content.Sink _delegate; private final Content.Sink _delegate;
private final RetainableByteBuffer.DynamicCapacity _aggregator; private final RetainableByteBuffer.DynamicCapacity _aggregator;
private final SerializedInvoker _serializer = new SerializedInvoker(); private final SerializedInvoker _serializer = new SerializedInvoker(BufferedContentSink.class);
private boolean _firstWrite = true; private boolean _firstWrite = true;
private boolean _lastWritten; private boolean _lastWritten;

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class ByteBufferContentSource implements Content.Source public class ByteBufferContentSource implements Content.Source
{ {
private final AutoLock lock = new AutoLock(); private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker(); private final SerializedInvoker invoker = new SerializedInvoker(ByteBufferContentSource.class);
private final long length; private final long length;
private final Collection<ByteBuffer> byteBuffers; private final Collection<ByteBuffer> byteBuffers;
private Iterator<ByteBuffer> iterator; private Iterator<ByteBuffer> iterator;

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class ChunksContentSource implements Content.Source public class ChunksContentSource implements Content.Source
{ {
private final AutoLock lock = new AutoLock(); private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker(); private final SerializedInvoker invoker = new SerializedInvoker(ChunksContentSource.class);
private final long length; private final long length;
private final Collection<Content.Chunk> chunks; private final Collection<Content.Chunk> chunks;
private Iterator<Content.Chunk> iterator; private Iterator<Content.Chunk> iterator;

View File

@ -39,7 +39,7 @@ public abstract class ContentSourceTransformer implements Content.Source
protected ContentSourceTransformer(Content.Source rawSource) protected ContentSourceTransformer(Content.Source rawSource)
{ {
this(rawSource, new SerializedInvoker()); this(rawSource, new SerializedInvoker(ContentSourceTransformer.class));
} }
protected ContentSourceTransformer(Content.Source rawSource, SerializedInvoker invoker) protected ContentSourceTransformer(Content.Source rawSource, SerializedInvoker invoker)

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class InputStreamContentSource implements Content.Source public class InputStreamContentSource implements Content.Source
{ {
private final AutoLock lock = new AutoLock(); private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker(); private final SerializedInvoker invoker = new SerializedInvoker(InputStreamContentSource.class);
private final InputStream inputStream; private final InputStream inputStream;
private final ByteBufferPool.Sized bufferPool; private final ByteBufferPool.Sized bufferPool;
private Runnable demandCallback; private Runnable demandCallback;

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
public class ByteChannelContentSource implements Content.Source public class ByteChannelContentSource implements Content.Source
{ {
private final AutoLock lock = new AutoLock(); private final AutoLock lock = new AutoLock();
private final SerializedInvoker _invoker = new SerializedInvoker(); private final SerializedInvoker _invoker = new SerializedInvoker(ByteChannelContentSource.class);
private final ByteBufferPool.Sized _byteBufferPool; private final ByteBufferPool.Sized _byteBufferPool;
private ByteChannel _byteChannel; private ByteChannel _byteChannel;
private final long _offset; private final long _offset;

View File

@ -130,8 +130,8 @@ public class HttpChannelState implements HttpChannel, Components
{ {
_connectionMetaData = connectionMetaData; _connectionMetaData = connectionMetaData;
// The SerializedInvoker is used to prevent infinite recursion of callbacks calling methods calling callbacks etc. // The SerializedInvoker is used to prevent infinite recursion of callbacks calling methods calling callbacks etc.
_readInvoker = new HttpChannelSerializedInvoker(); _readInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "_readInvoker");
_writeInvoker = new HttpChannelSerializedInvoker(); _writeInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "_writeInvoker");
} }
@Override @Override
@ -1825,6 +1825,11 @@ public class HttpChannelState implements HttpChannel, Components
private class HttpChannelSerializedInvoker extends SerializedInvoker private class HttpChannelSerializedInvoker extends SerializedInvoker
{ {
public HttpChannelSerializedInvoker(String name)
{
super(name);
}
@Override @Override
protected void onError(Runnable task, Throwable failure) protected void onError(Runnable task, Throwable failure)
{ {

View File

@ -42,7 +42,6 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.NanoTime; import org.eclipse.jetty.util.NanoTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -263,7 +262,7 @@ public class HttpClientDemandTest extends AbstractTest
.timeout(5, TimeUnit.SECONDS) .timeout(5, TimeUnit.SECONDS)
.send(result -> .send(result ->
{ {
Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure())); assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
Response response = result.getResponse(); Response response = result.getResponse();
assertEquals(HttpStatus.OK_200, response.getStatus()); assertEquals(HttpStatus.OK_200, response.getStatus());
resultLatch.countDown(); resultLatch.countDown();
@ -346,7 +345,7 @@ public class HttpClientDemandTest extends AbstractTest
.onResponseContentAsync(listener2) .onResponseContentAsync(listener2)
.send(result -> .send(result ->
{ {
Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure())); assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
Response response = result.getResponse(); Response response = result.getResponse();
assertEquals(HttpStatus.OK_200, response.getStatus()); assertEquals(HttpStatus.OK_200, response.getStatus());
resultLatch.countDown(); resultLatch.countDown();
@ -415,8 +414,8 @@ public class HttpClientDemandTest extends AbstractTest
}) })
.send(result -> .send(result ->
{ {
Assertions.assertTrue(result.isSucceeded()); assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown(); resultLatch.countDown();
}); });
assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
@ -480,8 +479,8 @@ public class HttpClientDemandTest extends AbstractTest
}) })
.send(result -> .send(result ->
{ {
Assertions.assertTrue(result.isSucceeded()); assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown(); resultLatch.countDown();
}); });
@ -540,8 +539,8 @@ public class HttpClientDemandTest extends AbstractTest
}) })
.send(result -> .send(result ->
{ {
Assertions.assertTrue(result.isSucceeded()); assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown(); resultLatch.countDown();
}); });
@ -572,8 +571,8 @@ public class HttpClientDemandTest extends AbstractTest
.onResponseContentSource((response, contentSource) -> contentSource.demand(() -> new Thread(new Accumulator(contentSource, chunks)).start())) .onResponseContentSource((response, contentSource) -> contentSource.demand(() -> new Thread(new Accumulator(contentSource, chunks)).start()))
.send(result -> .send(result ->
{ {
Assertions.assertTrue(result.isSucceeded()); assertTrue(result.isSucceeded());
Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown(); resultLatch.countDown();
}); });

View File

@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
*/ */
public class SerializedExecutor implements Executor public class SerializedExecutor implements Executor
{ {
private final SerializedInvoker _invoker = new SerializedInvoker() private final SerializedInvoker _invoker = new SerializedInvoker(SerializedExecutor.class)
{ {
@Override @Override
protected void onError(Runnable task, Throwable t) protected void onError(Runnable task, Throwable t)

View File

@ -13,8 +13,12 @@
package org.eclipse.jetty.util.thread; package org.eclipse.jetty.util.thread;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -35,6 +39,51 @@ public class SerializedInvoker
private static final Logger LOG = LoggerFactory.getLogger(SerializedInvoker.class); private static final Logger LOG = LoggerFactory.getLogger(SerializedInvoker.class);
private final AtomicReference<Link> _tail = new AtomicReference<>(); private final AtomicReference<Link> _tail = new AtomicReference<>();
private final String _name;
private volatile Thread _invokerThread;
/**
* Create a new instance whose name is {@code anonymous}.
*/
public SerializedInvoker()
{
this("anonymous");
}
/**
* Create a new instance whose name is derived from the given class.
* @param nameFrom the class to use as a name.
*/
public SerializedInvoker(Class<?> nameFrom)
{
this(nameFrom.getSimpleName());
}
/**
* Create a new instance with the given name.
* @param name the name.
*/
public SerializedInvoker(String name)
{
_name = name;
}
/**
* @return whether the current thread is currently executing a task using this invoker
*/
boolean isCurrentThreadInvoking()
{
return _invokerThread == Thread.currentThread();
}
/**
* @throws IllegalStateException when the current thread is not currently executing a task using this invoker
*/
public void assertCurrentThreadInvoking() throws IllegalStateException
{
if (!isCurrentThreadInvoking())
throw new IllegalStateException();
}
/** /**
* Arrange for a task to be invoked, mutually excluded from other tasks. * Arrange for a task to be invoked, mutually excluded from other tasks.
@ -59,7 +108,7 @@ public class SerializedInvoker
{ {
// Wrap the given task with another one that's going to delegate run() to the wrapped task while the // Wrap the given task with another one that's going to delegate run() to the wrapped task while the
// wrapper's toString() returns a description of the place in code where SerializedInvoker.run() was called. // wrapper's toString() returns a description of the place in code where SerializedInvoker.run() was called.
task = new NamedRunnable(task, deriveTaskName(task)); task = new NamedRunnable(task);
} }
} }
Link link = new Link(task); Link link = new Link(task);
@ -72,18 +121,6 @@ public class SerializedInvoker
return null; return null;
} }
protected String deriveTaskName(Runnable task)
{
StackTraceElement[] stackTrace = new Exception().getStackTrace();
for (StackTraceElement stackTraceElement : stackTrace)
{
String className = stackTraceElement.getClassName();
if (!className.equals(SerializedInvoker.class.getName()) && !className.equals(getClass().getName()))
return "Queued at " + stackTraceElement;
}
return task.toString();
}
/** /**
* Arrange for tasks to be invoked, mutually excluded from other tasks. * Arrange for tasks to be invoked, mutually excluded from other tasks.
* @param tasks The tasks to invoke * @param tasks The tasks to invoke
@ -115,8 +152,7 @@ public class SerializedInvoker
Runnable todo = offer(task); Runnable todo = offer(task);
if (todo != null) if (todo != null)
todo.run(); todo.run();
else else if (LOG.isDebugEnabled())
if (LOG.isDebugEnabled())
LOG.debug("Queued link in {}", this); LOG.debug("Queued link in {}", this);
} }
@ -130,15 +166,14 @@ public class SerializedInvoker
Runnable todo = offer(tasks); Runnable todo = offer(tasks);
if (todo != null) if (todo != null)
todo.run(); todo.run();
else else if (LOG.isDebugEnabled())
if (LOG.isDebugEnabled())
LOG.debug("Queued links in {}", this); LOG.debug("Queued links in {}", this);
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x{tail=%s}", getClass().getSimpleName(), hashCode(), _tail); return String.format("%s@%x{name=%s,tail=%s,invoker=%s}", getClass().getSimpleName(), hashCode(), _name, _tail, _invokerThread);
} }
protected void onError(Runnable task, Throwable t) protected void onError(Runnable task, Throwable t)
@ -146,7 +181,7 @@ public class SerializedInvoker
LOG.warn("Serialized invocation error", t); LOG.warn("Serialized invocation error", t);
} }
private class Link implements Runnable, Invocable private class Link implements Runnable, Invocable, Dumpable
{ {
private final Runnable _task; private final Runnable _task;
private final AtomicReference<Link> _next = new AtomicReference<>(); private final AtomicReference<Link> _next = new AtomicReference<>();
@ -156,6 +191,24 @@ public class SerializedInvoker
_task = task; _task = task;
} }
@Override
public void dump(Appendable out, String indent) throws IOException
{
if (_task instanceof NamedRunnable nr)
{
StringWriter sw = new StringWriter();
nr.stack.printStackTrace(new PrintWriter(sw));
Dumpable.dumpObjects(out, indent, nr.toString(), sw.toString());
}
else
{
Dumpable.dumpObjects(out, indent, _task);
}
Link link = _next.get();
if (link != null)
link.dump(out, indent);
}
@Override @Override
public InvocationType getInvocationType() public InvocationType getInvocationType()
{ {
@ -186,6 +239,7 @@ public class SerializedInvoker
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Running link {} of {}", link, SerializedInvoker.this); LOG.debug("Running link {} of {}", link, SerializedInvoker.this);
_invokerThread = Thread.currentThread();
try try
{ {
link._task.run(); link._task.run();
@ -196,6 +250,12 @@ public class SerializedInvoker
LOG.debug("Failed while running link {} of {}", link, SerializedInvoker.this, t); LOG.debug("Failed while running link {} of {}", link, SerializedInvoker.this, t);
onError(link._task, t); onError(link._task, t);
} }
finally
{
// _invokerThread must be nulled before calling link.next() as
// once the latter has executed, another thread can enter Link.run().
_invokerThread = null;
}
link = link.next(); link = link.next();
if (link == null && LOG.isDebugEnabled()) if (link == null && LOG.isDebugEnabled())
LOG.debug("Next link is null, execution is over in {}", SerializedInvoker.this); LOG.debug("Next link is null, execution is over in {}", SerializedInvoker.this);
@ -209,10 +269,35 @@ public class SerializedInvoker
} }
} }
private record NamedRunnable(Runnable delegate, String name) implements Runnable private class NamedRunnable implements Runnable
{ {
private static final Logger LOG = LoggerFactory.getLogger(NamedRunnable.class); private static final Logger LOG = LoggerFactory.getLogger(NamedRunnable.class);
private final Runnable delegate;
private final String name;
private final Throwable stack;
private NamedRunnable(Runnable delegate)
{
this.delegate = delegate;
this.stack = new Throwable();
this.name = deriveTaskName(delegate, stack);
}
private String deriveTaskName(Runnable task, Throwable stack)
{
StackTraceElement[] stackTrace = stack.getStackTrace();
for (StackTraceElement stackTraceElement : stackTrace)
{
String className = stackTraceElement.getClassName();
if (!className.equals(SerializedInvoker.class.getName()) &&
!className.equals(SerializedInvoker.this.getClass().getName()) &&
!className.equals(getClass().getName()))
return "Queued by " + Thread.currentThread().getName() + " at " + stackTraceElement;
}
return task.toString();
}
@Override @Override
public void run() public void run()
{ {

View File

@ -14,6 +14,8 @@
package org.eclipse.jetty.util.thread; package org.eclipse.jetty.util.thread;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -25,17 +27,20 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class SerializedInvokerTest public class SerializedInvokerTest
{ {
SerializedInvoker _serialedInvoker; private SerializedInvoker _serializedInvoker;
private ExecutorService _executor;
@BeforeEach @BeforeEach
public void beforeEach() public void beforeEach()
{ {
_serialedInvoker = new SerializedInvoker(); _serializedInvoker = new SerializedInvoker(SerializedInvokerTest.class);
_executor = Executors.newSingleThreadExecutor();
} }
@AfterEach @AfterEach
public void afterEach() public void afterEach()
{ {
_executor.shutdownNow();
} }
@Test @Test
@ -45,24 +50,27 @@ public class SerializedInvokerTest
Task task2 = new Task(); Task task2 = new Task();
Task task3 = new Task(); Task task3 = new Task();
Runnable todo = _serialedInvoker.offer(task1); Runnable todo = _serializedInvoker.offer(task1);
assertNull(_serialedInvoker.offer(task2)); assertNull(_serializedInvoker.offer(task2));
assertNull(_serialedInvoker.offer(task3)); assertNull(_serializedInvoker.offer(task3));
assertFalse(task1.hasRun()); assertFalse(task1.hasRun());
assertFalse(task2.hasRun()); assertFalse(task2.hasRun());
assertFalse(task3.hasRun()); assertFalse(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
todo.run(); todo.run();
assertTrue(task1.hasRun()); assertTrue(task1.hasRun());
assertTrue(task2.hasRun()); assertTrue(task2.hasRun());
assertTrue(task3.hasRun()); assertTrue(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
Task task4 = new Task(); Task task4 = new Task();
todo = _serialedInvoker.offer(task4); todo = _serializedInvoker.offer(task4);
todo.run(); todo.run();
assertTrue(task4.hasRun()); assertTrue(task4.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
} }
@Test @Test
@ -72,22 +80,25 @@ public class SerializedInvokerTest
Task task2 = new Task(); Task task2 = new Task();
Task task3 = new Task(); Task task3 = new Task();
Runnable todo = _serialedInvoker.offer(null, task1, null, task2, null, task3, null); Runnable todo = _serializedInvoker.offer(null, task1, null, task2, null, task3, null);
assertFalse(task1.hasRun()); assertFalse(task1.hasRun());
assertFalse(task2.hasRun()); assertFalse(task2.hasRun());
assertFalse(task3.hasRun()); assertFalse(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
todo.run(); todo.run();
assertTrue(task1.hasRun()); assertTrue(task1.hasRun());
assertTrue(task2.hasRun()); assertTrue(task2.hasRun());
assertTrue(task3.hasRun()); assertTrue(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
Task task4 = new Task(); Task task4 = new Task();
todo = _serialedInvoker.offer(task4); todo = _serializedInvoker.offer(task4);
todo.run(); todo.run();
assertTrue(task4.hasRun()); assertTrue(task4.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
} }
@Test @Test
@ -99,7 +110,7 @@ public class SerializedInvokerTest
@Override @Override
public void run() public void run()
{ {
assertNull(_serialedInvoker.offer(task3)); assertNull(_serializedInvoker.offer(task3));
super.run(); super.run();
} }
}; };
@ -108,32 +119,35 @@ public class SerializedInvokerTest
@Override @Override
public void run() public void run()
{ {
assertNull(_serialedInvoker.offer(task2)); assertNull(_serializedInvoker.offer(task2));
super.run(); super.run();
} }
}; };
Runnable todo = _serialedInvoker.offer(task1); Runnable todo = _serializedInvoker.offer(task1);
assertFalse(task1.hasRun()); assertFalse(task1.hasRun());
assertFalse(task2.hasRun()); assertFalse(task2.hasRun());
assertFalse(task3.hasRun()); assertFalse(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
todo.run(); todo.run();
assertTrue(task1.hasRun()); assertTrue(task1.hasRun());
assertTrue(task2.hasRun()); assertTrue(task2.hasRun());
assertTrue(task3.hasRun()); assertTrue(task3.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
Task task4 = new Task(); Task task4 = new Task();
todo = _serialedInvoker.offer(task4); todo = _serializedInvoker.offer(task4);
todo.run(); todo.run();
assertTrue(task4.hasRun()); assertTrue(task4.hasRun());
assertFalse(_serializedInvoker.isCurrentThreadInvoking());
} }
public static class Task implements Runnable public class Task implements Runnable
{ {
CountDownLatch _run = new CountDownLatch(1); final CountDownLatch _run = new CountDownLatch(1);
boolean hasRun() boolean hasRun()
{ {
@ -143,7 +157,17 @@ public class SerializedInvokerTest
@Override @Override
public void run() public void run()
{ {
try
{
assertTrue(_serializedInvoker.isCurrentThreadInvoking());
assertFalse(_executor.submit(() -> _serializedInvoker.isCurrentThreadInvoking()).get());
_run.countDown(); _run.countDown();
} }
catch (Exception e)
{
throw new RuntimeException(e);
}
}
} }
} }