Issue #6728 - QUIC and HTTP/3

- More javadocs.
- Fixed QuicSession.fill() to also flush() to send to the remote peer the information that the data has been consumed.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-25 18:27:05 +02:00
parent 539fee7f79
commit 8b3ae8a661
6 changed files with 140 additions and 36 deletions

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.api;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
@ -26,6 +27,13 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
* <p>Like {@link Session}, {@link Stream} is the active part and by calling its API applications
* can generate events on the stream; conversely, {@link Stream.Listener} is the passive part, and
* its callbacks are invoked when events happen on the stream.</p>
* <p>The client initiates a stream by sending a HEADERS frame containing the HTTP/3 request URI
* and request headers, and zero or more DATA frames containing request content.</p>
* <p>Similarly, the server responds by sending a HEADERS frame containing the HTTP/3 response
* status code and response headers, and zero or more DATA frames containing response content.</p>
* <p>Both client and server can end their side of the stream by sending a final frame with
* the {@code last} flag set to {@code true}, see {@link HeadersFrame#HeadersFrame(MetaData, boolean)}
* and {@link DataFrame#DataFrame(ByteBuffer, boolean)}.</p>
*
* @see Stream.Listener
*/
@ -64,15 +72,14 @@ public interface Stream
* stream.</li>
* </ul>
* <p>When the returned {@link Stream.Data} object is not {@code null},
* applications <em>should</em> call {@link Stream.Data#complete()} to
* notify the implementation that the bytes have been processed.
* This allows the implementation to perform better, for example by
* recycling the {@link Stream.Data} object's {@link ByteBuffer}.</p>
* applications <em>must</em> call {@link Stream.Data#complete()} to
* notify the implementation that the bytes have been processed.</p>
* <p>{@link Stream.Data} objects may be stored away for later, asynchronous,
* processing (for example, to process them only when all of them have been
* received).</p>
*
* @return a {@link Stream.Data} object containing the request bytes or the response bytes
* @return a {@link Stream.Data} object containing the request bytes or
* the response bytes, or null if no bytes are available
* @see Stream.Listener#onDataAvailable(Stream)
*/
public Stream.Data readData();
@ -195,8 +202,12 @@ public interface Stream
}
/**
* * <p>The returned {@link Stream.Data} object associates the
* * {@link ByteBuffer} containing the bytes with a completion </p>
* <p>A {@link Stream.Data} instance associates a {@link ByteBuffer}
* containing request bytes or response bytes with a completion event
* that applications <em>must</em> trigger when the bytes have been
* processed.</p>
*
* @see Stream#readData()
*/
public static class Data
{
@ -209,16 +220,27 @@ public interface Stream
this.complete = complete;
}
/**
* @return the {@link ByteBuffer} containing the bytes
*/
public ByteBuffer getByteBuffer()
{
return frame.getByteBuffer();
}
/**
* @return whether this is the instance that ends
* the stream of bytes received from the remote peer
*/
public boolean isLast()
{
return frame.isLast();
}
/**
* <p>The method that applications <em>must</em> invoke to
* signal that the bytes have been processed.</p>
*/
public void complete()
{
complete.run();

View File

@ -103,7 +103,7 @@ public class DataBodyParser extends BodyParser
{
DataFrame frame = new DataFrame(buffer, last);
if (LOG.isDebugEnabled())
LOG.debug("notifying fragment={} {}#{} remaining={}", fragment, frame, streamId, length);
LOG.debug("notifying fragment={} {}#{} left={}", fragment, frame, streamId, length);
notifyData(frame);
}

View File

@ -33,6 +33,8 @@ import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -81,6 +83,9 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
})
.get(5, TimeUnit.SECONDS);
assertNotNull(session);
assertTrue(serverSettingsLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
}
@Test
@ -191,12 +196,13 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
Thread.sleep(500);
stream2.data(new DataFrame(ByteBuffer.allocate(5 * 1024), true));
assertTrue(clientLatch.get().await(555, TimeUnit.SECONDS));
assertTrue(serverLatch.get().await(555, TimeUnit.SECONDS));
assertTrue(clientLatch.get().await(5, TimeUnit.SECONDS));
assertTrue(serverLatch.get().await(5, TimeUnit.SECONDS));
}
@Test
public void testEchoRequestContentAsResponseContent() throws Exception
@ParameterizedTest
@ValueSource(ints = {1024, 128 * 1024, 1024 * 1024})
public void testEchoRequestContentAsResponseContent(int length) throws Exception
{
startServer(new Session.Server.Listener()
{
@ -237,7 +243,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, false);
byte[] bytesSent = new byte[8192];
byte[] bytesSent = new byte[length];
new Random().nextBytes(bytesSent);
byte[] bytesReceived = new byte[bytesSent.length];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytesReceived);

View File

@ -130,7 +130,9 @@ public abstract class QuicSession
public int fill(long streamId, ByteBuffer buffer) throws IOException
{
return quicheConnection.drainClearTextForStream(streamId, buffer);
int drained = quicheConnection.drainClearTextForStream(streamId, buffer);
flush();
return drained;
}
public int flush(long streamId, ByteBuffer buffer) throws IOException
@ -377,7 +379,7 @@ public abstract class QuicSession
boolean connectionClosed = quicheConnection.isConnectionClosed();
Action action = connectionClosed ? Action.SUCCEEDED : Action.IDLE;
if (LOG.isDebugEnabled())
LOG.debug("connection is closed? {} -> action = {}", connectionClosed, action);
LOG.debug("connection closed={}, action={}", connectionClosed, action);
return action;
}
BufferUtil.flipToFlush(cipherBuffer, pos);

View File

@ -476,24 +476,6 @@ public class QuicheConnection
}
}
public String statistics()
{
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
LibQuiche.quiche_stats stats = new LibQuiche.quiche_stats();
LibQuiche.INSTANCE.quiche_conn_stats(quicheConn, stats);
return "[recv: " + stats.recv +
" sent: " + stats.sent +
" lost: " + stats.lost +
" rtt: " + stats.rtt +
" rate: " + stats.delivery_rate +
" window: " + stats.cwnd +
"]";
}
}
public boolean close() throws IOException
{
try (AutoLock ignore = lock.lock())

View File

@ -28,7 +28,7 @@ public interface LibQuiche extends Library
{
// This interface is a translation of the quiche.h header of a specific version.
// It needs to be reviewed each time the native lib version changes.
String EXPECTED_QUICHE_VERSION = "0.9.0";
String EXPECTED_QUICHE_VERSION = "0.10.0";
// load the native lib
LibQuiche INSTANCE = Native.load("quiche", LibQuiche.class);
@ -109,6 +109,12 @@ public interface LibQuiche extends Library
// The peer violated the local stream limits.
QUICHE_ERR_STREAM_LIMIT = -12,
// The specified stream was stopped by the peer.
QUICHE_ERR_STREAM_STOPPED = -15,
// The specified stream was reset by the peer.
QUICHE_ERR_STREAM_RESET = -16,
// The received data exceeds the stream's final size.
QUICHE_ERR_FINAL_SIZE = -13,
@ -145,6 +151,10 @@ public interface LibQuiche extends Library
return "QUICHE_ERR_FINAL_SIZE";
if (err == QUICHE_ERR_CONGESTION_CONTROL)
return "QUICHE_ERR_CONGESTION_CONTROL";
if (err == QUICHE_ERR_STREAM_STOPPED)
return "QUICHE_ERR_STREAM_STOPPED";
if (err == QUICHE_ERR_STREAM_RESET)
return "QUICHE_ERR_STREAM_RESET";
return "?? " + err;
}
}
@ -220,7 +230,13 @@ public interface LibQuiche extends Library
public byte dummy;
}
@Structure.FieldOrder({"recv", "sent", "lost", "rtt", "cwnd", "delivery_rate"})
@Structure.FieldOrder({"recv", "sent", "lost", "retrans", "rtt", "cwnd", "sent_bytes", "recv_bytes", "lost_bytes",
"stream_retrans_bytes", "pmtu", "delivery_rate", "peer_max_idle_timeout",
"peer_max_udp_payload_size", "peer_initial_max_data", "peer_initial_max_stream_data_bidi_local",
"peer_initial_max_stream_data_bidi_remote", "peer_initial_max_stream_data_uni",
"peer_initial_max_streams_bidi", "peer_initial_max_streams_uni", "peer_ack_delay_exponent",
"peer_ack_delay_exponent", "peer_max_ack_delay", "peer_disable_active_migration",
"peer_active_conn_id_limit", "peer_max_datagram_frame_size"})
class quiche_stats extends Structure
{
// The number of QUIC packets received on this connection.
@ -232,14 +248,71 @@ public interface LibQuiche extends Library
// The number of QUIC packets that were lost.
public size_t lost;
// The number of sent QUIC packets with retranmitted data.
public size_t retrans;
// The estimated round-trip time of the connection (in nanoseconds).
public uint64_t rtt;
// The size of the connection's congestion window in bytes.
public size_t cwnd;
// The estimated data delivery rate in bytes/s.
// The number of sent bytes.
public uint64_t sent_bytes;
// The number of recevied bytes.
public uint64_t recv_bytes;
// The number of bytes lost.
public uint64_t lost_bytes;
// The number of stream bytes retransmitted.
public uint64_t stream_retrans_bytes;
// The current PMTU for the connection.
public size_t pmtu;
// The most recent data delivery rate estimate in bytes/s.
public uint64_t delivery_rate;
// The maximum idle timeout.
public uint64_t peer_max_idle_timeout;
// The maximum UDP payload size.
public uint64_t peer_max_udp_payload_size;
// The initial flow control maximum data for the connection.
public uint64_t peer_initial_max_data;
// The initial flow control maximum data for local bidirectional streams.
public uint64_t peer_initial_max_stream_data_bidi_local;
// The initial flow control maximum data for remote bidirectional streams.
public uint64_t peer_initial_max_stream_data_bidi_remote;
// The initial flow control maximum data for unidirectional streams.
public uint64_t peer_initial_max_stream_data_uni;
// The initial maximum bidirectional streams.
public uint64_t peer_initial_max_streams_bidi;
// The initial maximum unidirectional streams.
public uint64_t peer_initial_max_streams_uni;
// The ACK delay exponent.
public uint64_t peer_ack_delay_exponent;
// The max ACK delay.
public uint64_t peer_max_ack_delay;
// Whether active migration is disabled.
public boolean peer_disable_active_migration;
// The active connection ID limit.
public uint64_t peer_active_conn_id_limit;
// DATAGRAM frame extension parameter, if any.
public ssize_t peer_max_datagram_frame_size;
}
interface LoggingCallback extends Callback
@ -352,6 +425,25 @@ public interface LibQuiche extends Library
// Returns true if the connection is closed.
boolean quiche_conn_is_closed(quiche_conn conn);
// Returns true if the connection was closed due to the idle timeout.
boolean quiche_conn_is_timed_out(quiche_conn conn);
// Returns true if a connection error was received, and updates the provided
// parameters accordingly.
boolean quiche_conn_peer_error(quiche_conn conn,
bool_pointer is_app,
uint64_t_pointer error_code,
Pointer/*const uint8_t ***/ reason,
size_t_pointer reason_len);
// Returns true if a connection error was queued or sent, and updates the provided
// parameters accordingly.
boolean quiche_conn_local_error(quiche_conn conn,
bool_pointer is_app,
uint64_t_pointer error_code,
Pointer/*const uint8_t ***/ reason,
size_t_pointer reason_len);
// Closes the connection with the given error and reason.
int quiche_conn_close(quiche_conn conn, boolean app, uint64_t err,
String reason, size_t reason_len);