Issue #3758 - Avoid sending empty trailer frames for http/2 requests.

Modified the sender logic to allow specific subclasses to decide
when to send the trailers, if any.
This allows HTTP/2 to correctly compute the end_stream flag and avoid
sending empty trailers frames with end_stream=true.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-06-12 10:51:15 +02:00
parent da4f116c63
commit 8f53d14e15
6 changed files with 259 additions and 216 deletions

View File

@ -22,12 +22,10 @@ import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.BufferUtil;
@ -67,7 +65,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
private final Callback commitCallback = new CommitCallback();
private final IteratingCallback contentCallback = new ContentCallback();
private final Callback trailersCallback = new TrailersCallback();
private final Callback lastCallback = new LastCallback();
private final HttpChannel channel;
private HttpContent content;
@ -444,15 +441,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
*/
protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
/**
* Implementations should send the HTTP trailers and notify the given {@code callback} of the
* result of this operation.
*
* @param exchange the exchange to send
* @param callback the callback to notify
*/
protected abstract void sendTrailers(HttpExchange exchange, Callback callback);
protected void reset()
{
HttpContent content = this.content;
@ -745,20 +733,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (content == null)
return;
HttpRequest request = exchange.getRequest();
Supplier<HttpFields> trailers = request.getTrailers();
boolean hasContent = content.hasContent();
if (!hasContent)
if (!content.hasContent())
{
if (trailers == null)
{
// No trailers or content to send, we are done.
someToSuccess(exchange);
}
else
{
sendTrailers(exchange, lastCallback);
}
// No content to send, we are done.
someToSuccess(exchange);
}
else
{
@ -859,9 +837,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (lastContent)
{
HttpRequest request = exchange.getRequest();
Supplier<HttpFields> trailers = request.getTrailers();
sendContent(exchange, content, trailers == null ? lastCallback : trailersCallback);
sendContent(exchange, content, lastCallback);
return Action.IDLE;
}
@ -925,28 +901,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
}
private class TrailersCallback implements Callback
{
@Override
public void succeeded()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
sendTrailers(exchange, lastCallback);
}
@Override
public void failed(Throwable x)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(x);
anyToFailure(x);
}
}
private class LastCallback implements Callback
{
@Override

View File

@ -59,7 +59,7 @@ public class HttpSenderOverHTTP extends HttpSender
{
try
{
new HeadersCallback(exchange, content, callback, getHttpChannel().getHttpConnection()).iterate();
new HeadersCallback(exchange, content, callback).iterate();
}
catch (Throwable x)
{
@ -83,8 +83,8 @@ public class HttpSenderOverHTTP extends HttpSender
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("Generated content ({} bytes) - {}/{}",
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
switch (result)
{
case NEED_CHUNK:
@ -138,21 +138,6 @@ public class HttpSenderOverHTTP extends HttpSender
}
}
@Override
protected void sendTrailers(HttpExchange exchange, Callback callback)
{
try
{
new TrailersCallback(callback).iterate();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
callback.failed(x);
}
}
@Override
protected void reset()
{
@ -191,19 +176,17 @@ public class HttpSenderOverHTTP extends HttpSender
private final HttpExchange exchange;
private final Callback callback;
private final MetaData.Request metaData;
private final HttpConnectionOverHTTP httpConnectionOverHTTP;
private ByteBuffer headerBuffer;
private ByteBuffer chunkBuffer;
private ByteBuffer contentBuffer;
private boolean lastContent;
private boolean generated;
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback, HttpConnectionOverHTTP httpConnectionOverHTTP)
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
{
super(false);
this.exchange = exchange;
this.callback = callback;
this.httpConnectionOverHTTP = httpConnectionOverHTTP;
HttpRequest request = exchange.getRequest();
ContentProvider requestContent = request.getContent();
@ -231,10 +214,10 @@ public class HttpSenderOverHTTP extends HttpSender
HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
headerBuffer == null ? -1 : headerBuffer.remaining(),
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
headerBuffer == null ? -1 : headerBuffer.remaining(),
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
switch (result)
{
case NEED_HEADER:
@ -249,7 +232,8 @@ public class HttpSenderOverHTTP extends HttpSender
}
case NEED_CHUNK_TRAILER:
{
return Action.SUCCEEDED;
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
break;
}
case FLUSH:
{
@ -260,11 +244,8 @@ public class HttpSenderOverHTTP extends HttpSender
chunkBuffer = BufferUtil.EMPTY_BUFFER;
if (contentBuffer == null)
contentBuffer = BufferUtil.EMPTY_BUFFER;
httpConnectionOverHTTP.addBytesOut( BufferUtil.length(headerBuffer)
+ BufferUtil.length(chunkBuffer)
+ BufferUtil.length(contentBuffer));
long bytes = headerBuffer.remaining() + chunkBuffer.remaining() + contentBuffer.remaining();
getHttpChannel().getHttpConnection().addBytesOut(bytes);
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
generated = true;
return Action.SCHEDULED;
@ -331,83 +312,6 @@ public class HttpSenderOverHTTP extends HttpSender
}
}
private class TrailersCallback extends IteratingCallback
{
private final Callback callback;
private ByteBuffer chunkBuffer;
public TrailersCallback(Callback callback)
{
this.callback = callback;
}
@Override
protected Action process() throws Throwable
{
while (true)
{
HttpGenerator.Result result = generator.generateRequest(null, null, chunkBuffer, null, true);
if (LOG.isDebugEnabled())
LOG.debug("Generated trailers {}/{}", result, generator);
switch (result)
{
case NEED_CHUNK_TRAILER:
{
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
break;
}
case FLUSH:
{
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
endPoint.write(this, chunkBuffer);
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
shutdownOutput();
return Action.SUCCEEDED;
}
case DONE:
{
return Action.SUCCEEDED;
}
default:
{
throw new IllegalStateException(result.toString());
}
}
}
}
@Override
public void succeeded()
{
release();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
release();
callback.failed(x);
super.failed(x);
}
@Override
protected void onCompleteSuccess()
{
super.onCompleteSuccess();
callback.succeeded();
}
private void release()
{
httpClient.getByteBufferPool().release(chunkBuffer);
chunkBuffer = null;
}
}
private class ByteBufferRecyclerCallback extends Callback.Nested
{
private final ByteBufferPool pool;
@ -435,7 +339,9 @@ public class HttpSenderOverHTTP extends HttpSender
public void failed(Throwable x)
{
for (ByteBuffer buffer : buffers)
{
pool.release(buffer);
}
super.failed(x);
}
}

View File

@ -125,10 +125,4 @@ public class HttpSenderOverFCGI extends HttpSender
getHttpChannel().flush(result);
}
}
@Override
protected void sendTrailers(HttpExchange exchange, Callback callback)
{
callback.succeeded();
}
}

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.http2.client;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@ -35,7 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -58,10 +47,14 @@ import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TrailersTest extends AbstractTest
{
@ -289,7 +282,7 @@ public class TrailersTest extends AbstractTest
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue( frames.size()==3, frames.toString());
assertEquals(3, frames.size(), frames.toString());
HeadersFrame headers = (HeadersFrame)frames.get(0);
DataFrame data = (DataFrame)frames.get(1);
@ -298,7 +291,7 @@ public class TrailersTest extends AbstractTest
assertFalse(headers.isEndStream());
assertFalse(data.isEndStream());
assertTrue(trailers.isEndStream());
assertTrue(trailers.getMetaData().getFields().get(trailerName).equals(trailerValue));
assertEquals(trailers.getMetaData().getFields().get(trailerName), trailerValue);
}
@Test
@ -358,6 +351,5 @@ public class TrailersTest extends AbstractTest
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -56,42 +56,57 @@ public class HttpSenderOverHTTP2 extends HttpSender
String path = relativize(request.getPath());
HttpURI uri = HttpURI.createHttpURI(request.getScheme(), request.getHost(), request.getPort(), path, null, request.getQuery(), null);
MetaData.Request metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_2, request.getHeaders());
Supplier<HttpFields> trailers = request.getTrailers();
metaData.setTrailerSupplier(trailers);
HeadersFrame headersFrame = new HeadersFrame(metaData, null, trailers == null && !content.hasContent());
HttpChannelOverHTTP2 channel = getHttpChannel();
Promise<Stream> promise = new Promise<Stream>()
{
@Override
public void succeeded(Stream stream)
{
channel.setStream(stream);
((IStream)stream).setAttachment(channel);
long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);
Supplier<HttpFields> trailerSupplier = request.getTrailers();
metaData.setTrailerSupplier(trailerSupplier);
if (content.hasContent() && !expects100Continue(request))
HeadersFrame headersFrame;
Promise<Stream> promise;
if (content.hasContent())
{
headersFrame = new HeadersFrame(metaData, null, false);
promise = new HeadersPromise(request, callback)
{
@Override
public void succeeded(Stream stream)
{
boolean advanced = content.advance();
boolean lastContent = trailers == null && content.isLast();
if (advanced || lastContent)
super.succeeded(stream);
if (expects100Continue(request))
{
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), lastContent);
stream.data(dataFrame, callback);
return;
// Don't send the content yet.
callback.succeeded();
}
else
{
boolean advanced = content.advance();
boolean lastContent = content.isLast();
if (advanced || lastContent)
sendContent(stream, content, trailerSupplier, callback);
else
callback.succeeded();
}
}
callback.succeeded();
}
@Override
public void failed(Throwable failure)
};
}
else
{
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
boolean endStream = trailers == null || trailers.size() == 0;
headersFrame = new HeadersFrame(metaData, null, endStream);
promise = new HeadersPromise(request, callback)
{
callback.failed(failure);
}
};
@Override
public void succeeded(Stream stream)
{
super.succeeded(stream);
if (endStream)
callback.succeeded();
else
sendTrailers(stream, trailers, callback);
}
};
}
// TODO optimize the send of HEADERS and DATA frames.
HttpChannelOverHTTP2 channel = getHttpChannel();
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
}
@ -123,19 +138,59 @@ public class HttpSenderOverHTTP2 extends HttpSender
else
{
Stream stream = getHttpChannel().getStream();
Supplier<HttpFields> trailers = exchange.getRequest().getTrailers();
DataFrame frame = new DataFrame(stream.getId(), content.getByteBuffer(), trailers == null && content.isLast());
stream.data(frame, callback);
Supplier<HttpFields> trailerSupplier = exchange.getRequest().getTrailers();
sendContent(stream, content, trailerSupplier, callback);
}
}
@Override
protected void sendTrailers(HttpExchange exchange, Callback callback)
private void sendContent(Stream stream, HttpContent content, Supplier<HttpFields> trailerSupplier, Callback callback)
{
Supplier<HttpFields> trailers = exchange.getRequest().getTrailers();
MetaData metaData = new MetaData(HttpVersion.HTTP_2, trailers.get());
Stream stream = getHttpChannel().getStream();
boolean lastContent = content.isLast();
HttpFields trailers = null;
boolean endStream = false;
if (lastContent)
{
trailers = trailerSupplier == null ? null : trailerSupplier.get();
endStream = trailers == null || trailers.size() == 0;
}
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), endStream);
HttpFields fTrailers = trailers;
stream.data(dataFrame, endStream || !lastContent ? callback : Callback.from(() -> sendTrailers(stream, fTrailers, callback), callback::failed));
}
private void sendTrailers(Stream stream, HttpFields trailers, Callback callback)
{
MetaData metaData = new MetaData(HttpVersion.HTTP_2, trailers);
HeadersFrame trailersFrame = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(trailersFrame, callback);
}
private class HeadersPromise implements Promise<Stream>
{
private final HttpRequest request;
private final Callback callback;
private HeadersPromise(HttpRequest request, Callback callback)
{
this.request = request;
this.callback = callback;
}
@Override
public void succeeded(Stream stream)
{
HttpChannelOverHTTP2 channel = getHttpChannel();
channel.setStream(stream);
((IStream)stream).setAttachment(channel);
long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);
}
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
}
}

View File

@ -0,0 +1,142 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RequestTrailersTest extends AbstractTest
{
@Test
public void testEmptyTrailersWithoutContent() throws Exception
{
testEmptyTrailers(null);
}
@Test
public void testEmptyTrailersWithEagerContent() throws Exception
{
testEmptyTrailers("eager_content");
}
private void testEmptyTrailers(String content) throws Exception
{
CountDownLatch trailersLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
stream.headers(responseFrame, Callback.NOOP);
return new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
trailersLatch.countDown();
}
};
}
});
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
HttpFields trailers = new HttpFields();
request.trailers(() -> trailers);
if (content != null)
request.content(new StringContentProvider(content));
ContentResponse response = request.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
// The client must not send the trailers.
assertFalse(trailersLatch.await(1, TimeUnit.SECONDS));
}
@Test
public void testEmptyTrailersWithDeferredContent() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame dataFrame, Callback callback)
{
callback.succeeded();
// We should not receive an empty HEADERS frame for the
// trailers, but instead a DATA frame with endStream=true.
if (dataFrame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
stream.headers(responseFrame, Callback.NOOP);
}
}
};
}
});
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
HttpFields trailers = new HttpFields();
request.trailers(() -> trailers);
DeferredContentProvider content = new DeferredContentProvider();
request.content(content);
CountDownLatch latch = new CountDownLatch(1);
request.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch.countDown();
});
// Send deferred content after a while.
Thread.sleep(1000);
content.offer(ByteBuffer.wrap("deferred_content".getBytes(StandardCharsets.UTF_8)));
content.close();
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}