Issue #6728 - QUIC and HTTP/3

- WIP on the client upper layer.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-19 18:12:11 +02:00
parent af885c3b49
commit 21464f85ff
4 changed files with 243 additions and 26 deletions

View File

@ -26,6 +26,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
private final HTTP3SessionClient session;
private final HttpSenderOverHTTP3 sender;
private final HttpReceiverOverHTTP3 receiver;
private Stream stream;
public HttpChannelOverHTTP3(HttpDestination destination, HTTP3SessionClient session)
{
@ -57,6 +58,16 @@ public class HttpChannelOverHTTP3 extends HttpChannel
return receiver;
}
public Stream getStream()
{
return stream;
}
public void setStream(Stream stream)
{
this.stream = stream;
}
@Override
public void send(HttpExchange exchange)
{

View File

@ -24,11 +24,17 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listener
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP3.class);
private boolean notifySuccess;
protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel)
{
super(channel);
@ -40,6 +46,23 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
return (HttpChannelOverHTTP3)super.getHttpChannel();
}
@Override
protected void receive()
{
// Called when the application resumes demand of content.
if (LOG.isDebugEnabled())
LOG.debug("resuming response processing on {}", this);
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
if (notifySuccess)
responseSuccess(exchange);
else
getHttpChannel().getStream().demand();
}
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
@ -73,23 +96,12 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
}
else
{
if (frame.isLast())
{
// There is no demand to trigger response success, so add
// a poison pill to trigger it when there will be demand.
// TODO
// notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after headers on {}", this);
notifySuccess = frame.isLast();
}
}
}
}
@Override
protected void receive()
{
// Called when the application resumes demand of content.
// TODO: stream.demand() should be enough.
}
@Override
public void onDataAvailable(Stream stream)
@ -104,8 +116,12 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
ByteBuffer byteBuffer = data.getByteBuffer();
if (byteBuffer.hasRemaining())
{
// TODO: callback failure should invoke responseFailure().
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete);
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete, x ->
{
data.complete();
if (responseFailure(x))
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
});
boolean proceed = responseContent(exchange, byteBuffer, callback);
if (proceed)
{
@ -114,6 +130,12 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
else
stream.demand();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after {} on {}", data, this);
notifySuccess = data.isLast();
}
}
else
{
@ -139,10 +161,6 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
HttpFields trailers = frame.getMetaData().getFields();
trailers.forEach(exchange.getResponse()::trailer);
// Previous DataFrames had endStream=false, so
// add a poison pill to trigger response success
// after all normal DataFrames have been consumed.
// TODO
// notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
responseSuccess(exchange);
}
}

View File

@ -36,8 +36,6 @@ import org.eclipse.jetty.util.Callback;
public class HttpSenderOverHTTP3 extends HttpSender
{
private Stream stream;
public HttpSenderOverHTTP3(HttpChannelOverHTTP3 channel)
{
super(channel);
@ -140,7 +138,7 @@ public class HttpSenderOverHTTP3 extends HttpSender
private Stream onNewStream(Stream stream, HttpRequest request)
{
this.stream = stream;
getHttpChannel().setStream(stream);
long idleTimeout = request.getIdleTimeout();
if (idleTimeout > 0)
((HTTP3Stream)stream).setIdleTimeout(idleTimeout);
@ -157,6 +155,7 @@ public class HttpSenderOverHTTP3 extends HttpSender
@Override
protected void sendContent(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback)
{
Stream stream = getHttpChannel().getStream();
boolean hasContent = contentBuffer.hasRemaining();
if (lastContent)
{

View File

@ -14,21 +14,58 @@
package org.eclipse.jetty.http3.tests;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientTransportOverHTTP3Test extends AbstractClientServerTest
{
@Test
public void testRequestResponse() throws Exception
public void testRequestHasHTTP2Version() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.setHandled(true);
HttpVersion version = HttpVersion.fromString(request.getProtocol());
response.setStatus(version == HttpVersion.HTTP_3 ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500);
}
});
ContentResponse response = httpClient.newRequest("localhost", connector.getLocalPort())
.onRequestBegin(request ->
{
if (request.getVersion() != HttpVersion.HTTP_3)
request.abort(new Exception("Not an HTTP/3 request"));
})
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@Test
public void testRequestResponseWithSmallContent() throws Exception
{
String content = "Hello, World!";
start(new AbstractHandler()
@ -42,8 +79,160 @@ public class HttpClientTransportOverHTTP3Test extends AbstractClientServerTest
});
ContentResponse response = httpClient.newRequest("https://localhost:" + connector.getLocalPort())
.timeout(555, TimeUnit.SECONDS)
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(content, response.getContentAsString());
}
@Test
public void testDelayedClientRead() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
response.getOutputStream().write(new byte[10 * 1024]);
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
CountDownLatch beforeContentLatch = new CountDownLatch(1);
AtomicInteger contentCount = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
httpClient.newRequest("https://localhost:" + connector.getLocalPort())
.onResponseContentDemanded(new Response.DemandedContentListener()
{
@Override
public void onBeforeContent(Response response, LongConsumer demand)
{
// Do not demand.
demandRef.set(demand);
beforeContentLatch.countDown();
}
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
contentCount.incrementAndGet();
callback.succeeded();
demand.accept(1);
}
})
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch.countDown();
});
assertTrue(beforeContentLatch.await(5, TimeUnit.SECONDS));
// Verify that onContent() is not called.
Thread.sleep(1000);
assertEquals(0, contentCount.get());
// Demand content.
demandRef.get().accept(1);
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testDelayDemandAfterHeaders() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.setHandled(true);
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
CountDownLatch beforeContentLatch = new CountDownLatch(1);
AtomicInteger contentCount = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
httpClient.newRequest("localhost", connector.getLocalPort())
.onResponseContentDemanded(new Response.DemandedContentListener()
{
@Override
public void onBeforeContent(Response response, LongConsumer demand)
{
// Do not demand.
demandRef.set(demand);
beforeContentLatch.countDown();
}
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
contentCount.incrementAndGet();
}
})
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch.countDown();
});
assertTrue(beforeContentLatch.await(5, TimeUnit.SECONDS));
// Verify that the response is not completed yet.
assertFalse(latch.await(1, TimeUnit.SECONDS));
// Demand to succeed the response.
demandRef.get().accept(1);
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(0, contentCount.get());
}
@Test
public void testDelayDemandAfterLastContentChunk() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
response.getOutputStream().print("0");
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
CountDownLatch contentLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
httpClient.newRequest("localhost", connector.getLocalPort())
.onResponseContentDemanded((response, demand, content, callback) ->
{
callback.succeeded();
// Do not demand.
demandRef.set(demand);
contentLatch.countDown();
})
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch.countDown();
});
assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
// Verify that the response is not completed yet.
assertFalse(latch.await(1, TimeUnit.SECONDS));
// Demand to succeed the response.
demandRef.get().accept(1);
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}