diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java index 99440d056e3..02d73de0478 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.api; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; @@ -26,6 +27,13 @@ import org.eclipse.jetty.http3.frames.HeadersFrame; *

Like {@link Session}, {@link Stream} is the active part and by calling its API applications * can generate events on the stream; conversely, {@link Stream.Listener} is the passive part, and * its callbacks are invoked when events happen on the stream.

+ *

The client initiates a stream by sending a HEADERS frame containing the HTTP/3 request URI + * and request headers, and zero or more DATA frames containing request content.

+ *

Similarly, the server responds by sending a HEADERS frame containing the HTTP/3 response + * status code and response headers, and zero or more DATA frames containing response content.

+ *

Both client and server can end their side of the stream by sending a final frame with + * the {@code last} flag set to {@code true}, see {@link HeadersFrame#HeadersFrame(MetaData, boolean)} + * and {@link DataFrame#DataFrame(ByteBuffer, boolean)}.

* * @see Stream.Listener */ @@ -64,15 +72,14 @@ public interface Stream * stream. * *

When the returned {@link Stream.Data} object is not {@code null}, - * applications should call {@link Stream.Data#complete()} to - * notify the implementation that the bytes have been processed. - * This allows the implementation to perform better, for example by - * recycling the {@link Stream.Data} object's {@link ByteBuffer}.

+ * applications must call {@link Stream.Data#complete()} to + * notify the implementation that the bytes have been processed.

*

{@link Stream.Data} objects may be stored away for later, asynchronous, * processing (for example, to process them only when all of them have been * received).

* - * @return a {@link Stream.Data} object containing the request bytes or the response bytes + * @return a {@link Stream.Data} object containing the request bytes or + * the response bytes, or null if no bytes are available * @see Stream.Listener#onDataAvailable(Stream) */ public Stream.Data readData(); @@ -195,8 +202,12 @@ public interface Stream } /** - * *

The returned {@link Stream.Data} object associates the - * * {@link ByteBuffer} containing the bytes with a completion

+ *

A {@link Stream.Data} instance associates a {@link ByteBuffer} + * containing request bytes or response bytes with a completion event + * that applications must trigger when the bytes have been + * processed.

+ * + * @see Stream#readData() */ public static class Data { @@ -209,16 +220,27 @@ public interface Stream this.complete = complete; } + /** + * @return the {@link ByteBuffer} containing the bytes + */ public ByteBuffer getByteBuffer() { return frame.getByteBuffer(); } + /** + * @return whether this is the instance that ends + * the stream of bytes received from the remote peer + */ public boolean isLast() { return frame.isLast(); } + /** + *

The method that applications must invoke to + * signal that the bytes have been processed.

+ */ public void complete() { complete.run(); diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java index e1997d5a616..e8ace7c0422 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/DataBodyParser.java @@ -103,7 +103,7 @@ public class DataBodyParser extends BodyParser { DataFrame frame = new DataFrame(buffer, last); if (LOG.isDebugEnabled()) - LOG.debug("notifying fragment={} {}#{} remaining={}", fragment, frame, streamId, length); + LOG.debug("notifying fragment={} {}#{} left={}", fragment, frame, streamId, length); notifyData(frame); } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java index 75b4706b9cb..54ea3abeb93 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java @@ -33,6 +33,8 @@ import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.SettingsFrame; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -81,6 +83,9 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest }) .get(5, TimeUnit.SECONDS); assertNotNull(session); + + assertTrue(serverSettingsLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS)); } @Test @@ -191,12 +196,13 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest Thread.sleep(500); stream2.data(new DataFrame(ByteBuffer.allocate(5 * 1024), true)); - assertTrue(clientLatch.get().await(555, TimeUnit.SECONDS)); - assertTrue(serverLatch.get().await(555, TimeUnit.SECONDS)); + assertTrue(clientLatch.get().await(5, TimeUnit.SECONDS)); + assertTrue(serverLatch.get().await(5, TimeUnit.SECONDS)); } - @Test - public void testEchoRequestContentAsResponseContent() throws Exception + @ParameterizedTest + @ValueSource(ints = {1024, 128 * 1024, 1024 * 1024}) + public void testEchoRequestContentAsResponseContent(int length) throws Exception { startServer(new Session.Server.Listener() { @@ -237,7 +243,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); HeadersFrame frame = new HeadersFrame(metaData, false); - byte[] bytesSent = new byte[8192]; + byte[] bytesSent = new byte[length]; new Random().nextBytes(bytesSent); byte[] bytesReceived = new byte[bytesSent.length]; ByteBuffer byteBuffer = ByteBuffer.wrap(bytesReceived); diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index d1ffe9cbb3d..d92d24908ac 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -130,7 +130,9 @@ public abstract class QuicSession public int fill(long streamId, ByteBuffer buffer) throws IOException { - return quicheConnection.drainClearTextForStream(streamId, buffer); + int drained = quicheConnection.drainClearTextForStream(streamId, buffer); + flush(); + return drained; } public int flush(long streamId, ByteBuffer buffer) throws IOException @@ -377,7 +379,7 @@ public abstract class QuicSession boolean connectionClosed = quicheConnection.isConnectionClosed(); Action action = connectionClosed ? Action.SUCCEEDED : Action.IDLE; if (LOG.isDebugEnabled()) - LOG.debug("connection is closed? {} -> action = {}", connectionClosed, action); + LOG.debug("connection closed={}, action={}", connectionClosed, action); return action; } BufferUtil.flipToFlush(cipherBuffer, pos); diff --git a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java index c79cb3154fe..bf14e7efbfb 100644 --- a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java +++ b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java @@ -476,24 +476,6 @@ public class QuicheConnection } } - public String statistics() - { - try (AutoLock ignore = lock.lock()) - { - if (quicheConn == null) - throw new IllegalStateException("Quiche connection was released"); - LibQuiche.quiche_stats stats = new LibQuiche.quiche_stats(); - LibQuiche.INSTANCE.quiche_conn_stats(quicheConn, stats); - return "[recv: " + stats.recv + - " sent: " + stats.sent + - " lost: " + stats.lost + - " rtt: " + stats.rtt + - " rate: " + stats.delivery_rate + - " window: " + stats.cwnd + - "]"; - } - } - public boolean close() throws IOException { try (AutoLock ignore = lock.lock()) diff --git a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/ffi/LibQuiche.java b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/ffi/LibQuiche.java index deb10516aef..6e2fb5bdacd 100644 --- a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/ffi/LibQuiche.java +++ b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/ffi/LibQuiche.java @@ -28,7 +28,7 @@ public interface LibQuiche extends Library { // This interface is a translation of the quiche.h header of a specific version. // It needs to be reviewed each time the native lib version changes. - String EXPECTED_QUICHE_VERSION = "0.9.0"; + String EXPECTED_QUICHE_VERSION = "0.10.0"; // load the native lib LibQuiche INSTANCE = Native.load("quiche", LibQuiche.class); @@ -109,6 +109,12 @@ public interface LibQuiche extends Library // The peer violated the local stream limits. QUICHE_ERR_STREAM_LIMIT = -12, + // The specified stream was stopped by the peer. + QUICHE_ERR_STREAM_STOPPED = -15, + + // The specified stream was reset by the peer. + QUICHE_ERR_STREAM_RESET = -16, + // The received data exceeds the stream's final size. QUICHE_ERR_FINAL_SIZE = -13, @@ -145,6 +151,10 @@ public interface LibQuiche extends Library return "QUICHE_ERR_FINAL_SIZE"; if (err == QUICHE_ERR_CONGESTION_CONTROL) return "QUICHE_ERR_CONGESTION_CONTROL"; + if (err == QUICHE_ERR_STREAM_STOPPED) + return "QUICHE_ERR_STREAM_STOPPED"; + if (err == QUICHE_ERR_STREAM_RESET) + return "QUICHE_ERR_STREAM_RESET"; return "?? " + err; } } @@ -220,7 +230,13 @@ public interface LibQuiche extends Library public byte dummy; } - @Structure.FieldOrder({"recv", "sent", "lost", "rtt", "cwnd", "delivery_rate"}) + @Structure.FieldOrder({"recv", "sent", "lost", "retrans", "rtt", "cwnd", "sent_bytes", "recv_bytes", "lost_bytes", + "stream_retrans_bytes", "pmtu", "delivery_rate", "peer_max_idle_timeout", + "peer_max_udp_payload_size", "peer_initial_max_data", "peer_initial_max_stream_data_bidi_local", + "peer_initial_max_stream_data_bidi_remote", "peer_initial_max_stream_data_uni", + "peer_initial_max_streams_bidi", "peer_initial_max_streams_uni", "peer_ack_delay_exponent", + "peer_ack_delay_exponent", "peer_max_ack_delay", "peer_disable_active_migration", + "peer_active_conn_id_limit", "peer_max_datagram_frame_size"}) class quiche_stats extends Structure { // The number of QUIC packets received on this connection. @@ -232,14 +248,71 @@ public interface LibQuiche extends Library // The number of QUIC packets that were lost. public size_t lost; + // The number of sent QUIC packets with retranmitted data. + public size_t retrans; + // The estimated round-trip time of the connection (in nanoseconds). public uint64_t rtt; // The size of the connection's congestion window in bytes. public size_t cwnd; - // The estimated data delivery rate in bytes/s. + // The number of sent bytes. + public uint64_t sent_bytes; + + // The number of recevied bytes. + public uint64_t recv_bytes; + + // The number of bytes lost. + public uint64_t lost_bytes; + + // The number of stream bytes retransmitted. + public uint64_t stream_retrans_bytes; + + // The current PMTU for the connection. + public size_t pmtu; + + // The most recent data delivery rate estimate in bytes/s. public uint64_t delivery_rate; + + // The maximum idle timeout. + public uint64_t peer_max_idle_timeout; + + // The maximum UDP payload size. + public uint64_t peer_max_udp_payload_size; + + // The initial flow control maximum data for the connection. + public uint64_t peer_initial_max_data; + + // The initial flow control maximum data for local bidirectional streams. + public uint64_t peer_initial_max_stream_data_bidi_local; + + // The initial flow control maximum data for remote bidirectional streams. + public uint64_t peer_initial_max_stream_data_bidi_remote; + + // The initial flow control maximum data for unidirectional streams. + public uint64_t peer_initial_max_stream_data_uni; + + // The initial maximum bidirectional streams. + public uint64_t peer_initial_max_streams_bidi; + + // The initial maximum unidirectional streams. + public uint64_t peer_initial_max_streams_uni; + + // The ACK delay exponent. + public uint64_t peer_ack_delay_exponent; + + // The max ACK delay. + public uint64_t peer_max_ack_delay; + + // Whether active migration is disabled. + public boolean peer_disable_active_migration; + + // The active connection ID limit. + public uint64_t peer_active_conn_id_limit; + + // DATAGRAM frame extension parameter, if any. + public ssize_t peer_max_datagram_frame_size; } interface LoggingCallback extends Callback @@ -352,6 +425,25 @@ public interface LibQuiche extends Library // Returns true if the connection is closed. boolean quiche_conn_is_closed(quiche_conn conn); + // Returns true if the connection was closed due to the idle timeout. + boolean quiche_conn_is_timed_out(quiche_conn conn); + + // Returns true if a connection error was received, and updates the provided + // parameters accordingly. + boolean quiche_conn_peer_error(quiche_conn conn, + bool_pointer is_app, + uint64_t_pointer error_code, + Pointer/*const uint8_t ***/ reason, + size_t_pointer reason_len); + + // Returns true if a connection error was queued or sent, and updates the provided + // parameters accordingly. + boolean quiche_conn_local_error(quiche_conn conn, + bool_pointer is_app, + uint64_t_pointer error_code, + Pointer/*const uint8_t ***/ reason, + size_t_pointer reason_len); + // Closes the connection with the given error and reason. int quiche_conn_close(quiche_conn conn, boolean app, uint64_t err, String reason, size_t reason_len);