Merge pull request #3772 from eclipse/jetty-9.4.x-3758-http2_dont_send_empty_trailers
Issue #3758 - Avoid sending empty trailer frames for http/2 requests.
This commit is contained in:
commit
0b56089327
|
@ -22,12 +22,10 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.client.api.ContentProvider;
|
import org.eclipse.jetty.client.api.ContentProvider;
|
||||||
import org.eclipse.jetty.client.api.Request;
|
import org.eclipse.jetty.client.api.Request;
|
||||||
import org.eclipse.jetty.client.api.Result;
|
import org.eclipse.jetty.client.api.Result;
|
||||||
import org.eclipse.jetty.http.HttpFields;
|
|
||||||
import org.eclipse.jetty.http.HttpHeader;
|
import org.eclipse.jetty.http.HttpHeader;
|
||||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
|
@ -67,7 +65,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
|
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
|
||||||
private final Callback commitCallback = new CommitCallback();
|
private final Callback commitCallback = new CommitCallback();
|
||||||
private final IteratingCallback contentCallback = new ContentCallback();
|
private final IteratingCallback contentCallback = new ContentCallback();
|
||||||
private final Callback trailersCallback = new TrailersCallback();
|
|
||||||
private final Callback lastCallback = new LastCallback();
|
private final Callback lastCallback = new LastCallback();
|
||||||
private final HttpChannel channel;
|
private final HttpChannel channel;
|
||||||
private HttpContent content;
|
private HttpContent content;
|
||||||
|
@ -444,15 +441,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
*/
|
*/
|
||||||
protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
|
protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
|
||||||
|
|
||||||
/**
|
|
||||||
* Implementations should send the HTTP trailers and notify the given {@code callback} of the
|
|
||||||
* result of this operation.
|
|
||||||
*
|
|
||||||
* @param exchange the exchange to send
|
|
||||||
* @param callback the callback to notify
|
|
||||||
*/
|
|
||||||
protected abstract void sendTrailers(HttpExchange exchange, Callback callback);
|
|
||||||
|
|
||||||
protected void reset()
|
protected void reset()
|
||||||
{
|
{
|
||||||
HttpContent content = this.content;
|
HttpContent content = this.content;
|
||||||
|
@ -745,24 +733,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
if (content == null)
|
if (content == null)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
HttpRequest request = exchange.getRequest();
|
if (!content.hasContent())
|
||||||
Supplier<HttpFields> trailers = request.getTrailers();
|
|
||||||
boolean hasContent = content.hasContent();
|
|
||||||
if (!hasContent)
|
|
||||||
{
|
{
|
||||||
if (trailers == null)
|
// No content to send, we are done.
|
||||||
{
|
someToSuccess(exchange);
|
||||||
// No trailers or content to send, we are done.
|
|
||||||
someToSuccess(exchange);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
sendTrailers(exchange, lastCallback);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Was any content sent while committing ?
|
// Was any content sent while committing?
|
||||||
ByteBuffer contentBuffer = content.getContent();
|
ByteBuffer contentBuffer = content.getContent();
|
||||||
if (contentBuffer != null)
|
if (contentBuffer != null)
|
||||||
{
|
{
|
||||||
|
@ -859,9 +837,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
|
|
||||||
if (lastContent)
|
if (lastContent)
|
||||||
{
|
{
|
||||||
HttpRequest request = exchange.getRequest();
|
sendContent(exchange, content, lastCallback);
|
||||||
Supplier<HttpFields> trailers = request.getTrailers();
|
|
||||||
sendContent(exchange, content, trailers == null ? lastCallback : trailersCallback);
|
|
||||||
return Action.IDLE;
|
return Action.IDLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -925,28 +901,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TrailersCallback implements Callback
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void succeeded()
|
|
||||||
{
|
|
||||||
HttpExchange exchange = getHttpExchange();
|
|
||||||
if (exchange == null)
|
|
||||||
return;
|
|
||||||
sendTrailers(exchange, lastCallback);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void failed(Throwable x)
|
|
||||||
{
|
|
||||||
HttpContent content = HttpSender.this.content;
|
|
||||||
if (content == null)
|
|
||||||
return;
|
|
||||||
content.failed(x);
|
|
||||||
anyToFailure(x);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class LastCallback implements Callback
|
private class LastCallback implements Callback
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
new HeadersCallback(exchange, content, callback, getHttpChannel().getHttpConnection()).iterate();
|
new HeadersCallback(exchange, content, callback).iterate();
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
|
@ -83,8 +83,8 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
|
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Generated content ({} bytes) - {}/{}",
|
LOG.debug("Generated content ({} bytes) - {}/{}",
|
||||||
contentBuffer == null ? -1 : contentBuffer.remaining(),
|
contentBuffer == null ? -1 : contentBuffer.remaining(),
|
||||||
result, generator);
|
result, generator);
|
||||||
switch (result)
|
switch (result)
|
||||||
{
|
{
|
||||||
case NEED_CHUNK:
|
case NEED_CHUNK:
|
||||||
|
@ -94,8 +94,8 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
}
|
}
|
||||||
case NEED_CHUNK_TRAILER:
|
case NEED_CHUNK_TRAILER:
|
||||||
{
|
{
|
||||||
callback.succeeded();
|
chunk = bufferPool.acquire(httpClient.getRequestBufferSize(), false);
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
case FLUSH:
|
case FLUSH:
|
||||||
{
|
{
|
||||||
|
@ -138,21 +138,6 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void sendTrailers(HttpExchange exchange, Callback callback)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
new TrailersCallback(callback).iterate();
|
|
||||||
}
|
|
||||||
catch (Throwable x)
|
|
||||||
{
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug(x);
|
|
||||||
callback.failed(x);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void reset()
|
protected void reset()
|
||||||
{
|
{
|
||||||
|
@ -191,19 +176,17 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
private final HttpExchange exchange;
|
private final HttpExchange exchange;
|
||||||
private final Callback callback;
|
private final Callback callback;
|
||||||
private final MetaData.Request metaData;
|
private final MetaData.Request metaData;
|
||||||
private final HttpConnectionOverHTTP httpConnectionOverHTTP;
|
|
||||||
private ByteBuffer headerBuffer;
|
private ByteBuffer headerBuffer;
|
||||||
private ByteBuffer chunkBuffer;
|
private ByteBuffer chunkBuffer;
|
||||||
private ByteBuffer contentBuffer;
|
private ByteBuffer contentBuffer;
|
||||||
private boolean lastContent;
|
private boolean lastContent;
|
||||||
private boolean generated;
|
private boolean generated;
|
||||||
|
|
||||||
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback, HttpConnectionOverHTTP httpConnectionOverHTTP)
|
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
|
||||||
{
|
{
|
||||||
super(false);
|
super(false);
|
||||||
this.exchange = exchange;
|
this.exchange = exchange;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
this.httpConnectionOverHTTP = httpConnectionOverHTTP;
|
|
||||||
|
|
||||||
HttpRequest request = exchange.getRequest();
|
HttpRequest request = exchange.getRequest();
|
||||||
ContentProvider requestContent = request.getContent();
|
ContentProvider requestContent = request.getContent();
|
||||||
|
@ -231,10 +214,10 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
|
HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
|
LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
|
||||||
headerBuffer == null ? -1 : headerBuffer.remaining(),
|
headerBuffer == null ? -1 : headerBuffer.remaining(),
|
||||||
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
|
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
|
||||||
contentBuffer == null ? -1 : contentBuffer.remaining(),
|
contentBuffer == null ? -1 : contentBuffer.remaining(),
|
||||||
result, generator);
|
result, generator);
|
||||||
switch (result)
|
switch (result)
|
||||||
{
|
{
|
||||||
case NEED_HEADER:
|
case NEED_HEADER:
|
||||||
|
@ -249,7 +232,8 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
}
|
}
|
||||||
case NEED_CHUNK_TRAILER:
|
case NEED_CHUNK_TRAILER:
|
||||||
{
|
{
|
||||||
return Action.SUCCEEDED;
|
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
case FLUSH:
|
case FLUSH:
|
||||||
{
|
{
|
||||||
|
@ -260,11 +244,8 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
chunkBuffer = BufferUtil.EMPTY_BUFFER;
|
chunkBuffer = BufferUtil.EMPTY_BUFFER;
|
||||||
if (contentBuffer == null)
|
if (contentBuffer == null)
|
||||||
contentBuffer = BufferUtil.EMPTY_BUFFER;
|
contentBuffer = BufferUtil.EMPTY_BUFFER;
|
||||||
|
long bytes = headerBuffer.remaining() + chunkBuffer.remaining() + contentBuffer.remaining();
|
||||||
httpConnectionOverHTTP.addBytesOut( BufferUtil.length(headerBuffer)
|
getHttpChannel().getHttpConnection().addBytesOut(bytes);
|
||||||
+ BufferUtil.length(chunkBuffer)
|
|
||||||
+ BufferUtil.length(contentBuffer));
|
|
||||||
|
|
||||||
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
|
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
|
||||||
generated = true;
|
generated = true;
|
||||||
return Action.SCHEDULED;
|
return Action.SCHEDULED;
|
||||||
|
@ -331,83 +312,6 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TrailersCallback extends IteratingCallback
|
|
||||||
{
|
|
||||||
private final Callback callback;
|
|
||||||
private ByteBuffer chunkBuffer;
|
|
||||||
|
|
||||||
public TrailersCallback(Callback callback)
|
|
||||||
{
|
|
||||||
this.callback = callback;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Action process() throws Throwable
|
|
||||||
{
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
HttpGenerator.Result result = generator.generateRequest(null, null, chunkBuffer, null, true);
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("Generated trailers {}/{}", result, generator);
|
|
||||||
switch (result)
|
|
||||||
{
|
|
||||||
case NEED_CHUNK_TRAILER:
|
|
||||||
{
|
|
||||||
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case FLUSH:
|
|
||||||
{
|
|
||||||
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
|
|
||||||
endPoint.write(this, chunkBuffer);
|
|
||||||
return Action.SCHEDULED;
|
|
||||||
}
|
|
||||||
case SHUTDOWN_OUT:
|
|
||||||
{
|
|
||||||
shutdownOutput();
|
|
||||||
return Action.SUCCEEDED;
|
|
||||||
}
|
|
||||||
case DONE:
|
|
||||||
{
|
|
||||||
return Action.SUCCEEDED;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
{
|
|
||||||
throw new IllegalStateException(result.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void succeeded()
|
|
||||||
{
|
|
||||||
release();
|
|
||||||
super.succeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void failed(Throwable x)
|
|
||||||
{
|
|
||||||
release();
|
|
||||||
callback.failed(x);
|
|
||||||
super.failed(x);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void onCompleteSuccess()
|
|
||||||
{
|
|
||||||
super.onCompleteSuccess();
|
|
||||||
callback.succeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void release()
|
|
||||||
{
|
|
||||||
httpClient.getByteBufferPool().release(chunkBuffer);
|
|
||||||
chunkBuffer = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class ByteBufferRecyclerCallback extends Callback.Nested
|
private class ByteBufferRecyclerCallback extends Callback.Nested
|
||||||
{
|
{
|
||||||
private final ByteBufferPool pool;
|
private final ByteBufferPool pool;
|
||||||
|
@ -435,7 +339,9 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
{
|
{
|
||||||
for (ByteBuffer buffer : buffers)
|
for (ByteBuffer buffer : buffers)
|
||||||
|
{
|
||||||
pool.release(buffer);
|
pool.release(buffer);
|
||||||
|
}
|
||||||
super.failed(x);
|
super.failed(x);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,10 +125,4 @@ public class HttpSenderOverFCGI extends HttpSender
|
||||||
getHttpChannel().flush(result);
|
getHttpChannel().flush(result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void sendTrailers(HttpExchange exchange, Callback callback)
|
|
||||||
{
|
|
||||||
callback.succeeded();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,16 +18,6 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.http2.client;
|
package org.eclipse.jetty.http2.client;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -35,7 +25,6 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.servlet.ServletInputStream;
|
import javax.servlet.ServletInputStream;
|
||||||
import javax.servlet.http.HttpServlet;
|
import javax.servlet.http.HttpServlet;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
@ -58,10 +47,14 @@ import org.eclipse.jetty.server.Response;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.FuturePromise;
|
import org.eclipse.jetty.util.FuturePromise;
|
||||||
import org.eclipse.jetty.util.Promise;
|
import org.eclipse.jetty.util.Promise;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.StringUtil;
|
import org.eclipse.jetty.util.StringUtil;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TrailersTest extends AbstractTest
|
public class TrailersTest extends AbstractTest
|
||||||
{
|
{
|
||||||
|
@ -289,7 +282,7 @@ public class TrailersTest extends AbstractTest
|
||||||
|
|
||||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
assertTrue( frames.size()==3, frames.toString());
|
assertEquals(3, frames.size(), frames.toString());
|
||||||
|
|
||||||
HeadersFrame headers = (HeadersFrame)frames.get(0);
|
HeadersFrame headers = (HeadersFrame)frames.get(0);
|
||||||
DataFrame data = (DataFrame)frames.get(1);
|
DataFrame data = (DataFrame)frames.get(1);
|
||||||
|
@ -298,7 +291,7 @@ public class TrailersTest extends AbstractTest
|
||||||
assertFalse(headers.isEndStream());
|
assertFalse(headers.isEndStream());
|
||||||
assertFalse(data.isEndStream());
|
assertFalse(data.isEndStream());
|
||||||
assertTrue(trailers.isEndStream());
|
assertTrue(trailers.isEndStream());
|
||||||
assertTrue(trailers.getMetaData().getFields().get(trailerName).equals(trailerValue));
|
assertEquals(trailers.getMetaData().getFields().get(trailerName), trailerValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -358,6 +351,5 @@ public class TrailersTest extends AbstractTest
|
||||||
|
|
||||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,42 +56,57 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
||||||
String path = relativize(request.getPath());
|
String path = relativize(request.getPath());
|
||||||
HttpURI uri = HttpURI.createHttpURI(request.getScheme(), request.getHost(), request.getPort(), path, null, request.getQuery(), null);
|
HttpURI uri = HttpURI.createHttpURI(request.getScheme(), request.getHost(), request.getPort(), path, null, request.getQuery(), null);
|
||||||
MetaData.Request metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_2, request.getHeaders());
|
MetaData.Request metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_2, request.getHeaders());
|
||||||
Supplier<HttpFields> trailers = request.getTrailers();
|
Supplier<HttpFields> trailerSupplier = request.getTrailers();
|
||||||
metaData.setTrailerSupplier(trailers);
|
metaData.setTrailerSupplier(trailerSupplier);
|
||||||
HeadersFrame headersFrame = new HeadersFrame(metaData, null, trailers == null && !content.hasContent());
|
|
||||||
HttpChannelOverHTTP2 channel = getHttpChannel();
|
|
||||||
Promise<Stream> promise = new Promise<Stream>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void succeeded(Stream stream)
|
|
||||||
{
|
|
||||||
channel.setStream(stream);
|
|
||||||
((IStream)stream).setAttachment(channel);
|
|
||||||
long idleTimeout = request.getIdleTimeout();
|
|
||||||
if (idleTimeout >= 0)
|
|
||||||
stream.setIdleTimeout(idleTimeout);
|
|
||||||
|
|
||||||
if (content.hasContent() && !expects100Continue(request))
|
HeadersFrame headersFrame;
|
||||||
|
Promise<Stream> promise;
|
||||||
|
if (content.hasContent())
|
||||||
|
{
|
||||||
|
headersFrame = new HeadersFrame(metaData, null, false);
|
||||||
|
promise = new HeadersPromise(request, callback)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void succeeded(Stream stream)
|
||||||
{
|
{
|
||||||
boolean advanced = content.advance();
|
super.succeeded(stream);
|
||||||
boolean lastContent = trailers == null && content.isLast();
|
if (expects100Continue(request))
|
||||||
if (advanced || lastContent)
|
|
||||||
{
|
{
|
||||||
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), lastContent);
|
// Don't send the content yet.
|
||||||
stream.data(dataFrame, callback);
|
callback.succeeded();
|
||||||
return;
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
boolean advanced = content.advance();
|
||||||
|
boolean lastContent = content.isLast();
|
||||||
|
if (advanced || lastContent)
|
||||||
|
sendContent(stream, content, trailerSupplier, callback);
|
||||||
|
else
|
||||||
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
callback.succeeded();
|
};
|
||||||
}
|
}
|
||||||
|
else
|
||||||
@Override
|
{
|
||||||
public void failed(Throwable failure)
|
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
|
||||||
|
boolean endStream = trailers == null || trailers.size() == 0;
|
||||||
|
headersFrame = new HeadersFrame(metaData, null, endStream);
|
||||||
|
promise = new HeadersPromise(request, callback)
|
||||||
{
|
{
|
||||||
callback.failed(failure);
|
@Override
|
||||||
}
|
public void succeeded(Stream stream)
|
||||||
};
|
{
|
||||||
|
super.succeeded(stream);
|
||||||
|
if (endStream)
|
||||||
|
callback.succeeded();
|
||||||
|
else
|
||||||
|
sendTrailers(stream, trailers, callback);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
// TODO optimize the send of HEADERS and DATA frames.
|
// TODO optimize the send of HEADERS and DATA frames.
|
||||||
|
HttpChannelOverHTTP2 channel = getHttpChannel();
|
||||||
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
|
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,24 +133,67 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
||||||
{
|
{
|
||||||
if (content.isConsumed())
|
if (content.isConsumed())
|
||||||
{
|
{
|
||||||
|
// The superclass calls sendContent() one more time after the last content.
|
||||||
|
// This is necessary for HTTP/1.1 to generate the terminal chunk (with trailers),
|
||||||
|
// but it's not necessary for HTTP/2 so we just succeed the callback.
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Stream stream = getHttpChannel().getStream();
|
Stream stream = getHttpChannel().getStream();
|
||||||
Supplier<HttpFields> trailers = exchange.getRequest().getTrailers();
|
Supplier<HttpFields> trailerSupplier = exchange.getRequest().getTrailers();
|
||||||
DataFrame frame = new DataFrame(stream.getId(), content.getByteBuffer(), trailers == null && content.isLast());
|
sendContent(stream, content, trailerSupplier, callback);
|
||||||
stream.data(frame, callback);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void sendContent(Stream stream, HttpContent content, Supplier<HttpFields> trailerSupplier, Callback callback)
|
||||||
protected void sendTrailers(HttpExchange exchange, Callback callback)
|
|
||||||
{
|
{
|
||||||
Supplier<HttpFields> trailers = exchange.getRequest().getTrailers();
|
boolean lastContent = content.isLast();
|
||||||
MetaData metaData = new MetaData(HttpVersion.HTTP_2, trailers.get());
|
HttpFields trailers = null;
|
||||||
Stream stream = getHttpChannel().getStream();
|
boolean endStream = false;
|
||||||
|
if (lastContent)
|
||||||
|
{
|
||||||
|
trailers = trailerSupplier == null ? null : trailerSupplier.get();
|
||||||
|
endStream = trailers == null || trailers.size() == 0;
|
||||||
|
}
|
||||||
|
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), endStream);
|
||||||
|
HttpFields fTrailers = trailers;
|
||||||
|
stream.data(dataFrame, endStream || !lastContent ? callback : Callback.from(() -> sendTrailers(stream, fTrailers, callback), callback::failed));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendTrailers(Stream stream, HttpFields trailers, Callback callback)
|
||||||
|
{
|
||||||
|
MetaData metaData = new MetaData(HttpVersion.HTTP_2, trailers);
|
||||||
HeadersFrame trailersFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
HeadersFrame trailersFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
||||||
stream.headers(trailersFrame, callback);
|
stream.headers(trailersFrame, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class HeadersPromise implements Promise<Stream>
|
||||||
|
{
|
||||||
|
private final HttpRequest request;
|
||||||
|
private final Callback callback;
|
||||||
|
|
||||||
|
private HeadersPromise(HttpRequest request, Callback callback)
|
||||||
|
{
|
||||||
|
this.request = request;
|
||||||
|
this.callback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void succeeded(Stream stream)
|
||||||
|
{
|
||||||
|
HttpChannelOverHTTP2 channel = getHttpChannel();
|
||||||
|
channel.setStream(stream);
|
||||||
|
((IStream)stream).setAttachment(channel);
|
||||||
|
long idleTimeout = request.getIdleTimeout();
|
||||||
|
if (idleTimeout >= 0)
|
||||||
|
stream.setIdleTimeout(idleTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failed(Throwable x)
|
||||||
|
{
|
||||||
|
callback.failed(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.http2.client.http;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.client.HttpRequest;
|
||||||
|
import org.eclipse.jetty.client.api.ContentResponse;
|
||||||
|
import org.eclipse.jetty.client.util.DeferredContentProvider;
|
||||||
|
import org.eclipse.jetty.client.util.StringContentProvider;
|
||||||
|
import org.eclipse.jetty.http.HttpFields;
|
||||||
|
import org.eclipse.jetty.http.HttpStatus;
|
||||||
|
import org.eclipse.jetty.http.HttpVersion;
|
||||||
|
import org.eclipse.jetty.http.MetaData;
|
||||||
|
import org.eclipse.jetty.http2.api.Stream;
|
||||||
|
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||||
|
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class RequestTrailersTest extends AbstractTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testEmptyTrailersWithoutContent() throws Exception
|
||||||
|
{
|
||||||
|
testEmptyTrailers(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyTrailersWithEagerContent() throws Exception
|
||||||
|
{
|
||||||
|
testEmptyTrailers("eager_content");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testEmptyTrailers(String content) throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch trailersLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
|
||||||
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
|
return new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
trailersLatch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
|
||||||
|
HttpFields trailers = new HttpFields();
|
||||||
|
request.trailers(() -> trailers);
|
||||||
|
if (content != null)
|
||||||
|
request.content(new StringContentProvider(content));
|
||||||
|
|
||||||
|
ContentResponse response = request.send();
|
||||||
|
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||||
|
|
||||||
|
// The client must not send the trailers.
|
||||||
|
assertFalse(trailersLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyTrailersWithDeferredContent() throws Exception
|
||||||
|
{
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
return new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onData(Stream stream, DataFrame dataFrame, Callback callback)
|
||||||
|
{
|
||||||
|
callback.succeeded();
|
||||||
|
// We should not receive an empty HEADERS frame for the
|
||||||
|
// trailers, but instead a DATA frame with endStream=true.
|
||||||
|
if (dataFrame.isEndStream())
|
||||||
|
{
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
|
||||||
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
|
||||||
|
HttpFields trailers = new HttpFields();
|
||||||
|
request.trailers(() -> trailers);
|
||||||
|
DeferredContentProvider content = new DeferredContentProvider();
|
||||||
|
request.content(content);
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
request.send(result ->
|
||||||
|
{
|
||||||
|
assertTrue(result.isSucceeded());
|
||||||
|
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||||
|
latch.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Send deferred content after a while.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
content.offer(ByteBuffer.wrap("deferred_content".getBytes(StandardCharsets.UTF_8)));
|
||||||
|
content.close();
|
||||||
|
|
||||||
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyTrailersWithEmptyDeferredContent() throws Exception
|
||||||
|
{
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
return new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onData(Stream stream, DataFrame dataFrame, Callback callback)
|
||||||
|
{
|
||||||
|
callback.succeeded();
|
||||||
|
// We should not receive an empty HEADERS frame for the
|
||||||
|
// trailers, but instead a DATA frame with endStream=true.
|
||||||
|
if (dataFrame.isEndStream())
|
||||||
|
{
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
|
||||||
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
|
||||||
|
HttpFields trailers = new HttpFields();
|
||||||
|
request.trailers(() -> trailers);
|
||||||
|
DeferredContentProvider content = new DeferredContentProvider();
|
||||||
|
request.content(content);
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
request.send(result ->
|
||||||
|
{
|
||||||
|
assertTrue(result.isSucceeded());
|
||||||
|
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||||
|
latch.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Send deferred content after a while.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
content.close();
|
||||||
|
|
||||||
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue