Improved locking for HttpReceiver.ContentSource. (#9007)

* Improved locking for HttpReceiver.ContentSource.

Improved response failure code path.
Now either responseFailure() must be called, or exchange.responseComplete() followed by HttpReceiver.abort().

Fixed failAndClose() for HTTP/2 and HTTP/3: the connection must not be closed, stream.reset() is sufficient.

Fixed flaky test HttpClientDemandTest.testTwoListenersWithDifferentDemand().

Fixed DistributionTests.testVirtualThreadPool().

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-12-06 15:09:48 +01:00 committed by GitHub
parent f8c4783802
commit d5054667df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 112 deletions

View File

@ -244,7 +244,7 @@ public class HttpExchange implements CyclicTimeouts.Expirable
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure); LOG.debug("Failed {}: req={}/rsp={}", this, abortRequest, abortResponse, failure);
if (!abortRequest && !abortResponse) if (!abortRequest && !abortResponse)
{ {

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ContentSourceTransformer; import org.eclipse.jetty.io.content.ContentSourceTransformer;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker; import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -266,6 +267,9 @@ public abstract class HttpReceiver
List<Response.ResponseListener> responseListeners = exchange.getConversation().getResponseListeners(); List<Response.ResponseListener> responseListeners = exchange.getConversation().getResponseListeners();
notifier.notifyHeaders(responseListeners, response); notifier.notifyHeaders(responseListeners, response);
if (exchange.isResponseComplete())
return;
if (HttpStatus.isInterim(response.getStatus())) if (HttpStatus.isInterim(response.getStatus()))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -338,24 +342,18 @@ public abstract class HttpReceiver
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Invoking responseSuccess on {}", this); LOG.debug("Invoking responseSuccess on {}", this);
NotifiableContentSource contentSource = this.contentSource; // Mark atomically the response as completed, with respect
if (contentSource != null) // to concurrency between response success and response failure.
{ if (!exchange.responseComplete(null))
this.contentSource = null; return;
contentSource.eof();
}
invoker.run(() -> invoker.run(() ->
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Executing responseSuccess on {}", this); LOG.debug("Executing responseSuccess on {}", this);
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (!exchange.responseComplete(null))
return;
responseState = ResponseState.IDLE; responseState = ResponseState.IDLE;
reset(); reset();
HttpResponse response = exchange.getResponse(); HttpResponse response = exchange.getResponse();
@ -385,34 +383,26 @@ public abstract class HttpReceiver
protected void responseFailure(Throwable failure, Promise<Boolean> promise) protected void responseFailure(Throwable failure, Promise<Boolean> promise)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Invoking responseFailure with {} on {}", failure, this); LOG.debug("Failing with {} on {}", failure, this);
invoker.run(() -> HttpExchange exchange = getHttpExchange();
// In case of a response error, the failure has already been notified
// and it is possible that a further attempt to read in the receive
// loop throws an exception that reenters here but without exchange;
// or, the server could just have timed out the connection.
if (exchange == null)
{ {
if (LOG.isDebugEnabled()) promise.succeeded(false);
LOG.debug("Executing responseFailure on {}", this); return;
}
HttpExchange exchange = getHttpExchange(); // Mark atomically the response as completed, with respect
// In case of a response error, the failure has already been notified // to concurrency between response success and response failure.
// and it is possible that a further attempt to read in the receive boolean completed = exchange.responseComplete(failure);
// loop throws an exception that reenters here but without exchange; if (completed)
// or, the server could just have timed out the connection. abort(exchange, failure, promise);
if (exchange == null) else
{ promise.succeeded(false);
promise.succeeded(false);
return;
}
if (LOG.isDebugEnabled())
LOG.debug("Response failure {}", exchange.getResponse(), failure);
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (exchange.responseComplete(failure))
abort(exchange, failure, promise);
else
promise.succeeded(false);
});
} }
private void terminateResponse(HttpExchange exchange) private void terminateResponse(HttpExchange exchange)
@ -482,10 +472,14 @@ public abstract class HttpReceiver
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Invoking abort with {} on {}", failure, this); LOG.debug("Invoking abort with {} on {}", failure, this);
// This method should be called only after calling HttpExchange.responseComplete().
if (!exchange.isResponseComplete())
throw new IllegalStateException();
invoker.run(() -> invoker.run(() ->
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Executing abort on {}", this); LOG.debug("Executing abort with {} on {}", failure, this);
if (responseState == ResponseState.FAILURE) if (responseState == ResponseState.FAILURE)
{ {
@ -496,7 +490,7 @@ public abstract class HttpReceiver
responseState = ResponseState.FAILURE; responseState = ResponseState.FAILURE;
this.failure = failure; this.failure = failure;
if (contentSource != null) if (contentSource != null)
contentSource.fail(failure); contentSource.error(failure);
dispose(); dispose();
HttpResponse response = exchange.getResponse(); HttpResponse response = exchange.getResponse();
@ -557,7 +551,7 @@ public abstract class HttpReceiver
private interface NotifiableContentSource extends Content.Source private interface NotifiableContentSource extends Content.Source
{ {
void eof(); boolean error(Throwable failure);
void onDataAvailable(); void onDataAvailable();
} }
@ -577,12 +571,6 @@ public abstract class HttpReceiver
_decoder = decoder; _decoder = decoder;
} }
@Override
public void eof()
{
_rawSource.eof();
}
@Override @Override
public void onDataAvailable() public void onDataAvailable()
{ {
@ -643,6 +631,15 @@ public abstract class HttpReceiver
} }
} }
} }
@Override
public boolean error(Throwable failure)
{
if (_chunk != null)
_chunk.release();
_chunk = null;
return _rawSource.error(failure);
}
} }
/** /**
@ -654,42 +651,37 @@ public abstract class HttpReceiver
private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class); private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class);
private final AtomicReference<Runnable> demandCallbackRef = new AtomicReference<>(); private final AtomicReference<Runnable> demandCallbackRef = new AtomicReference<>();
private volatile Content.Chunk currentChunk; private final AutoLock lock = new AutoLock();
private Content.Chunk currentChunk;
@Override @Override
public Content.Chunk read() public Content.Chunk read()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Reading from {}", this); LOG.debug("Reading from {}", this);
Content.Chunk chunk = consumeCurrentChunk();
if (chunk != null)
return chunk;
currentChunk = HttpReceiver.this.read(false);
return consumeCurrentChunk();
}
@Override Content.Chunk current;
public void eof() try (AutoLock ignored = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("Setting EOF on {}", this);
if (currentChunk != null)
throw new IllegalStateException();
currentChunk = Content.Chunk.EOF;
Runnable demandCallback = demandCallbackRef.getAndSet(null);
if (LOG.isDebugEnabled())
LOG.debug("Calling demand callback on {}", this);
if (demandCallback != null)
{ {
try current = currentChunk;
currentChunk = Content.Chunk.next(current);
if (current != null)
return current;
}
current = HttpReceiver.this.read(false);
try (AutoLock ignored = lock.lock())
{
if (currentChunk != null)
{ {
demandCallback.run(); // There was a concurrent call to fail().
} if (current != null)
catch (Throwable x) current.release();
{ return currentChunk;
fail(x);
} }
currentChunk = Content.Chunk.next(current);
return current;
} }
} }
@ -703,15 +695,6 @@ public abstract class HttpReceiver
invokeDemandCallback(true); invokeDemandCallback(true);
} }
private Content.Chunk consumeCurrentChunk()
{
if (LOG.isDebugEnabled())
LOG.debug("Consuming current chunk from {}", this);
Content.Chunk chunk = currentChunk;
currentChunk = Content.Chunk.next(chunk);
return chunk;
}
@Override @Override
public void demand(Runnable demandCallback) public void demand(Runnable demandCallback)
{ {
@ -731,12 +714,30 @@ public abstract class HttpReceiver
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Processing demand on {}", this); LOG.debug("Processing demand on {}", this);
if (currentChunk == null) Content.Chunk current;
try (AutoLock ignored = lock.lock())
{ {
currentChunk = HttpReceiver.this.read(true); current = currentChunk;
if (currentChunk == null)
return;
} }
if (current == null)
{
current = HttpReceiver.this.read(true);
if (current == null)
return;
try (AutoLock ignored = lock.lock())
{
if (currentChunk != null)
{
// There was a concurrent call to fail().
current.release();
return;
}
currentChunk = current;
}
}
// The processDemand method is only ever called by the // The processDemand method is only ever called by the
// invoker so there is no need to use the latter here. // invoker so there is no need to use the latter here.
invokeDemandCallback(false); invokeDemandCallback(false);
@ -768,17 +769,40 @@ public abstract class HttpReceiver
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Failing {}", this); LOG.debug("Failing {}", this);
if (currentChunk != null) boolean failed = error(failure);
currentChunk.release(); if (failed)
if (currentChunk == null || !(currentChunk instanceof Content.Chunk.Error))
HttpReceiver.this.failAndClose(failure); HttpReceiver.this.failAndClose(failure);
currentChunk = Content.Chunk.from(failure); invokeDemandCallback(true);
}
@Override
public boolean error(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Erroring {}", this);
try (AutoLock ignored = lock.lock())
{
if (currentChunk instanceof Content.Chunk.Error)
return false;
if (currentChunk != null)
currentChunk.release();
currentChunk = Content.Chunk.from(failure);
}
return true;
}
private Content.Chunk chunk()
{
try (AutoLock ignored = lock.lock())
{
return currentChunk;
}
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), currentChunk, demandCallbackRef); return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), chunk(), demandCallbackRef);
} }
} }
} }

View File

@ -92,13 +92,9 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
Stream stream = getHttpChannel().getStream(); Stream stream = getHttpChannel().getStream();
responseFailure(failure, Promise.from(failed -> responseFailure(failure, Promise.from(failed ->
{ {
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); if (failed)
getHttpChannel().getHttpConnection().close(failure); stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}, x -> }, x -> stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP)));
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
getHttpChannel().getHttpConnection().close(failure);
}));
} }
@Override @Override
@ -127,15 +123,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason());
responseBegin(exchange); responseBegin(exchange);
if (exchange.isResponseComplete())
return;
HttpFields headers = response.getFields(); HttpFields headers = response.getFields();
for (HttpField header : headers) for (HttpField header : headers)
{ {
responseHeader(exchange, header); responseHeader(exchange, header);
if (exchange.isResponseComplete())
return;
} }
HttpRequest httpRequest = exchange.getRequest(); HttpRequest httpRequest = exchange.getRequest();

View File

@ -23,6 +23,7 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -73,8 +74,12 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
@Override @Override
public void failAndClose(Throwable failure) public void failAndClose(Throwable failure)
{ {
responseFailure(failure, Promise.from(failed -> getHttpChannel().getHttpConnection().close(failure), Stream stream = getHttpChannel().getStream();
x -> getHttpChannel().getHttpConnection().close(failure))); responseFailure(failure, Promise.from(failed ->
{
if (failed)
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), failure);
}, x -> stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), failure)));
} }
@Override @Override
@ -95,15 +100,11 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason());
responseBegin(exchange); responseBegin(exchange);
if (exchange.isResponseComplete())
return;
HttpFields headers = response.getFields(); HttpFields headers = response.getFields();
for (HttpField header : headers) for (HttpField header : headers)
{ {
responseHeader(exchange, header); responseHeader(exchange, header);
if (exchange.isResponseComplete())
return;
} }
// TODO: add support for HttpMethod.CONNECT. // TODO: add support for HttpMethod.CONNECT.

View File

@ -335,22 +335,22 @@ public class HttpClientDemandTest extends AbstractTest
resultLatch.countDown(); resultLatch.countDown();
}); });
await().atMost(5, TimeUnit.SECONDS).until(listener1DemandRef::get, not(nullValue()));
await().atMost(5, TimeUnit.SECONDS).until(listener2DemandRef::get, not(nullValue()));
// Make both listeners progress in locksteps. // Make both listeners progress in locksteps.
int i = 0; int i = 0;
while (resultLatch.getCount() > 0) while (resultLatch.getCount() > 0)
{ {
i++; i++;
await().atMost(5, TimeUnit.SECONDS).until(listener1DemandRef::get, not(nullValue()));
await().atMost(5, TimeUnit.SECONDS).until(listener2DemandRef::get, not(nullValue()));
// Assert that no listener can progress for as long as both listeners did not demand. // Assert that no listener can progress for as long as both listeners did not demand.
assertThat(listener1Chunks.get(), is(i)); assertThat(listener1Chunks.get(), is(i));
assertThat(listener2Chunks.get(), is(i)); assertThat(listener2Chunks.get(), is(i));
listener2DemandRef.get().accept(1); listener2DemandRef.getAndSet(null).accept(1);
assertThat(listener1Chunks.get(), is(i)); assertThat(listener1Chunks.get(), is(i));
assertThat(listener2Chunks.get(), is(i)); assertThat(listener2Chunks.get(), is(i));
listener1DemandRef.get().accept(1); listener1DemandRef.getAndSet(null).accept(1);
} }
assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); assertTrue(resultLatch.await(5, TimeUnit.SECONDS));