diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3ClientConnectionFactory.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3ClientConnectionFactory.java index 4dc9c19675d..ff8b4f39497 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3ClientConnectionFactory.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3ClientConnectionFactory.java @@ -17,7 +17,7 @@ import java.util.Map; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.client.internal.ClientHTTP3Session; -import org.eclipse.jetty.http3.internal.HTTP3Connection; +import org.eclipse.jetty.http3.client.internal.ClientHTTP3StreamConnection; import org.eclipse.jetty.http3.internal.parser.MessageParser; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.Connection; @@ -76,8 +76,7 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint; long streamId = streamEndPoint.getStreamId(); ClientHTTP3Session http3Session = (ClientHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession(); - // TODO: Parser may be created internally, if I pass the QuicStreamEndPoint and ClientHTTP3Session. MessageParser parser = new MessageParser(http3Session.getSessionClient(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished); - return new HTTP3Connection(streamEndPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser); + return new ClientHTTP3StreamConnection(streamEndPoint, http3Session, parser); } } diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index 5ae8780d999..9b0c7e9789a 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -19,8 +19,8 @@ import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.ControlFlusher; -import org.eclipse.jetty.http3.internal.DecoderConnection; -import org.eclipse.jetty.http3.internal.EncoderConnection; +import org.eclipse.jetty.http3.internal.DecoderStreamConnection; +import org.eclipse.jetty.http3.internal.EncoderStreamConnection; import org.eclipse.jetty.http3.internal.HTTP3Flusher; import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionHandler; @@ -42,30 +42,28 @@ public class ClientHTTP3Session extends ClientProtocolSession private final QpackEncoder encoder; private final QpackDecoder decoder; - private final HTTP3SessionClient apiSession; - private final InstructionFlusher encoderInstructionFlusher; - private final InstructionFlusher decoderInstructionFlusher; + private final HTTP3SessionClient applicationSession; private final ControlFlusher controlFlusher; private final HTTP3Flusher messageFlusher; public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise promise, int maxBlockedStreams, int maxResponseHeadersSize) { super(session); - this.apiSession = new HTTP3SessionClient(this, listener, promise); + this.applicationSession = new HTTP3SessionClient(this, listener, promise); if (LOG.isDebugEnabled()) LOG.debug("initializing HTTP/3 streams"); long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL); QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId); - this.encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderConnection.STREAM_TYPE); + InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE); this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams); if (LOG.isDebugEnabled()) LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint); long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL); QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId); - this.decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderConnection.STREAM_TYPE); + InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE); this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxResponseHeadersSize); if (LOG.isDebugEnabled()) LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint); @@ -87,14 +85,14 @@ public class ClientHTTP3Session extends ClientProtocolSession public HTTP3SessionClient getSessionClient() { - return apiSession; + return applicationSession; } @Override public void onOpen() { // Queue the mandatory SETTINGS frame. - Map settings = apiSession.onPreface(); + Map settings = applicationSession.onPreface(); if (settings == null) settings = Map.of(); // TODO: add default settings. @@ -102,7 +100,7 @@ public class ClientHTTP3Session extends ClientProtocolSession controlFlusher.offer(frame, Callback.NOOP); controlFlusher.iterate(); - apiSession.onOpen(); + applicationSession.onOpen(); } private QuicStreamEndPoint configureInstructionEndPoint(long streamId) @@ -118,27 +116,27 @@ public class ClientHTTP3Session extends ClientProtocolSession } @Override - protected void onReadable(long readableStreamId) + protected boolean onReadable(long readableStreamId) { StreamType streamType = StreamType.from(readableStreamId); if (streamType == StreamType.CLIENT_BIDIRECTIONAL) { if (LOG.isDebugEnabled()) LOG.debug("stream #{} selected for read", readableStreamId); - super.onReadable(readableStreamId); + return super.onReadable(readableStreamId); } else { QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); if (LOG.isDebugEnabled()) LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); - streamEndPoint.onReadable(); + return streamEndPoint.onReadable(); } } private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint) { - UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, apiSession); + UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession); endPoint.setConnection(connection); endPoint.onOpen(); connection.onOpen(); @@ -151,6 +149,11 @@ public class ClientHTTP3Session extends ClientProtocolSession messageFlusher.iterate(); } + public void onDataAvailable(long streamId) + { + applicationSession.onDataAvailable(streamId); + } + @Override public String toString() { diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3StreamConnection.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3StreamConnection.java new file mode 100644 index 00000000000..b9d131c4b8e --- /dev/null +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3StreamConnection.java @@ -0,0 +1,35 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client.internal; + +import org.eclipse.jetty.http3.internal.HTTP3StreamConnection; +import org.eclipse.jetty.http3.internal.parser.MessageParser; +import org.eclipse.jetty.quic.common.QuicStreamEndPoint; + +public class ClientHTTP3StreamConnection extends HTTP3StreamConnection +{ + private final ClientHTTP3Session http3Session; + + public ClientHTTP3StreamConnection(QuicStreamEndPoint endPoint, ClientHTTP3Session http3Session, MessageParser parser) + { + super(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser); + this.http3Session = http3Session; + } + + @Override + protected void onDataAvailable(long streamId) + { + http3Session.onDataAvailable(streamId); + } +} diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java index 17ef7de4805..cd1ddfaa6c6 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java @@ -17,36 +17,73 @@ import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; -import org.eclipse.jetty.util.Callback; public interface Stream { public CompletableFuture respond(HeadersFrame frame); + public CompletableFuture data(DataFrame dataFrame); + + public Stream.Data readData(); + + public void demand(boolean enable); + + public CompletableFuture trailer(HeadersFrame frame); + public interface Listener { public default void onResponse(Stream stream, HeadersFrame frame) { - } - public default void onData(Stream stream, DataFrame frame, Callback callback) + public default void onDataAvailable(Stream stream) { - // TODO: alternative API -// public void onDataAvailable(Stream s) -// { -// while (true) -// { -// DataFrame frame = s.pollData(); -// if (frame == null) -// return; -// process(frame); -// } -// } } public default void onTrailer(Stream stream, HeadersFrame frame) { } } + + public static class Data + { + private final DataFrame frame; + private final CompletableFuture callback; + + public Data(DataFrame frame, CompletableFuture callback) + { + this.frame = frame; + this.callback = callback; + } + + public DataFrame frame() + { + return frame; + } + + public CompletableFuture callback() + { + return callback; + } + + public void complete(Object result, Throwable failure) + { + if (failure == null) + callback().complete(result); + else + callback().completeExceptionally(failure); + } + + public void completeAndDemand(Stream stream, Throwable failure) + { + complete(stream, failure); + if (failure == null) + stream.demand(true); + } + + public void succeed() + { + callback().complete(null); + } + } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/DataFrame.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/DataFrame.java index e71143a0d7b..09600d376eb 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/DataFrame.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/DataFrame.java @@ -42,9 +42,4 @@ public class DataFrame extends Frame { return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), getData().remaining()); } - - public DataFrame withLast(boolean last) - { - return new DataFrame(data, last); - } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/HeadersFrame.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/HeadersFrame.java index b266b4f5cc3..36c1938cf57 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/HeadersFrame.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/HeadersFrame.java @@ -40,11 +40,6 @@ public class HeadersFrame extends Frame @Override public String toString() { - return String.format("%s[last=%b,%s]", super.toString(), isLast(), getMetaData()); - } - - public HeadersFrame withLast(boolean last) - { - return new HeadersFrame(metaData, last); + return String.format("%s[last=%b,{%s}]", super.toString(), isLast(), getMetaData()); } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlFlusher.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlFlusher.java index 19c6d6eae5b..c0c193e7ce2 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlFlusher.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlFlusher.java @@ -82,8 +82,8 @@ public class ControlFlusher extends IteratingCallback if (!initialized) { initialized = true; - ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE)); - VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE); + ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlStreamConnection.STREAM_TYPE)); + VarLenInt.generate(buffer, ControlStreamConnection.STREAM_TYPE); buffer.flip(); lease.insert(0, buffer, false); } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlStreamConnection.java similarity index 93% rename from jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlConnection.java rename to jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlStreamConnection.java index 6524cec3315..df8dd17f7ed 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlStreamConnection.java @@ -25,18 +25,18 @@ import org.eclipse.jetty.util.BufferUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ControlConnection extends AbstractConnection implements Connection.UpgradeTo +public class ControlStreamConnection extends AbstractConnection implements Connection.UpgradeTo { // SPEC: Control Stream Type. public static final int STREAM_TYPE = 0x00; - private static final Logger LOG = LoggerFactory.getLogger(ControlConnection.class); + private static final Logger LOG = LoggerFactory.getLogger(ControlStreamConnection.class); private final ByteBufferPool byteBufferPool; private final ControlParser parser; private boolean useInputDirectByteBuffers = true; private ByteBuffer buffer; - public ControlConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser) + public ControlStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser) { super(endPoint, executor); this.byteBufferPool = byteBufferPool; diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java similarity index 86% rename from jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderConnection.java rename to jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java index 434c2323ef9..694d93a487c 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java @@ -21,14 +21,14 @@ import org.eclipse.jetty.http3.qpack.QpackException; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; -public class DecoderConnection extends InstructionConnection +public class DecoderStreamConnection extends InstructionStreamConnection { // SPEC: QPACK Encoder Stream Type. public static final int STREAM_TYPE = 0x03; private final QpackEncoder encoder; - public DecoderConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder) + public DecoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder) { super(endPoint, executor, byteBufferPool); this.encoder = encoder; diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java similarity index 86% rename from jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderConnection.java rename to jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java index 66a981bf53d..9a944d935d7 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java @@ -21,14 +21,14 @@ import org.eclipse.jetty.http3.qpack.QpackException; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; -public class EncoderConnection extends InstructionConnection +public class EncoderStreamConnection extends InstructionStreamConnection { // SPEC: QPACK Encoder Stream Type. public static final int STREAM_TYPE = 0x02; private final QpackDecoder decoder; - public EncoderConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder) + public EncoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder) { super(endPoint, executor, byteBufferPool); this.decoder = decoder; diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Connection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Connection.java deleted file mode 100644 index 83de4429e7a..00000000000 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Connection.java +++ /dev/null @@ -1,96 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.http3.internal; - -import java.nio.ByteBuffer; -import java.util.concurrent.Executor; - -import org.eclipse.jetty.http3.internal.parser.MessageParser; -import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.quic.common.QuicStreamEndPoint; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HTTP3Connection extends AbstractConnection -{ - private static final Logger LOG = LoggerFactory.getLogger(HTTP3Connection.class); - - private final ByteBufferPool byteBufferPool; - private final MessageParser parser; - private boolean useInputDirectByteBuffers = true; - - public HTTP3Connection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser) - { - super(endPoint, executor); - this.byteBufferPool = byteBufferPool; - this.parser = parser; - } - - public boolean isUseInputDirectByteBuffers() - { - return useInputDirectByteBuffers; - } - - public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers) - { - this.useInputDirectByteBuffers = useInputDirectByteBuffers; - } - - @Override - public void onOpen() - { - super.onOpen(); - fillInterested(); - } - - @Override - public void onFillable() - { - ByteBuffer buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); - try - { - while (true) - { - int filled = getEndPoint().fill(buffer); - if (LOG.isDebugEnabled()) - LOG.debug("filled {} on {}", filled, this); - - if (filled > 0) - { - parser.parse(buffer); - } - else if (filled == 0) - { - byteBufferPool.release(buffer); - fillInterested(); - break; - } - else - { - byteBufferPool.release(buffer); - getEndPoint().close(); - break; - } - } - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("could not process control stream {}", getEndPoint(), x); - byteBufferPool.release(buffer); - getEndPoint().close(x); - } - } -} diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java index a9cc79efba1..01ae332ce0e 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java @@ -114,24 +114,27 @@ public abstract class HTTP3Session implements Session, ParserListener @Override public void onHeaders(long streamId, HeadersFrame frame) { - if (LOG.isDebugEnabled()) - LOG.debug("received {}#{} on {}", frame, streamId, this); - QuicStreamEndPoint endPoint = session.getStreamEndPoint(streamId); HTTP3Stream stream = getOrCreateStream(endPoint); MetaData metaData = frame.getMetaData(); if (metaData.isRequest()) { + if (LOG.isDebugEnabled()) + LOG.debug("received request {}#{} on {}", frame, streamId, this); Stream.Listener streamListener = notifyRequest(stream, frame); stream.setListener(streamListener); } else if (metaData.isResponse()) { + if (LOG.isDebugEnabled()) + LOG.debug("received response {}#{} on {}", frame, streamId, this); notifyResponse(stream, frame); } else { + if (LOG.isDebugEnabled()) + LOG.debug("received trailer {}#{} on {}", frame, streamId, this); notifyTrailer(stream, frame); } } @@ -153,7 +156,9 @@ public abstract class HTTP3Session implements Session, ParserListener { try { - stream.getListener().onResponse(stream, frame); + Stream.Listener listener = stream.getListener(); + if (listener != null) + listener.onResponse(stream, frame); } catch (Throwable x) { @@ -165,7 +170,9 @@ public abstract class HTTP3Session implements Session, ParserListener { try { - stream.getListener().onTrailer(stream, frame); + Stream.Listener listener = stream.getListener(); + if (listener != null) + listener.onTrailer(stream, frame); } catch (Throwable x) { @@ -178,22 +185,16 @@ public abstract class HTTP3Session implements Session, ParserListener { if (LOG.isDebugEnabled()) LOG.debug("received {}#{} on {}", frame, streamId, this); - - // The stream must already exist. - HTTP3Stream stream = getStream(streamId); - // TODO: handle null stream. - - // TODO: implement demand mechanism like in HTTP2Stream - // demand(n) should be on Stream, or on a LongConsumer parameter? - - // TODO: the callback in HTTP2 was only to notify of data consumption for flow control. - // Here, we don't have to do flow control, but what about retain()/release() for the network buffer? - notifyData(stream, frame, Callback.NOOP); } - private void notifyData(HTTP3Stream stream, DataFrame frame, Callback callback) + public void onDataAvailable(long streamId) { - stream.getListener().onData(stream, frame, callback); + if (LOG.isDebugEnabled()) + LOG.debug("notifying data available for stream #{} on {}", streamId, this); + HTTP3Stream stream = getStream(streamId); + Stream.Listener listener = stream.getListener(); + if (listener != null) + listener.onDataAvailable(stream); } @Override diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java index be85e23c363..90b2780c260 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java @@ -16,6 +16,8 @@ package org.eclipse.jetty.http3.internal; import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.DataFrame; +import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.util.Callback; @@ -46,6 +48,45 @@ public class HTTP3Stream implements Stream @Override public CompletableFuture respond(HeadersFrame frame) + { + return writeFrame(frame); + } + + @Override + public CompletableFuture data(DataFrame frame) + { + return writeFrame(frame); + } + + @Override + public Data readData() + { + HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection(); + return connection.readData(); + } + + @Override + public void demand(boolean enable) + { + HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection(); + connection.demand(enable); + } + + @Override + public CompletableFuture trailer(HeadersFrame frame) + { + if (!frame.isLast()) + throw new IllegalArgumentException("invalid trailer frame: property 'last' must be true"); + return writeFrame(frame); + } + + public boolean hasDemand() + { + HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection(); + return connection.hasDemand(); + } + + private Promise.Completable writeFrame(Frame frame) { Promise.Completable completable = new Promise.Completable<>(); session.writeFrame(endPoint.getStreamId(), frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed)); diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java new file mode 100644 index 00000000000..73a9a18a61b --- /dev/null +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java @@ -0,0 +1,348 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.internal; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.DataFrame; +import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.internal.parser.MessageParser; +import org.eclipse.jetty.http3.internal.parser.ParserListener; +import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; +import org.eclipse.jetty.quic.common.QuicStreamEndPoint; +import org.eclipse.jetty.util.thread.AutoLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class HTTP3StreamConnection extends AbstractConnection +{ + private static final Logger LOG = LoggerFactory.getLogger(HTTP3StreamConnection.class); + // An empty DATA frame is the sequence of bytes [0x0, 0x0]. + private static final ByteBuffer EMPTY_DATA_FRAME = ByteBuffer.allocate(2); + + private final AutoLock lock = new AutoLock(); + private final Queue dataFrames = new ArrayDeque<>(); + private final RetainableByteBufferPool buffers; + private final MessageParser parser; + private boolean useInputDirectByteBuffers = true; + private RetainableByteBuffer buffer; + private boolean dataMode; + private boolean dataDemand; + private boolean dataStalled; + private boolean dataLast; + private boolean remotelyClosed; + + public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser) + { + super(endPoint, executor); + this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool); + this.parser = parser; + parser.init(MessageListener::new); + // By default, invoke onDataAvailable() after onRequest()/onResponse(). + this.dataDemand = true; + } + + @Override + public QuicStreamEndPoint getEndPoint() + { + return (QuicStreamEndPoint)super.getEndPoint(); + } + + public boolean isUseInputDirectByteBuffers() + { + return useInputDirectByteBuffers; + } + + public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers) + { + this.useInputDirectByteBuffers = useInputDirectByteBuffers; + } + + @Override + public void onOpen() + { + super.onOpen(); + fillInterested(); + } + + @Override + public void onFillable() + { + if (LOG.isDebugEnabled()) + LOG.debug("processing dataMode={} on {}", dataMode, this); + if (dataMode) + processDataFrames(); + else + processNonDataFrames(); + } + + private void processDataFrames() + { + processDataDemand(); + if (!dataMode) + { + if (buffer.hasRemaining()) + processNonDataFrames(); + else + fillInterested(); + } + } + + private void processNonDataFrames() + { + while (true) + { + if (parseAndFill() == MessageParser.Result.NO_FRAME) + break; + if (dataMode) + { + if (LOG.isDebugEnabled()) + LOG.debug("switching to dataMode=true on {}", this); + if (buffer.hasRemaining()) + processDataFrames(); + else + fillInterested(); + break; + } + } + } + + protected abstract void onDataAvailable(long streamId); + + public Stream.Data readData() + { + if (LOG.isDebugEnabled()) + LOG.debug("reading data on {}", this); + + switch (parseAndFill()) + { + case FRAME: + { + DataFrame frame = dataFrames.poll(); + if (LOG.isDebugEnabled()) + LOG.debug("read data {} on {}", frame, this); + if (frame == null) + return null; + + buffer.retain(); + + CompletableFuture completable = new CompletableFuture<>() + .whenComplete((r, x) -> buffer.release()); + return new Stream.Data(frame, completable); + } + case MODE_SWITCH: + { + if (LOG.isDebugEnabled()) + LOG.debug("switching to dataMode=false on {}", this); + dataLast = true; + dataMode = false; + parser.setDataMode(false); + return null; + } + case NO_FRAME: + { + return null; + } + default: + { + throw new IllegalStateException(); + } + } + } + + public void demand(boolean enable) + { + boolean process = false; + try (AutoLock l = lock.lock()) + { + dataDemand = enable; + if (dataStalled) + { + dataStalled = false; + process = true; + } + } + if (process) + processDataDemand(); + } + + public boolean hasDemand() + { + try (AutoLock l = lock.lock()) + { + return dataDemand; + } + } + + private boolean isStalled() + { + try (AutoLock l = lock.lock()) + { + return dataStalled; + } + } + + private void processDataDemand() + { + while (true) + { + boolean demand; + try (AutoLock l = lock.lock()) + { + if (dataDemand) + { + demand = !dataLast; + } + else + { + dataStalled = true; + demand = false; + } + } + if (LOG.isDebugEnabled()) + LOG.debug("processing demand={} fillInterested={} on {}", demand, isFillInterested(), this); + + // Exit if there is no demand, or there is demand but no data. + if (!demand || isFillInterested()) + return; + + // We have demand, notify the application. + try (AutoLock l = lock.lock()) + { + dataDemand = false; + } + onDataAvailable(getEndPoint().getStreamId()); + } + } + + private MessageParser.Result parseAndFill() + { + try + { + if (LOG.isDebugEnabled()) + LOG.debug("parse+fill on {} with buffer {}", this, buffer); + + if (buffer == null) + buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); + + while (true) + { + ByteBuffer byteBuffer = buffer.getBuffer(); + MessageParser.Result result = parser.parse(byteBuffer); + if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH) + return result; + + if (buffer.isRetained()) + { + buffer.release(); + buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); + byteBuffer = buffer.getBuffer(); + } + + int filled = getEndPoint().fill(byteBuffer); + if (LOG.isDebugEnabled()) + LOG.debug("filled {} on {} with buffer {}", filled, this, buffer); + + if (filled > 0) + continue; + + if (!remotelyClosed && getEndPoint().isStreamFinished()) + { + if (LOG.isDebugEnabled()) + LOG.debug("detected end of stream on {}", this); + parser.parse(EMPTY_DATA_FRAME.slice()); + return MessageParser.Result.FRAME; + } + + if (filled == 0) + { + buffer.release(); + buffer = null; + fillInterested(); + break; + } + else + { + buffer.release(); + buffer = null; + break; + } + } + return MessageParser.Result.NO_FRAME; + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("could not process control stream {}", getEndPoint(), x); + if (buffer != null) + buffer.release(); + buffer = null; + getEndPoint().close(x); + return MessageParser.Result.NO_FRAME; + } + } + + @Override + public String toConnectionString() + { + return String.format("%s[demand=%b,stalled=%b,dataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), dataMode); + } + + private class MessageListener extends ParserListener.Wrapper + { + private MessageListener(ParserListener listener) + { + super(listener); + } + + @Override + public void onHeaders(long streamId, HeadersFrame frame) + { + remotelyClosed = frame.isLast(); + MetaData metaData = frame.getMetaData(); + if (metaData.isRequest() || metaData.isResponse()) + { + // Expect DATA frames now. + dataMode = true; + parser.setDataMode(true); + } + else + { + // Trailer. + remotelyClosed = true; + if (!frame.isLast()) + frame = new HeadersFrame(metaData, true); + } + super.onHeaders(streamId, frame); + } + + @Override + public void onData(long streamId, DataFrame frame) + { + remotelyClosed = frame.isLast(); + dataLast = frame.isLast(); + dataFrames.offer(frame); + super.onData(streamId, frame); + } + } +} diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionStreamConnection.java similarity index 84% rename from jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionConnection.java rename to jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionStreamConnection.java index 9d398527a55..dc572cefa63 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionStreamConnection.java @@ -25,14 +25,14 @@ import org.eclipse.jetty.util.BufferUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class InstructionConnection extends AbstractConnection implements Connection.UpgradeTo +public abstract class InstructionStreamConnection extends AbstractConnection implements Connection.UpgradeTo { - private static final Logger LOG = LoggerFactory.getLogger(DecoderConnection.class); + private static final Logger LOG = LoggerFactory.getLogger(InstructionStreamConnection.class); private final ByteBufferPool byteBufferPool; private boolean useInputDirectByteBuffers = true; private ByteBuffer buffer; - public InstructionConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool) + public InstructionStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool) { super(endPoint, executor); this.byteBufferPool = byteBufferPool; @@ -83,8 +83,8 @@ public abstract class InstructionConnection extends AbstractConnection implement // Then read from the EndPoint. int filled = getEndPoint().fill(buffer); - if (InstructionConnection.LOG.isDebugEnabled()) - InstructionConnection.LOG.debug("filled {} on {}", filled, this); + if (LOG.isDebugEnabled()) + LOG.debug("filled {} on {}", filled, this); if (filled == 0) { @@ -104,8 +104,8 @@ public abstract class InstructionConnection extends AbstractConnection implement } catch (Throwable x) { - if (InstructionConnection.LOG.isDebugEnabled()) - InstructionConnection.LOG.debug("could not process decoder stream {}", getEndPoint(), x); + if (LOG.isDebugEnabled()) + LOG.debug("could not process decoder stream {}", getEndPoint(), x); byteBufferPool.release(buffer); buffer = null; getEndPoint().close(x); diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java index 0e44e35c21c..61ca2cd9476 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java @@ -125,10 +125,10 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement { switch (streamType) { - case ControlConnection.STREAM_TYPE: + case ControlStreamConnection.STREAM_TYPE: { ControlParser parser = new ControlParser(listener); - ControlConnection newConnection = new ControlConnection(getEndPoint(), getExecutor(), byteBufferPool, parser); + ControlStreamConnection newConnection = new ControlStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, parser); newConnection.setInputBufferSize(getInputBufferSize()); newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers()); if (LOG.isDebugEnabled()) @@ -136,9 +136,9 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement getEndPoint().upgrade(newConnection); break; } - case EncoderConnection.STREAM_TYPE: + case EncoderStreamConnection.STREAM_TYPE: { - EncoderConnection newConnection = new EncoderConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder); + EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder); newConnection.setInputBufferSize(getInputBufferSize()); newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers()); if (LOG.isDebugEnabled()) @@ -146,9 +146,9 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement getEndPoint().upgrade(newConnection); break; } - case DecoderConnection.STREAM_TYPE: + case DecoderStreamConnection.STREAM_TYPE: { - DecoderConnection newConnection = new DecoderConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder); + DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder); newConnection.setInputBufferSize(getInputBufferSize()); newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers()); if (LOG.isDebugEnabled()) diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/BodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/BodyParser.java index 37bcebaa095..bd2a3b442ec 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/BodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/BodyParser.java @@ -60,7 +60,7 @@ public abstract class BodyParser * @return true if all the frame body bytes were parsed; * false if not enough frame body bytes were present in the buffer */ - public abstract boolean parse(ByteBuffer buffer); + public abstract Result parse(ByteBuffer buffer); protected void emptyBody(ByteBuffer buffer) { @@ -108,4 +108,9 @@ public abstract class BodyParser LOG.info("failure while notifying listener {}", listener, x); } } + + public enum Result + { + NO_FRAME, FRAGMENT_FRAME, WHOLE_FRAME + } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/CancelPushBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/CancelPushBodyParser.java index 3154afc0342..c6abc3a6b69 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/CancelPushBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/CancelPushBodyParser.java @@ -23,7 +23,7 @@ public class CancelPushBodyParser extends BodyParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { throw new UnsupportedOperationException(); } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ControlParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ControlParser.java index 7e6260a1e5b..cbc38f73809 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ControlParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ControlParser.java @@ -84,24 +84,31 @@ public class ControlParser // TODO: enforce only control frames, but ignore unknown. if (LOG.isDebugEnabled()) LOG.debug("ignoring unknown frame type {}", Integer.toHexString(frameType)); - if (!unknownBodyParser.parse(buffer)) + BodyParser.Result result = unknownBodyParser.parse(buffer); + if (result == BodyParser.Result.NO_FRAME) return; - reset(); + if (result == BodyParser.Result.WHOLE_FRAME) + reset(); } else { if (headerParser.getFrameLength() == 0) { bodyParser.emptyBody(buffer); + if (LOG.isDebugEnabled()) + LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), buffer); + reset(); } else { - if (!bodyParser.parse(buffer)) + BodyParser.Result result = bodyParser.parse(buffer); + if (result == BodyParser.Result.NO_FRAME) return; + if (LOG.isDebugEnabled()) + LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), buffer); + if (result == BodyParser.Result.WHOLE_FRAME) + reset(); } - if (LOG.isDebugEnabled()) - LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), buffer); - reset(); } break; } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java index 3ec4c5df0f6..53b949b5658 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java @@ -50,7 +50,7 @@ public class DataBodyParser extends BodyParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { while (buffer.hasRemaining()) { @@ -77,13 +77,13 @@ public class DataBodyParser extends BodyParser { reset(); onData(slice, false); - return true; + return Result.WHOLE_FRAME; } else { // We got partial data, simulate a smaller frame, and stay in DATA state. onData(slice, true); - break; + return Result.FRAGMENT_FRAME; } } default: @@ -92,7 +92,7 @@ public class DataBodyParser extends BodyParser } } } - return false; + return Result.NO_FRAME; } private void onData(ByteBuffer buffer, boolean fragment) diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/GoAwayBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/GoAwayBodyParser.java index 2ca99ed55e5..8b40b42f21f 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/GoAwayBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/GoAwayBodyParser.java @@ -23,7 +23,7 @@ public class GoAwayBodyParser extends BodyParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { throw new UnsupportedOperationException(); } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeadersBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeadersBodyParser.java index faf50706ed5..635ab0ab6ae 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeadersBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeadersBodyParser.java @@ -53,7 +53,7 @@ public class HeadersBodyParser extends BodyParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { while (buffer.hasRemaining()) { @@ -74,7 +74,7 @@ public class HeadersBodyParser extends BodyParser length -= remaining; ByteBuffer copy = BufferUtil.copy(buffer); byteBuffers.add(copy); - return false; + return Result.NO_FRAME; } else { @@ -99,7 +99,7 @@ public class HeadersBodyParser extends BodyParser byteBuffers.clear(); } - return decode(encoded); + return decode(encoded) ? Result.WHOLE_FRAME : Result.NO_FRAME; } } default: @@ -108,7 +108,7 @@ public class HeadersBodyParser extends BodyParser } } } - return false; + return Result.NO_FRAME; } private boolean decode(ByteBuffer encoded) diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MaxPushIdBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MaxPushIdBodyParser.java index 0ccb5c7ded5..20594c138a5 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MaxPushIdBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MaxPushIdBodyParser.java @@ -23,7 +23,7 @@ public class MaxPushIdBodyParser extends BodyParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { throw new UnsupportedOperationException(); } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java index 10b47fadd9e..37da4ff9147 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java @@ -15,10 +15,12 @@ package org.eclipse.jetty.http3.internal.parser; import java.nio.ByteBuffer; import java.util.function.BooleanSupplier; +import java.util.function.UnaryOperator; import org.eclipse.jetty.http3.ErrorCode; import org.eclipse.jetty.http3.frames.FrameType; import org.eclipse.jetty.http3.qpack.QpackDecoder; +import org.eclipse.jetty.util.BufferUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,14 +33,27 @@ public class MessageParser { private static final Logger LOG = LoggerFactory.getLogger(MessageParser.class); - private final HeaderParser headerParser; + private final HeaderParser headerParser = new HeaderParser(); private final BodyParser[] bodyParsers = new BodyParser[FrameType.maxType() + 1]; - private final BodyParser unknownBodyParser; + private final ParserListener listener; + private final QpackDecoder decoder; + private final long streamId; + private final BooleanSupplier isLast; + private BodyParser unknownBodyParser; private State state = State.HEADER; + protected boolean dataMode; public MessageParser(ParserListener listener, QpackDecoder decoder, long streamId, BooleanSupplier isLast) { - this.headerParser = new HeaderParser(); + this.listener = listener; + this.decoder = decoder; + this.streamId = streamId; + this.isLast = isLast; + } + + public void init(UnaryOperator wrapper) + { + ParserListener listener = wrapper.apply(this.listener); this.bodyParsers[FrameType.DATA.type()] = new DataBodyParser(headerParser, listener, streamId, isLast); this.bodyParsers[FrameType.HEADERS.type()] = new HeadersBodyParser(headerParser, listener, decoder, streamId, isLast); this.bodyParsers[FrameType.PUSH_PROMISE.type()] = new PushPromiseBodyParser(headerParser, listener); @@ -51,12 +66,20 @@ public class MessageParser state = State.HEADER; } + public void setDataMode(boolean enable) + { + this.dataMode = enable; + } + /** *

Parses the given {@code buffer} bytes and emit events to a {@link ParserListener}.

+ *

Only the bytes of one frame are consumed, therefore when this method returns, + * the buffer may contain unconsumed bytes, for example for other frames.

* * @param buffer the buffer to parse + * @return the result of the parsing */ - public void parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { try { @@ -69,9 +92,12 @@ public class MessageParser if (headerParser.parse(buffer)) { state = State.BODY; + // If we are in data mode, but we did not parse a DATA frame, bail out. + if (dataMode && headerParser.getFrameType() != FrameType.DATA.type()) + return Result.MODE_SWITCH; break; } - return; + return Result.NO_FRAME; } case BODY: { @@ -83,27 +109,37 @@ public class MessageParser if (bodyParser == null) { // Unknown frame types must be ignored. + BodyParser.Result result = unknownBodyParser.parse(buffer); + if (result == BodyParser.Result.NO_FRAME) + return Result.NO_FRAME; if (LOG.isDebugEnabled()) - LOG.debug("Ignoring unknown frame type {}", Integer.toHexString(frameType)); - if (!unknownBodyParser.parse(buffer)) - return; + LOG.debug("Parsed unknown frame body for type {}", Integer.toHexString(frameType)); + if (result == BodyParser.Result.WHOLE_FRAME) + reset(); + break; } else { if (headerParser.getFrameLength() == 0) { bodyParser.emptyBody(buffer); + if (LOG.isDebugEnabled()) + LOG.debug("Parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); + reset(); + return Result.FRAME; } else { - if (!bodyParser.parse(buffer)) - return; + BodyParser.Result result = bodyParser.parse(buffer); + if (result == BodyParser.Result.NO_FRAME) + return Result.NO_FRAME; + if (LOG.isDebugEnabled()) + LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); + if (result == BodyParser.Result.WHOLE_FRAME) + reset(); + return Result.FRAME; } - if (LOG.isDebugEnabled()) - LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), buffer); } - reset(); - break; } default: { @@ -118,6 +154,7 @@ public class MessageParser LOG.debug("parse failed", x); buffer.clear(); connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error"); + return Result.NO_FRAME; } } @@ -126,6 +163,11 @@ public class MessageParser unknownBodyParser.sessionFailure(buffer, error, reason); } + public enum Result + { + NO_FRAME, FRAME, MODE_SWITCH + } + private enum State { HEADER, BODY diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ParserListener.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ParserListener.java index 43f7d27d40b..55de61ef460 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ParserListener.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ParserListener.java @@ -38,4 +38,44 @@ public interface ParserListener public default void onSessionFailure(int error, String reason) { } + + public static class Wrapper implements ParserListener + { + protected final ParserListener listener; + + public Wrapper(ParserListener listener) + { + this.listener = listener; + } + + @Override + public void onHeaders(long streamId, HeadersFrame frame) + { + listener.onHeaders(streamId, frame); + } + + @Override + public void onData(long streamId, DataFrame frame) + { + listener.onData(streamId, frame); + } + + @Override + public void onSettings(SettingsFrame frame) + { + listener.onSettings(frame); + } + + @Override + public void onStreamFailure(long streamId, int error, String reason) + { + listener.onStreamFailure(streamId, error, reason); + } + + @Override + public void onSessionFailure(int error, String reason) + { + listener.onSessionFailure(error, reason); + } + } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/PushPromiseBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/PushPromiseBodyParser.java index 9d8dd6181f5..e3463b0a5d9 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/PushPromiseBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/PushPromiseBodyParser.java @@ -23,7 +23,7 @@ public class PushPromiseBodyParser extends BodyParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { throw new UnsupportedOperationException(); } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/SettingsBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/SettingsBodyParser.java index 673723a0e14..f5d03465bb8 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/SettingsBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/SettingsBodyParser.java @@ -50,7 +50,7 @@ public class SettingsBodyParser extends BodyParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { while (buffer.hasRemaining()) { @@ -74,12 +74,12 @@ public class SettingsBodyParser extends BodyParser if (settings.containsKey(key)) { sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate"); - return true; + return Result.NO_FRAME; } if (SettingsFrame.isReserved(key)) { sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_reserved"); - return true; + return Result.NO_FRAME; } if (length > 0) { @@ -88,11 +88,11 @@ public class SettingsBodyParser extends BodyParser else { sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format"); - return true; + return Result.NO_FRAME; } break; } - return false; + return Result.NO_FRAME; } case VALUE: { @@ -112,16 +112,16 @@ public class SettingsBodyParser extends BodyParser Map settings = this.settings; reset(); onSettings(settings); - return true; + return Result.WHOLE_FRAME; } else { sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format"); - return true; + return Result.NO_FRAME; } break; } - return false; + return Result.NO_FRAME; } default: { @@ -129,7 +129,7 @@ public class SettingsBodyParser extends BodyParser } } } - return false; + return Result.NO_FRAME; } private void onSettings(Map settings) diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/UnknownBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/UnknownBodyParser.java index a84e5cc74dc..6609c964d43 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/UnknownBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/UnknownBodyParser.java @@ -23,7 +23,7 @@ public class UnknownBodyParser extends BodyParser } @Override - public boolean parse(ByteBuffer buffer) + public Result parse(ByteBuffer buffer) { throw new UnsupportedOperationException(); } diff --git a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java index a2f2b89aadc..a0d38dff54e 100644 --- a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java +++ b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java @@ -17,6 +17,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.function.UnaryOperator; import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.internal.generator.MessageGenerator; @@ -65,6 +66,7 @@ public class DataGenerateParseTest frames.add(frame); } }, null, 13, () -> true); + parser.init(UnaryOperator.identity()); for (ByteBuffer buffer : lease.getByteBuffers()) { parser.parse(buffer); diff --git a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/HeadersGenerateParseTest.java b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/HeadersGenerateParseTest.java index d7bb091ca50..bf666ce3451 100644 --- a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/HeadersGenerateParseTest.java +++ b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/HeadersGenerateParseTest.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.internal; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.function.UnaryOperator; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; @@ -60,6 +61,7 @@ public class HeadersGenerateParseTest frames.add(frame); } }, decoder, 13, () -> true); + parser.init(UnaryOperator.identity()); for (ByteBuffer buffer : lease.getByteBuffers()) { parser.parse(buffer); diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/AbstractHTTP3ServerConnectionFactory.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/AbstractHTTP3ServerConnectionFactory.java index 096c7326743..4e318b2998d 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/AbstractHTTP3ServerConnectionFactory.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/AbstractHTTP3ServerConnectionFactory.java @@ -17,9 +17,9 @@ import java.util.Map; import java.util.Objects; import org.eclipse.jetty.http3.api.Session; -import org.eclipse.jetty.http3.internal.HTTP3Connection; import org.eclipse.jetty.http3.internal.parser.MessageParser; import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session; +import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.quic.common.ProtocolSession; @@ -98,7 +98,6 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne long streamId = streamEndPoint.getStreamId(); ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession(); MessageParser parser = new MessageParser(http3Session.getSessionServer(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished); - HTTP3Connection connection = new HTTP3Connection(streamEndPoint, connector.getExecutor(), connector.getByteBufferPool(), parser); - return connection; + return new ServerHTTP3StreamConnection(streamEndPoint, http3Session, parser); } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index 39ef82cebea..b22b4d3777f 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -19,8 +19,8 @@ import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.ControlFlusher; -import org.eclipse.jetty.http3.internal.DecoderConnection; -import org.eclipse.jetty.http3.internal.EncoderConnection; +import org.eclipse.jetty.http3.internal.DecoderStreamConnection; +import org.eclipse.jetty.http3.internal.EncoderStreamConnection; import org.eclipse.jetty.http3.internal.HTTP3Flusher; import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionHandler; @@ -59,14 +59,14 @@ public class ServerHTTP3Session extends ServerProtocolSession long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); QuicStreamEndPoint encoderEndPoint = configureEncoderEndPoint(encoderStreamId); - this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderConnection.STREAM_TYPE); + this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE); this.encoder = new QpackEncoder(new InstructionHandler(encoderFlusher), maxBlockedStreams); if (LOG.isDebugEnabled()) LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint); long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); QuicStreamEndPoint decoderEndPoint = configureDecoderEndPoint(decoderStreamId); - this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderConnection.STREAM_TYPE); + this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE); this.decoder = new QpackDecoder(new InstructionHandler(decoderFlusher), maxRequestHeadersSize); if (LOG.isDebugEnabled()) LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint); @@ -125,22 +125,22 @@ public class ServerHTTP3Session extends ServerProtocolSession } @Override - protected void onReadable(long readableStreamId) + protected boolean onReadable(long readableStreamId) { StreamType streamType = StreamType.from(readableStreamId); if (streamType == StreamType.CLIENT_BIDIRECTIONAL) { if (LOG.isDebugEnabled()) - LOG.debug("stream #{} selected for read", readableStreamId); - super.onReadable(readableStreamId); + LOG.debug("bidirectional stream #{} selected for read", readableStreamId); + return super.onReadable(readableStreamId); } else { // On the server, we need a get-or-create semantic in case of reads. QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); if (LOG.isDebugEnabled()) - LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); - streamEndPoint.onReadable(); + LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); + return streamEndPoint.onReadable(); } } @@ -158,4 +158,9 @@ public class ServerHTTP3Session extends ServerProtocolSession messageFlusher.offer(endPoint, frame, callback); messageFlusher.iterate(); } + + protected void onDataAvailable(long streamId) + { + apiSession.onDataAvailable(streamId); + } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java new file mode 100644 index 00000000000..93b1b7afe92 --- /dev/null +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java @@ -0,0 +1,35 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.server.internal; + +import org.eclipse.jetty.http3.internal.HTTP3StreamConnection; +import org.eclipse.jetty.http3.internal.parser.MessageParser; +import org.eclipse.jetty.quic.common.QuicStreamEndPoint; + +public class ServerHTTP3StreamConnection extends HTTP3StreamConnection +{ + private final ServerHTTP3Session http3Session; + + public ServerHTTP3StreamConnection(QuicStreamEndPoint endPoint, ServerHTTP3Session http3Session, MessageParser parser) + { + super(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser); + this.http3Session = http3Session; + } + + @Override + protected void onDataAvailable(long streamId) + { + http3Session.onDataAvailable(streamId); + } +} diff --git a/jetty-http3/http3-tests/pom.xml b/jetty-http3/http3-tests/pom.xml index 62d12503c8c..87152aba90b 100644 --- a/jetty-http3/http3-tests/pom.xml +++ b/jetty-http3/http3-tests/pom.xml @@ -29,6 +29,11 @@ ${project.version} test + + org.awaitility + awaitility + test + org.junit.jupiter junit-jupiter diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/AbstractHTTP3ClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/AbstractHTTP3ClientServerTest.java new file mode 100644 index 00000000000..a85ddd90370 --- /dev/null +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/AbstractHTTP3ClientServerTest.java @@ -0,0 +1,62 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.tests; + +import org.eclipse.jetty.http3.api.Session; +import org.eclipse.jetty.http3.client.HTTP3Client; +import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory; +import org.eclipse.jetty.quic.server.ServerQuicConnector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.BeforeTestExecutionCallback; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class AbstractHTTP3ClientServerTest +{ + @RegisterExtension + final BeforeTestExecutionCallback printMethodName = context -> + System.err.printf("Running %s.%s() %s%n", context.getRequiredTestClass().getSimpleName(), context.getRequiredTestMethod().getName(), context.getDisplayName()); + protected ServerQuicConnector connector; + protected HTTP3Client client; + protected Server server; + + protected void startServer(Session.Server.Listener listener) throws Exception + { + SslContextFactory.Server sslContextFactory = new SslContextFactory.Server(); + sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12"); + sslContextFactory.setKeyStorePassword("storepwd"); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); + connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(listener)); + server.addConnector(connector); + server.start(); + } + + protected void startClient() throws Exception + { + client = new HTTP3Client(); + client.start(); + } + + @AfterEach + public void dispose() + { + LifeCycle.stop(client); + LifeCycle.stop(server); + } +} diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java index 8146c843eef..116e9168018 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java @@ -14,9 +14,12 @@ package org.eclipse.jetty.http3.tests; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.Map; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; @@ -26,53 +29,17 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Stream; -import org.eclipse.jetty.http3.client.HTTP3Client; +import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.SettingsFrame; -import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory; -import org.eclipse.jetty.quic.server.ServerQuicConnector; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class HTTP3ClientServerTest +public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest { - private Server server; - private ServerQuicConnector connector; - private HTTP3Client client; - - private void startServer(Session.Server.Listener listener) throws Exception - { - SslContextFactory.Server sslContextFactory = new SslContextFactory.Server(); - sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12"); - sslContextFactory.setKeyStorePassword("storepwd"); - QueuedThreadPool serverThreads = new QueuedThreadPool(); - serverThreads.setName("server"); - server = new Server(serverThreads); - connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(listener)); - server.addConnector(connector); - server.start(); - } - - private void startClient() throws Exception - { - client = new HTTP3Client(); - client.start(); - } - - @AfterEach - public void dispose() - { - LifeCycle.stop(client); - LifeCycle.stop(server); - } - @Test public void testConnectTriggersSettingsFrame() throws Exception { @@ -152,7 +119,155 @@ public class HTTP3ClientServerTest .get(5, TimeUnit.SECONDS); assertNotNull(stream); - assertTrue(serverRequestLatch.await(555, TimeUnit.SECONDS)); - assertTrue(clientResponseLatch.await(555, TimeUnit.SECONDS)); + assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testDiscardRequestContent() throws Exception + { + AtomicReference serverLatch = new AtomicReference<>(new CountDownLatch(1)); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + // Send the response. + stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false)); + // Implicit demand, so onDataAvailable() will be called. + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + // FlowControl acknowledged already. + Stream.Data data = stream.readData(); + if (data == null) + { + // Call me again when you have data. + stream.demand(true); + return; + } + // Recycle the ByteBuffer in data.frame. + data.succeed(); + // Call me again immediately. + stream.demand(true); + if (data.frame().isLast()) + serverLatch.get().countDown(); + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + AtomicReference clientLatch = new AtomicReference<>(new CountDownLatch(1)); + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.POST.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame frame = new HeadersFrame(metaData, false); + Stream.Listener streamListener = new Stream.Listener() + { + @Override + public void onResponse(Stream stream, HeadersFrame frame) + { + clientLatch.get().countDown(); + } + }; + Stream stream1 = session.newRequest(frame, streamListener) + .get(5, TimeUnit.SECONDS); + stream1.data(new DataFrame(ByteBuffer.allocate(8192), true)); + + assertTrue(clientLatch.get().await(5, TimeUnit.SECONDS)); + assertTrue(serverLatch.get().await(5, TimeUnit.SECONDS)); + + // Send another request, but with 2 chunks of data separated by some time. + serverLatch.set(new CountDownLatch(1)); + clientLatch.set(new CountDownLatch(1)); + Stream stream2 = session.newRequest(frame, streamListener).get(5, TimeUnit.SECONDS); + stream2.data(new DataFrame(ByteBuffer.allocate(3 * 1024), false)); + // Wait some time before sending the second chunk. + Thread.sleep(500); + stream2.data(new DataFrame(ByteBuffer.allocate(5 * 1024), true)); + + assertTrue(clientLatch.get().await(555, TimeUnit.SECONDS)); + assertTrue(serverLatch.get().await(555, TimeUnit.SECONDS)); + } + + @Test + public void testEchoRequestContentAsResponseContent() throws Exception + { + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + // Send the response headers. + stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false)); + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + // Read data. + Stream.Data data = stream.readData(); + if (data == null) + { + stream.demand(true); + return; + } + // Echo it back, then demand only when the write is finished. + stream.data(data.frame()) + .whenComplete(data::completeAndDemand); + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + CountDownLatch clientResponseLatch = new CountDownLatch(1); + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame frame = new HeadersFrame(metaData, false); + byte[] bytesSent = new byte[8192]; + new Random().nextBytes(bytesSent); + byte[] bytesReceived = new byte[bytesSent.length]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytesReceived); + CountDownLatch clientDataLatch = new CountDownLatch(1); + Stream stream = session.newRequest(frame, new Stream.Listener() + { + @Override + public void onResponse(Stream stream, HeadersFrame frame) + { + clientResponseLatch.countDown(); + } + + @Override + public void onDataAvailable(Stream stream) + { + // Read data. + Stream.Data data = stream.readData(); + if (data != null) + { + // Consume data. + byteBuffer.put(data.frame().getData()); + data.callback().complete(null); + if (data.frame().isLast()) + clientDataLatch.countDown(); + } + // Demand more data. + stream.demand(true); + } + }) + .get(5, TimeUnit.SECONDS); + stream.data(new DataFrame(ByteBuffer.wrap(bytesSent), true)); + + assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientDataLatch.await(5, TimeUnit.SECONDS)); + assertArrayEquals(bytesSent, bytesReceived); } } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java new file mode 100644 index 00000000000..0f9860f01ed --- /dev/null +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java @@ -0,0 +1,463 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.tests; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpURI; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http3.api.Session; +import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.DataFrame; +import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.internal.HTTP3Stream; +import org.eclipse.jetty.util.BufferUtil; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest +{ + @Test + public void testOnDataAvailableThenExit() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverStreamLatch = new CountDownLatch(1); + CountDownLatch serverDataLatch = new CountDownLatch(1); + AtomicLong onDataAvailableCalls = new AtomicLong(); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + onDataAvailableCalls.incrementAndGet(); + if (serverStreamRef.compareAndSet(null, stream)) + { + // Do nothing on the first pass, with respect to demand and reading data. + serverStreamLatch.countDown(); + } + else + { + // When resumed, demand all content until the last. + Stream.Data data = stream.readData(); + if (data != null && data.frame().isLast()) + serverDataLatch.countDown(); + else + stream.demand(true); + } + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame request = new HeadersFrame(metaData, false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + stream.data(new DataFrame(ByteBuffer.allocate(8192), true)); + + assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS)); + // Wait a little to be sure we do not spin. + Thread.sleep(500); + assertEquals(1, onDataAvailableCalls.get()); + + // Resume processing of data, this should call onDataAvailable(). + serverStreamRef.get().demand(true); + + assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testOnDataAvailableThenReadDataThenExit() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverStreamLatch = new CountDownLatch(1); + CountDownLatch serverDataLatch = new CountDownLatch(1); + AtomicLong onDataAvailableCalls = new AtomicLong(); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + onDataAvailableCalls.incrementAndGet(); + if (serverStreamRef.compareAndSet(null, stream)) + { + serverStreamLatch.countDown(); + // Read only one chunk of data. + Stream.Data data = stream.readData(); + assertNotNull(data); + // Don't demand, just exit. + } + else + { + // When resumed, demand all content until the last. + Stream.Data data = stream.readData(); + if (data != null && data.frame().isLast()) + serverDataLatch.countDown(); + else + stream.demand(true); + } + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame request = new HeadersFrame(metaData, false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + stream.data(new DataFrame(ByteBuffer.allocate(16), false)); + + assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS)); + // Wait a little to be sure we do not spin. + Thread.sleep(500); + assertEquals(1, onDataAvailableCalls.get()); + + // Resume processing of data, this should call onDataAvailable(), but there is no data to read yet. + serverStreamRef.get().demand(true); + + await().atMost(1, TimeUnit.SECONDS).until(() -> onDataAvailableCalls.get() == 2 && ((HTTP3Stream)stream).hasDemand()); + + stream.data(new DataFrame(ByteBuffer.allocate(32), true)); + + assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testOnDataAvailableThenReadDataNullThenExit() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverStreamLatch = new CountDownLatch(1); + CountDownLatch serverDataLatch = new CountDownLatch(1); + AtomicLong onDataAvailableCalls = new AtomicLong(); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + onDataAvailableCalls.incrementAndGet(); + if (serverStreamRef.compareAndSet(null, stream)) + { + while (true) + { + Stream.Data data = stream.readData(); + if (data == null) + { + serverStreamLatch.countDown(); + break; + } + } + // Do not demand after reading null data. + } + else + { + // When resumed, demand all content until the last. + Stream.Data data = stream.readData(); + if (data != null && data.frame().isLast()) + serverDataLatch.countDown(); + else + stream.demand(true); + } + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame request = new HeadersFrame(metaData, false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + stream.data(new DataFrame(ByteBuffer.allocate(16), false)); + + assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS)); + // Wait a little to be sure we do not spin. + Thread.sleep(500); + assertEquals(1, onDataAvailableCalls.get()); + + // Send a last empty frame. + stream.data(new DataFrame(BufferUtil.EMPTY_BUFFER, true)); + + // Resume processing of data, this should call onDataAvailable(). + serverStreamRef.get().demand(true); + + assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testHeadersNoDataThenTrailers() throws Exception + { + CountDownLatch serverDataLatch = new CountDownLatch(1); + CountDownLatch serverTrailerLatch = new CountDownLatch(1); + AtomicLong onDataAvailableCalls = new AtomicLong(); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + onDataAvailableCalls.incrementAndGet(); + // Must read to EOF to trigger fill+parse of the trailer. + Stream.Data data = stream.readData(); + assertNull(data); + // It's typical to demand after null data. + stream.demand(true); + serverDataLatch.countDown(); + } + + @Override + public void onTrailer(Stream stream, HeadersFrame frame) + { + serverTrailerLatch.countDown(); + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame request = new HeadersFrame(metaData, false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + stream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true)).get(5, TimeUnit.SECONDS); + + assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); + // Wait a little to be sure we do not spin. + Thread.sleep(500); + assertEquals(1, onDataAvailableCalls.get()); + + assertTrue(serverTrailerLatch.await(5, TimeUnit.SECONDS)); + assertEquals(1, onDataAvailableCalls.get()); + } + + @Test + public void testHeadersDataTrailers() throws Exception + { + int dataLength = 8192; + AtomicInteger dataRead = new AtomicInteger(); + CountDownLatch serverDataLatch = new CountDownLatch(1); + CountDownLatch serverTrailerLatch = new CountDownLatch(1); + AtomicLong onDataAvailableCalls = new AtomicLong(); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + onDataAvailableCalls.incrementAndGet(); + Stream.Data data = stream.readData(); + if (data != null) + { + if (dataRead.addAndGet(data.frame().getData().remaining()) == dataLength) + serverDataLatch.countDown(); + } + stream.demand(true); + } + + @Override + public void onTrailer(Stream stream, HeadersFrame frame) + { + serverTrailerLatch.countDown(); + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame request = new HeadersFrame(metaData, false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + + stream.data(new DataFrame(ByteBuffer.allocate(dataLength), false)); + + assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); + long calls = onDataAvailableCalls.get(); + + stream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true)).get(5, TimeUnit.SECONDS); + + assertTrue(serverTrailerLatch.await(5, TimeUnit.SECONDS)); + // In order to detect that the trailer have arrived, we must call + // onDataAvailable() one more time, possibly two more times if an + // empty DATA frame was delivered to indicate the end of the stream. + assertThat(onDataAvailableCalls.get(), Matchers.lessThanOrEqualTo(calls + 2)); + } + + @Test + public void testRetainRelease() throws Exception + { + CountDownLatch serverDataLatch = new CountDownLatch(1); + List datas = new ArrayList<>(); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + while (true) + { + Stream.Data data = stream.readData(); + if (data == null) + { + stream.demand(true); + return; + } + // Store the Data away to be used later. + datas.add(data); + if (data.frame().isLast()) + serverDataLatch.countDown(); + } + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame request = new HeadersFrame(metaData, false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + + byte[] bytesSent = new byte[16384]; + new Random().nextBytes(bytesSent); + stream.data(new DataFrame(ByteBuffer.wrap(bytesSent), true)); + + assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); + + assertEquals(bytesSent.length, datas.stream().mapToInt(d -> d.frame().getData().remaining()).sum()); + byte[] bytesReceived = new byte[bytesSent.length]; + ByteBuffer buffer = ByteBuffer.wrap(bytesReceived); + datas.forEach(d -> buffer.put(d.frame().getData())); + assertArrayEquals(bytesSent, bytesReceived); + } + + @Test + public void testDisableDemandOnRequest() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverRequestLatch = new CountDownLatch(1); + CountDownLatch serverDataLatch = new CountDownLatch(1); + AtomicLong onDataAvailableCalls = new AtomicLong(); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + serverRequestLatch.countDown(); + stream.demand(false); + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + onDataAvailableCalls.incrementAndGet(); + Stream.Data data = stream.readData(); + if (data != null && data.frame().isLast()) + serverDataLatch.countDown(); + stream.demand(true); + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame request = new HeadersFrame(metaData, false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + + stream.data(new DataFrame(ByteBuffer.allocate(4096), true)); + + assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS)); + + // Wait a little to verify that onDataAvailable() is not called. + Thread.sleep(500); + assertEquals(0, onDataAvailableCalls.get()); + + // Resume processing of data. + serverStreamRef.get().demand(true); + + assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java index c7c07bc94ed..150b9e25e91 100644 --- a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java +++ b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java @@ -44,13 +44,14 @@ public class ClientProtocolSession extends ProtocolSession } @Override - protected void onReadable(long readableStreamId) + protected boolean onReadable(long readableStreamId) { // On the client, we need a get-only semantic in case of reads. QuicStreamEndPoint streamEndPoint = getStreamEndPoint(readableStreamId); if (LOG.isDebugEnabled()) LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); if (streamEndPoint != null) - streamEndPoint.onReadable(); + return streamEndPoint.onReadable(); + return false; } } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java index 9c5125e15da..3c423109ed9 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java @@ -93,18 +93,19 @@ public abstract class ProtocolSession List readableStreamIds = session.getReadableStreamIds(); if (LOG.isDebugEnabled()) LOG.debug("readable stream ids: {}", readableStreamIds); - readableStreamIds.forEach(this::onReadable); // TODO: ExecutionStrategy plug-in point is here. - // this::onReadable() just feeds the decoder and the instruction streams. + // this.onReadable() just feeds the decoder and the instruction streams. // Note that req/rsp streams never eat DATA frame, it's a noop because they pull data // when they want to read data frames, either via Stream.readData() or ServletInputStream.read(). // Then here we ask decoder for tasks, and have the ExecutionStrategy process them. - return !readableStreamIds.isEmpty(); + return readableStreamIds.stream() + .map(this::onReadable) + .reduce(false, (result, interested) -> result || interested); } - protected abstract void onReadable(long readableStreamId); + protected abstract boolean onReadable(long readableStreamId); public void configureProtocolEndPoint(QuicStreamEndPoint endPoint) { diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java index 58698b682d8..aefd321328e 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java @@ -125,7 +125,7 @@ public class QuicStreamEndPoint extends AbstractEndPoint public int fill(ByteBuffer buffer) throws IOException { if (LOG.isDebugEnabled()) - LOG.debug("filling buffer from stream {}", streamId); + LOG.debug("filling buffer from stream {} finished={}", streamId, isStreamFinished()); int pos = BufferUtil.flipToFill(buffer); int drained = session.fill(streamId, buffer); BufferUtil.flipToFlush(buffer, pos); @@ -135,6 +135,8 @@ public class QuicStreamEndPoint extends AbstractEndPoint @Override public boolean flush(ByteBuffer... buffers) throws IOException { + // TODO: session.flush(streamId, buffer) feeds Quiche and then calls flush(). + // Can we call flush() only after the for loop below? if (LOG.isDebugEnabled()) LOG.debug("flushing {} buffer(s) to stream {}", buffers.length, streamId); for (ByteBuffer buffer : buffers) @@ -188,11 +190,12 @@ public class QuicStreamEndPoint extends AbstractEndPoint getWriteFlusher().completeWrite(); } - public void onReadable() + public boolean onReadable() { if (LOG.isDebugEnabled()) LOG.debug("stream {} is readable", streamId); getFillInterest().fillable(); + return isFillInterested(); } @Override diff --git a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java index 4aa9df1d17a..c79cb3154fe 100644 --- a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java +++ b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java @@ -586,12 +586,17 @@ public class QuicheConnection } public int feedClearTextForStream(long streamId, ByteBuffer buffer) throws IOException + { + return feedClearTextForStream(streamId, buffer, false); + } + + public int feedClearTextForStream(long streamId, ByteBuffer buffer, boolean last) throws IOException { try (AutoLock ignore = lock.lock()) { if (quicheConn == null) throw new IOException("Quiche connection was released"); - int written = LibQuiche.INSTANCE.quiche_conn_stream_send(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), false).intValue(); + int written = LibQuiche.INSTANCE.quiche_conn_stream_send(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), last).intValue(); if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE) return 0; if (written < 0L) diff --git a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java index c3334e3fed9..065b20bee90 100644 --- a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java +++ b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java @@ -39,12 +39,12 @@ public class ServerProtocolSession extends ProtocolSession } @Override - protected void onReadable(long readableStreamId) + protected boolean onReadable(long readableStreamId) { // On the server, we need a get-or-create semantic in case of reads. QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint); if (LOG.isDebugEnabled()) LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); - streamEndPoint.onReadable(); + return streamEndPoint.onReadable(); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 22320f4378c..2b47e2d848a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -186,6 +186,18 @@ public interface Callback extends Invocable }; } + static Callback from(InvocationType invocationType, Runnable completed) + { + return new Completing(invocationType) + { + @Override + public void completed() + { + completed.run(); + } + }; + } + /** *

Creates a Callback with the given {@code invocationType}, * that runs the given {@code Runnable} when it succeeds or fails.