From 4bee790c045536e4530b2ec6383de03aa62f2d4b Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 24 Sep 2021 15:22:20 +0200 Subject: [PATCH] Issue #6728 - QUIC and HTTP/3 - Added javadocs. - Fixed race condition in processDataDemand(). Signed-off-by: Simone Bordet --- .../client/internal/ClientHTTP3Session.java | 8 +- .../client/internal/HTTP3SessionClient.java | 45 ++++- .../org/eclipse/jetty/http3/api/Session.java | 129 +++++++++++++- .../org/eclipse/jetty/http3/api/Stream.java | 164 +++++++++++++++++- .../eclipse/jetty/http3/frames/DataFrame.java | 4 +- .../jetty/http3/internal/HTTP3Session.java | 63 +++---- .../http3/internal/HTTP3StreamConnection.java | 27 +-- .../internal/generator/DataGenerator.java | 2 +- .../http3/internal/parser/DataBodyParser.java | 2 +- .../http3/internal/parser/MessageParser.java | 6 +- .../http3/internal/DataGenerateParseTest.java | 4 +- .../server/internal/HTTP3SessionServer.java | 64 +++++++ .../server/internal/ServerHTTP3Session.java | 8 +- .../http3/tests/HTTP3ClientServerTest.java | 8 +- .../http3/tests/HTTP3DataDemandTest.java | 22 ++- 15 files changed, 459 insertions(+), 97 deletions(-) diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index 8032cdfe4f6..8dc4cde4ae8 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,10 +98,13 @@ public class ClientHTTP3Session extends ClientProtocolSession settings = Map.of(); // TODO: add default settings. SettingsFrame frame = new SettingsFrame(settings); - controlFlusher.offer(frame, Callback.NOOP); + controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, applicationSession::onOpen, this::fail)); controlFlusher.iterate(); + } - applicationSession.onOpen(); + private void fail(Throwable failure) + { + // TODO: must close the connection. } private QuicStreamEndPoint configureInstructionEndPoint(long streamId) diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java index 1ced94e64c6..9389a23e3b7 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java @@ -15,6 +15,7 @@ package org.eclipse.jetty.http3.client.internal; import java.util.concurrent.CompletableFuture; +import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.frames.Frame; @@ -47,18 +48,44 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client return (ClientHTTP3Session)super.getProtocolSession(); } - @Override - protected void writeFrame(long streamId, Frame frame, Callback callback) - { - getProtocolSession().writeFrame(streamId, frame, callback); - } - @Override public void onOpen() { promise.succeeded(this); } + @Override + public void onHeaders(long streamId, HeadersFrame frame) + { + QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId); + HTTP3Stream stream = getOrCreateStream(endPoint); + MetaData metaData = frame.getMetaData(); + if (metaData.isResponse()) + { + if (LOG.isDebugEnabled()) + LOG.debug("received response {}#{} on {}", frame, streamId, this); + notifyResponse(stream, frame); + } + else + { + super.onHeaders(streamId, frame); + } + } + + private void notifyResponse(HTTP3Stream stream, HeadersFrame frame) + { + Stream.Listener listener = stream.getListener(); + try + { + if (listener != null) + listener.onResponse(stream, frame); + } + catch (Throwable x) + { + LOG.info("failure notifying listener {}", listener, x); + } + } + @Override public CompletableFuture newRequest(HeadersFrame frame, Stream.Listener listener) { @@ -76,4 +103,10 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client session.writeFrame(streamId, frame, callback); return promise; } + + @Override + protected void writeFrame(long streamId, Frame frame, Callback callback) + { + getProtocolSession().writeFrame(streamId, frame, callback); + } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java index 0ea724d70b0..45c88263742 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java @@ -13,45 +13,162 @@ package org.eclipse.jetty.http3.api; +import java.net.SocketAddress; import java.util.Map; import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.SettingsFrame; +/** + *

The low-level HTTP/3 API representing a connection with a remote peer.

+ *

A {@link Session} is the active part of the connection, and by calling its APIs + * applications can generate events on the connection.

+ *

Conversely, {@link Session.Listener} is the passive part of the connection, + * and has callback methods that are invoked when events happen on the connection.

+ * + * @see Client + * @see Server + * @see Listener + */ public interface Session { + /** + * @return the local socket address this session is bound to + */ + public default SocketAddress getLocalSocketAddress() + { + return null; + } + + /** + * @return the remote socket address this session is connected to + */ + public default SocketAddress getRemoteSocketAddress() + { + return null; + } + + /** + *

The client-side HTTP/3 API representing a connection with a server.

+ *

Once a {@link Session} has been obtained, it can be used to make HTTP/3 requests:

+ *
+     * Session session = ...;
+     * HeadersFrame headersFrame = ...;
+     * session.newRequest(headersFrame, new Stream.Listener()
+     * {
+     *     @Override
+     *     public void onResponse(Stream stream, HeadersFrame frame)
+     *     {
+     *         // Response headers received.
+     *     }
+     * });
+     * 
+ * + * @see Stream + * @see Stream.Listener + */ public interface Client { + /** + *

Makes a request by creating a HTTP/3 stream and sending the given HEADERS frame.

+ * + * @param frame the HEADERS frame containing the HTTP request headers + * @param listener the listener that gets notified of stream events + * @return a CompletableFuture that is notified of the stream creation + */ public CompletableFuture newRequest(HeadersFrame frame, Stream.Listener listener); + /** + *

The client-side specific {@link Session.Listener}.

+ */ public interface Listener extends Session.Listener { } } + /** + *

The server-side HTTP/3 API representing a connection with a client.

+ *

To receive HTTP/3 request events, see {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.

+ */ public interface Server { + /** + *

The server-side specific {@link Session.Listener}.

+ */ public interface Listener extends Session.Listener { - // TODO: accept event. + /** + *

Callback method invoked when a connection has been accepted by the server.

+ * + * @param session the session + */ + public default void onAccept(Session session) + { + } + + /** + *

Callback method invoked when a request is received.

+ *

Applications should implement this method to process HTTP/3 requests, + * typically providing an HTTP/3 response via {@link Stream#respond(HeadersFrame)}:

+ *
+             * class MyServer implements Session.Server.Listener
+             * {
+             *     @Override
+             *     public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
+             *     {
+             *         // Send a response.
+             *         var response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
+             *         stream.respond(new HeadersFrame(response, true));
+             *     }
+             * }
+             * 
+ *

To read request content, applications should call + * {@link Stream#demand()} and return a {@link Stream.Listener} that overrides + * {@link Stream.Listener#onDataAvailable(Stream)}.

+ * + * @param stream the stream associated with the request + * @param frame the HEADERS frame containing the request headers + * @return a {@link Stream.Listener} that will be notified of stream events + * @see Stream.Listener#onDataAvailable(Stream) + */ + public default Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + return null; + } } } + /** + *

A {@link Listener} is the passive counterpart of a {@link Session} and + * receives events happening on an HTTP/3 connection.

+ * + * @see Session + */ public interface Listener { + /** + *

Callback method invoked just before the initial SETTINGS frame is sent + * to the remote peer, to gather the configuration settings that the local + * peer wants to send to the remote peer.

+ * + * @param session the session + * @return a (possibly empty or null) map containing configuration + * settings to send to the remote peer. + */ public default Map onPreface(Session session) { return null; } + /** + *

Callback method invoked when a SETTINGS frame has been received.

+ * + * @param session the session + * @param frame the SETTINGS frame received + */ public default void onSettings(Session session, SettingsFrame frame) { } - - public default Stream.Listener onRequest(Stream stream, HeadersFrame frame) - { - return null; - } } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java index 8f79bce356a..99440d056e3 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java @@ -13,38 +13,191 @@ package org.eclipse.jetty.http3.api; +import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; +/** + *

A {@link Stream} represents a bidirectional exchange of data within a {@link Session}.

+ *

A {@link Stream} maps to an HTTP/3 request/response cycle, and after the request/response + * cycle is completed, the stream is closed and removed from the {@link Session}.

+ *

Like {@link Session}, {@link Stream} is the active part and by calling its API applications + * can generate events on the stream; conversely, {@link Stream.Listener} is the passive part, and + * its callbacks are invoked when events happen on the stream.

+ * + * @see Stream.Listener + */ public interface Stream { + /** + *

Responds to a request performed via {@link Session.Client#newRequest(HeadersFrame, Listener)}, + * sending the given HEADERS frame containing the response status code and response headers.

+ * + * @param frame the HEADERS frame containing the response headers + * @return the {@link CompletableFuture} that gets notified when the frame has been sent + */ public CompletableFuture respond(HeadersFrame frame); - public CompletableFuture data(DataFrame dataFrame); + /** + *

Sends the given DATA frame containing some or all the bytes + * of the request content or of the response content.

+ * + * @param frame the DATA frame containing some or all the bytes of the request or of the response. + * @return the {@link CompletableFuture} that gets notified when the frame has been sent + */ + public CompletableFuture data(DataFrame frame); + /** + *

Reads request content bytes or response content bytes.

+ *

The returned {@link Stream.Data} object may be {@code null}, indicating + * that the end of the read side of the stream has not yet been reached, which + * may happen in these cases:

+ *
    + *
  • not all the bytes have been received so far, and a further attempt + * to call this method returns {@code null} because the rest of the bytes + * are not yet available (for example, the remote peer did not send them + * yet, or they are in-flight)
  • + *
  • all the bytes have been received, but there is a trailer HEADERS + * frame to be received to indicate the end of the read side of the + * stream.
  • + *
+ *

When the returned {@link Stream.Data} object is not {@code null}, + * applications should call {@link Stream.Data#complete()} to + * notify the implementation that the bytes have been processed. + * This allows the implementation to perform better, for example by + * recycling the {@link Stream.Data} object's {@link ByteBuffer}.

+ *

{@link Stream.Data} objects may be stored away for later, asynchronous, + * processing (for example, to process them only when all of them have been + * received).

+ * + * @return a {@link Stream.Data} object containing the request bytes or the response bytes + * @see Stream.Listener#onDataAvailable(Stream) + */ public Stream.Data readData(); + /** + *

Causes {@link Stream.Listener#onDataAvailable(Stream)} to be invoked, + * possibly at a later time, when the stream has data to be read.

+ *

This method is idempotent: calling it when there already is an + * outstanding demand to invoke {@link Stream.Listener#onDataAvailable(Stream)} + * is a no-operation.

+ *

The thread invoking this method may invoke directly + * {@link Stream.Listener#onDataAvailable(Stream)}, unless another thread + * that must invoke {@link Stream.Listener#onDataAvailable(Stream)} + * notices the outstanding demand first.

+ *

When all bytes have been read (via {@link #readData()}), further + * invocations of this method are a no-operation.

+ *

It is always guaranteed that invoking this method from within + * {@link Stream.Listener#onDataAvailable(Stream)} will not cause a + * {@link StackOverflowError}.

+ * + * @see #readData() + * @see Stream.Listener#onDataAvailable(Stream) + */ public void demand(); + /** + *

Sends the given HEADERS frame containing the trailer headers.

+ * + * @param frame the HEADERS frame containing the trailer headers + * @return the {@link CompletableFuture} that gets notified when the frame has been sent + */ public CompletableFuture trailer(HeadersFrame frame); + /** + *

A {@link Stream.Listener} is the passive counterpart of a {@link Stream} and receives + * events happening on an HTTP/3 stream.

+ * + * @see Stream + */ public interface Listener { + /** + *

Callback method invoked when a response is received.

+ *

To read response content, applications should call + * {@link Stream#demand()} and override + * {@link Stream.Listener#onDataAvailable(Stream)}.

+ * + * @param stream the stream + * @param frame the HEADERS frame containing the response headers + * @see Stream.Listener#onDataAvailable(Stream) + */ public default void onResponse(Stream stream, HeadersFrame frame) { } + /** + *

Callback method invoked if the application has expressed + * {@link Stream#demand() demand} for content, and if there is + * content available.

+ *

A server application that wishes to handle request content + * should typically call {@link Stream#demand()} from + * {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.

+ *

A client application that wishes to handle response content + * should typically call {@link Stream#demand()} from + * {@link #onResponse(Stream, HeadersFrame)}.

+ *

Just prior calling this method, the outstanding demand is + * cancelled; applications that implement this method should read + * content calling {@link Stream#readData()}, and call + * {@link Stream#demand()} to signal to the implementation to call + * again this method when there is more content available.

+ *

Only one thread at a time invokes this method, although it + * may not be the same thread across different invocations.

+ *

It is always guaranteed that invoking {@link Stream#demand()} + * from within this method will not cause a {@link StackOverflowError}.

+ *

Typical usage:

+ *
+         * class MyStreamListener implements Stream.Listener
+         * {
+         *     @Override
+         *     public void onDataAvailable(Stream stream)
+         *     {
+         *         // Read a chunk of the content.
+         *         Stream.Data data = stream.readData();
+         *         if (data == null)
+         *         {
+         *             // No data available now, demand to be called back.
+         *             stream.demand();
+         *         }
+         *         else
+         *         {
+         *             // Process the content.
+         *             process(data.getByteBuffer());
+         *             // Notify that the content has been consumed.
+         *             data.complete();
+         *             if (!data.isLast())
+         *             {
+         *                 // Demand to be called back.
+         *                 stream.demand();
+         *             }
+         *         }
+         *     }
+         * }
+         * 
+ * + * @param stream the stream + */ public default void onDataAvailable(Stream stream) { } + /** + *

Callback method invoked when a trailer is received.

+ * + * @param stream the stream + * @param frame the HEADERS frame containing the trailer headers + */ public default void onTrailer(Stream stream, HeadersFrame frame) { } } + /** + * *

The returned {@link Stream.Data} object associates the + * * {@link ByteBuffer} containing the bytes with a completion

+ */ public static class Data { private final DataFrame frame; @@ -56,9 +209,14 @@ public interface Stream this.complete = complete; } - public DataFrame frame() + public ByteBuffer getByteBuffer() { - return frame; + return frame.getByteBuffer(); + } + + public boolean isLast() + { + return frame.isLast(); } public void complete() diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/DataFrame.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/DataFrame.java index 09600d376eb..6565c52a09b 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/DataFrame.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/DataFrame.java @@ -27,7 +27,7 @@ public class DataFrame extends Frame this.last = last; } - public ByteBuffer getData() + public ByteBuffer getByteBuffer() { return data; } @@ -40,6 +40,6 @@ public class DataFrame extends Frame @Override public String toString() { - return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), getData().remaining()); + return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), getByteBuffer().remaining()); } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java index 3e9dc90d705..64780790bf0 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.http3.internal; +import java.net.SocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -44,13 +45,30 @@ public abstract class HTTP3Session implements Session, ParserListener this.listener = listener; } + public ProtocolSession getProtocolSession() + { + return session; + } + + public Listener getListener() + { + return listener; + } + public void onOpen() { } - public ProtocolSession getProtocolSession() + @Override + public SocketAddress getLocalSocketAddress() { - return session; + return getProtocolSession().getQuicSession().getLocalAddress(); + } + + @Override + public SocketAddress getRemoteSocketAddress() + { + return getProtocolSession().getQuicSession().getRemoteAddress(); } protected HTTP3Stream createStream(QuicStreamEndPoint endPoint) @@ -120,20 +138,10 @@ public abstract class HTTP3Session implements Session, ParserListener { QuicStreamEndPoint endPoint = session.getStreamEndPoint(streamId); HTTP3Stream stream = getOrCreateStream(endPoint); - MetaData metaData = frame.getMetaData(); - if (metaData.isRequest()) + if (metaData.isRequest() || metaData.isResponse()) { - if (LOG.isDebugEnabled()) - LOG.debug("received request {}#{} on {}", frame, streamId, this); - Stream.Listener streamListener = notifyRequest(stream, frame); - stream.setListener(streamListener); - } - else if (metaData.isResponse()) - { - if (LOG.isDebugEnabled()) - LOG.debug("received response {}#{} on {}", frame, streamId, this); - notifyResponse(stream, frame); + throw new IllegalStateException("invalid metadata"); } else { @@ -143,33 +151,6 @@ public abstract class HTTP3Session implements Session, ParserListener } } - private Stream.Listener notifyRequest(HTTP3Stream stream, HeadersFrame frame) - { - try - { - return listener.onRequest(stream, frame); - } - catch (Throwable x) - { - LOG.info("failure notifying listener {}", listener, x); - return null; - } - } - - private void notifyResponse(HTTP3Stream stream, HeadersFrame frame) - { - try - { - Stream.Listener listener = stream.getListener(); - if (listener != null) - listener.onResponse(stream, frame); - } - catch (Throwable x) - { - LOG.info("failure notifying listener {}", listener, x); - } - } - private void notifyTrailer(HTTP3Stream stream, HeadersFrame frame) { try diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java index 54a3825736e..0e5a55feefd 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java @@ -177,6 +177,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection process = true; } } + if (LOG.isDebugEnabled()) + LOG.debug("demand, wasStalled={} on {}", process, this); if (process) processDataDemand(); } @@ -201,31 +203,30 @@ public abstract class HTTP3StreamConnection extends AbstractConnection { while (true) { - boolean demand; + boolean process = true; try (AutoLock l = lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("processing demand={}, last={} fillInterested={} on {}", dataDemand, dataLast, isFillInterested(), this); if (dataDemand) { - demand = !dataLast; + // Do not process if the last frame was already + // notified, or if there is demand but no data. + if (dataLast || isFillInterested()) + process = false; + else + dataDemand = false; } else { dataStalled = true; - demand = false; + process = false; } } - if (LOG.isDebugEnabled()) - LOG.debug("processing demand={} fillInterested={} on {}", demand, isFillInterested(), this); - // Exit if there is no demand, or there is demand but no data. - if (!demand || isFillInterested()) + if (!process) return; - // We have demand, notify the application. - try (AutoLock l = lock.lock()) - { - dataDemand = false; - } onDataAvailable(getEndPoint().getStreamId()); } } @@ -244,6 +245,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection { ByteBuffer byteBuffer = buffer.getBuffer(); MessageParser.Result result = parser.parse(byteBuffer); + if (LOG.isDebugEnabled()) + LOG.debug("parsed {} on {} with buffer {}", result, this, buffer); if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH) return result; diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/DataGenerator.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/DataGenerator.java index 9994d28b57d..dff775646ab 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/DataGenerator.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/DataGenerator.java @@ -32,7 +32,7 @@ public class DataGenerator extends FrameGenerator private int generateDataFrame(ByteBufferPool.Lease lease, DataFrame frame) { - ByteBuffer data = frame.getData(); + ByteBuffer data = frame.getByteBuffer(); int dataLength = data.remaining(); int headerLength = VarLenInt.length(FrameType.DATA.type()) + VarLenInt.length(dataLength); ByteBuffer header = ByteBuffer.allocate(headerLength); diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java index 53b949b5658..9763b2bdff7 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java @@ -99,7 +99,7 @@ public class DataBodyParser extends BodyParser { DataFrame frame = new DataFrame(buffer, isLast.getAsBoolean()); if (LOG.isDebugEnabled()) - LOG.debug("notifying synthetic={} {}#{}", fragment, frame, streamId); + LOG.debug("notifying fragment={} {}#{} remaining={}", fragment, frame, streamId, length); notifyData(frame); } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java index 37da4ff9147..c73171f95d2 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java @@ -113,7 +113,7 @@ public class MessageParser if (result == BodyParser.Result.NO_FRAME) return Result.NO_FRAME; if (LOG.isDebugEnabled()) - LOG.debug("Parsed unknown frame body for type {}", Integer.toHexString(frameType)); + LOG.debug("parsed unknown frame body for type {}", Integer.toHexString(frameType)); if (result == BodyParser.Result.WHOLE_FRAME) reset(); break; @@ -124,7 +124,7 @@ public class MessageParser { bodyParser.emptyBody(buffer); if (LOG.isDebugEnabled()) - LOG.debug("Parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); + LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); reset(); return Result.FRAME; } @@ -134,7 +134,7 @@ public class MessageParser if (result == BodyParser.Result.NO_FRAME) return Result.NO_FRAME; if (LOG.isDebugEnabled()) - LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); + LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); if (result == BodyParser.Result.WHOLE_FRAME) reset(); return Result.FRAME; diff --git a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java index a0d38dff54e..d0a28ccbec3 100644 --- a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java +++ b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java @@ -75,8 +75,8 @@ public class DataGenerateParseTest assertEquals(1, frames.size()); DataFrame output = frames.get(0); - byte[] outputBytes = new byte[output.getData().remaining()]; - output.getData().get(outputBytes); + byte[] outputBytes = new byte[output.getByteBuffer().remaining()]; + output.getByteBuffer().get(outputBytes); assertArrayEquals(inputBytes, outputBytes); } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java index 0058af3035b..06e430e89ab 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java @@ -13,9 +13,14 @@ package org.eclipse.jetty.http3.server.internal; +import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Session; +import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.frames.Frame; +import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.internal.HTTP3Session; +import org.eclipse.jetty.http3.internal.HTTP3Stream; +import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,15 +34,74 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server super(session, listener); } + @Override + public void onOpen() + { + super.onOpen(); + notifyAccept(); + } + @Override public ServerHTTP3Session getProtocolSession() { return (ServerHTTP3Session)super.getProtocolSession(); } + @Override + public Session.Server.Listener getListener() + { + return (Session.Server.Listener)super.getListener(); + } + + @Override + public void onHeaders(long streamId, HeadersFrame frame) + { + QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId); + HTTP3Stream stream = getOrCreateStream(endPoint); + MetaData metaData = frame.getMetaData(); + if (metaData.isRequest()) + { + if (LOG.isDebugEnabled()) + LOG.debug("received request {}#{} on {}", frame, streamId, this); + Stream.Listener streamListener = notifyRequest(stream, frame); + stream.setListener(streamListener); + } + else + { + super.onHeaders(streamId, frame); + } + } + + private Stream.Listener notifyRequest(HTTP3Stream stream, HeadersFrame frame) + { + Server.Listener listener = getListener(); + try + { + return listener.onRequest(stream, frame); + } + catch (Throwable x) + { + LOG.info("failure notifying listener {}", listener, x); + return null; + } + } + @Override protected void writeFrame(long streamId, Frame frame, Callback callback) { getProtocolSession().writeFrame(streamId, frame, callback); } + + private void notifyAccept() + { + Server.Listener listener = getListener(); + try + { + listener.onAccept(this); + } + catch (Throwable x) + { + LOG.info("failure notifying listener {}", listener, x); + } + } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index 64a7c79d081..675f7b48bd7 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -32,6 +32,7 @@ import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.server.ServerProtocolSession; import org.eclipse.jetty.quic.server.ServerQuicSession; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,10 +97,13 @@ public class ServerHTTP3Session extends ServerProtocolSession settings = Map.of(); // TODO: add default settings. SettingsFrame frame = new SettingsFrame(settings); - controlFlusher.offer(frame, Callback.NOOP); + controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, applicationSession::onOpen, this::fail)); controlFlusher.iterate(); + } - applicationSession.onOpen(); + private void fail(Throwable failure) + { + // TODO: must close the connection. } private QuicStreamEndPoint configureInstructionEndPoint(long streamId) diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java index ff5ecbad237..75b4706b9cb 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java @@ -152,7 +152,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest data.complete(); // Call me again immediately. stream.demand(); - if (data.frame().isLast()) + if (data.isLast()) serverLatch.get().countDown(); } }; @@ -219,7 +219,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest return; } // Echo it back, then demand only when the write is finished. - stream.data(data.frame()) + stream.data(new DataFrame(data.getByteBuffer(), data.isLast())) // Always complete. .whenComplete((s, x) -> data.complete()) // Demand only if successful. @@ -259,9 +259,9 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest if (data != null) { // Consume data. - byteBuffer.put(data.frame().getData()); + byteBuffer.put(data.getByteBuffer()); data.complete(); - if (data.frame().isLast()) + if (data.isLast()) clientDataLatch.countDown(); } // Demand more data. diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java index 1bfe0c42fec..9e5f9680e04 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java @@ -42,7 +42,6 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,7 +75,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest { // When resumed, demand all content until the last. Stream.Data data = stream.readData(); - if (data != null && data.frame().isLast()) + if (data != null && data.isLast()) serverDataLatch.countDown(); else stream.demand(); @@ -128,17 +127,16 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest onDataAvailableCalls.incrementAndGet(); if (serverStreamRef.compareAndSet(null, stream)) { - serverStreamLatch.countDown(); // Read only one chunk of data. - Stream.Data data = stream.readData(); - assertNotNull(data); + await().atMost(1, TimeUnit.SECONDS).until(() -> stream.readData() != null); + serverStreamLatch.countDown(); // Don't demand, just exit. } else { // When resumed, demand all content until the last. Stream.Data data = stream.readData(); - if (data != null && data.frame().isLast()) + if (data != null && data.isLast()) serverDataLatch.countDown(); else stream.demand(); @@ -210,7 +208,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest { // When resumed, demand all content until the last. Stream.Data data = stream.readData(); - if (data != null && data.frame().isLast()) + if (data != null && data.isLast()) serverDataLatch.countDown(); else stream.demand(); @@ -321,7 +319,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest Stream.Data data = stream.readData(); if (data != null) { - if (dataRead.addAndGet(data.frame().getData().remaining()) == dataLength) + if (dataRead.addAndGet(data.getByteBuffer().remaining()) == dataLength) serverDataLatch.countDown(); } stream.demand(); @@ -385,7 +383,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest } // Store the Data away to be used later. datas.add(data); - if (data.frame().isLast()) + if (data.isLast()) serverDataLatch.countDown(); } } @@ -408,10 +406,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); - assertEquals(bytesSent.length, datas.stream().mapToInt(d -> d.frame().getData().remaining()).sum()); + assertEquals(bytesSent.length, datas.stream().mapToInt(d -> d.getByteBuffer().remaining()).sum()); byte[] bytesReceived = new byte[bytesSent.length]; ByteBuffer buffer = ByteBuffer.wrap(bytesReceived); - datas.forEach(d -> buffer.put(d.frame().getData())); + datas.forEach(d -> buffer.put(d.getByteBuffer())); assertArrayEquals(bytesSent, bytesReceived); } @@ -437,7 +435,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest { onDataAvailableCalls.incrementAndGet(); Stream.Data data = stream.readData(); - if (data != null && data.frame().isLast()) + if (data != null && data.isLast()) serverDataLatch.countDown(); stream.demand(); }