Fixes #10087 - Flaky EventsHandlerTest due to trailers. (#10093)

* Fixes #10087 - Flaky EventsHandlerTest due to trailers.

HTTP/2 trailers may arrive and be processed before the application reads request chunks.

Avoid the race condition by storing the trailers aside and returning them during reads.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-07-13 08:19:03 +02:00 committed by GitHub
parent 8c9d6e4528
commit a08e953c74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 49 deletions

View File

@ -371,32 +371,43 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
private void onHeaders(HeadersFrame frame, Callback callback)
{
boolean offered = false;
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
boolean isTrailer = !metaData.isRequest() && !metaData.isResponse();
if (isTrailer)
{
// In case of trailers, notify first and then offer EOF to
// avoid race conditions due to concurrent calls to readData().
boolean closed = updateClose(true, CloseState.Event.RECEIVED);
notifyHeaders(this, frame);
if (closed)
getSession().removeStream(this);
// Offer EOF in case the application calls readData() or demand().
offered = offer(Data.eof(getId()));
}
else
{
HttpFields fields = metaData.getHttpFields();
long length = -1;
if (fields != null && !HttpMethod.CONNECT.is(request.getMethod()))
length = fields.getLongField(HttpHeader.CONTENT_LENGTH);
dataLength = length;
}
boolean offered = false;
if (frame.isEndStream())
{
// Offer EOF for either the request, the response or the trailers
// in case the application calls readData() or demand().
offered = offer(Data.eof(getId()));
}
if (frame.isEndStream())
{
// Offer EOF for either the request or the response in
// case the application calls readData() or demand().
offered = offer(Data.eof(getId()));
}
// Requests are notified to a Session.Listener,
// here only handle responses and trailers.
if (metaData.isResponse() || !metaData.isRequest())
{
boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyHeaders(this, frame);
if (closed)
getSession().removeStream(this);
// Requests are notified to a Session.Listener, here only notify responses.
if (metaData.isResponse())
{
boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyHeaders(this, frame);
if (closed)
getSession().removeStream(this);
}
}
if (offered)

View File

@ -62,6 +62,7 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
private MetaData.Response _responseMetaData;
private TunnelSupport tunnelSupport;
private Content.Chunk _chunk;
private Content.Chunk _trailer;
private boolean committed;
private boolean _demand;
private boolean _expects100Continue;
@ -150,37 +151,51 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
if (tunnelSupport != null)
return null;
while (true)
// Check if there already is a chunk, e.g. EOF.
Content.Chunk chunk;
try (AutoLock ignored = lock.lock())
{
Content.Chunk chunk;
chunk = _chunk;
_chunk = Content.Chunk.next(chunk);
}
if (chunk != null)
return chunk;
Stream.Data data = _stream.readData();
if (data == null)
return null;
// Check if the trailers must be returned.
if (data.frame().isEndStream())
{
Content.Chunk trailer;
try (AutoLock ignored = lock.lock())
{
chunk = _chunk;
_chunk = Content.Chunk.next(chunk);
}
if (chunk != null)
return chunk;
Stream.Data data = _stream.readData();
if (data == null)
return null;
// The data instance should be released after readData() above;
// the chunk is stored below for later use, so should be retained;
// the two actions cancel each other, no need to further retain or release.
chunk = createChunk(data);
// Some content is read, but the 100 Continue interim
// response has not been sent yet, then don't bother
// sending it later, as the client already sent the content.
if (_expects100Continue && chunk.hasRemaining())
_expects100Continue = false;
try (AutoLock ignored = lock.lock())
{
_chunk = chunk;
trailer = _trailer;
if (trailer != null)
{
_chunk = Content.Chunk.next(trailer);
return trailer;
}
}
}
// The data instance should be released after readData() above;
// the chunk is stored below for later use, so should be retained;
// the two actions cancel each other, no need to further retain or release.
chunk = createChunk(data);
// Some content is read, but the 100 Continue interim
// response has not been sent yet, then don't bother
// sending it later, as the client already sent the content.
if (_expects100Continue && chunk.hasRemaining())
_expects100Continue = false;
try (AutoLock ignored = lock.lock())
{
_chunk = Content.Chunk.next(chunk);
}
return chunk;
}
@Override
@ -190,8 +205,7 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
boolean demand = false;
try (AutoLock ignored = lock.lock())
{
// We may have a non-demanded chunk in case of trailers.
if (_chunk != null)
if (_chunk != null || _trailer != null)
notify = true;
else if (!_demand)
demand = _demand = true;
@ -237,8 +251,7 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
HttpFields trailers = frame.getMetaData().getHttpFields().asImmutable();
try (AutoLock ignored = lock.lock())
{
_demand = false;
_chunk = new Trailers(trailers);
_trailer = new Trailers(trailers);
}
if (LOG.isDebugEnabled())

View File

@ -18,15 +18,20 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.FutureResponseListener;
import org.eclipse.jetty.client.InputStreamResponseListener;
import org.eclipse.jetty.client.OutputStreamRequestContent;
import org.eclipse.jetty.client.StringRequestContent;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -37,7 +42,6 @@ public class TrailersTest extends AbstractTest
{
@ParameterizedTest
@MethodSource("transportsNoFCGI")
@Tag("flaky") // https://github.com/eclipse/jetty.project/issues/9662
public void testTrailers(Transport transport) throws Exception
{
String trailerName = "Some-Trailer";
@ -111,4 +115,42 @@ public class TrailersTest extends AbstractTest
assertNotNull(responseTrailers);
assertEquals(trailerValue, responseTrailers.get(trailerName));
}
@ParameterizedTest
@MethodSource("transportsNoFCGI")
public void testTrailersWithDelayedRead(Transport transport) throws Exception
{
start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
// Do not read immediately, to cause the trailers to
// arrive at the server, especially in case of HTTP/2.
Thread.sleep(500);
HttpFields.Mutable trailers = HttpFields.build();
response.setTrailersSupplier(() -> trailers);
Content.copy(request, response, Response.newTrailersChunkProcessor(response), callback);
return true;
}
});
String content = "Some-Content";
String trailerName = "X-Trailer";
String trailerValue = "0xC0FFEE";
var request = client.newRequest(newURI(transport))
.method(HttpMethod.POST)
.headers(headers -> headers.put(HttpHeader.TRAILER, trailerName))
.body(new StringRequestContent(content))
.trailersSupplier(() -> HttpFields.build().put(trailerName, trailerValue));
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(content, response.getContentAsString());
assertEquals(trailerValue, response.getTrailers().get(trailerName));
}
}