Issue #6728 - QUIC and HTTP/3

- Split Stream in Stream.Client and Server.Server, so segregate client-specific actions and events.
Now, only Stream.Server has method respond(), and only Stream.Client.Listener has method onResponse().
- Improved javadocs, and updated javadoc module to create javadocs for both http3 and quic modules.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-11-29 20:09:27 +01:00
parent 279eb5a39e
commit 64296f76f1
35 changed files with 1036 additions and 555 deletions

View File

@ -45,6 +45,10 @@
</goals>
<configuration>
<rules>
<requireJavaVersion>
<version>[17,)</version>
<message>[ERROR] Old Java [${java.version}] in use. Jetty javadocs ${project.version} MUST use Java 17 or newer</message>
</requireJavaVersion>
<requireUpperBoundDeps>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
@ -94,8 +98,10 @@
org.eclipse.jetty.fcgi,
org.eclipse.jetty.gcloud,
org.eclipse.jetty.http2,
org.eclipse.jetty.http3,
org.eclipse.jetty.memcached,
org.eclipse.jetty.osgi,
org.eclipse.jetty.quic,
org.eclipse.jetty.websocket
</includeGroupIds>
<excludeGroupIds>
@ -137,6 +143,23 @@
<configuration>
<doctitle>Eclipse Jetty API Doc - v${project.version}</doctitle>
<windowtitle>Eclipse Jetty API Doc - v${project.version}</windowtitle>
<additionalJOptions>
<option>--add-modules</option>
<option>jdk.incubator.foreign</option>
</additionalJOptions>
<excludePackageNames>
org.eclipse.jetty.http3.client.http.internal;
org.eclipse.jetty.http3.client.internal;
org.eclipse.jetty.http3.internal;
org.eclipse.jetty.http3.internal.*;
org.eclipse.jetty.http3.qpack.internal;
org.eclipse.jetty.http3.qpack.internal.*;
org.eclipse.jetty.http3.server.internal;
org.eclipse.jetty.quic.common.internal;
org.eclipse.jetty.quic.quiche;
org.eclipse.jetty.quic.quiche.*;
org.eclipse.jetty.quic.server.internal;
</excludePackageNames>
</configuration>
</execution>
</executions>
@ -227,12 +250,32 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<artifactId>http2-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-client</artifactId>
<artifactId>http2-http-client-transport</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-http-client-transport</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-server</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@ -265,6 +308,16 @@
<artifactId>jetty-proxy</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-server</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-quickstart</artifactId>

View File

@ -21,8 +21,12 @@ import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.client.internal.ClientHTTP3Session;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
import org.eclipse.jetty.quic.client.ClientQuicConnection;
import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.client.QuicClientConnectorConfigurator;
import org.eclipse.jetty.quic.common.QuicConfiguration;
import org.eclipse.jetty.quic.common.QuicConnection;
@ -33,10 +37,92 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>HTTP3Client provides an asynchronous, non-blocking implementation to send
* HTTP/3 frames to a server.</p>
* <p>Typical usage:</p>
* <pre>
* / dgramEP1 - ClientQuiConnection -* ClientQuicSession - ClientProtocolSession
* HTTP3Client / ControlStream
* \ dgramEP3 - ClientQuiConnection -* ClientQuicSession - ClientHTTP3Session -* HTTP3Streams
* // HTTP3Client setup.
*
* HTTP3Client client = new HTTP3Client();
*
* // To configure QUIC properties.
* QuicConfiguration quicConfig = client.getQuicConfiguration();
*
* // To configure HTTP/3 properties.
* HTTP3Configuration h3Config = client.getHTTP3Configuration();
*
* client.start();
*
* // HTTP3Client request/response usage.
*
* // Connect to host.
* String host = "webtide.com";
* int port = 443;
* Session.Client session = client
* .connect(new InetSocketAddress(host, port), new Session.Client.Listener() {})
* .get(5, TimeUnit.SECONDS);
*
* // Prepare the HTTP request headers.
* HttpFields requestFields = new HttpFields();
* requestFields.put("User-Agent", client.getClass().getName() + "/" + Jetty.VERSION);
*
* // Prepare the HTTP request object.
* MetaData.Request request = new MetaData.Request("PUT", HttpURI.from("https://" + host + ":" + port + "/"), HttpVersion.HTTP_3, requestFields);
*
* // Create the HTTP/3 HEADERS frame representing the HTTP request.
* HeadersFrame headersFrame = new HeadersFrame(request, false);
*
* // Send the HEADERS frame to create a request stream.
* Stream stream = session.newRequest(headersFrame, new Stream.Listener()
* {
* &#64;Override
* public void onResponse(Stream stream, HeadersFrame frame)
* {
* // Inspect the response status and headers.
* MetaData.Response response = (MetaData.Response)frame.getMetaData();
*
* // Demand for response content.
* stream.demand();
* }
*
* &#64;Override
* public void onDataAvailable(Stream stream)
* {
* Stream.Data data = stream.readData();
* if (data != null)
* {
* // Process the response content chunk.
* }
* // Demand for more response content.
* stream.demand();
* }
* }).get(5, TimeUnit.SECONDS);
*
* // Use the Stream object to send request content, if any, using a DATA frame.
* ByteBuffer requestChunk1 = ...;
* stream.data(new DataFrame(requestChunk1, false))
* // Subsequent sends must wait for previous sends to complete.
* .thenCompose(s ->
* {
* ByteBuffer requestChunk2 = ...;
* s.data(new DataFrame(requestChunk2, true)));
* }
* </pre>
*
* <p>IMPLEMENTATION NOTES.</p>
* <p>Each call to {@link #connect(SocketAddress, Session.Client.Listener)} creates a new
* {@link DatagramChannelEndPoint} with the correspondent {@link ClientQuicConnection}.</p>
* <p>Each {@link ClientQuicConnection} manages one {@link ClientQuicSession} with the
* corresponding {@link ClientHTTP3Session}.</p>
* <p>Each {@link ClientHTTP3Session} manages the mandatory encoder, decoder and control
* streams, plus zero or more request/response streams.</p>
* <pre>
* GENERIC, TCP-LIKE, SETUP FOR HTTP/1.1 AND HTTP/2
* HTTP3Client - dgramEP - ClientQuiConnection - ClientQuicSession - ClientProtocolSession - TCPLikeStream
*
* SPECIFIC SETUP FOR HTTP/3
* /- [Control|Decoder|Encoder]Stream
* HTTP3Client - dgramEP - ClientQuiConnection - ClientQuicSession - ClientHTTP3Session -* HTTP3Streams
* </pre>
*/
public class HTTP3Client extends ContainerLifeCycle

View File

@ -30,6 +30,10 @@ import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A {@link ClientConnectionFactory} implementation that creates HTTP/3 specific
* {@link Connection} objects to be linked to a {@link QuicStreamEndPoint}.</p>
*/
public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, ProtocolSession.Factory
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3ClientConnectionFactory.class);

View File

@ -20,8 +20,9 @@ import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback;
@ -55,13 +56,19 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
promise.succeeded(this);
}
@Override
protected HTTP3StreamClient newHTTP3Stream(QuicStreamEndPoint endPoint, boolean local)
{
return new HTTP3StreamClient(this, endPoint, local);
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
if (frame.getMetaData().isResponse())
{
QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
HTTP3StreamClient stream = (HTTP3StreamClient)getOrCreateStream(endPoint);
if (LOG.isDebugEnabled())
LOG.debug("received response {} on {}", frame, stream);
if (stream != null)
@ -74,12 +81,52 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
}
@Override
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener)
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Client.Listener listener)
{
long streamId = getProtocolSession().getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
return newRequest(streamId, frame, listener);
}
private CompletableFuture<Stream> newRequest(long streamId, HeadersFrame frame, Stream.Client.Listener listener)
{
if (LOG.isDebugEnabled())
LOG.debug("new request stream #{} with {} on {}", streamId, frame, this);
ProtocolSession session = getProtocolSession();
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::openProtocolEndPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>();
promise.whenComplete((s, x) ->
{
if (x != null)
endPoint.close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
});
HTTP3StreamClient stream = (HTTP3StreamClient)createStream(endPoint, promise::failed);
if (stream == null)
return promise;
stream.setListener(listener);
stream.writeFrame(frame)
.whenComplete((r, x) ->
{
if (x == null)
{
if (listener == null)
endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
stream.updateClose(frame.isLast(), true);
promise.succeeded(stream);
}
else
{
removeStream(stream, x);
promise.failed(x);
}
});
return promise;
}
@Override
public void writeControlFrame(Frame frame, Callback callback)
{

View File

@ -0,0 +1,140 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client.internal;
import java.util.EnumSet;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3StreamClient extends HTTP3Stream implements Stream.Client
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3StreamClient.class);
private Stream.Client.Listener listener;
public HTTP3StreamClient(HTTP3Session session, QuicStreamEndPoint endPoint, boolean local)
{
super(session, endPoint, local);
}
public Stream.Client.Listener getListener()
{
return listener;
}
public void setListener(Stream.Client.Listener listener)
{
this.listener = listener;
}
public void onResponse(HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
boolean valid;
if (response.getStatus() == HttpStatus.CONTINUE_100)
valid = validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.CONTINUE);
else
valid = validateAndUpdate(EnumSet.of(FrameState.INITIAL, FrameState.CONTINUE), FrameState.HEADER);
if (valid)
{
notIdle();
notifyResponse(frame);
updateClose(frame.isLast(), false);
}
}
private void notifyResponse(HeadersFrame frame)
{
Stream.Client.Listener listener = getListener();
try
{
if (listener != null)
listener.onResponse(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
protected void notifyDataAvailable()
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onDataAvailable(this);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override
protected void notifyTrailer(HeadersFrame frame)
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onTrailer(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override
protected boolean notifyIdleTimeout(TimeoutException timeout)
{
Listener listener = getListener();
try
{
if (listener != null)
return listener.onIdleTimeout(this, timeout);
return true;
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return true;
}
}
@Override
protected void notifyFailure(long error, Throwable failure)
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onFailure(this, error, failure);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
}

View File

@ -16,6 +16,9 @@ package org.eclipse.jetty.http3;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
/**
* <p>A record that captures HTTP/3 configuration parameters.</p>
*/
@ManagedObject
public class HTTP3Configuration
{

View File

@ -88,7 +88,6 @@ public interface Session
* </pre>
*
* @see Stream
* @see Stream.Listener
*/
public interface Client extends Session
{
@ -99,7 +98,7 @@ public interface Session
* @param listener the listener that gets notified of stream events
* @return a CompletableFuture that is notified of the stream creation
*/
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener);
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Client.Listener listener);
/**
* <p>The client-side specific {@link Session.Listener}.</p>
@ -111,7 +110,7 @@ public interface Session
/**
* <p>The server-side HTTP/3 API representing a connection with a client.</p>
* <p>To receive HTTP/3 request events, see {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.</p>
* <p>To receive HTTP/3 request events, see {@link Session.Server.Listener#onRequest(Stream.Server, HeadersFrame)}.</p>
*/
public interface Server extends Session
{
@ -128,6 +127,45 @@ public interface Session
public default void onAccept(Session session)
{
}
/**
* <p>Callback method invoked when a request is received.</p>
* <p>Applications should implement this method to process HTTP/3 requests,
* typically providing an HTTP/3 response via {@link Stream.Server#respond(HeadersFrame)}:</p>
* <pre>
* class MyServer implements Session.Server.Listener
* {
* &#64;Override
* public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
* {
* // Send a response.
* var response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
* stream.respond(new HeadersFrame(response, true));
* if (!frame.isLast())
* stream.demand();
* return null;
* }
* }
* </pre>
* <p>If there is request content (indicated by the fact that the HEADERS frame
* is not the last in the stream), then applications either:</p>
* <ul>
* <li>return {@code null} to indicate that they are not interested in
* reading the content</li>
* <li><em>must</em> call {@link Stream#demand()} and return a {@link Stream.Server.Listener}
* that overrides {@link Stream.Server.Listener#onDataAvailable(Stream.Server)} that reads
* and consumes the content.</li>
* </ul>
*
* @param stream the stream associated with the request
* @param frame the HEADERS frame containing the request headers
* @return a {@link Stream.Server.Listener} that will be notified of stream events
* @see Stream.Server.Listener#onDataAvailable(Stream.Server)
*/
public default Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
return null;
}
}
}
@ -195,45 +233,6 @@ public interface Session
{
}
/**
* <p>Callback method invoked when a request is received.</p>
* <p>Applications should implement this method to process HTTP/3 requests,
* typically providing an HTTP/3 response via {@link Stream#respond(HeadersFrame)}:</p>
* <pre>
* class MyServer implements Session.Server.Listener
* {
* &#64;Override
* public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
* {
* // Send a response.
* var response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
* stream.respond(new HeadersFrame(response, true));
* if (!frame.isLast())
* stream.demand();
* return null;
* }
* }
* </pre>
* <p>If there is request content (indicated by the fact that the HEADERS frame
* is not the last in the stream), then applications either:</p>
* <ul>
* <li>return {@code null} to indicate that they are not interested in
* reading the content</li>
* <li><em>must</em> call {@link Stream#demand()} and return a {@link Stream.Listener}
* that overrides {@link Stream.Listener#onDataAvailable(Stream)} that reads
* and consumes the content.</li>
* </ul>
*
* @param stream the stream associated with the request
* @param frame the HEADERS frame containing the request headers
* @return a {@link Stream.Listener} that will be notified of stream events
* @see Stream.Listener#onDataAvailable(Stream)
*/
public default Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return null;
}
/**
* <p>Callback method invoked when a failure has been detected for this session.</p>
*

View File

@ -26,8 +26,9 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
* <p>A {@link Stream} maps to an HTTP/3 request/response cycle, and after the request/response
* cycle is completed, the stream is closed and removed from the {@link Session}.</p>
* <p>Like {@link Session}, {@link Stream} is the active part and by calling its API applications
* can generate events on the stream; conversely, {@link Stream.Listener} is the passive part, and
* its callbacks are invoked when events happen on the stream.</p>
* can generate events on the stream; conversely, {@link Stream.Client.Listener} and
* {@link Stream.Server.Listener} are the passive part, and their callbacks are invoked when
* events happen on the stream.</p>
* <p>The client initiates a stream by sending a HEADERS frame containing the HTTP/3 request URI
* and request headers, and zero or more DATA frames containing request content.</p>
* <p>Similarly, the server responds by sending a HEADERS frame containing the HTTP/3 response
@ -35,8 +36,6 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
* <p>Both client and server can end their side of the stream by sending a final frame with
* the {@code last} flag set to {@code true}, see {@link HeadersFrame#HeadersFrame(MetaData, boolean)}
* and {@link DataFrame#DataFrame(ByteBuffer, boolean)}.</p>
*
* @see Stream.Listener
*/
public interface Stream
{
@ -50,15 +49,6 @@ public interface Stream
*/
public Session getSession();
/**
* <p>Responds to a request performed via {@link Session.Client#newRequest(HeadersFrame, Listener)},
* sending the given HEADERS frame containing the response status code and response headers.</p>
*
* @param frame the HEADERS frame containing the response headers
* @return the {@link CompletableFuture} that gets notified when the frame has been sent
*/
public CompletableFuture<Stream> respond(HeadersFrame frame);
/**
* <p>Sends the given DATA frame containing some or all the bytes
* of the request content or of the response content.</p>
@ -90,28 +80,32 @@ public interface Stream
*
* @return a {@link Stream.Data} object containing the request bytes or
* the response bytes, or null if no bytes are available
* @see Stream.Listener#onDataAvailable(Stream)
* @see Stream.Client.Listener#onDataAvailable(Stream.Client)
* @see Stream.Server.Listener#onDataAvailable(Stream.Server)
*/
public Stream.Data readData();
/**
* <p>Causes {@link Stream.Listener#onDataAvailable(Stream)} to be invoked,
* possibly at a later time, when the stream has data to be read.</p>
* <p>Causes {@link Stream.Client.Listener#onDataAvailable(Stream.Client)}
* on the client, or {@link Stream.Server.Listener#onDataAvailable(Stream.Server)}
* on the server, to be invoked, possibly at a later time, when the stream
* has data to be read.</p>
* <p>This method is idempotent: calling it when there already is an
* outstanding demand to invoke {@link Stream.Listener#onDataAvailable(Stream)}
* outstanding demand to invoke {@code onDataAvailable(Stream)}
* is a no-operation.</p>
* <p>The thread invoking this method may invoke directly
* {@link Stream.Listener#onDataAvailable(Stream)}, unless another thread
* that must invoke {@link Stream.Listener#onDataAvailable(Stream)}
* {@code onDataAvailable(Stream)}, unless another thread
* that must invoke {@code onDataAvailable(Stream)}
* notices the outstanding demand first.</p>
* <p>When all bytes have been read (via {@link #readData()}), further
* invocations of this method are a no-operation.</p>
* <p>It is always guaranteed that invoking this method from within
* {@link Stream.Listener#onDataAvailable(Stream)} will not cause a
* {@code onDataAvailable(Stream)} will not cause a
* {@link StackOverflowError}.</p>
*
* @see #readData()
* @see Stream.Listener#onDataAvailable(Stream)
* @see Stream.Client.Listener#onDataAvailable(Stream.Client)
* @see Stream.Server.Listener#onDataAvailable(Stream.Server)
*/
public void demand();
@ -132,116 +126,237 @@ public interface Stream
public void reset(long error, Throwable failure);
/**
* <p>A {@link Stream.Listener} is the passive counterpart of a {@link Stream} and receives
* events happening on an HTTP/3 stream.</p>
*
* @see Stream
* <p>The client side version of {@link Stream}.</p>
*/
public interface Listener
public interface Client extends Stream
{
/**
* <p>Callback method invoked when a response is received.</p>
* <p>To read response content, applications should call
* {@link Stream#demand()} and override
* {@link Stream.Listener#onDataAvailable(Stream)}.</p>
* <p>A {@link Stream.Client.Listener} is the passive counterpart of a {@link Stream.Client}
* and receives client-side events happening on an HTTP/3 stream.</p>
*
* @see Stream.Client
*/
public interface Listener
{
/**
* <p>Callback method invoked when a response is received.</p>
* <p>To read response content, applications should call
* {@link Stream#demand()} and override
* {@link Stream.Client.Listener#onDataAvailable(Client)}.</p>
*
* @param stream the stream
* @param frame the HEADERS frame containing the response headers
* @see Stream.Client.Listener#onDataAvailable(Client)
*/
public default void onResponse(Stream.Client stream, HeadersFrame frame)
{
}
/**
* <p>Callback method invoked if the application has expressed
* {@link Stream#demand() demand} for content, and if there may
* be content available.</p>
* <p>A server application that wishes to handle request content
* should typically call {@link Stream#demand()} from
* {@link Session.Server.Listener#onRequest(Server, HeadersFrame)}.</p>
* <p>A client application that wishes to handle response content
* should typically call {@link Stream#demand()} from
* {@link Stream.Client.Listener#onResponse(Client, HeadersFrame)}.</p>
* <p>Just prior calling this method, the outstanding demand is
* cancelled; applications that implement this method should read
* content calling {@link Stream#readData()}, and call
* {@link Stream#demand()} to signal to the implementation to call
* again this method when there may be more content available.</p>
* <p>Only one thread at a time invokes this method, although it
* may not be the same thread across different invocations.</p>
* <p>It is always guaranteed that invoking {@link Stream#demand()}
* from within this method will not cause a {@link StackOverflowError}.</p>
* <p>Typical usage:</p>
* <pre>
* class MyStreamListener implements Stream.Client.Listener
* {
* &#64;Override
* public void onDataAvailable(Stream.Client stream)
* {
* // Read a chunk of the content.
* Stream.Data data = stream.readData();
* if (data == null)
* {
* // No data available now, demand to be called back.
* stream.demand();
* }
* else
* {
* // Process the content.
* process(data.getByteBuffer());
* // Notify that the content has been consumed.
* data.complete();
* if (!data.isLast())
* {
* // Demand to be called back.
* stream.demand();
* }
* }
* }
* }
* </pre>
*
* @param stream the stream
*/
public default void onDataAvailable(Stream.Client stream)
{
}
/**
* <p>Callback method invoked when a trailer is received.</p>
*
* @param stream the stream
* @param frame the HEADERS frame containing the trailer headers
*/
public default void onTrailer(Stream.Client stream, HeadersFrame frame)
{
}
/**
* <p>Callback method invoked when the stream idle timeout elapses.</p>
*
* @param stream the stream
* @param failure the timeout failure
* @return true to reset the stream, false to ignore the idle timeout
*/
public default boolean onIdleTimeout(Stream.Client stream, Throwable failure)
{
return true;
}
/**
* <p>Callback method invoked when a stream failure occurred.</p>
* <p>Typical stream failures, among others, are failures to
* decode a HEADERS frame, or failures to read bytes because
* the stream has been reset.</p>
*
* @param stream the stream
* @param error the failure error
* @param failure the cause of the failure
*/
public default void onFailure(Stream.Client stream, long error, Throwable failure)
{
}
}
}
/**
* <p>The server side version of {@link Stream}.</p>
*/
public interface Server extends Stream
{
/**
* <p>Responds to a request performed via {@link Session.Client#newRequest(HeadersFrame, Client.Listener)},
* sending the given HEADERS frame containing the response status code and response headers.</p>
*
* @param stream the stream
* @param frame the HEADERS frame containing the response headers
* @see Stream.Listener#onDataAvailable(Stream)
* @return the {@link CompletableFuture} that gets notified when the frame has been sent
*/
public default void onResponse(Stream stream, HeadersFrame frame)
{
}
public CompletableFuture<Stream> respond(HeadersFrame frame);
/**
* <p>Callback method invoked if the application has expressed
* {@link Stream#demand() demand} for content, and if there may
* be content available.</p>
* <p>A server application that wishes to handle request content
* should typically call {@link Stream#demand()} from
* {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.</p>
* <p>A client application that wishes to handle response content
* should typically call {@link Stream#demand()} from
* {@link #onResponse(Stream, HeadersFrame)}.</p>
* <p>Just prior calling this method, the outstanding demand is
* cancelled; applications that implement this method should read
* content calling {@link Stream#readData()}, and call
* {@link Stream#demand()} to signal to the implementation to call
* again this method when there may be more content available.</p>
* <p>Only one thread at a time invokes this method, although it
* may not be the same thread across different invocations.</p>
* <p>It is always guaranteed that invoking {@link Stream#demand()}
* from within this method will not cause a {@link StackOverflowError}.</p>
* <p>Typical usage:</p>
* <pre>
* class MyStreamListener implements Stream.Listener
* {
* &#64;Override
* public void onDataAvailable(Stream stream)
* {
* // Read a chunk of the content.
* Stream.Data data = stream.readData();
* if (data == null)
* {
* // No data available now, demand to be called back.
* stream.demand();
* }
* else
* {
* // Process the content.
* process(data.getByteBuffer());
* // Notify that the content has been consumed.
* data.complete();
* if (!data.isLast())
* {
* // Demand to be called back.
* stream.demand();
* }
* }
* }
* }
* </pre>
* <p>A {@link Stream.Server.Listener} is the passive counterpart of a {@link Stream.Server}
* and receives server-side events happening on an HTTP/3 stream.</p>
*
* @param stream the stream
* @see Stream.Server
*/
public default void onDataAvailable(Stream stream)
public interface Listener
{
}
/**
* <p>Callback method invoked if the application has expressed
* {@link Stream#demand() demand} for content, and if there may
* be content available.</p>
* <p>A server application that wishes to handle request content
* should typically call {@link Stream#demand()} from
* {@link Session.Server.Listener#onRequest(Server, HeadersFrame)}.</p>
* <p>A client application that wishes to handle response content
* should typically call {@link Stream#demand()} from
* {@link Stream.Client.Listener#onResponse(Client, HeadersFrame)}.</p>
* <p>Just prior calling this method, the outstanding demand is
* cancelled; applications that implement this method should read
* content calling {@link Stream#readData()}, and call
* {@link Stream#demand()} to signal to the implementation to call
* again this method when there may be more content available.</p>
* <p>Only one thread at a time invokes this method, although it
* may not be the same thread across different invocations.</p>
* <p>It is always guaranteed that invoking {@link Stream#demand()}
* from within this method will not cause a {@link StackOverflowError}.</p>
* <p>Typical usage:</p>
* <pre>
* class MyStreamListener implements Stream.Server.Listener
* {
* &#64;Override
* public void onDataAvailable(Stream.Server stream)
* {
* // Read a chunk of the content.
* Stream.Data data = stream.readData();
* if (data == null)
* {
* // No data available now, demand to be called back.
* stream.demand();
* }
* else
* {
* // Process the content.
* process(data.getByteBuffer());
* // Notify that the content has been consumed.
* data.complete();
* if (!data.isLast())
* {
* // Demand to be called back.
* stream.demand();
* }
* }
* }
* }
* </pre>
*
* @param stream the stream
*/
public default void onDataAvailable(Stream.Server stream)
{
}
/**
* <p>Callback method invoked when a trailer is received.</p>
*
* @param stream the stream
* @param frame the HEADERS frame containing the trailer headers
*/
public default void onTrailer(Stream stream, HeadersFrame frame)
{
}
/**
* <p>Callback method invoked when a trailer is received.</p>
*
* @param stream the stream
* @param frame the HEADERS frame containing the trailer headers
*/
public default void onTrailer(Stream.Server stream, HeadersFrame frame)
{
}
/**
* <p>Callback method invoked when the stream idle timeout elapses.</p>
*
* @param stream the stream
* @param failure the timeout failure
* @return true to reset the stream, false to ignore the idle timeout
*/
public default boolean onIdleTimeout(Stream stream, Throwable failure)
{
return true;
}
/**
* <p>Callback method invoked when the stream idle timeout elapses.</p>
*
* @param stream the stream
* @param failure the timeout failure
* @return true to reset the stream, false to ignore the idle timeout
*/
public default boolean onIdleTimeout(Stream.Server stream, Throwable failure)
{
return true;
}
/**
* <p>Callback method invoked when a stream failure occurred.</p>
* <p>Typical stream failures, among others, are failures to
* decode a HEADERS frame, or failures to read bytes because
* the stream has been reset.</p>
*
* @param stream the stream
* @param error the failure error
* @param failure the cause of the failure
*/
public default void onFailure(Stream stream, long error, Throwable failure)
{
/**
* <p>Callback method invoked when a stream failure occurred.</p>
* <p>Typical stream failures, among others, are failures to
* decode a HEADERS frame, or failures to read bytes because
* the stream has been reset.</p>
*
* @param stream the stream
* @param error the failure error
* @param failure the cause of the failure
*/
public default void onFailure(Stream.Server stream, long error, Throwable failure)
{
}
}
}

View File

@ -42,7 +42,6 @@ import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.AutoLock;
@ -267,45 +266,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
streamTimeouts.schedule(stream);
}
protected CompletableFuture<Stream> newRequest(long streamId, HeadersFrame frame, Stream.Listener listener)
{
if (LOG.isDebugEnabled())
LOG.debug("new request stream #{} with {} on {}", streamId, frame, this);
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::openProtocolEndPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>();
promise.whenComplete((s, x) ->
{
if (x != null)
endPoint.close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
});
HTTP3Stream stream = createStream(endPoint, promise::failed);
if (stream == null)
return promise;
stream.setListener(listener);
stream.writeFrame(frame)
.whenComplete((r, x) ->
{
if (x == null)
{
if (listener == null)
endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
stream.updateClose(frame.isLast(), true);
promise.succeeded(stream);
}
else
{
removeStream(stream, x);
promise.failed(x);
}
});
return promise;
}
protected HTTP3Stream createStream(QuicStreamEndPoint endPoint, Consumer<Throwable> fail)
{
long streamId = endPoint.getStreamId();
@ -337,7 +297,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
if (failure == null)
{
HTTP3Stream stream = new HTTP3Stream(this, endPoint, local);
HTTP3Stream stream = newHTTP3Stream(endPoint, local);
long idleTimeout = getStreamIdleTimeout();
if (idleTimeout > 0)
stream.setIdleTimeout(idleTimeout);
@ -357,6 +317,8 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
}
}
protected abstract HTTP3Stream newHTTP3Stream(QuicStreamEndPoint endPoint, boolean local);
protected HTTP3Stream getStream(long streamId)
{
return streams.get(streamId);

View File

@ -18,9 +18,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
@ -34,7 +31,7 @@ import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Stream.class);
@ -42,7 +39,6 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
private final QuicStreamEndPoint endPoint;
private final boolean local;
private CloseState closeState = CloseState.NOT_CLOSED;
private Listener listener;
private FrameState frameState = FrameState.INITIAL;
private long idleTimeout;
private long expireNanoTime;
@ -89,16 +85,6 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
return local;
}
public Listener getListener()
{
return listener;
}
public void setListener(Listener listener)
{
this.listener = listener;
}
public long getIdleTimeout()
{
return idleTimeout;
@ -119,7 +105,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
return expireNanoTime;
}
private void notIdle()
protected void notIdle()
{
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0)
@ -136,19 +122,13 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
return close;
}
@Override
public CompletableFuture<Stream> respond(HeadersFrame frame)
{
return write(frame);
}
@Override
public CompletableFuture<Stream> data(DataFrame frame)
{
return write(frame);
}
private CompletableFuture<Stream> write(Frame frame)
protected CompletableFuture<Stream> write(Frame frame)
{
return writeFrame(frame)
.whenComplete((s, x) ->
@ -202,66 +182,6 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
return connection.hasDemand();
}
public void onRequest(HeadersFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
{
notIdle();
Listener listener = notifyRequest(frame);
setListener(listener);
if (listener == null)
{
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code()));
session.writeMessageFrame(getId(), new MessageFlusher.FlushFrame(), callback);
}
updateClose(frame.isLast(), false);
}
}
private Listener notifyRequest(HeadersFrame frame)
{
Session.Listener listener = session.getListener();
try
{
return listener.onRequest(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return null;
}
}
public void onResponse(HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
boolean valid;
if (response.getStatus() == HttpStatus.CONTINUE_100)
valid = validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.CONTINUE);
else
valid = validateAndUpdate(EnumSet.of(FrameState.INITIAL, FrameState.CONTINUE), FrameState.HEADER);
if (valid)
{
notIdle();
notifyResponse(frame);
updateClose(frame.isLast(), false);
}
}
private void notifyResponse(HeadersFrame frame)
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onResponse(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
public void onData(DataFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.HEADER, FrameState.DATA), FrameState.DATA))
@ -273,19 +193,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
notifyDataAvailable();
}
private void notifyDataAvailable()
{
Stream.Listener listener = getListener();
try
{
if (listener != null)
listener.onDataAvailable(this);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
protected abstract void notifyDataAvailable();
public void onTrailer(HeadersFrame frame)
{
@ -297,35 +205,9 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
}
}
private void notifyTrailer(HeadersFrame frame)
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onTrailer(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
protected abstract void notifyTrailer(HeadersFrame frame);
private boolean notifyIdleTimeout(TimeoutException timeout)
{
Listener listener = getListener();
try
{
if (listener != null)
return listener.onIdleTimeout(this, timeout);
return true;
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return true;
}
}
protected abstract boolean notifyIdleTimeout(TimeoutException timeout);
public void onFailure(long error, Throwable failure)
{
@ -333,21 +215,9 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
session.removeStream(this, failure);
}
private void notifyFailure(long error, Throwable failure)
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onFailure(this, error, failure);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
protected abstract void notifyFailure(long error, Throwable failure);
private boolean validateAndUpdate(EnumSet<FrameState> allowed, FrameState target)
protected boolean validateAndUpdate(EnumSet<FrameState> allowed, FrameState target)
{
if (allowed.contains(frameState))
{
@ -366,7 +236,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
}
}
Promise.Completable<Stream> writeFrame(Frame frame)
public Promise.Completable<Stream> writeFrame(Frame frame)
{
notIdle();
Promise.Completable<Stream> completable = new Promise.Completable<>();
@ -379,7 +249,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
return closeState == CloseState.CLOSED;
}
void updateClose(boolean update, boolean local)
public void updateClose(boolean update, boolean local)
{
if (update)
{
@ -443,7 +313,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
);
}
private enum FrameState
protected enum FrameState
{
INITIAL, CONTINUE, HEADER, DATA, TRAILER, FAILED
}

View File

@ -45,7 +45,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
return session;
}
public Stream.Listener getStreamListener()
public Stream.Client.Listener getStreamListener()
{
return receiver;
}

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listener
public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client.Listener
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP3.class);
private boolean notifySuccess;
@ -65,7 +65,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
}
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
@ -105,7 +105,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
}
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Client stream)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
@ -164,7 +164,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
public void onTrailer(Stream.Client stream, HeadersFrame frame)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
@ -176,7 +176,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable failure)
public boolean onIdleTimeout(Stream.Client stream, Throwable failure)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
@ -186,7 +186,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
}
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream.Client stream, long error, Throwable failure)
{
responseFailure(failure);
}

View File

@ -19,6 +19,7 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.server.internal.HTTP3StreamServer;
import org.eclipse.jetty.http3.server.internal.HttpChannelOverHTTP3;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection;
@ -44,7 +45,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
private static final Logger LOG = LoggerFactory.getLogger(HTTP3SessionListener.class);
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
HTTP3StreamListener listener = new HTTP3StreamListener(http3Stream.getEndPoint());
@ -75,7 +76,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
}
}
private static class HTTP3StreamListener implements Stream.Listener
private static class HTTP3StreamListener implements Stream.Server.Listener
{
private final EndPoint endPoint;
@ -91,7 +92,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
public void onRequest(Stream stream, HeadersFrame frame)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
HTTP3StreamServer http3Stream = (HTTP3StreamServer)stream;
Runnable task = getConnection().onRequest(http3Stream, frame);
if (task != null)
{
@ -101,7 +102,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
}
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onDataAvailable(http3Stream);
@ -113,7 +114,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
public void onTrailer(Stream.Server stream, HeadersFrame frame)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onTrailer(http3Stream, frame);
@ -125,7 +126,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable failure)
public boolean onIdleTimeout(Stream.Server stream, Throwable failure)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
return getConnection().onIdleTimeout((HTTP3Stream)stream, failure, task ->
@ -139,7 +140,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
}
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream.Server stream, long error, Throwable failure)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure);

View File

@ -22,6 +22,9 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>A HTTP/3 specific {@link QuicServerConnector} that configures QUIC parameters according to HTTP/3 requirements.</p>
*/
public class HTTP3ServerConnector extends QuicServerConnector
{
public HTTP3ServerConnector(Server server, SslContextFactory.Server sslContextFactory, ConnectionFactory... factories)

View File

@ -18,7 +18,6 @@ import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
@ -52,13 +51,19 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
notifyAccept();
}
@Override
protected HTTP3StreamServer newHTTP3Stream(QuicStreamEndPoint endPoint, boolean local)
{
return new HTTP3StreamServer(this, endPoint, local);
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
if (frame.getMetaData().isRequest())
{
QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
HTTP3StreamServer stream = (HTTP3StreamServer)getOrCreateStream(endPoint);
if (LOG.isDebugEnabled())
LOG.debug("received request {} on {}", frame, stream);
if (stream != null)

View File

@ -0,0 +1,140 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.server.internal;
import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.internal.MessageFlusher;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3StreamServer extends HTTP3Stream implements Stream.Server
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3StreamServer.class);
private Stream.Server.Listener listener;
public HTTP3StreamServer(HTTP3Session session, QuicStreamEndPoint endPoint, boolean local)
{
super(session, endPoint, local);
}
public void onRequest(HeadersFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
{
notIdle();
Listener listener = this.listener = notifyRequest(frame);
if (listener == null)
{
QuicStreamEndPoint endPoint = getEndPoint();
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code()));
getSession().writeMessageFrame(getId(), new MessageFlusher.FlushFrame(), callback);
}
updateClose(frame.isLast(), false);
}
}
private Listener notifyRequest(HeadersFrame frame)
{
Session.Server.Listener listener = (Session.Server.Listener)getSession().getListener();
try
{
return listener.onRequest(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return null;
}
}
@Override
public CompletableFuture<Stream> respond(HeadersFrame frame)
{
return write(frame);
}
protected void notifyDataAvailable()
{
Stream.Server.Listener listener = this.listener;
try
{
if (listener != null)
listener.onDataAvailable(this);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override
protected void notifyTrailer(HeadersFrame frame)
{
Listener listener = this.listener;
try
{
if (listener != null)
listener.onTrailer(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override
protected boolean notifyIdleTimeout(TimeoutException timeout)
{
Listener listener = this.listener;
try
{
if (listener != null)
return listener.onIdleTimeout(this, timeout);
return true;
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return true;
}
}
@Override
protected void notifyFailure(long error, Throwable failure)
{
Listener listener = this.listener;
try
{
if (listener != null)
listener.onFailure(this, error, failure);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
}

View File

@ -30,7 +30,6 @@ import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -44,10 +43,10 @@ public class HttpTransportOverHTTP3 implements HttpTransport
private final AtomicBoolean commit = new AtomicBoolean();
private final TransportCallback transportCallback = new TransportCallback();
private final HTTP3Stream stream;
private final HTTP3StreamServer stream;
private MetaData.Response metaData;
public HttpTransportOverHTTP3(HTTP3Stream stream)
public HttpTransportOverHTTP3(HTTP3StreamServer stream)
{
this.stream = stream;
}

View File

@ -43,7 +43,7 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
session.onDataAvailable(streamId);
}
public Runnable onRequest(HTTP3Stream stream, HeadersFrame frame)
public Runnable onRequest(HTTP3StreamServer stream, HeadersFrame frame)
{
HttpTransportOverHTTP3 transport = new HttpTransportOverHTTP3(stream);
HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this);

View File

@ -98,7 +98,7 @@ public class ClientServerTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverSessionRef.set((HTTP3Session)stream.getSession());
serverRequestLatch.countDown();
@ -113,10 +113,10 @@ public class ClientServerTest extends AbstractClientServerTest
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HeadersFrame frame = new HeadersFrame(newRequest("/"), true);
Stream stream = clientSession.newRequest(frame, new Stream.Listener()
Stream stream = clientSession.newRequest(frame, new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
clientResponseLatch.countDown();
}
@ -147,15 +147,15 @@ public class ClientServerTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
// Send the response.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
// FlowControl acknowledged already.
Stream.Data data = stream.readData();
@ -180,10 +180,10 @@ public class ClientServerTest extends AbstractClientServerTest
AtomicReference<CountDownLatch> clientLatch = new AtomicReference<>(new CountDownLatch(1));
HeadersFrame frame = new HeadersFrame(newRequest(HttpMethod.POST, "/"), false);
Stream.Listener streamListener = new Stream.Listener()
Stream.Client.Listener streamListener = new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
clientLatch.get().countDown();
}
@ -216,16 +216,16 @@ public class ClientServerTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverSessionRef.set((HTTP3Session)stream.getSession());
// Send the response headers.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
// Read data.
Stream.Data data = stream.readData();
@ -254,17 +254,17 @@ public class ClientServerTest extends AbstractClientServerTest
byte[] bytesReceived = new byte[bytesSent.length];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytesReceived);
CountDownLatch clientDataLatch = new CountDownLatch(1);
Stream stream = clientSession.newRequest(frame, new Stream.Listener()
Stream stream = clientSession.newRequest(frame, new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
clientResponseLatch.countDown();
stream.demand();
}
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Client stream)
{
// Read data.
Stream.Data data = stream.readData();
@ -306,7 +306,7 @@ public class ClientServerTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true));
return null;
@ -319,7 +319,7 @@ public class ClientServerTest extends AbstractClientServerTest
CountDownLatch requestFailureLatch = new CountDownLatch(1);
HttpFields largeHeaders = HttpFields.build().put("too-large", "x".repeat(2 * maxRequestHeadersSize));
clientSession.newRequest(new HeadersFrame(newRequest(HttpMethod.GET, "/", largeHeaders), true), new Stream.Listener() {})
clientSession.newRequest(new HeadersFrame(newRequest(HttpMethod.GET, "/", largeHeaders), true), new Stream.Client.Listener() {})
.whenComplete((s, x) ->
{
// The HTTP3Stream was created, but the application cannot access
@ -334,10 +334,10 @@ public class ClientServerTest extends AbstractClientServerTest
// Verify that the connection is still good.
CountDownLatch responseLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
responseLatch.countDown();
}
@ -355,7 +355,7 @@ public class ClientServerTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverSessionRef.set(stream.getSession());
MetaData.Request request = (MetaData.Request)frame.getMetaData();
@ -390,10 +390,10 @@ public class ClientServerTest extends AbstractClientServerTest
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch streamFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/large"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/large"), true), new Stream.Client.Listener()
{
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream.Client stream, long error, Throwable failure)
{
streamFailureLatch.countDown();
}
@ -406,10 +406,10 @@ public class ClientServerTest extends AbstractClientServerTest
// Verify that the connection is still good.
CountDownLatch responseLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
responseLatch.countDown();
}

View File

@ -55,13 +55,13 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
onDataAvailableCalls.incrementAndGet();
if (serverStreamRef.compareAndSet(null, stream))
@ -86,7 +86,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(8192), true));
assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
@ -110,13 +110,13 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
onDataAvailableCalls.incrementAndGet();
if (serverStreamRef.compareAndSet(null, stream))
@ -143,7 +143,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(16), false));
assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
@ -172,13 +172,13 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
onDataAvailableCalls.incrementAndGet();
if (serverStreamRef.compareAndSet(null, stream))
@ -211,7 +211,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(16), false));
assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
@ -237,13 +237,13 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
onDataAvailableCalls.incrementAndGet();
// Must read to EOF to trigger fill+parse of the trailer.
@ -255,7 +255,7 @@ public class DataDemandTest extends AbstractClientServerTest
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
public void onTrailer(Stream.Server stream, HeadersFrame frame)
{
serverTrailerLatch.countDown();
}
@ -266,7 +266,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
stream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true)).get(5, TimeUnit.SECONDS);
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
@ -289,13 +289,13 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
onDataAvailableCalls.incrementAndGet();
Stream.Data data = stream.readData();
@ -308,7 +308,7 @@ public class DataDemandTest extends AbstractClientServerTest
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
public void onTrailer(Stream.Server stream, HeadersFrame frame)
{
serverTrailerLatch.countDown();
}
@ -319,7 +319,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(dataLength), false));
@ -343,13 +343,13 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
while (true)
{
@ -372,7 +372,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
byte[] bytesSent = new byte[16384];
new Random().nextBytes(bytesSent);
@ -397,15 +397,15 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverStreamRef.set(stream);
serverRequestLatch.countDown();
// Do not demand here.
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
onDataAvailableCalls.incrementAndGet();
Stream.Data data = stream.readData();
@ -420,7 +420,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(4096), true));
@ -444,7 +444,7 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
@ -484,10 +484,10 @@ public class DataDemandTest extends AbstractClientServerTest
}
}).start();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
semaphore.release();
}
@ -498,7 +498,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
// Send a first chunk of data.
stream.data(new DataFrame(ByteBuffer.allocate(16 * 1024), false));
@ -520,16 +520,16 @@ public class DataDemandTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
private boolean firstData;
private boolean nullData;
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
while (!firstData)
{
@ -572,7 +572,7 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {})
Stream stream = session.newRequest(request, new Stream.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
// Send a first chunk to trigger reads.

View File

@ -54,10 +54,10 @@ public class ExternalServerTest
CountDownLatch requestLatch = new CountDownLatch(1);
HttpURI uri = HttpURI.from(String.format("https://%s/", hostPort));
MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request, true), new Stream.Listener()
session.newRequest(new HeadersFrame(request, true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
System.err.println("RESPONSE HEADER = " + frame);
if (frame.isLast())
@ -69,7 +69,7 @@ public class ExternalServerTest
}
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Client stream)
{
Stream.Data data = stream.readData();
System.err.println("RESPONSE DATA = " + data);
@ -86,7 +86,7 @@ public class ExternalServerTest
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
public void onTrailer(Stream.Client stream, HeadersFrame frame)
{
System.err.println("RESPONSE TRAILER = " + frame);
requestLatch.countDown();

View File

@ -58,7 +58,7 @@ public class GoAwayTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverSessionRef.set((HTTP3SessionServer)stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
@ -95,10 +95,10 @@ public class GoAwayTest extends AbstractClientServerTest
clientDisconnectLatch.countDown();
}
});
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
@ -138,7 +138,7 @@ public class GoAwayTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverSessionRef.set(stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
@ -177,20 +177,20 @@ public class GoAwayTest extends AbstractClientServerTest
});
CountDownLatch streamFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/1"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/1"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
// Simulate the server sending a GOAWAY while the client sends a second request.
// The server sends a lastStreamId for the first request, and discards the second.
serverSessionRef.get().goAway(false);
// The client sends the second request and should eventually fail it
// locally since it has a larger streamId, and the server discarded it.
clientSession.newRequest(new HeadersFrame(newRequest("/2"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/2"), true), new Stream.Client.Listener()
{
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream.Client stream, long error, Throwable failure)
{
streamFailureLatch.countDown();
}
@ -217,7 +217,7 @@ public class GoAwayTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverSessionRef.set(stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
@ -259,10 +259,10 @@ public class GoAwayTest extends AbstractClientServerTest
}
});
CountDownLatch clientLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
@ -292,11 +292,11 @@ public class GoAwayTest extends AbstractClientServerTest
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
AtomicReference<Stream.Server> serverStreamRef = new AtomicReference<>();
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverStreamRef.set(stream);
Session session = stream.getSession();
@ -342,10 +342,10 @@ public class GoAwayTest extends AbstractClientServerTest
}
});
CountDownLatch clientLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
@ -364,7 +364,7 @@ public class GoAwayTest extends AbstractClientServerTest
assertFalse(serverGoAwayLatch.await(1, TimeUnit.SECONDS));
// Previous streams must complete successfully.
Stream serverStream = serverStreamRef.get();
Stream.Server serverStream = serverStreamRef.get();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
serverStream.respond(new HeadersFrame(response, true));
@ -384,14 +384,14 @@ public class GoAwayTest extends AbstractClientServerTest
@Test
public void testClientGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception
{
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
AtomicReference<Stream.Server> serverStreamRef = new AtomicReference<>();
CountDownLatch serverStreamLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverStreamRef.set(stream);
serverStreamLatch.countDown();
@ -429,10 +429,10 @@ public class GoAwayTest extends AbstractClientServerTest
});
CountDownLatch clientLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
@ -451,7 +451,7 @@ public class GoAwayTest extends AbstractClientServerTest
assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
// Complete the stream.
Stream serverStream = serverStreamRef.get();
Stream.Server serverStream = serverStreamRef.get();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
serverStream.respond(new HeadersFrame(response, true));
@ -467,14 +467,14 @@ public class GoAwayTest extends AbstractClientServerTest
@Test
public void testServerGracefulGoAwayWithStreamsClientGoAwayServerClosesWhenLastStreamCloses() throws Exception
{
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
AtomicReference<Stream.Server> serverStreamRef = new AtomicReference<>();
CountDownLatch serverStreamLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverStreamRef.set(stream);
serverStreamLatch.countDown();
@ -524,10 +524,10 @@ public class GoAwayTest extends AbstractClientServerTest
});
CountDownLatch clientLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
@ -539,7 +539,7 @@ public class GoAwayTest extends AbstractClientServerTest
assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
// Complete the stream, the server should send the non-graceful GOAWAY.
Stream serverStream = serverStreamRef.get();
Stream.Server serverStream = serverStreamRef.get();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
serverStream.respond(new HeadersFrame(response, true));
@ -566,15 +566,15 @@ public class GoAwayTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server serverStream, HeadersFrame frame)
{
serverStreamRef.set(stream);
stream.demand();
serverStreamRef.set(serverStream);
serverStream.demand();
serverRequestLatch.countDown();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
Stream.Data data = stream.readData();
if (data != null)
@ -582,7 +582,7 @@ public class GoAwayTest extends AbstractClientServerTest
if (data != null && data.isLast())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
stream.respond(new HeadersFrame(response, true));
serverStream.respond(new HeadersFrame(response, true));
}
else
{
@ -633,7 +633,7 @@ public class GoAwayTest extends AbstractClientServerTest
clientDisconnectLatch.countDown();
}
});
Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener() {})
Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS));
@ -902,7 +902,7 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
// Send a graceful GOAWAY.
stream.getSession().goAway(true);
@ -945,10 +945,10 @@ public class GoAwayTest extends AbstractClientServerTest
});
CountDownLatch clientFailureLatch = new CountDownLatch(1);
// Send request headers but not data.
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Client.Listener()
{
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream.Client stream, long error, Throwable failure)
{
clientFailureLatch.countDown();
}
@ -997,7 +997,7 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverRequestLatch.countDown();
return null;
@ -1035,10 +1035,10 @@ public class GoAwayTest extends AbstractClientServerTest
}
});
CountDownLatch streamFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Client.Listener()
{
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream.Client stream, long error, Throwable failure)
{
streamFailureLatch.countDown();
}
@ -1068,7 +1068,7 @@ public class GoAwayTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverSessionRef.set(stream.getSession());
// Don't reply, don't reset the stream, just send the GOAWAY.
@ -1107,10 +1107,10 @@ public class GoAwayTest extends AbstractClientServerTest
});
CountDownLatch clientFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Client.Listener()
{
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream.Client stream, long error, Throwable failure)
{
clientFailureLatch.countDown();
}
@ -1251,7 +1251,7 @@ public class GoAwayTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverStreamRef.set((HTTP3Stream)stream);
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
@ -1262,17 +1262,17 @@ public class GoAwayTest extends AbstractClientServerTest
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener() {});
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
responseLatch.countDown();
stream.demand();
}
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Client stream)
{
Stream.Data data = stream.readData();
if (data != null)
@ -1306,7 +1306,7 @@ public class GoAwayTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
serverStreamRef.set((HTTP3Stream)stream);
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
@ -1317,17 +1317,17 @@ public class GoAwayTest extends AbstractClientServerTest
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener() {});
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
responseLatch.countDown();
stream.demand();
}
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Client stream)
{
Stream.Data data = stream.readData();
if (data != null)

View File

@ -21,7 +21,6 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -52,7 +51,7 @@ public class HandlerClientServerTest extends AbstractClientServerTest
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.setHandled(true);
serverLatch.countDown();
@ -63,10 +62,10 @@ public class HandlerClientServerTest extends AbstractClientServerTest
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HeadersFrame frame = new HeadersFrame(newRequest("/"), true);
session.newRequest(frame, new Stream.Listener()
session.newRequest(frame, new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertThat(response.getStatus(), is(HttpStatus.OK_200));
@ -101,10 +100,10 @@ public class HandlerClientServerTest extends AbstractClientServerTest
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HeadersFrame frame = new HeadersFrame(newRequest(HttpMethod.POST, "/"), false);
Stream stream = session.newRequest(frame, new Stream.Listener()
Stream stream = session.newRequest(frame, new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertThat(response.getStatus(), is(HttpStatus.OK_200));
@ -112,7 +111,7 @@ public class HandlerClientServerTest extends AbstractClientServerTest
}
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Client stream)
{
Stream.Data data = stream.readData();
if (data == null)
@ -137,9 +136,9 @@ public class HandlerClientServerTest extends AbstractClientServerTest
stream.demand();
}
})
.get(555, TimeUnit.SECONDS);
.get(5, TimeUnit.SECONDS);
byte[] bytes = new byte[1 * 1024];
byte[] bytes = new byte[1024];
new Random().nextBytes(bytes);
stream.data(new DataFrame(ByteBuffer.wrap(bytes, 0, bytes.length / 2), false))
.thenCompose(s -> s.data(new DataFrame(ByteBuffer.wrap(bytes, bytes.length / 2, bytes.length / 2), true)))

View File

@ -50,17 +50,17 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
}
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("/idle".equals(request.getURI().getPath()))
{
assertFalse(frame.isLast());
stream.demand();
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream stream)
public void onDataAvailable(Stream.Server stream)
{
// When the client closes the stream, the server
// may either receive an empty, last, DATA frame, or
@ -102,10 +102,10 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch clientIdleLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/idle"), false), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/idle"), false), new Stream.Client.Listener()
{
@Override
public boolean onIdleTimeout(Stream stream, Throwable failure)
public boolean onIdleTimeout(Stream.Client stream, Throwable failure)
{
clientIdleLatch.countDown();
// Signal to close the stream.
@ -122,10 +122,10 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
// The session should still be open, verify by sending another request.
CountDownLatch clientLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
clientLatch.countDown();
}
@ -152,15 +152,15 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
}
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("/idle".equals(request.getURI().getPath()))
{
return new Stream.Listener()
return new Stream.Server.Listener()
{
@Override
public boolean onIdleTimeout(Stream stream, Throwable failure)
public boolean onIdleTimeout(Stream.Server stream, Throwable failure)
{
serverIdleLatch.countDown();
return true;
@ -183,10 +183,10 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
.get(5, TimeUnit.SECONDS);
CountDownLatch clientFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/idle"), false), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/idle"), false), new Stream.Client.Listener()
{
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream.Client stream, long error, Throwable failure)
{
// The server idle times out, but did not send any data back.
// However, the stream is readable and the implementation
@ -203,10 +203,10 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
// The session should still be open, verify by sending another request.
CountDownLatch clientLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
clientLatch.countDown();
}

View File

@ -21,6 +21,9 @@ import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Client specific implementation of {@link ProtocolSession}.</p>
*/
public class ClientProtocolSession extends ProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ClientProtocolSession.class);

View File

@ -18,6 +18,7 @@ import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Objects;
import java.util.function.UnaryOperator;
@ -27,8 +28,17 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.quic.common.QuicConfiguration;
/**
* <p>A QUIC specific {@link ClientConnector.Configurator}.</p>
* <p>Since QUIC is based on UDP, this class creates {@link DatagramChannel}s instead of
* {@link SocketChannel}s, and {@link DatagramChannelEndPoint}s instead of
* {@link SocketChannelEndPoint}s.</p>
*
* @see QuicConfiguration
*/
public class QuicClientConnectorConfigurator extends ClientConnector.Configurator
{
private final QuicConfiguration configuration = new QuicConfiguration();

View File

@ -13,6 +13,9 @@
package org.eclipse.jetty.quic.common;
/**
* <p>A record that captures error code and error reason.</p>
*/
public class CloseInfo
{
private final long error;

View File

@ -29,6 +29,14 @@ import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Represents an <em>established</em> stateful connection with a remote peer for a specific QUIC connection ID.</p>
* <p>Differently from {@link QuicSession}, which is created during the initial phases of connection establishment
* and it is not specific to a protocol, {@link ProtocolSession} is created only when the connection is established,
* and it is protocol specific, depending on the protocol negotiated during the connection establishment.</p>
*
* @see QuicSession
*/
public abstract class ProtocolSession extends ContainerLifeCycle
{
private static final Logger LOG = LoggerFactory.getLogger(ProtocolSession.class);
@ -164,6 +172,9 @@ public abstract class ProtocolSession extends ContainerLifeCycle
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), getQuicSession());
}
/**
* <p>A factory for protocol specific instances of {@link ProtocolSession}.</p>
*/
public interface Factory
{
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context);

View File

@ -15,6 +15,9 @@ package org.eclipse.jetty.quic.common;
import java.util.List;
/**
* <p>A record that captures QUIC configuration parameters.</p>
*/
public class QuicConfiguration
{
public static final String CONTEXT_KEY = QuicConfiguration.class.getName();

View File

@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
* have pending I/O events, either read-ready or write-ready.</p>
* <p>On the receive side, a QuicSession <em>fans-out</em> to multiple {@link QuicStreamEndPoint}s.</p>
* <p>On the send side, many {@link QuicStreamEndPoint}s <em>fan-in</em> to a QuicSession.</p>
*
* @see ProtocolSession
*/
public abstract class QuicSession extends ContainerLifeCycle
{
@ -309,30 +311,6 @@ public abstract class QuicSession extends ContainerLifeCycle
if (isConnectionEstablished())
{
// HTTP/1.1
// client1 -- sockEP1 -- H1Connection
// HTTP/2
// client1 -- sockEP1 -> H2Connection - HEADERSParser - H2Session -* RequestStreams -# HTTP Handler
// client2 -- sockEP2 -> H2Connection - HEADERSParser - H2Session -* RequestStreams -# HTTP Handler
// HTTP/1 on QUIC
// client1
// \
// dataEP - QuicConnection -* QuicSession -# ProtocolSession -* RequestStreamN - HttpConnection - HTTP Handler
// /
// client2
// HTTP/3
// client1
// \ /- ControlStream0 - ControlParser for SETTINGS frames, etc.
// dataEP - QuicConnection -* QuicSession -# H3QuicSession -* RequestStreamsEP - H3Connection - HEADERSParser -# HTTP Handler
// / `- InstructionStream - InstructionConnection/Parser
// client2
// H3ProtoSession - QpackEncoder
// H3ProtoSession - QpackDecoder
// H3ProtoSession -* request streams
ProtocolSession protocol = protocolSession;
if (protocol == null)
{
@ -549,12 +527,25 @@ public abstract class QuicSession extends ContainerLifeCycle
}
}
/**
* <p>A listener for {@link QuicSession} events.</p>
*/
public interface Listener extends EventListener
{
/**
* <p>Callback method invoked when a {@link QuicSession} is opened.</p>
*
* @param session the session
*/
public default void onOpened(QuicSession session)
{
}
/**
* <p>Callback method invoked when a {@link QuicSession} is closed.</p>
*
* @param session the session
*/
public default void onClosed(QuicSession session)
{
}

View File

@ -24,6 +24,9 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.component.Graceful;
/**
* <p>A container that tracks {@link QuicSession} instances.</p>
*/
public class QuicSessionContainer extends AbstractLifeCycle implements QuicSession.Listener, Graceful, Dumpable
{
private final Set<QuicSession> sessions = ConcurrentHashMap.newKeySet();

View File

@ -16,6 +16,9 @@ package org.eclipse.jetty.quic.common;
import java.util.HashMap;
import java.util.Map;
/**
* <p>The QUIC stream type, either client or server initiated, and either unidirectional or bidirectional.</p>
*/
public enum StreamType
{
CLIENT_BIDIRECTIONAL(0x00),
@ -29,16 +32,6 @@ public enum StreamType
return Types.types.get(type);
}
public static boolean isUnidirectional(long streamId)
{
return (streamId & 0b01) == 0b01;
}
public static boolean isBidirectional(long streamId)
{
return (streamId & 0b01) == 0b00;
}
public static boolean isReserved(long streamType)
{
// SPEC: reserved stream types follow the formula: 0x1F * N + 0x21.

View File

@ -25,29 +25,53 @@
* <p>The {@link org.eclipse.jetty.io.Connection} associated with each {@link org.eclipse.jetty.quic.common.QuicStreamEndPoint}
* parses the bytes received on the QUIC stream represented by the {@link org.eclipse.jetty.quic.common.QuicStreamEndPoint},
* and generates the bytes to send on that QUIC stream.</p>
* <p>On the client side, the layout of the components in case of HTTP/1.1 could be the following:</p>
* <p>On the client side, the layout of the components in case of HTTP/1.1 is the following:</p>
* <pre>
* DatagramChannelEndPoint -- QuicConnection -- QuicSession -- ProtocolSession -- QuicStreamEndPoint -- HttpConnectionOverHTTP
* DatagramChannelEndPoint -- ClientQuicConnection -- ClientQuicSession -- ClientProtocolSession -- QuicStreamEndPoint -- HttpConnectionOverHTTP
* </pre>
* <p>The client-specific {@link org.eclipse.jetty.quic.common.ProtocolSession} creates a bidirectional QUIC stream
* <p>The client specific {@link org.eclipse.jetty.quic.common.ProtocolSession} creates one bidirectional QUIC stream
* that represent the same transport as a TCP stream, over which HTTP/1.1 bytes are exchanged by the two peers.</p>
* <p>On the server side, the layout of the components in case of HTTP/1.1 could be the following:</p>
* <p>On the client side, the layout of the components in case of HTTP/3 is the following:</p>
* <pre>
* DatagramChannelEndPoint -- ClientQuicConnection -- ClientQuicSession -- ClientHTTP3Session -* QuicStreamEndPoint -- ClientHTTP3StreamConnection
* </pre>
* <p>In this case, the client specific, HTTP/3 specific, {@link org.eclipse.jetty.quic.common.ProtocolSession} creates
* and manages zero or more bidirectional QUIC streams, over which HTTP/3 bytes are exchanged by the two peers.</p>
* <p>On the server side, the layout of the components in case of HTTP/1.1 is the following:</p>
* <pre>
* CLIENT | SERVER
*
* clientA QuicSessionA -- ProtocolSessionA -- QuicStreamEndPointA -- HttpConnection
* \ /
* DatagramChannelEndPoint -- QuicConnection
* / \
* clientB QuicSessionB -- ProtocolSessionB -- QuicStreamEndPointB -- HttpConnection
* clientA ServerQuicSessionA -- ServerProtocolSessionA -- QuicStreamEndPointA -- HttpConnection
* \ /
* DatagramChannelEndPoint -- ServerQuicConnection
* / \
* clientB ServerQuicSessionB -- ServerProtocolSessionB -- QuicStreamEndPointB -- HttpConnection
* </pre>
* <p>The {@code DatagramChannelEndPoint} receives UDP datagrams from clients.</p>
* <p>{@code QuicConnection} processes the incoming datagram bytes creating a {@code QuicSession} for every
* QUIC connection ID sent by the clients.</p>
* <p>The {@code DatagramChannelEndPoint} listens on the server port and receives UDP datagrams from all clients.</p>
* <p>The server side {@code QuicConnection} processes the incoming datagram bytes creating a {@code QuicSession} for
* every QUIC connection ID sent by the clients.</p>
* <p>The clients have created a single QUIC stream to send HTTP/1.1 requests, which results in the
* {@code QuicSession}s to create a correspondent {@code QuicStreamEndPoint} with its associated {@code HttpConnection}.</p>
* <p>The path {@code DatagramChannelEndPoint - QuicConnection - QuicSession - QuicStreamEndPoint}
* <p>The path {@code DatagramChannelEndPoint - ServerQuicConnection - ServerQuicSession - ServerProtocolSession - QuicStreamEndPoint}
* behaves exactly like a TCP {@link org.eclipse.jetty.io.SocketChannelEndPoint} for the associated
* {@code HttpConnection}.</p>
* <p>On the server side, the layout of the components in case of HTTP/3 is the following:</p>
* <pre>
* CLIENT | SERVER
* clientA ServerQuicSessionA -# ServerProtocolSessionA -- QuicStreamEndPointA1 -- ServerHTTP3StreamConnection
* \ / \ QuicStreamEndPointA2 -- ServerHTTP3StreamConnection
* DatagramChannelEndPoint -- ServerQuicConnection
* / \ / QuicStreamEndPointB1 -- ServerHTTP3StreamConnection
* clientB ServerQuicSessionB -# ServerProtocolSessionB -- QuicStreamEndPointB2 -- ServerHTTP3StreamConnection
* </pre>
* <p>In this case, the server specific, HTTP/3 specific, {@link org.eclipse.jetty.quic.common.ProtocolSession} creates
* and manages zero or more bidirectional QUIC streams, created by the clients, over which HTTP/3 bytes are exchanged
* by the two peers.</p>
* <p>In a more compact representation, the server side layout is the following:</p>
* <pre>
* DatagramChannelEndPoint -- ServerQuicConnection -* ServerQuicSession -# ServerProtocolSession -* QuicStreamEndPoint -- ServerHTTP3StreamConnection
* </pre>
* where {@code --} represents a 1-1 relationship, {@code -*} represents a 1-N relationship, and {@code -#} represents the
* place where a new thread is dispatched to process different QUIC connection IDs so that they can be processed in parallel,
* as it would naturally happen with TCP (which has a "thread per active connection" model).
*/
package org.eclipse.jetty.quic.common;

View File

@ -32,7 +32,9 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.quic.common.QuicConfiguration;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicSessionContainer;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConfig;
import org.eclipse.jetty.quic.quiche.SSLKeyPair;
import org.eclipse.jetty.server.AbstractNetworkConnector;
@ -42,6 +44,15 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>A server side network connector that uses a {@link DatagramChannel} to listen on a network port for QUIC traffic.</p>
* <p>This connector uses {@link ConnectionFactory}s to configure the protocols to support.
* The protocol is negotiated during the connection establishment by {@link QuicSession}, and for each QUIC stream
* managed by a {@link QuicSession} a {@link ConnectionFactory} is used to create a {@link Connection} for the
* correspondent {@link QuicStreamEndPoint}.</p>
*
* @see QuicConfiguration
*/
public class QuicServerConnector extends AbstractNetworkConnector
{
private final QuicConfiguration quicConfiguration = new QuicConfiguration();

View File

@ -21,6 +21,9 @@ import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Server specific implementation of {@link ProtocolSession}.</p>
*/
public class ServerProtocolSession extends ProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ServerProtocolSession.class);