From 87edb609bdff1f0cdc8b3b8301ef967b88e41794 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 22 Sep 2021 12:34:57 +0200 Subject: [PATCH] Issue #6728 - QUIC and HTTP/3 - Updates to the data support after review. Signed-off-by: Simone Bordet --- .../client/internal/ClientHTTP3Session.java | 4 +- .../client/internal/HTTP3SessionClient.java | 1 + .../org/eclipse/jetty/http3/api/Stream.java | 32 ++++--------- .../jetty/http3/internal/HTTP3Session.java | 4 ++ .../jetty/http3/internal/HTTP3Stream.java | 4 +- .../http3/internal/HTTP3StreamConnection.java | 11 ++--- .../server/internal/ServerHTTP3Session.java | 47 +++++++++---------- .../http3/tests/HTTP3ClientServerTest.java | 21 +++++---- .../http3/tests/HTTP3DataDemandTest.java | 33 ++++++++----- 9 files changed, 75 insertions(+), 82 deletions(-) diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index 9b0c7e9789a..8032cdfe4f6 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -122,14 +122,14 @@ public class ClientHTTP3Session extends ClientProtocolSession if (streamType == StreamType.CLIENT_BIDIRECTIONAL) { if (LOG.isDebugEnabled()) - LOG.debug("stream #{} selected for read", readableStreamId); + LOG.debug("bidirectional stream #{} selected for read", readableStreamId); return super.onReadable(readableStreamId); } else { QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); if (LOG.isDebugEnabled()) - LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); + LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); return streamEndPoint.onReadable(); } } diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java index 63249f7f46c..1ced94e64c6 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java @@ -53,6 +53,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client getProtocolSession().writeFrame(streamId, frame, callback); } + @Override public void onOpen() { promise.succeeded(this); diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java index cd1ddfaa6c6..8f79bce356a 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java @@ -26,7 +26,7 @@ public interface Stream public Stream.Data readData(); - public void demand(boolean enable); + public void demand(); public CompletableFuture trailer(HeadersFrame frame); @@ -48,12 +48,12 @@ public interface Stream public static class Data { private final DataFrame frame; - private final CompletableFuture callback; + private final Runnable complete; - public Data(DataFrame frame, CompletableFuture callback) + public Data(DataFrame frame, Runnable complete) { this.frame = frame; - this.callback = callback; + this.complete = complete; } public DataFrame frame() @@ -61,29 +61,15 @@ public interface Stream return frame; } - public CompletableFuture callback() + public void complete() { - return callback; + complete.run(); } - public void complete(Object result, Throwable failure) + @Override + public String toString() { - if (failure == null) - callback().complete(result); - else - callback().completeExceptionally(failure); - } - - public void completeAndDemand(Stream stream, Throwable failure) - { - complete(stream, failure); - if (failure == null) - stream.demand(true); - } - - public void succeed() - { - callback().complete(null); + return String.format("%s[%s]", getClass().getSimpleName(), frame); } } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java index 01ae332ce0e..3e9dc90d705 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java @@ -44,6 +44,10 @@ public abstract class HTTP3Session implements Session, ParserListener this.listener = listener; } + public void onOpen() + { + } + public ProtocolSession getProtocolSession() { return session; diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java index 90b2780c260..342b2d5938a 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java @@ -66,10 +66,10 @@ public class HTTP3Stream implements Stream } @Override - public void demand(boolean enable) + public void demand() { HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection(); - connection.demand(enable); + connection.demand(); } @Override diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java index 73a9a18a61b..54a3825736e 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java @@ -16,7 +16,6 @@ package org.eclipse.jetty.http3.internal; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import org.eclipse.jetty.http.MetaData; @@ -58,8 +57,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool); this.parser = parser; parser.init(MessageListener::new); - // By default, invoke onDataAvailable() after onRequest()/onResponse(). - this.dataDemand = true; } @Override @@ -146,9 +143,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection buffer.retain(); - CompletableFuture completable = new CompletableFuture<>() - .whenComplete((r, x) -> buffer.release()); - return new Stream.Data(frame, completable); + return new Stream.Data(frame, buffer::release); } case MODE_SWITCH: { @@ -170,12 +165,12 @@ public abstract class HTTP3StreamConnection extends AbstractConnection } } - public void demand(boolean enable) + public void demand() { boolean process = false; try (AutoLock l = lock.lock()) { - dataDemand = enable; + dataDemand = true; if (dataStalled) { dataStalled = false; diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index b22b4d3777f..64a7c79d081 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -25,7 +25,6 @@ import org.eclipse.jetty.http3.internal.HTTP3Flusher; import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionHandler; import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection; -import org.eclipse.jetty.http3.internal.generator.MessageGenerator; import org.eclipse.jetty.http3.qpack.QpackDecoder; import org.eclipse.jetty.http3.qpack.QpackEncoder; import org.eclipse.jetty.quic.common.QuicStreamEndPoint; @@ -42,37 +41,32 @@ public class ServerHTTP3Session extends ServerProtocolSession private final QpackEncoder encoder; private final QpackDecoder decoder; - private final HTTP3SessionServer apiSession; - private final InstructionFlusher encoderFlusher; - private final InstructionFlusher decoderFlusher; + private final HTTP3SessionServer applicationSession; private final ControlFlusher controlFlusher; - private final MessageGenerator generator; private final HTTP3Flusher messageFlusher; public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize) { super(session); - this.apiSession = new HTTP3SessionServer(this, listener); + this.applicationSession = new HTTP3SessionServer(this, listener); if (LOG.isDebugEnabled()) LOG.debug("initializing HTTP/3 streams"); long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); - QuicStreamEndPoint encoderEndPoint = configureEncoderEndPoint(encoderStreamId); - this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE); - this.encoder = new QpackEncoder(new InstructionHandler(encoderFlusher), maxBlockedStreams); + QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId); + InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE); + this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams); if (LOG.isDebugEnabled()) LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint); long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); - QuicStreamEndPoint decoderEndPoint = configureDecoderEndPoint(decoderStreamId); - this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE); - this.decoder = new QpackDecoder(new InstructionHandler(decoderFlusher), maxRequestHeadersSize); + QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId); + InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE); + this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxRequestHeadersSize); if (LOG.isDebugEnabled()) LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint); - // TODO: make parameters configurable. - this.generator = new MessageGenerator(encoder, 4096, true); long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId); this.controlFlusher = new ControlFlusher(session, controlEndPoint); @@ -90,29 +84,25 @@ public class ServerHTTP3Session extends ServerProtocolSession public HTTP3SessionServer getSessionServer() { - return apiSession; + return applicationSession; } @Override public void onOpen() { // Queue the mandatory SETTINGS frame. - Map settings = apiSession.onPreface(); + Map settings = applicationSession.onPreface(); if (settings == null) settings = Map.of(); // TODO: add default settings. SettingsFrame frame = new SettingsFrame(settings); controlFlusher.offer(frame, Callback.NOOP); controlFlusher.iterate(); + + applicationSession.onOpen(); } - private QuicStreamEndPoint configureEncoderEndPoint(long streamId) - { - // This is a write-only stream, so no need to link a Connection. - return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen); - } - - private QuicStreamEndPoint configureDecoderEndPoint(long streamId) + private QuicStreamEndPoint configureInstructionEndPoint(long streamId) { // This is a write-only stream, so no need to link a Connection. return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen); @@ -136,7 +126,6 @@ public class ServerHTTP3Session extends ServerProtocolSession } else { - // On the server, we need a get-or-create semantic in case of reads. QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); if (LOG.isDebugEnabled()) LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); @@ -146,7 +135,7 @@ public class ServerHTTP3Session extends ServerProtocolSession private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint) { - UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, apiSession); + UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession); endPoint.setConnection(connection); endPoint.onOpen(); connection.onOpen(); @@ -161,6 +150,12 @@ public class ServerHTTP3Session extends ServerProtocolSession protected void onDataAvailable(long streamId) { - apiSession.onDataAvailable(streamId); + applicationSession.onDataAvailable(streamId); + } + + @Override + public String toString() + { + return String.format("%s@%x", getClass().getSimpleName(), hashCode()); } } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java index 116e9168018..ff5ecbad237 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java @@ -134,7 +134,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest { // Send the response. stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false)); - // Implicit demand, so onDataAvailable() will be called. + stream.demand(); return new Stream.Listener() { @Override @@ -145,13 +145,13 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest if (data == null) { // Call me again when you have data. - stream.demand(true); + stream.demand(); return; } // Recycle the ByteBuffer in data.frame. - data.succeed(); + data.complete(); // Call me again immediately. - stream.demand(true); + stream.demand(); if (data.frame().isLast()) serverLatch.get().countDown(); } @@ -205,6 +205,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest { // Send the response headers. stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false)); + stream.demand(); return new Stream.Listener() { @Override @@ -214,12 +215,15 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest Stream.Data data = stream.readData(); if (data == null) { - stream.demand(true); + stream.demand(); return; } // Echo it back, then demand only when the write is finished. stream.data(data.frame()) - .whenComplete(data::completeAndDemand); + // Always complete. + .whenComplete((s, x) -> data.complete()) + // Demand only if successful. + .thenRun(stream::demand); } }; } @@ -244,6 +248,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest public void onResponse(Stream stream, HeadersFrame frame) { clientResponseLatch.countDown(); + stream.demand(); } @Override @@ -255,12 +260,12 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest { // Consume data. byteBuffer.put(data.frame().getData()); - data.callback().complete(null); + data.complete(); if (data.frame().isLast()) clientDataLatch.countDown(); } // Demand more data. - stream.demand(true); + stream.demand(); } }) .get(5, TimeUnit.SECONDS); diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java index 0f9860f01ed..1bfe0c42fec 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java @@ -60,6 +60,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { + stream.demand(); return new Stream.Listener() { @Override @@ -78,7 +79,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest if (data != null && data.frame().isLast()) serverDataLatch.countDown(); else - stream.demand(true); + stream.demand(); } } }; @@ -101,7 +102,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest assertEquals(1, onDataAvailableCalls.get()); // Resume processing of data, this should call onDataAvailable(). - serverStreamRef.get().demand(true); + serverStreamRef.get().demand(); assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); } @@ -118,6 +119,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { + stream.demand(); return new Stream.Listener() { @Override @@ -139,7 +141,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest if (data != null && data.frame().isLast()) serverDataLatch.countDown(); else - stream.demand(true); + stream.demand(); } } }; @@ -162,9 +164,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest assertEquals(1, onDataAvailableCalls.get()); // Resume processing of data, this should call onDataAvailable(), but there is no data to read yet. - serverStreamRef.get().demand(true); + Stream serverStream = serverStreamRef.get(); + serverStream.demand(); - await().atMost(1, TimeUnit.SECONDS).until(() -> onDataAvailableCalls.get() == 2 && ((HTTP3Stream)stream).hasDemand()); + await().atMost(1, TimeUnit.SECONDS).until(() -> onDataAvailableCalls.get() == 2 && ((HTTP3Stream)serverStream).hasDemand()); stream.data(new DataFrame(ByteBuffer.allocate(32), true)); @@ -183,6 +186,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { + stream.demand(); return new Stream.Listener() { @Override @@ -209,7 +213,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest if (data != null && data.frame().isLast()) serverDataLatch.countDown(); else - stream.demand(true); + stream.demand(); } } }; @@ -235,7 +239,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest stream.data(new DataFrame(BufferUtil.EMPTY_BUFFER, true)); // Resume processing of data, this should call onDataAvailable(). - serverStreamRef.get().demand(true); + serverStreamRef.get().demand(); assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); } @@ -251,6 +255,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { + stream.demand(); return new Stream.Listener() { @Override @@ -261,7 +266,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest Stream.Data data = stream.readData(); assertNull(data); // It's typical to demand after null data. - stream.demand(true); + stream.demand(); serverDataLatch.countDown(); } @@ -306,6 +311,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { + stream.demand(); return new Stream.Listener() { @Override @@ -318,7 +324,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest if (dataRead.addAndGet(data.frame().getData().remaining()) == dataLength) serverDataLatch.countDown(); } - stream.demand(true); + stream.demand(); } @Override @@ -363,6 +369,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { + stream.demand(); return new Stream.Listener() { @Override @@ -373,7 +380,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest Stream.Data data = stream.readData(); if (data == null) { - stream.demand(true); + stream.demand(); return; } // Store the Data away to be used later. @@ -422,7 +429,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest { serverStreamRef.set(stream); serverRequestLatch.countDown(); - stream.demand(false); + // Do not demand here. return new Stream.Listener() { @Override @@ -432,7 +439,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest Stream.Data data = stream.readData(); if (data != null && data.frame().isLast()) serverDataLatch.countDown(); - stream.demand(true); + stream.demand(); } }; } @@ -456,7 +463,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest assertEquals(0, onDataAvailableCalls.get()); // Resume processing of data. - serverStreamRef.get().demand(true); + serverStreamRef.get().demand(); assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); }