Issue #6728 - QUIC and HTTP/3

- Changed HTTP/3 parsers to return the Frame rather than notifying a listener.
- Correctly linked encoder and decoder streams to respectively QpackEncoder and QpackDecoder.
- Fixed buffer release.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-16 10:40:04 +02:00
parent f63a7efc5a
commit 7cb32b2d98
45 changed files with 660 additions and 397 deletions

View File

@ -77,7 +77,7 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
long streamId = streamEndPoint.getStreamId();
ClientHTTP3Session http3Session = (ClientHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
// TODO: Parser may be created internally, if I pass the QuicStreamEndPoint and ClientHTTP3Session.
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder(), http3Session.getSessionClient());
return new HTTP3Connection(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder());
return new HTTP3Connection(streamEndPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser, http3Session.getSessionClient());
}
}

View File

@ -19,10 +19,12 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderConnection;
import org.eclipse.jetty.http3.internal.EncoderConnection;
import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.StreamConnection;
import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.quic.client.ClientProtocolSession;
@ -51,19 +53,28 @@ public class ClientHTTP3Session extends ClientProtocolSession
super(session);
this.apiSession = new HTTP3SessionClient(this, listener, promise);
if (LOG.isDebugEnabled())
LOG.debug("initializing HTTP/3 streams");
long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
this.encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint);
this.encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
this.decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint);
this.decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxResponseHeadersSize);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
// TODO: make parameters configurable.
this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true);
@ -112,20 +123,22 @@ public class ClientHTTP3Session extends ClientProtocolSession
StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read", readableStreamId);
super.onReadable(readableStreamId);
}
else
{
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureStreamEndPoint);
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream {} selected endpoint for read: {}", readableStreamId, streamEndPoint);
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
}
}
private void configureStreamEndPoint(QuicStreamEndPoint endPoint)
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{
StreamConnection connection = new StreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), apiSession);
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, apiSession);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();

View File

@ -21,6 +21,7 @@ import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
@ -62,9 +63,12 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
{
ClientHTTP3Session session = getProtocolSession();
long streamId = session.getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
QuicStreamEndPoint streamEndPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("created request/response stream #{} on {}", streamId, streamEndPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>();
HTTP3Stream stream = createStream(streamId);
HTTP3Stream stream = createStream(streamEndPoint);
stream.setListener(listener);
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), promise::failed);

View File

@ -27,10 +27,22 @@ public interface Stream
{
public default void onResponse(Stream stream, HeadersFrame frame)
{
}
public default void onData(Stream stream, DataFrame frame, Callback callback)
{
// TODO: alternative API
// public void onDataAvailable(Stream s)
// {
// while (true)
// {
// DataFrame frame = s.pollData();
// if (frame == null)
// return;
// process(frame);
// }
// }
}
public default void onTrailer(Stream stream, HeadersFrame frame)

View File

@ -18,11 +18,13 @@ import java.nio.ByteBuffer;
public class DataFrame extends Frame
{
private final ByteBuffer data;
private final boolean last;
public DataFrame(ByteBuffer data)
public DataFrame(ByteBuffer data, boolean last)
{
super(FrameType.DATA);
this.data = data;
this.last = last;
}
public ByteBuffer getData()
@ -30,9 +32,19 @@ public class DataFrame extends Frame
return data;
}
public boolean isLast()
{
return last;
}
@Override
public String toString()
{
return String.format("%s[length:%d]", super.toString(), getData().remaining());
return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), getData().remaining());
}
public DataFrame withLast(boolean last)
{
return new DataFrame(data, last);
}
}

View File

@ -18,11 +18,13 @@ import org.eclipse.jetty.http.MetaData;
public class HeadersFrame extends Frame
{
private final MetaData metaData;
private final boolean last;
public HeadersFrame(MetaData metaData)
public HeadersFrame(MetaData metaData, boolean last)
{
super(FrameType.HEADERS);
this.metaData = metaData;
this.last = last;
}
public MetaData getMetaData()
@ -30,9 +32,19 @@ public class HeadersFrame extends Frame
return metaData;
}
public boolean isLast()
{
return last;
}
@Override
public String toString()
{
return String.format("%s[%s]", super.toString(), getMetaData());
return String.format("%s[last=%b,%s]", super.toString(), isLast(), getMetaData());
}
public HeadersFrame withLast(boolean last)
{
return new HeadersFrame(metaData, last);
}
}

View File

@ -16,7 +16,11 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -33,14 +37,16 @@ public class ControlConnection extends AbstractConnection implements Connection.
private final ByteBufferPool byteBufferPool;
private final ControlParser parser;
private final ParserListener listener;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public ControlConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser)
public ControlConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser, ParserListener listener)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.listener = listener;
}
public boolean isUseInputDirectByteBuffers()
@ -80,10 +86,17 @@ public class ControlConnection extends AbstractConnection implements Connection.
{
if (buffer == null)
buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
// Parse first in case of bytes from the upgrade.
parser.parse(buffer);
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame == null)
break;
notifyFrame(frame);
}
// Then read from the EndPoint.
int filled = getEndPoint().fill(buffer);
@ -93,6 +106,7 @@ public class ControlConnection extends AbstractConnection implements Connection.
if (filled == 0)
{
byteBufferPool.release(buffer);
buffer = null;
fillInterested();
break;
}
@ -114,4 +128,33 @@ public class ControlConnection extends AbstractConnection implements Connection.
getEndPoint().close(x);
}
}
private void notifyFrame(Frame frame)
{
FrameType frameType = frame.getFrameType();
switch (frameType)
{
case SETTINGS:
{
notifySettings((SettingsFrame)frame);
break;
}
default:
{
throw new UnsupportedOperationException("unsupported frame type " + frameType);
}
}
}
private void notifySettings(SettingsFrame frame)
{
try
{
listener.onSettings(frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
}

View File

@ -99,7 +99,7 @@ public class ControlFlusher extends IteratingCallback
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to flush {} on {}", entries, this);
LOG.debug("succeeded to write {} on {}", entries, this);
lease.recycle();
@ -115,7 +115,7 @@ public class ControlFlusher extends IteratingCallback
protected void onCompleteFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to flush {} on {}", entries, this, failure);
LOG.debug("failed to write {} on {}", entries, this, failure);
lease.recycle();

View File

@ -13,42 +13,30 @@
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
public class DecoderConnection extends AbstractConnection
public class DecoderConnection extends InstructionConnection
{
// SPEC: QPACK Encoder Stream Type.
public static final int STREAM_TYPE = 0x03;
private boolean useInputDirectByteBuffers = true;
private final QpackEncoder encoder;
public DecoderConnection(EndPoint endPoint, Executor executor)
public DecoderConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder)
{
super(endPoint, executor);
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
super(endPoint, executor, byteBufferPool);
this.encoder = encoder;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
protected void parseInstruction(ByteBuffer buffer) throws QpackException
{
encoder.parseInstructionBuffer(buffer);
}
}

View File

@ -13,42 +13,30 @@
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
public class EncoderConnection extends AbstractConnection
public class EncoderConnection extends InstructionConnection
{
// SPEC: QPACK Encoder Stream Type.
public static final int STREAM_TYPE = 0x02;
private boolean useInputDirectByteBuffers = true;
private final QpackDecoder decoder;
public EncoderConnection(EndPoint endPoint, Executor executor)
public EncoderConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder)
{
super(endPoint, executor);
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
super(endPoint, executor, byteBufferPool);
this.decoder = decoder;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
protected void parseInstruction(ByteBuffer buffer) throws QpackException
{
decoder.parseInstructionBuffer(buffer);
}
}

View File

@ -16,10 +16,15 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,14 +34,15 @@ public class HTTP3Connection extends AbstractConnection
private final ByteBufferPool byteBufferPool;
private final MessageParser parser;
private final ParserListener listener;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public HTTP3Connection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
public HTTP3Connection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser, ParserListener listener)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.listener = listener;
}
public boolean isUseInputDirectByteBuffers()
@ -59,10 +65,9 @@ public class HTTP3Connection extends AbstractConnection
@Override
public void onFillable()
{
ByteBuffer buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
try
{
if (buffer == null)
buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
int filled = getEndPoint().fill(buffer);
@ -71,7 +76,17 @@ public class HTTP3Connection extends AbstractConnection
if (filled > 0)
{
parser.parse(buffer);
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame == null)
break;
if (frame instanceof HeadersFrame)
frame = ((HeadersFrame)frame).withLast(getEndPoint().isInputShutdown());
else if (frame instanceof DataFrame)
frame = ((DataFrame)frame).withLast(getEndPoint().isInputShutdown());
notifyFrame(frame);
}
}
else if (filled == 0)
{
@ -82,7 +97,6 @@ public class HTTP3Connection extends AbstractConnection
else
{
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close();
break;
}
@ -93,8 +107,55 @@ public class HTTP3Connection extends AbstractConnection
if (LOG.isDebugEnabled())
LOG.debug("could not process control stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close(x);
}
}
private void notifyFrame(Frame frame)
{
FrameType frameType = frame.getFrameType();
switch (frameType)
{
case HEADERS:
{
notifyHeaders((HeadersFrame)frame);
break;
}
case DATA:
{
notifyData((DataFrame)frame);
break;
}
default:
{
throw new UnsupportedOperationException("unsupported frame type " + frameType);
}
}
}
private void notifyHeaders(HeadersFrame frame)
{
try
{
long streamId = ((QuicStreamEndPoint)getEndPoint()).getStreamId();
listener.onHeaders(streamId, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
private void notifyData(DataFrame frame)
{
try
{
long streamId = ((QuicStreamEndPoint)getEndPoint()).getStreamId();
listener.onData(streamId, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
}

View File

@ -18,7 +18,9 @@ import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
@ -71,7 +73,8 @@ public class HTTP3Flusher extends IteratingCallback
QuicStreamEndPoint endPoint = entry.endPoint;
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this);
LOG.debug("writing {} buffers ({} bytes) for stream #{} on {}", buffers.size(), lease.getTotalLength(), endPoint.getStreamId(), this);
endPoint.write(this, buffers.toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
@ -79,12 +82,35 @@ public class HTTP3Flusher extends IteratingCallback
@Override
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entry, this);
// TODO: this is inefficient, as it will write
// an empty DATA frame with the FIN flag.
// Could be coalesced with the write above,
// but needs an additional boolean parameter.
if (entry.last)
{
QuicStreamEndPoint endPoint = entry.endPoint;
if (LOG.isDebugEnabled())
LOG.debug("last frame on stream #{} on {}", endPoint.getStreamId(), this);
endPoint.shutdownOutput();
}
lease.recycle();
entry.callback.succeeded();
entry = null;
super.succeeded();
}
@Override
protected void onCompleteFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to write {} on {}", entry, this, failure);
// TODO
}
@Override
public InvocationType getInvocationType()
{
@ -96,12 +122,15 @@ public class HTTP3Flusher extends IteratingCallback
private final QuicStreamEndPoint endPoint;
private final Frame frame;
private final Callback callback;
private final boolean last;
private Entry(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
this.endPoint = endPoint;
this.frame = frame;
this.callback = callback;
this.last = frame instanceof HeadersFrame && ((HeadersFrame)frame).isLast() ||
frame instanceof DataFrame && ((DataFrame)frame).isLast();
}
@Override

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,17 +49,18 @@ public abstract class HTTP3Session implements Session, ParserListener
return session;
}
protected HTTP3Stream createStream(long streamId)
protected HTTP3Stream createStream(QuicStreamEndPoint endPoint)
{
HTTP3Stream stream = new HTTP3Stream(this, streamId);
long streamId = endPoint.getStreamId();
HTTP3Stream stream = new HTTP3Stream(this, endPoint);
if (streams.put(streamId, stream) != null)
throw new IllegalStateException("duplicate stream id " + streamId);
return stream;
}
protected HTTP3Stream getOrCreateStream(long streamId)
protected HTTP3Stream getOrCreateStream(QuicStreamEndPoint endPoint)
{
return streams.computeIfAbsent(streamId, id -> new HTTP3Stream(this, streamId));
return streams.computeIfAbsent(endPoint.getStreamId(), id -> new HTTP3Stream(this, endPoint));
}
protected HTTP3Stream getStream(long streamId)
@ -115,7 +117,8 @@ public abstract class HTTP3Session implements Session, ParserListener
if (LOG.isDebugEnabled())
LOG.debug("received {}#{} on {}", frame, streamId, this);
HTTP3Stream stream = getOrCreateStream(streamId);
QuicStreamEndPoint endPoint = session.getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
@ -192,4 +195,10 @@ public abstract class HTTP3Session implements Session, ParserListener
{
stream.getListener().onData(stream, frame, callback);
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

View File

@ -17,6 +17,7 @@ import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
@ -24,13 +25,13 @@ import org.eclipse.jetty.util.thread.Invocable;
public class HTTP3Stream implements Stream
{
private final HTTP3Session session;
private final long streamId;
private final QuicStreamEndPoint endPoint;
private Listener listener;
public HTTP3Stream(HTTP3Session session, long streamId)
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint)
{
this.session = session;
this.streamId = streamId;
this.endPoint = endPoint;
}
public Listener getListener()
@ -47,7 +48,7 @@ public class HTTP3Stream implements Stream
public CompletableFuture<Stream> respond(HeadersFrame frame)
{
Promise.Completable<Stream> completable = new Promise.Completable<>();
session.writeFrame(streamId, frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed));
session.writeFrame(endPoint.getStreamId(), frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed));
return completable;
}
}

View File

@ -0,0 +1,103 @@
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class InstructionConnection extends AbstractConnection implements Connection.UpgradeTo
{
private static final Logger LOG = LoggerFactory.getLogger(DecoderConnection.class);
private final ByteBufferPool byteBufferPool;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public InstructionConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@Override
public void onUpgradeTo(ByteBuffer upgrade)
{
int capacity = Math.max(upgrade.remaining(), getInputBufferSize());
buffer = byteBufferPool.acquire(capacity, isUseInputDirectByteBuffers());
int position = BufferUtil.flipToFill(buffer);
buffer.put(upgrade);
BufferUtil.flipToFlush(buffer, position);
}
@Override
public void onOpen()
{
super.onOpen();
if (BufferUtil.hasContent(buffer))
onFillable();
else
fillInterested();
}
@Override
public void onFillable()
{
try
{
if (buffer == null)
buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
// Parse first in case of bytes from the upgrade.
parseInstruction(buffer);
// Then read from the EndPoint.
int filled = getEndPoint().fill(buffer);
if (InstructionConnection.LOG.isDebugEnabled())
InstructionConnection.LOG.debug("filled {} on {}", filled, this);
if (filled == 0)
{
byteBufferPool.release(buffer);
buffer = null;
fillInterested();
break;
}
else if (filled < 0)
{
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close();
break;
}
}
}
catch (Throwable x)
{
if (InstructionConnection.LOG.isDebugEnabled())
InstructionConnection.LOG.debug("could not process decoder stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close(x);
}
}
protected abstract void parseInstruction(ByteBuffer buffer) throws QpackException;
}

View File

@ -28,6 +28,10 @@ import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// TODO: see QPACK spec "Avoiding Flow Control Deadlocks"
// We would need to check the flow control window before writing.
// However, if we do, then we need a mechanism to wakeup again this flusher
// when Quiche tells us that the stream is writable again (right now we only do completeWrite()).
public class InstructionFlusher extends IteratingCallback
{
private static final Logger LOG = LoggerFactory.getLogger(InstructionFlusher.class);
@ -36,12 +40,14 @@ public class InstructionFlusher extends IteratingCallback
private final Queue<Instruction> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final QuicStreamEndPoint endPoint;
private final int streamType;
private boolean initialized;
public InstructionFlusher(QuicSession session, QuicStreamEndPoint endPoint)
public InstructionFlusher(QuicSession session, QuicStreamEndPoint endPoint, int streamType)
{
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
this.endPoint = endPoint;
this.streamType = streamType;
}
public void offer(List<Instruction> instructions)
@ -72,8 +78,8 @@ public class InstructionFlusher extends IteratingCallback
if (!initialized)
{
initialized = true;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(EncoderConnection.STREAM_TYPE));
VarLenInt.generate(buffer, EncoderConnection.STREAM_TYPE);
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(streamType));
VarLenInt.generate(buffer, streamType);
buffer.flip();
lease.insert(0, buffer, false);
}
@ -88,10 +94,20 @@ public class InstructionFlusher extends IteratingCallback
@Override
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", lease.getByteBuffers(), this);
lease.recycle();
super.succeeded();
}
@Override
protected void onCompleteFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to write {} on {}", lease.getByteBuffers(), this, failure);
// TODO
}
@Override
public InvocationType getInvocationType()
{

View File

@ -16,9 +16,13 @@ package org.eclipse.jetty.http3.internal;
import java.util.List;
import org.eclipse.jetty.http3.qpack.Instruction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InstructionHandler implements Instruction.Handler
{
private static final Logger LOG = LoggerFactory.getLogger(InstructionHandler.class);
private final InstructionFlusher encoderFlusher;
public InstructionHandler(InstructionFlusher encoderFlusher)
@ -29,6 +33,8 @@ public class InstructionHandler implements Instruction.Handler
@Override
public void onInstructions(List<Instruction> instructions)
{
if (LOG.isDebugEnabled())
LOG.debug("processing {}", instructions);
encoderFlusher.offer(instructions);
encoderFlusher.iterate();
}

View File

@ -18,6 +18,8 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -25,20 +27,24 @@ import org.eclipse.jetty.io.EndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamConnection extends AbstractConnection implements Connection.UpgradeFrom
public class UnidirectionalStreamConnection extends AbstractConnection implements Connection.UpgradeFrom
{
private static final Logger LOG = LoggerFactory.getLogger(StreamConnection.class);
private static final Logger LOG = LoggerFactory.getLogger(UnidirectionalStreamConnection.class);
private final ByteBufferPool byteBufferPool;
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final ParserListener listener;
private final VarLenInt parser = new VarLenInt();
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public StreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ParserListener listener)
public UnidirectionalStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder, QpackDecoder decoder, ParserListener listener)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.encoder = encoder;
this.decoder = decoder;
this.listener = listener;
}
@ -121,8 +127,8 @@ public class StreamConnection extends AbstractConnection implements Connection.U
{
case ControlConnection.STREAM_TYPE:
{
ControlParser parser = new ControlParser(listener);
ControlConnection newConnection = new ControlConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
ControlParser parser = new ControlParser();
ControlConnection newConnection = new ControlConnection(getEndPoint(), getExecutor(), byteBufferPool, parser, listener);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
@ -132,7 +138,7 @@ public class StreamConnection extends AbstractConnection implements Connection.U
}
case EncoderConnection.STREAM_TYPE:
{
EncoderConnection newConnection = new EncoderConnection(getEndPoint(), getExecutor());
EncoderConnection newConnection = new EncoderConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
@ -142,7 +148,7 @@ public class StreamConnection extends AbstractConnection implements Connection.U
}
case DecoderConnection.STREAM_TYPE:
{
DecoderConnection newConnection = new DecoderConnection(getEndPoint(), getExecutor());
DecoderConnection newConnection = new DecoderConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
public class HeadersGenerator extends FrameGenerator
{
@ -45,11 +46,14 @@ public class HeadersGenerator extends FrameGenerator
private int generateHeadersFrame(ByteBufferPool.Lease lease, long streamId, HeadersFrame frame)
{
// TODO: 2 buffers are inefficient because they are sent in different datagrams.
// We can allocate a max size N for the header, encode from position=N, then
// write the H header bytes from N-H to N-1, so the buffer will start at position=N-H.
try
{
ByteBuffer buffer = lease.acquire(maxLength, useDirectByteBuffers);
BufferUtil.clear(buffer);
encoder.encode(buffer, streamId, frame.getMetaData());
buffer.flip();
int dataLength = buffer.remaining();
int headerLength = VarLenInt.length(FrameType.HEADERS.type()) + VarLenInt.length(dataLength);
ByteBuffer header = ByteBuffer.allocate(headerLength);

View File

@ -16,12 +16,7 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.http3.frames.Frame;
/**
* <p>The base parser for the frame body of HTTP/3 frames.</p>
@ -32,17 +27,13 @@ import org.slf4j.LoggerFactory;
*/
public abstract class BodyParser
{
private static final Logger LOG = LoggerFactory.getLogger(BodyParser.class);
private final long streamId;
private final HeaderParser headerParser;
private final ParserListener listener;
protected BodyParser(long streamId, HeaderParser headerParser, ParserListener listener)
protected BodyParser(long streamId, HeaderParser headerParser)
{
this.streamId = streamId;
this.headerParser = headerParser;
this.listener = listener;
}
protected long getStreamId()
@ -56,84 +47,18 @@ public abstract class BodyParser
}
/**
* <p>Parses the frame body bytes in the given {@code buffer}; only the body
* bytes are consumed, therefore when this method returns, the buffer
* <p>Parses the frame body bytes in the given {@code buffer}, producing a {@link Frame}.</p>
* <p>Only the frame body bytes are consumed, therefore when this method returns, the buffer
* may contain unconsumed bytes, for example for other frames.</p>
*
* @param buffer the buffer to parse
* @return true if all the frame body bytes were parsed, false if not enough
* frame body bytes were present in the buffer
* @return the parsed frame if all the frame body bytes were parsed, or an error frame,
* or null if not enough frame body bytes were present in the buffer
*/
public abstract boolean parse(ByteBuffer buffer);
public abstract Frame parse(ByteBuffer buffer) throws ParseException;
protected void emptyBody(ByteBuffer buffer)
protected Frame emptyBody(ByteBuffer buffer) throws ParseException
{
sessionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code(), "invalid_frame");
}
protected void sessionFailure(ByteBuffer buffer, int error, String reason)
{
BufferUtil.clear(buffer);
notifySessionFailure(error, reason);
}
protected void notifySessionFailure(int error, String reason)
{
try
{
listener.onSessionFailure(error, reason);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
protected void notifyStreamFailure(long streamId, int error, String reason)
{
try
{
listener.onStreamFailure(streamId, error, reason);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
protected void notifyData(DataFrame frame)
{
try
{
listener.onData(getStreamId(), frame);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
protected void notifyHeaders(HeadersFrame frame)
{
try
{
listener.onHeaders(getStreamId(), frame);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
protected void notifySettings(SettingsFrame frame)
{
try
{
listener.onSettings(frame);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
throw new ParseException(ErrorCode.PROTOCOL_ERROR.code(), "invalid_frame");
}
}

View File

@ -15,15 +15,17 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class CancelPushBodyParser extends BodyParser
{
public CancelPushBodyParser(HeaderParser headerParser, ParserListener listener)
public CancelPushBodyParser(HeaderParser headerParser)
{
super(1, headerParser, listener);
super(1, headerParser);
}
@Override
public boolean parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,18 +33,16 @@ public class ControlParser
private final HeaderParser headerParser;
private final BodyParser[] bodyParsers = new BodyParser[FrameType.maxType() + 1];
private final BodyParser unknownBodyParser;
private final ParserListener listener;
private State state = State.HEADER;
public ControlParser(ParserListener listener)
public ControlParser()
{
this.headerParser = new HeaderParser();
this.bodyParsers[FrameType.CANCEL_PUSH.type()] = new CancelPushBodyParser(headerParser, listener);
this.bodyParsers[FrameType.SETTINGS.type()] = new SettingsBodyParser(headerParser, listener);
this.bodyParsers[FrameType.GOAWAY.type()] = new GoAwayBodyParser(headerParser, listener);
this.bodyParsers[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdBodyParser(headerParser, listener);
this.unknownBodyParser = new UnknownBodyParser(headerParser, listener);
this.listener = listener;
this.bodyParsers[FrameType.CANCEL_PUSH.type()] = new CancelPushBodyParser(headerParser);
this.bodyParsers[FrameType.SETTINGS.type()] = new SettingsBodyParser(headerParser);
this.bodyParsers[FrameType.GOAWAY.type()] = new GoAwayBodyParser(headerParser);
this.bodyParsers[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdBodyParser(headerParser);
this.unknownBodyParser = new UnknownBodyParser(headerParser);
}
private void reset()
@ -57,7 +56,7 @@ public class ControlParser
*
* @param buffer the buffer to parse
*/
public void parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer) throws ParseException
{
try
{
@ -72,7 +71,7 @@ public class ControlParser
state = State.BODY;
break;
}
return;
return null;
}
case BODY:
{
@ -83,29 +82,28 @@ public class ControlParser
if (bodyParser == null)
{
// Unknown frame types must be ignored.
// TODO: enforce only control frames, but ignore unknown.
if (LOG.isDebugEnabled())
LOG.debug("Ignoring unknown frame type {}", Integer.toHexString(frameType));
if (!unknownBodyParser.parse(buffer))
return;
LOG.debug("ignoring unknown frame type {}", Integer.toHexString(frameType));
Frame frame = unknownBodyParser.parse(buffer);
if (frame == null)
return null;
reset();
break;
}
else
{
Frame frame;
if (headerParser.getFrameLength() == 0)
{
bodyParser.emptyBody(buffer);
}
frame = bodyParser.emptyBody(buffer);
else
{
if (!bodyParser.parse(buffer))
return;
}
frame = bodyParser.parse(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), buffer);
reset();
LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), buffer);
if (frame != null)
reset();
return frame;
}
break;
}
default:
{
@ -114,20 +112,22 @@ public class ControlParser
}
}
}
catch (ParseException x)
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
buffer.clear();
throw x;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Parse failed", x);
LOG.debug("parse failed", x);
buffer.clear();
connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
throw new ParseException(ErrorCode.INTERNAL_ERROR.code(), "parser_error", true, x);
}
}
private void connectionFailure(ByteBuffer buffer, int error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}
private enum State
{
HEADER, BODY

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,9 +28,9 @@ public class DataBodyParser extends BodyParser
private State state = State.INIT;
private long length;
public DataBodyParser(long streamId, HeaderParser headerParser, ParserListener listener)
public DataBodyParser(long streamId, HeaderParser headerParser)
{
super(streamId, headerParser, listener);
super(streamId, headerParser);
}
private void reset()
@ -39,13 +40,13 @@ public class DataBodyParser extends BodyParser
}
@Override
protected void emptyBody(ByteBuffer buffer)
protected Frame emptyBody(ByteBuffer buffer)
{
onData(BufferUtil.EMPTY_BUFFER, false);
return onData(BufferUtil.EMPTY_BUFFER, false);
}
@Override
public boolean parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -71,15 +72,13 @@ public class DataBodyParser extends BodyParser
if (length == 0)
{
reset();
onData(slice, false);
return true;
return onData(slice, false);
}
else
{
// We got partial data, simulate a smaller frame, and stay in DATA state.
onData(slice, true);
return onData(slice, true);
}
break;
}
default:
{
@ -87,15 +86,15 @@ public class DataBodyParser extends BodyParser
}
}
}
return false;
return null;
}
private void onData(ByteBuffer buffer, boolean fragment)
private DataFrame onData(ByteBuffer buffer, boolean fragment)
{
DataFrame frame = new DataFrame(buffer);
DataFrame frame = new DataFrame(buffer, true);
if (LOG.isDebugEnabled())
LOG.debug("notifying synthetic={} {}#{}", fragment, frame, getStreamId());
notifyData(frame);
return frame;
}
private enum State

View File

@ -15,15 +15,17 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class GoAwayBodyParser extends BodyParser
{
public GoAwayBodyParser(HeaderParser headerParser, ParserListener listener)
public GoAwayBodyParser(HeaderParser headerParser)
{
super(1, headerParser, listener);
super(1, headerParser);
}
@Override
public boolean parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -19,6 +19,7 @@ import java.util.List;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackException;
@ -34,10 +35,11 @@ public class HeadersBodyParser extends BodyParser
private final QpackDecoder decoder;
private State state = State.INIT;
private long length;
private Frame frame;
public HeadersBodyParser(long streamId, HeaderParser headerParser, ParserListener listener, QpackDecoder decoder)
public HeadersBodyParser(long streamId, HeaderParser headerParser, QpackDecoder decoder)
{
super(streamId, headerParser, listener);
super(streamId, headerParser);
this.decoder = decoder;
}
@ -45,10 +47,11 @@ public class HeadersBodyParser extends BodyParser
{
state = State.INIT;
length = 0;
frame = null;
}
@Override
public boolean parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer) throws ParseException
{
while (buffer.hasRemaining())
{
@ -69,7 +72,7 @@ public class HeadersBodyParser extends BodyParser
length -= remaining;
ByteBuffer copy = BufferUtil.copy(buffer);
byteBuffers.add(copy);
return false;
return null;
}
else
{
@ -91,9 +94,10 @@ public class HeadersBodyParser extends BodyParser
byteBuffers.add(slice);
int capacity = byteBuffers.stream().mapToInt(ByteBuffer::remaining).sum();
encoded = byteBuffers.stream().reduce(ByteBuffer.allocate(capacity), ByteBuffer::put);
byteBuffers.clear();
}
return decode(buffer, encoded);
return decode(encoded);
}
}
default:
@ -102,50 +106,45 @@ public class HeadersBodyParser extends BodyParser
}
}
}
return false;
return null;
}
private boolean decode(ByteBuffer buffer, ByteBuffer encoded)
private Frame decode(ByteBuffer encoded) throws ParseException
{
try
{
return decoder.decode(getStreamId(), encoded, (streamId, metaData) ->
// TODO: do a proper reset when the lambda is notified asynchronously.
if (decoder.decode(getStreamId(), encoded, (streamId, metaData) -> this.frame = onHeaders(metaData)))
{
Frame frame = this.frame;
reset();
onHeaders(metaData);
});
return frame;
}
return null;
}
catch (QpackException.StreamException x)
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
// TODO: exception should carry error code.
notifyStreamFailure(getStreamId(), ErrorCode.FRAME_ERROR.code(), "invalid_qpack");
return true;
throw new ParseException(x.getErrorCode(), x.getMessage());
}
catch (QpackException.SessionException x)
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
// TODO: exception should carry error code.
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "invalid_qpack");
return true;
throw new ParseException(x.getErrorCode(), x.getMessage(), true);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
sessionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "internal_error");
return true;
throw new ParseException(ErrorCode.INTERNAL_ERROR.code(), "internal_error", true, x);
}
}
private void onHeaders(MetaData metaData)
private Frame onHeaders(MetaData metaData)
{
HeadersFrame frame = new HeadersFrame(metaData);
if (LOG.isDebugEnabled())
LOG.debug("notifying {}#{}", frame, getStreamId());
notifyHeaders(frame);
return new HeadersFrame(metaData, false);
}
private enum State

View File

@ -15,15 +15,17 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class MaxPushIdBodyParser extends BodyParser
{
public MaxPushIdBodyParser(HeaderParser headerParser, ParserListener listener)
public MaxPushIdBodyParser(HeaderParser headerParser)
{
super(1, headerParser, listener);
super(1, headerParser);
}
@Override
public boolean parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.slf4j.Logger;
@ -33,19 +34,15 @@ public class MessageParser
private final HeaderParser headerParser;
private final BodyParser[] bodyParsers = new BodyParser[FrameType.maxType() + 1];
private final BodyParser unknownBodyParser;
private final long streamId;
private final ParserListener listener;
private State state = State.HEADER;
public MessageParser(long streamId, QpackDecoder decoder, ParserListener listener)
public MessageParser(long streamId, QpackDecoder decoder)
{
this.streamId = streamId;
this.headerParser = new HeaderParser();
this.bodyParsers[FrameType.DATA.type()] = new DataBodyParser(streamId, headerParser, listener);
this.bodyParsers[FrameType.HEADERS.type()] = new HeadersBodyParser(streamId, headerParser, listener, decoder);
this.bodyParsers[FrameType.PUSH_PROMISE.type()] = new PushPromiseBodyParser(headerParser, listener);
this.unknownBodyParser = new UnknownBodyParser(headerParser, listener);
this.listener = listener;
this.bodyParsers[FrameType.DATA.type()] = new DataBodyParser(streamId, headerParser);
this.bodyParsers[FrameType.HEADERS.type()] = new HeadersBodyParser(streamId, headerParser, decoder);
this.bodyParsers[FrameType.PUSH_PROMISE.type()] = new PushPromiseBodyParser(headerParser);
this.unknownBodyParser = new UnknownBodyParser(headerParser);
}
private void reset()
@ -55,11 +52,12 @@ public class MessageParser
}
/**
* <p>Parses the given {@code buffer} bytes and emit events to a {@link ParserListener}.</p>
* <p>Parses the given {@code buffer} bytes and returns parsed frames.</p>
*
* @param buffer the buffer to parse
* @return a parsed frame, or null if not enough bytes were provided to parse a frame
*/
public void parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer) throws ParseException
{
try
{
@ -74,7 +72,7 @@ public class MessageParser
state = State.BODY;
break;
}
return;
return null;
}
case BODY:
{
@ -88,26 +86,27 @@ public class MessageParser
// Unknown frame types must be ignored.
if (LOG.isDebugEnabled())
LOG.debug("Ignoring unknown frame type {}", Integer.toHexString(frameType));
if (!unknownBodyParser.parse(buffer))
return;
Frame frame = unknownBodyParser.parse(buffer);
if (frame == null)
return null;
reset();
break;
}
else
{
Frame frame;
if (headerParser.getFrameLength() == 0)
{
bodyParser.emptyBody(buffer);
}
frame = bodyParser.emptyBody(buffer);
else
frame = bodyParser.parse(buffer);
if (frame != null)
{
if (!bodyParser.parse(buffer))
return;
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), buffer);
reset();
}
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), buffer);
reset();
return frame;
}
break;
}
default:
{
@ -116,20 +115,22 @@ public class MessageParser
}
}
}
catch (ParseException x)
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
buffer.clear();
throw x;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Parse failed", x);
LOG.debug("parse failed", x);
buffer.clear();
connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
throw new ParseException(ErrorCode.INTERNAL_ERROR.code(), "parser_error", true, x);
}
}
private void connectionFailure(ByteBuffer buffer, int error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}
private enum State
{
HEADER, BODY

View File

@ -0,0 +1,34 @@
package org.eclipse.jetty.http3.internal.parser;
public class ParseException extends Exception
{
private final int error;
private final boolean fatal;
public ParseException(int error, String message)
{
this(error, message, false);
}
public ParseException(int error, String message, boolean fatal)
{
this(error, message, fatal, null);
}
public ParseException(int error, String message, boolean fatal, Throwable cause)
{
super(message, cause);
this.error = error;
this.fatal = fatal;
}
public int getErrorCode()
{
return error;
}
public boolean isFatal()
{
return fatal;
}
}

View File

@ -15,15 +15,17 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class PushPromiseBodyParser extends BodyParser
{
public PushPromiseBodyParser(HeaderParser headerParser, ParserListener listener)
public PushPromiseBodyParser(HeaderParser headerParser)
{
super(1, headerParser, listener);
super(1, headerParser);
}
@Override
public boolean parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -18,6 +18,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.VarLenInt;
@ -29,9 +30,9 @@ public class SettingsBodyParser extends BodyParser
private long key;
private Map<Long, Long> settings;
public SettingsBodyParser(HeaderParser headerParser, ParserListener listener)
public SettingsBodyParser(HeaderParser headerParser)
{
super(1, headerParser, listener);
super(1, headerParser);
}
private void reset()
@ -44,13 +45,13 @@ public class SettingsBodyParser extends BodyParser
}
@Override
protected void emptyBody(ByteBuffer buffer)
protected Frame emptyBody(ByteBuffer buffer)
{
onSettings(Map.of());
return onSettings(Map.of());
}
@Override
public boolean parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer) throws ParseException
{
while (buffer.hasRemaining())
{
@ -72,27 +73,16 @@ public class SettingsBodyParser extends BodyParser
}))
{
if (settings.containsKey(key))
{
sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate");
return true;
}
throw new ParseException(ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate");
if (SettingsFrame.isReserved(key))
{
sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_reserved");
return true;
}
throw new ParseException(ErrorCode.SETTINGS_ERROR.code(), "settings_reserved");
if (length > 0)
{
state = State.VALUE;
}
else
{
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
return true;
}
throw new ParseException(ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
break;
}
return false;
return null;
}
case VALUE:
{
@ -111,17 +101,15 @@ public class SettingsBodyParser extends BodyParser
{
Map<Long, Long> settings = this.settings;
reset();
onSettings(settings);
return true;
return onSettings(settings);
}
else
{
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
return true;
throw new ParseException(ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
}
break;
}
return false;
return null;
}
default:
{
@ -129,13 +117,12 @@ public class SettingsBodyParser extends BodyParser
}
}
}
return false;
return null;
}
private void onSettings(Map<Long, Long> settings)
private SettingsFrame onSettings(Map<Long, Long> settings)
{
SettingsFrame frame = new SettingsFrame(settings);
notifySettings(frame);
return new SettingsFrame(settings);
}
private enum State

View File

@ -15,15 +15,17 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class UnknownBodyParser extends BodyParser
{
public UnknownBodyParser(HeaderParser headerParser, ParserListener listener)
public UnknownBodyParser(HeaderParser headerParser)
{
super(1, headerParser, listener);
super(1, headerParser);
}
@Override
public boolean parse(ByteBuffer buffer)
public Frame parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -19,9 +19,9 @@ import java.util.List;
import java.util.Random;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
@ -29,50 +29,46 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class DataGenerateParseTest
{
@Test
public void testGenerateParseEmpty()
public void testGenerateParseEmpty() throws Exception
{
testGenerateParse(BufferUtil.EMPTY_BUFFER);
}
@Test
public void testGenerateParse()
public void testGenerateParse() throws Exception
{
byte[] bytes = new byte[1024];
new Random().nextBytes(bytes);
testGenerateParse(ByteBuffer.wrap(bytes));
}
private void testGenerateParse(ByteBuffer byteBuffer)
private void testGenerateParse(ByteBuffer byteBuffer) throws Exception
{
byte[] inputBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(inputBytes);
DataFrame input = new DataFrame(ByteBuffer.wrap(inputBytes));
DataFrame input = new DataFrame(ByteBuffer.wrap(inputBytes), true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(null, 8192, true).generate(lease, 0, input);
List<DataFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, null, new ParserListener()
{
@Override
public void onData(long streamId, DataFrame frame)
{
frames.add(frame);
}
});
List<Frame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, null);
for (ByteBuffer buffer : lease.getByteBuffers())
{
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame != null)
frames.add(frame);
}
}
assertEquals(1, frames.size());
DataFrame output = frames.get(0);
DataFrame output = (DataFrame)frames.get(0);
byte[] outputBytes = new byte[output.getData().remaining()];
output.getData().get(outputBytes);
assertArrayEquals(inputBytes, outputBytes);

View File

@ -22,10 +22,10 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
@ -33,41 +33,37 @@ import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class HeadersGenerateParseTest
{
@Test
public void testGenerateParse()
public void testGenerateParse() throws Exception
{
HttpURI uri = HttpURI.from("http://host:1234/path?a=b");
HttpFields fields = HttpFields.build()
.put("User-Agent", "Jetty")
.put("Cookie", "c=d");
HeadersFrame input = new HeadersFrame(new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, fields));
HeadersFrame input = new HeadersFrame(new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, fields), true);
QpackEncoder encoder = new QpackEncoder(instructions -> {}, 100);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(encoder, 8192, true).generate(lease, 0, input);
QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192);
List<HeadersFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, decoder, new ParserListener()
{
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
frames.add(frame);
}
});
List<Frame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, decoder);
for (ByteBuffer buffer : lease.getByteBuffers())
{
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame != null)
frames.add(frame);
}
}
assertEquals(1, frames.size());
HeadersFrame output = frames.get(0);
HeadersFrame output = (HeadersFrame)frames.get(0);
MetaData.Request inputMetaData = (MetaData.Request)input.getMetaData();
MetaData.Request outputMetaData = (MetaData.Request)output.getMetaData();

View File

@ -18,55 +18,51 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.generator.ControlGenerator;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class SettingsGenerateParseTest
{
@Test
public void testGenerateParseEmpty()
public void testGenerateParseEmpty() throws Exception
{
testGenerateParse(Map.of());
}
@Test
public void testGenerateParse()
public void testGenerateParse() throws Exception
{
testGenerateParse(Map.of(13L, 7L, 31L, 29L));
}
private void testGenerateParse(Map<Long, Long> settings)
private void testGenerateParse(Map<Long, Long> settings) throws Exception
{
SettingsFrame input = new SettingsFrame(settings);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new ControlGenerator().generate(lease, 0, input);
List<SettingsFrame> frames = new ArrayList<>();
ControlParser parser = new ControlParser(new ParserListener()
{
@Override
public void onSettings(SettingsFrame frame)
{
frames.add(frame);
}
});
List<Frame> frames = new ArrayList<>();
ControlParser parser = new ControlParser();
for (ByteBuffer buffer : lease.getByteBuffers())
{
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame != null)
frames.add(frame);
}
}
assertEquals(1, frames.size());
SettingsFrame output = frames.get(0);
SettingsFrame output = (SettingsFrame)frames.get(0);
assertEquals(input.getSettings(), output.getSettings());
}

View File

@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
*/
public class QpackContext
{
public static final Logger LOG = LoggerFactory.getLogger(QpackContext.class);
private static final Logger LOG = LoggerFactory.getLogger(QpackContext.class);
private static final StaticTable __staticTable = new StaticTable();
private final DynamicTable _dynamicTable;

View File

@ -34,7 +34,7 @@ import static org.eclipse.jetty.http3.qpack.QpackException.QPACK_DECOMPRESSION_F
public class EncodedFieldSection
{
public static final Logger LOG = LoggerFactory.getLogger(EncodedFieldSection.class);
private static final Logger LOG = LoggerFactory.getLogger(EncodedFieldSection.class);
private final NBitIntegerParser _integerParser = new NBitIntegerParser();
private final NBitStringParser _stringParser = new NBitStringParser();

View File

@ -97,8 +97,8 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
long streamId = streamEndPoint.getStreamId();
ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder(), http3Session.getSessionServer());
HTTP3Connection connection = new HTTP3Connection(endPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder());
HTTP3Connection connection = new HTTP3Connection(streamEndPoint, connector.getExecutor(), connector.getByteBufferPool(), parser, http3Session.getSessionServer());
return connection;
}
}

View File

@ -19,10 +19,12 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderConnection;
import org.eclipse.jetty.http3.internal.EncoderConnection;
import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.StreamConnection;
import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
@ -52,21 +54,30 @@ public class ServerHTTP3Session extends ServerProtocolSession
super(session);
this.apiSession = new HTTP3SessionServer(this, listener);
if (LOG.isDebugEnabled())
LOG.debug("initializing HTTP/3 streams");
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint);
this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderFlusher), maxBlockedStreams);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint);
this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderFlusher), maxRequestHeadersSize);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
// TODO: make parameters configurable.
this.generator = new MessageGenerator(encoder, 4096, true);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
// TODO: make parameters configurable.
this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true);
@ -119,21 +130,23 @@ public class ServerHTTP3Session extends ServerProtocolSession
StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read", readableStreamId);
super.onReadable(readableStreamId);
}
else
{
// On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureStreamEndPoint);
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream {} selected endpoint for read: {}", readableStreamId, streamEndPoint);
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
}
}
private void configureStreamEndPoint(QuicStreamEndPoint endPoint)
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{
StreamConnection connection = new StreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), apiSession);
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, apiSession);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();

View File

@ -127,7 +127,7 @@ public class HTTP3ClientServerTest
{
serverRequestLatch.countDown();
// Send the response.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY)));
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true));
// Not interested in request data.
return null;
}
@ -140,7 +140,7 @@ public class HTTP3ClientServerTest
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData);
HeadersFrame frame = new HeadersFrame(metaData, true);
Stream stream = session.newRequest(frame, new Stream.Listener()
{
@Override
@ -149,10 +149,10 @@ public class HTTP3ClientServerTest
clientResponseLatch.countDown();
}
})
.get(555, TimeUnit.SECONDS);
.get(5, TimeUnit.SECONDS);
assertNotNull(stream);
assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverRequestLatch.await(555, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(555, TimeUnit.SECONDS));
}
}

View File

@ -49,7 +49,7 @@ public class ClientProtocolSession extends ProtocolSession
// On the client, we need a get-only semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getStreamEndPoint(readableStreamId);
if (LOG.isDebugEnabled())
LOG.debug("stream {} selected endpoint for read: {}", readableStreamId, streamEndPoint);
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
if (streamEndPoint != null)
streamEndPoint.onReadable();
}

View File

@ -60,7 +60,7 @@ public abstract class ProtocolSession
}
}
protected QuicStreamEndPoint getStreamEndPoint(long streamId)
public QuicStreamEndPoint getStreamEndPoint(long streamId)
{
return session.getStreamEndPoint(streamId);
}

View File

@ -212,7 +212,7 @@ public abstract class QuicSession
// HTTP/3
// client1
// \ /- ConnectionStream0 - ConnectionParser for SETTINGS frames, etc.
// \ /- ControlStream0 - ControlParser for SETTINGS frames, etc.
// dataEP - QuicConnection -* QuicSession -# H3QuicSession -* RequestStreamsEP - H3Connection - HEADERSParser -# HTTP Handler
// / `- InstructionStream - InstructionConnection/Parser
// client2

View File

@ -124,7 +124,7 @@ public class QuicStreamEndPoint extends AbstractEndPoint
int pos = BufferUtil.flipToFill(buffer);
int drained = session.fill(streamId, buffer);
BufferUtil.flipToFlush(buffer, pos);
if (session.isFinished(streamId))
if (drained < 0)
shutdownInput();
return drained;
}

View File

@ -516,13 +516,13 @@ public class QuicheConnection
public synchronized int drainClearTextForStream(long streamId, ByteBuffer buffer) throws IOException
{
bool_pointer fin = new bool_pointer();
int written = libQuiche().quiche_conn_stream_recv(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), fin).intValue();
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return 0;
if (written < 0L)
throw new IOException("Quiche failed to read from stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(written));
buffer.position(buffer.position() + written);
return written;
int read = libQuiche().quiche_conn_stream_recv(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), fin).intValue();
if (read == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return fin.getValue() ? -1 : 0;
if (read < 0L)
throw new IOException("Quiche failed to read from stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(read));
buffer.position(buffer.position() + read);
return read;
}
public synchronized boolean isStreamFinished(long streamId)

View File

@ -44,7 +44,7 @@ public class ServerProtocolSession extends ProtocolSession
// On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream {} selected endpoint for read: {}", readableStreamId, streamEndPoint);
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
}
}