Issue #6728 - QUIC and HTTP/3

- Added javadocs.
- Fixed race condition in processDataDemand().

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-24 15:22:20 +02:00
parent c044400d80
commit 4bee790c04
15 changed files with 459 additions and 97 deletions

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -97,10 +98,13 @@ public class ClientHTTP3Session extends ClientProtocolSession
settings = Map.of(); settings = Map.of();
// TODO: add default settings. // TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings); SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP); controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, applicationSession::onOpen, this::fail));
controlFlusher.iterate(); controlFlusher.iterate();
}
applicationSession.onOpen(); private void fail(Throwable failure)
{
// TODO: must close the connection.
} }
private QuicStreamEndPoint configureInstructionEndPoint(long streamId) private QuicStreamEndPoint configureInstructionEndPoint(long streamId)

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.http3.client.internal;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
@ -47,18 +48,44 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
return (ClientHTTP3Session)super.getProtocolSession(); return (ClientHTTP3Session)super.getProtocolSession();
} }
@Override
protected void writeFrame(long streamId, Frame frame, Callback callback)
{
getProtocolSession().writeFrame(streamId, frame, callback);
}
@Override @Override
public void onOpen() public void onOpen()
{ {
promise.succeeded(this); promise.succeeded(this);
} }
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
MetaData metaData = frame.getMetaData();
if (metaData.isResponse())
{
if (LOG.isDebugEnabled())
LOG.debug("received response {}#{} on {}", frame, streamId, this);
notifyResponse(stream, frame);
}
else
{
super.onHeaders(streamId, frame);
}
}
private void notifyResponse(HTTP3Stream stream, HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
try
{
if (listener != null)
listener.onResponse(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override @Override
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener) public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener)
{ {
@ -76,4 +103,10 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
session.writeFrame(streamId, frame, callback); session.writeFrame(streamId, frame, callback);
return promise; return promise;
} }
@Override
protected void writeFrame(long streamId, Frame frame, Callback callback)
{
getProtocolSession().writeFrame(streamId, frame, callback);
}
} }

View File

@ -13,45 +13,162 @@
package org.eclipse.jetty.http3.api; package org.eclipse.jetty.http3.api;
import java.net.SocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
/**
* <p>The low-level HTTP/3 API representing a connection with a remote peer.</p>
* <p>A {@link Session} is the active part of the connection, and by calling its APIs
* applications can generate events on the connection.</p>
* <p>Conversely, {@link Session.Listener} is the passive part of the connection,
* and has callback methods that are invoked when events happen on the connection.</p>
*
* @see Client
* @see Server
* @see Listener
*/
public interface Session public interface Session
{ {
/**
* @return the local socket address this session is bound to
*/
public default SocketAddress getLocalSocketAddress()
{
return null;
}
/**
* @return the remote socket address this session is connected to
*/
public default SocketAddress getRemoteSocketAddress()
{
return null;
}
/**
* <p>The client-side HTTP/3 API representing a connection with a server.</p>
* <p>Once a {@link Session} has been obtained, it can be used to make HTTP/3 requests:</p>
* <pre>
* Session session = ...;
* HeadersFrame headersFrame = ...;
* session.newRequest(headersFrame, new Stream.Listener()
* {
* &#64;Override
* public void onResponse(Stream stream, HeadersFrame frame)
* {
* // Response headers received.
* }
* });
* </pre>
*
* @see Stream
* @see Stream.Listener
*/
public interface Client public interface Client
{ {
/**
* <p>Makes a request by creating a HTTP/3 stream and sending the given HEADERS frame.</p>
*
* @param frame the HEADERS frame containing the HTTP request headers
* @param listener the listener that gets notified of stream events
* @return a CompletableFuture that is notified of the stream creation
*/
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener); public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener);
/**
* <p>The client-side specific {@link Session.Listener}.</p>
*/
public interface Listener extends Session.Listener public interface Listener extends Session.Listener
{ {
} }
} }
/**
* <p>The server-side HTTP/3 API representing a connection with a client.</p>
* <p>To receive HTTP/3 request events, see {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.</p>
*/
public interface Server public interface Server
{ {
/**
* <p>The server-side specific {@link Session.Listener}.</p>
*/
public interface Listener extends Session.Listener public interface Listener extends Session.Listener
{ {
// TODO: accept event. /**
* <p>Callback method invoked when a connection has been accepted by the server.</p>
*
* @param session the session
*/
public default void onAccept(Session session)
{
}
/**
* <p>Callback method invoked when a request is received.</p>
* <p>Applications should implement this method to process HTTP/3 requests,
* typically providing an HTTP/3 response via {@link Stream#respond(HeadersFrame)}:</p>
* <pre>
* class MyServer implements Session.Server.Listener
* {
* &#64;Override
* public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
* {
* // Send a response.
* var response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
* stream.respond(new HeadersFrame(response, true));
* }
* }
* </pre>
* <p>To read request content, applications should call
* {@link Stream#demand()} and return a {@link Stream.Listener} that overrides
* {@link Stream.Listener#onDataAvailable(Stream)}.</p>
*
* @param stream the stream associated with the request
* @param frame the HEADERS frame containing the request headers
* @return a {@link Stream.Listener} that will be notified of stream events
* @see Stream.Listener#onDataAvailable(Stream)
*/
public default Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return null;
}
} }
} }
/**
* <p>A {@link Listener} is the passive counterpart of a {@link Session} and
* receives events happening on an HTTP/3 connection.</p>
*
* @see Session
*/
public interface Listener public interface Listener
{ {
/**
* <p>Callback method invoked just before the initial SETTINGS frame is sent
* to the remote peer, to gather the configuration settings that the local
* peer wants to send to the remote peer.</p>
*
* @param session the session
* @return a (possibly empty or null) map containing configuration
* settings to send to the remote peer.
*/
public default Map<Long, Long> onPreface(Session session) public default Map<Long, Long> onPreface(Session session)
{ {
return null; return null;
} }
/**
* <p>Callback method invoked when a SETTINGS frame has been received.</p>
*
* @param session the session
* @param frame the SETTINGS frame received
*/
public default void onSettings(Session session, SettingsFrame frame) public default void onSettings(Session session, SettingsFrame frame)
{ {
} }
public default Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return null;
}
} }
} }

View File

@ -13,38 +13,191 @@
package org.eclipse.jetty.http3.api; package org.eclipse.jetty.http3.api;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
/**
* <p>A {@link Stream} represents a bidirectional exchange of data within a {@link Session}.</p>
* <p>A {@link Stream} maps to an HTTP/3 request/response cycle, and after the request/response
* cycle is completed, the stream is closed and removed from the {@link Session}.</p>
* <p>Like {@link Session}, {@link Stream} is the active part and by calling its API applications
* can generate events on the stream; conversely, {@link Stream.Listener} is the passive part, and
* its callbacks are invoked when events happen on the stream.</p>
*
* @see Stream.Listener
*/
public interface Stream public interface Stream
{ {
/**
* <p>Responds to a request performed via {@link Session.Client#newRequest(HeadersFrame, Listener)},
* sending the given HEADERS frame containing the response status code and response headers.</p>
*
* @param frame the HEADERS frame containing the response headers
* @return the {@link CompletableFuture} that gets notified when the frame has been sent
*/
public CompletableFuture<Stream> respond(HeadersFrame frame); public CompletableFuture<Stream> respond(HeadersFrame frame);
public CompletableFuture<Stream> data(DataFrame dataFrame); /**
* <p>Sends the given DATA frame containing some or all the bytes
* of the request content or of the response content.</p>
*
* @param frame the DATA frame containing some or all the bytes of the request or of the response.
* @return the {@link CompletableFuture} that gets notified when the frame has been sent
*/
public CompletableFuture<Stream> data(DataFrame frame);
/**
* <p>Reads request content bytes or response content bytes.</p>
* <p>The returned {@link Stream.Data} object may be {@code null}, indicating
* that the end of the read side of the stream has not yet been reached, which
* may happen in these cases:</p>
* <ul>
* <li>not all the bytes have been received so far, and a further attempt
* to call this method returns {@code null} because the rest of the bytes
* are not yet available (for example, the remote peer did not send them
* yet, or they are in-flight)</li>
* <li>all the bytes have been received, but there is a trailer HEADERS
* frame to be received to indicate the end of the read side of the
* stream.</li>
* </ul>
* <p>When the returned {@link Stream.Data} object is not {@code null},
* applications <em>should</em> call {@link Stream.Data#complete()} to
* notify the implementation that the bytes have been processed.
* This allows the implementation to perform better, for example by
* recycling the {@link Stream.Data} object's {@link ByteBuffer}.</p>
* <p>{@link Stream.Data} objects may be stored away for later, asynchronous,
* processing (for example, to process them only when all of them have been
* received).</p>
*
* @return a {@link Stream.Data} object containing the request bytes or the response bytes
* @see Stream.Listener#onDataAvailable(Stream)
*/
public Stream.Data readData(); public Stream.Data readData();
/**
* <p>Causes {@link Stream.Listener#onDataAvailable(Stream)} to be invoked,
* possibly at a later time, when the stream has data to be read.</p>
* <p>This method is idempotent: calling it when there already is an
* outstanding demand to invoke {@link Stream.Listener#onDataAvailable(Stream)}
* is a no-operation.</p>
* <p>The thread invoking this method may invoke directly
* {@link Stream.Listener#onDataAvailable(Stream)}, unless another thread
* that must invoke {@link Stream.Listener#onDataAvailable(Stream)}
* notices the outstanding demand first.</p>
* <p>When all bytes have been read (via {@link #readData()}), further
* invocations of this method are a no-operation.</p>
* <p>It is always guaranteed that invoking this method from within
* {@link Stream.Listener#onDataAvailable(Stream)} will not cause a
* {@link StackOverflowError}.</p>
*
* @see #readData()
* @see Stream.Listener#onDataAvailable(Stream)
*/
public void demand(); public void demand();
/**
* <p> Sends the given HEADERS frame containing the trailer headers.</p>
*
* @param frame the HEADERS frame containing the trailer headers
* @return the {@link CompletableFuture} that gets notified when the frame has been sent
*/
public CompletableFuture<Stream> trailer(HeadersFrame frame); public CompletableFuture<Stream> trailer(HeadersFrame frame);
/**
* <p>A {@link Stream.Listener} is the passive counterpart of a {@link Stream} and receives
* events happening on an HTTP/3 stream.</p>
*
* @see Stream
*/
public interface Listener public interface Listener
{ {
/**
* <p>Callback method invoked when a response is received.</p>
* <p>To read response content, applications should call
* {@link Stream#demand()} and override
* {@link Stream.Listener#onDataAvailable(Stream)}.</p>
*
* @param stream the stream
* @param frame the HEADERS frame containing the response headers
* @see Stream.Listener#onDataAvailable(Stream)
*/
public default void onResponse(Stream stream, HeadersFrame frame) public default void onResponse(Stream stream, HeadersFrame frame)
{ {
} }
/**
* <p>Callback method invoked if the application has expressed
* {@link Stream#demand() demand} for content, and if there is
* content available.</p>
* <p>A server application that wishes to handle request content
* should typically call {@link Stream#demand()} from
* {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.</p>
* <p>A client application that wishes to handle response content
* should typically call {@link Stream#demand()} from
* {@link #onResponse(Stream, HeadersFrame)}.</p>
* <p>Just prior calling this method, the outstanding demand is
* cancelled; applications that implement this method should read
* content calling {@link Stream#readData()}, and call
* {@link Stream#demand()} to signal to the implementation to call
* again this method when there is more content available.</p>
* <p>Only one thread at a time invokes this method, although it
* may not be the same thread across different invocations.</p>
* <p>It is always guaranteed that invoking {@link Stream#demand()}
* from within this method will not cause a {@link StackOverflowError}.</p>
* <p>Typical usage:</p>
* <pre>
* class MyStreamListener implements Stream.Listener
* {
* &#64;Override
* public void onDataAvailable(Stream stream)
* {
* // Read a chunk of the content.
* Stream.Data data = stream.readData();
* if (data == null)
* {
* // No data available now, demand to be called back.
* stream.demand();
* }
* else
* {
* // Process the content.
* process(data.getByteBuffer());
* // Notify that the content has been consumed.
* data.complete();
* if (!data.isLast())
* {
* // Demand to be called back.
* stream.demand();
* }
* }
* }
* }
* </pre>
*
* @param stream the stream
*/
public default void onDataAvailable(Stream stream) public default void onDataAvailable(Stream stream)
{ {
} }
/**
* <p>Callback method invoked when a trailer is received.</p>
*
* @param stream the stream
* @param frame the HEADERS frame containing the trailer headers
*/
public default void onTrailer(Stream stream, HeadersFrame frame) public default void onTrailer(Stream stream, HeadersFrame frame)
{ {
} }
} }
/**
* * <p>The returned {@link Stream.Data} object associates the
* * {@link ByteBuffer} containing the bytes with a completion </p>
*/
public static class Data public static class Data
{ {
private final DataFrame frame; private final DataFrame frame;
@ -56,9 +209,14 @@ public interface Stream
this.complete = complete; this.complete = complete;
} }
public DataFrame frame() public ByteBuffer getByteBuffer()
{ {
return frame; return frame.getByteBuffer();
}
public boolean isLast()
{
return frame.isLast();
} }
public void complete() public void complete()

View File

@ -27,7 +27,7 @@ public class DataFrame extends Frame
this.last = last; this.last = last;
} }
public ByteBuffer getData() public ByteBuffer getByteBuffer()
{ {
return data; return data;
} }
@ -40,6 +40,6 @@ public class DataFrame extends Frame
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), getData().remaining()); return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), getByteBuffer().remaining());
} }
} }

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.internal; package org.eclipse.jetty.http3.internal;
import java.net.SocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -44,13 +45,30 @@ public abstract class HTTP3Session implements Session, ParserListener
this.listener = listener; this.listener = listener;
} }
public ProtocolSession getProtocolSession()
{
return session;
}
public Listener getListener()
{
return listener;
}
public void onOpen() public void onOpen()
{ {
} }
public ProtocolSession getProtocolSession() @Override
public SocketAddress getLocalSocketAddress()
{ {
return session; return getProtocolSession().getQuicSession().getLocalAddress();
}
@Override
public SocketAddress getRemoteSocketAddress()
{
return getProtocolSession().getQuicSession().getRemoteAddress();
} }
protected HTTP3Stream createStream(QuicStreamEndPoint endPoint) protected HTTP3Stream createStream(QuicStreamEndPoint endPoint)
@ -120,20 +138,10 @@ public abstract class HTTP3Session implements Session, ParserListener
{ {
QuicStreamEndPoint endPoint = session.getStreamEndPoint(streamId); QuicStreamEndPoint endPoint = session.getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint); HTTP3Stream stream = getOrCreateStream(endPoint);
MetaData metaData = frame.getMetaData(); MetaData metaData = frame.getMetaData();
if (metaData.isRequest()) if (metaData.isRequest() || metaData.isResponse())
{ {
if (LOG.isDebugEnabled()) throw new IllegalStateException("invalid metadata");
LOG.debug("received request {}#{} on {}", frame, streamId, this);
Stream.Listener streamListener = notifyRequest(stream, frame);
stream.setListener(streamListener);
}
else if (metaData.isResponse())
{
if (LOG.isDebugEnabled())
LOG.debug("received response {}#{} on {}", frame, streamId, this);
notifyResponse(stream, frame);
} }
else else
{ {
@ -143,33 +151,6 @@ public abstract class HTTP3Session implements Session, ParserListener
} }
} }
private Stream.Listener notifyRequest(HTTP3Stream stream, HeadersFrame frame)
{
try
{
return listener.onRequest(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return null;
}
}
private void notifyResponse(HTTP3Stream stream, HeadersFrame frame)
{
try
{
Stream.Listener listener = stream.getListener();
if (listener != null)
listener.onResponse(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
private void notifyTrailer(HTTP3Stream stream, HeadersFrame frame) private void notifyTrailer(HTTP3Stream stream, HeadersFrame frame)
{ {
try try

View File

@ -177,6 +177,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
process = true; process = true;
} }
} }
if (LOG.isDebugEnabled())
LOG.debug("demand, wasStalled={} on {}", process, this);
if (process) if (process)
processDataDemand(); processDataDemand();
} }
@ -201,31 +203,30 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{ {
while (true) while (true)
{ {
boolean demand; boolean process = true;
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
{ {
if (LOG.isDebugEnabled())
LOG.debug("processing demand={}, last={} fillInterested={} on {}", dataDemand, dataLast, isFillInterested(), this);
if (dataDemand) if (dataDemand)
{ {
demand = !dataLast; // Do not process if the last frame was already
// notified, or if there is demand but no data.
if (dataLast || isFillInterested())
process = false;
else
dataDemand = false;
} }
else else
{ {
dataStalled = true; dataStalled = true;
demand = false; process = false;
} }
} }
if (LOG.isDebugEnabled())
LOG.debug("processing demand={} fillInterested={} on {}", demand, isFillInterested(), this);
// Exit if there is no demand, or there is demand but no data. if (!process)
if (!demand || isFillInterested())
return; return;
// We have demand, notify the application.
try (AutoLock l = lock.lock())
{
dataDemand = false;
}
onDataAvailable(getEndPoint().getStreamId()); onDataAvailable(getEndPoint().getStreamId());
} }
} }
@ -244,6 +245,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{ {
ByteBuffer byteBuffer = buffer.getBuffer(); ByteBuffer byteBuffer = buffer.getBuffer();
MessageParser.Result result = parser.parse(byteBuffer); MessageParser.Result result = parser.parse(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} on {} with buffer {}", result, this, buffer);
if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH) if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH)
return result; return result;

View File

@ -32,7 +32,7 @@ public class DataGenerator extends FrameGenerator
private int generateDataFrame(ByteBufferPool.Lease lease, DataFrame frame) private int generateDataFrame(ByteBufferPool.Lease lease, DataFrame frame)
{ {
ByteBuffer data = frame.getData(); ByteBuffer data = frame.getByteBuffer();
int dataLength = data.remaining(); int dataLength = data.remaining();
int headerLength = VarLenInt.length(FrameType.DATA.type()) + VarLenInt.length(dataLength); int headerLength = VarLenInt.length(FrameType.DATA.type()) + VarLenInt.length(dataLength);
ByteBuffer header = ByteBuffer.allocate(headerLength); ByteBuffer header = ByteBuffer.allocate(headerLength);

View File

@ -99,7 +99,7 @@ public class DataBodyParser extends BodyParser
{ {
DataFrame frame = new DataFrame(buffer, isLast.getAsBoolean()); DataFrame frame = new DataFrame(buffer, isLast.getAsBoolean());
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("notifying synthetic={} {}#{}", fragment, frame, streamId); LOG.debug("notifying fragment={} {}#{} remaining={}", fragment, frame, streamId, length);
notifyData(frame); notifyData(frame);
} }

View File

@ -113,7 +113,7 @@ public class MessageParser
if (result == BodyParser.Result.NO_FRAME) if (result == BodyParser.Result.NO_FRAME)
return Result.NO_FRAME; return Result.NO_FRAME;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Parsed unknown frame body for type {}", Integer.toHexString(frameType)); LOG.debug("parsed unknown frame body for type {}", Integer.toHexString(frameType));
if (result == BodyParser.Result.WHOLE_FRAME) if (result == BodyParser.Result.WHOLE_FRAME)
reset(); reset();
break; break;
@ -124,7 +124,7 @@ public class MessageParser
{ {
bodyParser.emptyBody(buffer); bodyParser.emptyBody(buffer);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
reset(); reset();
return Result.FRAME; return Result.FRAME;
} }
@ -134,7 +134,7 @@ public class MessageParser
if (result == BodyParser.Result.NO_FRAME) if (result == BodyParser.Result.NO_FRAME)
return Result.NO_FRAME; return Result.NO_FRAME;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
if (result == BodyParser.Result.WHOLE_FRAME) if (result == BodyParser.Result.WHOLE_FRAME)
reset(); reset();
return Result.FRAME; return Result.FRAME;

View File

@ -75,8 +75,8 @@ public class DataGenerateParseTest
assertEquals(1, frames.size()); assertEquals(1, frames.size());
DataFrame output = frames.get(0); DataFrame output = frames.get(0);
byte[] outputBytes = new byte[output.getData().remaining()]; byte[] outputBytes = new byte[output.getByteBuffer().remaining()];
output.getData().get(outputBytes); output.getByteBuffer().get(outputBytes);
assertArrayEquals(inputBytes, outputBytes); assertArrayEquals(inputBytes, outputBytes);
} }
} }

View File

@ -13,9 +13,14 @@
package org.eclipse.jetty.http3.server.internal; package org.eclipse.jetty.http3.server.internal;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -29,15 +34,74 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
super(session, listener); super(session, listener);
} }
@Override
public void onOpen()
{
super.onOpen();
notifyAccept();
}
@Override @Override
public ServerHTTP3Session getProtocolSession() public ServerHTTP3Session getProtocolSession()
{ {
return (ServerHTTP3Session)super.getProtocolSession(); return (ServerHTTP3Session)super.getProtocolSession();
} }
@Override
public Session.Server.Listener getListener()
{
return (Session.Server.Listener)super.getListener();
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
if (LOG.isDebugEnabled())
LOG.debug("received request {}#{} on {}", frame, streamId, this);
Stream.Listener streamListener = notifyRequest(stream, frame);
stream.setListener(streamListener);
}
else
{
super.onHeaders(streamId, frame);
}
}
private Stream.Listener notifyRequest(HTTP3Stream stream, HeadersFrame frame)
{
Server.Listener listener = getListener();
try
{
return listener.onRequest(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return null;
}
}
@Override @Override
protected void writeFrame(long streamId, Frame frame, Callback callback) protected void writeFrame(long streamId, Frame frame, Callback callback)
{ {
getProtocolSession().writeFrame(streamId, frame, callback); getProtocolSession().writeFrame(streamId, frame, callback);
} }
private void notifyAccept()
{
Server.Listener listener = getListener();
try
{
listener.onAccept(this);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
} }

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.quic.server.ServerProtocolSession; import org.eclipse.jetty.quic.server.ServerProtocolSession;
import org.eclipse.jetty.quic.server.ServerQuicSession; import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -96,10 +97,13 @@ public class ServerHTTP3Session extends ServerProtocolSession
settings = Map.of(); settings = Map.of();
// TODO: add default settings. // TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings); SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP); controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, applicationSession::onOpen, this::fail));
controlFlusher.iterate(); controlFlusher.iterate();
}
applicationSession.onOpen(); private void fail(Throwable failure)
{
// TODO: must close the connection.
} }
private QuicStreamEndPoint configureInstructionEndPoint(long streamId) private QuicStreamEndPoint configureInstructionEndPoint(long streamId)

View File

@ -152,7 +152,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
data.complete(); data.complete();
// Call me again immediately. // Call me again immediately.
stream.demand(); stream.demand();
if (data.frame().isLast()) if (data.isLast())
serverLatch.get().countDown(); serverLatch.get().countDown();
} }
}; };
@ -219,7 +219,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
return; return;
} }
// Echo it back, then demand only when the write is finished. // Echo it back, then demand only when the write is finished.
stream.data(data.frame()) stream.data(new DataFrame(data.getByteBuffer(), data.isLast()))
// Always complete. // Always complete.
.whenComplete((s, x) -> data.complete()) .whenComplete((s, x) -> data.complete())
// Demand only if successful. // Demand only if successful.
@ -259,9 +259,9 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
if (data != null) if (data != null)
{ {
// Consume data. // Consume data.
byteBuffer.put(data.frame().getData()); byteBuffer.put(data.getByteBuffer());
data.complete(); data.complete();
if (data.frame().isLast()) if (data.isLast())
clientDataLatch.countDown(); clientDataLatch.countDown();
} }
// Demand more data. // Demand more data.

View File

@ -42,7 +42,6 @@ import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -76,7 +75,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
{ {
// When resumed, demand all content until the last. // When resumed, demand all content until the last.
Stream.Data data = stream.readData(); Stream.Data data = stream.readData();
if (data != null && data.frame().isLast()) if (data != null && data.isLast())
serverDataLatch.countDown(); serverDataLatch.countDown();
else else
stream.demand(); stream.demand();
@ -128,17 +127,16 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
onDataAvailableCalls.incrementAndGet(); onDataAvailableCalls.incrementAndGet();
if (serverStreamRef.compareAndSet(null, stream)) if (serverStreamRef.compareAndSet(null, stream))
{ {
serverStreamLatch.countDown();
// Read only one chunk of data. // Read only one chunk of data.
Stream.Data data = stream.readData(); await().atMost(1, TimeUnit.SECONDS).until(() -> stream.readData() != null);
assertNotNull(data); serverStreamLatch.countDown();
// Don't demand, just exit. // Don't demand, just exit.
} }
else else
{ {
// When resumed, demand all content until the last. // When resumed, demand all content until the last.
Stream.Data data = stream.readData(); Stream.Data data = stream.readData();
if (data != null && data.frame().isLast()) if (data != null && data.isLast())
serverDataLatch.countDown(); serverDataLatch.countDown();
else else
stream.demand(); stream.demand();
@ -210,7 +208,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
{ {
// When resumed, demand all content until the last. // When resumed, demand all content until the last.
Stream.Data data = stream.readData(); Stream.Data data = stream.readData();
if (data != null && data.frame().isLast()) if (data != null && data.isLast())
serverDataLatch.countDown(); serverDataLatch.countDown();
else else
stream.demand(); stream.demand();
@ -321,7 +319,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
Stream.Data data = stream.readData(); Stream.Data data = stream.readData();
if (data != null) if (data != null)
{ {
if (dataRead.addAndGet(data.frame().getData().remaining()) == dataLength) if (dataRead.addAndGet(data.getByteBuffer().remaining()) == dataLength)
serverDataLatch.countDown(); serverDataLatch.countDown();
} }
stream.demand(); stream.demand();
@ -385,7 +383,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
} }
// Store the Data away to be used later. // Store the Data away to be used later.
datas.add(data); datas.add(data);
if (data.frame().isLast()) if (data.isLast())
serverDataLatch.countDown(); serverDataLatch.countDown();
} }
} }
@ -408,10 +406,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
assertEquals(bytesSent.length, datas.stream().mapToInt(d -> d.frame().getData().remaining()).sum()); assertEquals(bytesSent.length, datas.stream().mapToInt(d -> d.getByteBuffer().remaining()).sum());
byte[] bytesReceived = new byte[bytesSent.length]; byte[] bytesReceived = new byte[bytesSent.length];
ByteBuffer buffer = ByteBuffer.wrap(bytesReceived); ByteBuffer buffer = ByteBuffer.wrap(bytesReceived);
datas.forEach(d -> buffer.put(d.frame().getData())); datas.forEach(d -> buffer.put(d.getByteBuffer()));
assertArrayEquals(bytesSent, bytesReceived); assertArrayEquals(bytesSent, bytesReceived);
} }
@ -437,7 +435,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
{ {
onDataAvailableCalls.incrementAndGet(); onDataAvailableCalls.incrementAndGet();
Stream.Data data = stream.readData(); Stream.Data data = stream.readData();
if (data != null && data.frame().isLast()) if (data != null && data.isLast())
serverDataLatch.countDown(); serverDataLatch.countDown();
stream.demand(); stream.demand();
} }