Issue #6728 - QUIC and HTTP/3

- Implemented data support.
- Implemented trailer support.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-21 20:10:57 +02:00
parent 8bf9227db2
commit 43551487ee
43 changed files with 1448 additions and 286 deletions

View File

@ -17,7 +17,7 @@ import java.util.Map;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.client.internal.ClientHTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Connection;
import org.eclipse.jetty.http3.client.internal.ClientHTTP3StreamConnection;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
@ -76,8 +76,7 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
long streamId = streamEndPoint.getStreamId();
ClientHTTP3Session http3Session = (ClientHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
// TODO: Parser may be created internally, if I pass the QuicStreamEndPoint and ClientHTTP3Session.
MessageParser parser = new MessageParser(http3Session.getSessionClient(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished);
return new HTTP3Connection(streamEndPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
return new ClientHTTP3StreamConnection(streamEndPoint, http3Session, parser);
}
}

View File

@ -19,8 +19,8 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderConnection;
import org.eclipse.jetty.http3.internal.EncoderConnection;
import org.eclipse.jetty.http3.internal.DecoderStreamConnection;
import org.eclipse.jetty.http3.internal.EncoderStreamConnection;
import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
@ -42,30 +42,28 @@ public class ClientHTTP3Session extends ClientProtocolSession
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3SessionClient apiSession;
private final InstructionFlusher encoderInstructionFlusher;
private final InstructionFlusher decoderInstructionFlusher;
private final HTTP3SessionClient applicationSession;
private final ControlFlusher controlFlusher;
private final HTTP3Flusher messageFlusher;
public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise<Session.Client> promise, int maxBlockedStreams, int maxResponseHeadersSize)
{
super(session);
this.apiSession = new HTTP3SessionClient(this, listener, promise);
this.applicationSession = new HTTP3SessionClient(this, listener, promise);
if (LOG.isDebugEnabled())
LOG.debug("initializing HTTP/3 streams");
long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
this.encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderConnection.STREAM_TYPE);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
this.decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderConnection.STREAM_TYPE);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxResponseHeadersSize);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
@ -87,14 +85,14 @@ public class ClientHTTP3Session extends ClientProtocolSession
public HTTP3SessionClient getSessionClient()
{
return apiSession;
return applicationSession;
}
@Override
public void onOpen()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = apiSession.onPreface();
Map<Long, Long> settings = applicationSession.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
@ -102,7 +100,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
apiSession.onOpen();
applicationSession.onOpen();
}
private QuicStreamEndPoint configureInstructionEndPoint(long streamId)
@ -118,27 +116,27 @@ public class ClientHTTP3Session extends ClientProtocolSession
}
@Override
protected void onReadable(long readableStreamId)
protected boolean onReadable(long readableStreamId)
{
StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read", readableStreamId);
super.onReadable(readableStreamId);
return super.onReadable(readableStreamId);
}
else
{
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
return streamEndPoint.onReadable();
}
}
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, apiSession);
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
@ -151,6 +149,11 @@ public class ClientHTTP3Session extends ClientProtocolSession
messageFlusher.iterate();
}
public void onDataAvailable(long streamId)
{
applicationSession.onDataAvailable(streamId);
}
@Override
public String toString()
{

View File

@ -0,0 +1,35 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client.internal;
import org.eclipse.jetty.http3.internal.HTTP3StreamConnection;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
public class ClientHTTP3StreamConnection extends HTTP3StreamConnection
{
private final ClientHTTP3Session http3Session;
public ClientHTTP3StreamConnection(QuicStreamEndPoint endPoint, ClientHTTP3Session http3Session, MessageParser parser)
{
super(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
this.http3Session = http3Session;
}
@Override
protected void onDataAvailable(long streamId)
{
http3Session.onDataAvailable(streamId);
}
}

View File

@ -17,36 +17,73 @@ import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
public interface Stream
{
public CompletableFuture<Stream> respond(HeadersFrame frame);
public CompletableFuture<Stream> data(DataFrame dataFrame);
public Stream.Data readData();
public void demand(boolean enable);
public CompletableFuture<Stream> trailer(HeadersFrame frame);
public interface Listener
{
public default void onResponse(Stream stream, HeadersFrame frame)
{
}
public default void onData(Stream stream, DataFrame frame, Callback callback)
public default void onDataAvailable(Stream stream)
{
// TODO: alternative API
// public void onDataAvailable(Stream s)
// {
// while (true)
// {
// DataFrame frame = s.pollData();
// if (frame == null)
// return;
// process(frame);
// }
// }
}
public default void onTrailer(Stream stream, HeadersFrame frame)
{
}
}
public static class Data
{
private final DataFrame frame;
private final CompletableFuture<Object> callback;
public Data(DataFrame frame, CompletableFuture<Object> callback)
{
this.frame = frame;
this.callback = callback;
}
public DataFrame frame()
{
return frame;
}
public CompletableFuture<Object> callback()
{
return callback;
}
public void complete(Object result, Throwable failure)
{
if (failure == null)
callback().complete(result);
else
callback().completeExceptionally(failure);
}
public void completeAndDemand(Stream stream, Throwable failure)
{
complete(stream, failure);
if (failure == null)
stream.demand(true);
}
public void succeed()
{
callback().complete(null);
}
}
}

View File

@ -42,9 +42,4 @@ public class DataFrame extends Frame
{
return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), getData().remaining());
}
public DataFrame withLast(boolean last)
{
return new DataFrame(data, last);
}
}

View File

@ -40,11 +40,6 @@ public class HeadersFrame extends Frame
@Override
public String toString()
{
return String.format("%s[last=%b,%s]", super.toString(), isLast(), getMetaData());
}
public HeadersFrame withLast(boolean last)
{
return new HeadersFrame(metaData, last);
return String.format("%s[last=%b,{%s}]", super.toString(), isLast(), getMetaData());
}
}

View File

@ -82,8 +82,8 @@ public class ControlFlusher extends IteratingCallback
if (!initialized)
{
initialized = true;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlStreamConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlStreamConnection.STREAM_TYPE);
buffer.flip();
lease.insert(0, buffer, false);
}

View File

@ -25,18 +25,18 @@ import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ControlConnection extends AbstractConnection implements Connection.UpgradeTo
public class ControlStreamConnection extends AbstractConnection implements Connection.UpgradeTo
{
// SPEC: Control Stream Type.
public static final int STREAM_TYPE = 0x00;
private static final Logger LOG = LoggerFactory.getLogger(ControlConnection.class);
private static final Logger LOG = LoggerFactory.getLogger(ControlStreamConnection.class);
private final ByteBufferPool byteBufferPool;
private final ControlParser parser;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public ControlConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser)
public ControlStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;

View File

@ -21,14 +21,14 @@ import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
public class DecoderConnection extends InstructionConnection
public class DecoderStreamConnection extends InstructionStreamConnection
{
// SPEC: QPACK Encoder Stream Type.
public static final int STREAM_TYPE = 0x03;
private final QpackEncoder encoder;
public DecoderConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder)
public DecoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder)
{
super(endPoint, executor, byteBufferPool);
this.encoder = encoder;

View File

@ -21,14 +21,14 @@ import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
public class EncoderConnection extends InstructionConnection
public class EncoderStreamConnection extends InstructionStreamConnection
{
// SPEC: QPACK Encoder Stream Type.
public static final int STREAM_TYPE = 0x02;
private final QpackDecoder decoder;
public EncoderConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder)
public EncoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder)
{
super(endPoint, executor, byteBufferPool);
this.decoder = decoder;

View File

@ -1,96 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3Connection extends AbstractConnection
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Connection.class);
private final ByteBufferPool byteBufferPool;
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
public HTTP3Connection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
{
ByteBuffer buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
try
{
while (true)
{
int filled = getEndPoint().fill(buffer);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {}", filled, this);
if (filled > 0)
{
parser.parse(buffer);
}
else if (filled == 0)
{
byteBufferPool.release(buffer);
fillInterested();
break;
}
else
{
byteBufferPool.release(buffer);
getEndPoint().close();
break;
}
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process control stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
getEndPoint().close(x);
}
}
}

View File

@ -114,24 +114,27 @@ public abstract class HTTP3Session implements Session, ParserListener
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {}#{} on {}", frame, streamId, this);
QuicStreamEndPoint endPoint = session.getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
if (LOG.isDebugEnabled())
LOG.debug("received request {}#{} on {}", frame, streamId, this);
Stream.Listener streamListener = notifyRequest(stream, frame);
stream.setListener(streamListener);
}
else if (metaData.isResponse())
{
if (LOG.isDebugEnabled())
LOG.debug("received response {}#{} on {}", frame, streamId, this);
notifyResponse(stream, frame);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("received trailer {}#{} on {}", frame, streamId, this);
notifyTrailer(stream, frame);
}
}
@ -153,7 +156,9 @@ public abstract class HTTP3Session implements Session, ParserListener
{
try
{
stream.getListener().onResponse(stream, frame);
Stream.Listener listener = stream.getListener();
if (listener != null)
listener.onResponse(stream, frame);
}
catch (Throwable x)
{
@ -165,7 +170,9 @@ public abstract class HTTP3Session implements Session, ParserListener
{
try
{
stream.getListener().onTrailer(stream, frame);
Stream.Listener listener = stream.getListener();
if (listener != null)
listener.onTrailer(stream, frame);
}
catch (Throwable x)
{
@ -178,22 +185,16 @@ public abstract class HTTP3Session implements Session, ParserListener
{
if (LOG.isDebugEnabled())
LOG.debug("received {}#{} on {}", frame, streamId, this);
// The stream must already exist.
HTTP3Stream stream = getStream(streamId);
// TODO: handle null stream.
// TODO: implement demand mechanism like in HTTP2Stream
// demand(n) should be on Stream, or on a LongConsumer parameter?
// TODO: the callback in HTTP2 was only to notify of data consumption for flow control.
// Here, we don't have to do flow control, but what about retain()/release() for the network buffer?
notifyData(stream, frame, Callback.NOOP);
}
private void notifyData(HTTP3Stream stream, DataFrame frame, Callback callback)
public void onDataAvailable(long streamId)
{
stream.getListener().onData(stream, frame, callback);
if (LOG.isDebugEnabled())
LOG.debug("notifying data available for stream #{} on {}", streamId, this);
HTTP3Stream stream = getStream(streamId);
Stream.Listener listener = stream.getListener();
if (listener != null)
listener.onDataAvailable(stream);
}
@Override

View File

@ -16,6 +16,8 @@ package org.eclipse.jetty.http3.internal;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
@ -46,6 +48,45 @@ public class HTTP3Stream implements Stream
@Override
public CompletableFuture<Stream> respond(HeadersFrame frame)
{
return writeFrame(frame);
}
@Override
public CompletableFuture<Stream> data(DataFrame frame)
{
return writeFrame(frame);
}
@Override
public Data readData()
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
return connection.readData();
}
@Override
public void demand(boolean enable)
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
connection.demand(enable);
}
@Override
public CompletableFuture<Stream> trailer(HeadersFrame frame)
{
if (!frame.isLast())
throw new IllegalArgumentException("invalid trailer frame: property 'last' must be true");
return writeFrame(frame);
}
public boolean hasDemand()
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
return connection.hasDemand();
}
private Promise.Completable<Stream> writeFrame(Frame frame)
{
Promise.Completable<Stream> completable = new Promise.Completable<>();
session.writeFrame(endPoint.getStreamId(), frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed));

View File

@ -0,0 +1,348 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class HTTP3StreamConnection extends AbstractConnection
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3StreamConnection.class);
// An empty DATA frame is the sequence of bytes [0x0, 0x0].
private static final ByteBuffer EMPTY_DATA_FRAME = ByteBuffer.allocate(2);
private final AutoLock lock = new AutoLock();
private final Queue<DataFrame> dataFrames = new ArrayDeque<>();
private final RetainableByteBufferPool buffers;
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
private RetainableByteBuffer buffer;
private boolean dataMode;
private boolean dataDemand;
private boolean dataStalled;
private boolean dataLast;
private boolean remotelyClosed;
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool);
this.parser = parser;
parser.init(MessageListener::new);
// By default, invoke onDataAvailable() after onRequest()/onResponse().
this.dataDemand = true;
}
@Override
public QuicStreamEndPoint getEndPoint()
{
return (QuicStreamEndPoint)super.getEndPoint();
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("processing dataMode={} on {}", dataMode, this);
if (dataMode)
processDataFrames();
else
processNonDataFrames();
}
private void processDataFrames()
{
processDataDemand();
if (!dataMode)
{
if (buffer.hasRemaining())
processNonDataFrames();
else
fillInterested();
}
}
private void processNonDataFrames()
{
while (true)
{
if (parseAndFill() == MessageParser.Result.NO_FRAME)
break;
if (dataMode)
{
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=true on {}", this);
if (buffer.hasRemaining())
processDataFrames();
else
fillInterested();
break;
}
}
}
protected abstract void onDataAvailable(long streamId);
public Stream.Data readData()
{
if (LOG.isDebugEnabled())
LOG.debug("reading data on {}", this);
switch (parseAndFill())
{
case FRAME:
{
DataFrame frame = dataFrames.poll();
if (LOG.isDebugEnabled())
LOG.debug("read data {} on {}", frame, this);
if (frame == null)
return null;
buffer.retain();
CompletableFuture<Object> completable = new CompletableFuture<>()
.whenComplete((r, x) -> buffer.release());
return new Stream.Data(frame, completable);
}
case MODE_SWITCH:
{
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=false on {}", this);
dataLast = true;
dataMode = false;
parser.setDataMode(false);
return null;
}
case NO_FRAME:
{
return null;
}
default:
{
throw new IllegalStateException();
}
}
}
public void demand(boolean enable)
{
boolean process = false;
try (AutoLock l = lock.lock())
{
dataDemand = enable;
if (dataStalled)
{
dataStalled = false;
process = true;
}
}
if (process)
processDataDemand();
}
public boolean hasDemand()
{
try (AutoLock l = lock.lock())
{
return dataDemand;
}
}
private boolean isStalled()
{
try (AutoLock l = lock.lock())
{
return dataStalled;
}
}
private void processDataDemand()
{
while (true)
{
boolean demand;
try (AutoLock l = lock.lock())
{
if (dataDemand)
{
demand = !dataLast;
}
else
{
dataStalled = true;
demand = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("processing demand={} fillInterested={} on {}", demand, isFillInterested(), this);
// Exit if there is no demand, or there is demand but no data.
if (!demand || isFillInterested())
return;
// We have demand, notify the application.
try (AutoLock l = lock.lock())
{
dataDemand = false;
}
onDataAvailable(getEndPoint().getStreamId());
}
}
private MessageParser.Result parseAndFill()
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("parse+fill on {} with buffer {}", this, buffer);
if (buffer == null)
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
ByteBuffer byteBuffer = buffer.getBuffer();
MessageParser.Result result = parser.parse(byteBuffer);
if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH)
return result;
if (buffer.isRetained())
{
buffer.release();
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
byteBuffer = buffer.getBuffer();
}
int filled = getEndPoint().fill(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {} with buffer {}", filled, this, buffer);
if (filled > 0)
continue;
if (!remotelyClosed && getEndPoint().isStreamFinished())
{
if (LOG.isDebugEnabled())
LOG.debug("detected end of stream on {}", this);
parser.parse(EMPTY_DATA_FRAME.slice());
return MessageParser.Result.FRAME;
}
if (filled == 0)
{
buffer.release();
buffer = null;
fillInterested();
break;
}
else
{
buffer.release();
buffer = null;
break;
}
}
return MessageParser.Result.NO_FRAME;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process control stream {}", getEndPoint(), x);
if (buffer != null)
buffer.release();
buffer = null;
getEndPoint().close(x);
return MessageParser.Result.NO_FRAME;
}
}
@Override
public String toConnectionString()
{
return String.format("%s[demand=%b,stalled=%b,dataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), dataMode);
}
private class MessageListener extends ParserListener.Wrapper
{
private MessageListener(ParserListener listener)
{
super(listener);
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
remotelyClosed = frame.isLast();
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
{
// Expect DATA frames now.
dataMode = true;
parser.setDataMode(true);
}
else
{
// Trailer.
remotelyClosed = true;
if (!frame.isLast())
frame = new HeadersFrame(metaData, true);
}
super.onHeaders(streamId, frame);
}
@Override
public void onData(long streamId, DataFrame frame)
{
remotelyClosed = frame.isLast();
dataLast = frame.isLast();
dataFrames.offer(frame);
super.onData(streamId, frame);
}
}
}

View File

@ -25,14 +25,14 @@ import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class InstructionConnection extends AbstractConnection implements Connection.UpgradeTo
public abstract class InstructionStreamConnection extends AbstractConnection implements Connection.UpgradeTo
{
private static final Logger LOG = LoggerFactory.getLogger(DecoderConnection.class);
private static final Logger LOG = LoggerFactory.getLogger(InstructionStreamConnection.class);
private final ByteBufferPool byteBufferPool;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public InstructionConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool)
public InstructionStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
@ -83,8 +83,8 @@ public abstract class InstructionConnection extends AbstractConnection implement
// Then read from the EndPoint.
int filled = getEndPoint().fill(buffer);
if (InstructionConnection.LOG.isDebugEnabled())
InstructionConnection.LOG.debug("filled {} on {}", filled, this);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {}", filled, this);
if (filled == 0)
{
@ -104,8 +104,8 @@ public abstract class InstructionConnection extends AbstractConnection implement
}
catch (Throwable x)
{
if (InstructionConnection.LOG.isDebugEnabled())
InstructionConnection.LOG.debug("could not process decoder stream {}", getEndPoint(), x);
if (LOG.isDebugEnabled())
LOG.debug("could not process decoder stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close(x);

View File

@ -125,10 +125,10 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
{
switch (streamType)
{
case ControlConnection.STREAM_TYPE:
case ControlStreamConnection.STREAM_TYPE:
{
ControlParser parser = new ControlParser(listener);
ControlConnection newConnection = new ControlConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
ControlStreamConnection newConnection = new ControlStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
@ -136,9 +136,9 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
getEndPoint().upgrade(newConnection);
break;
}
case EncoderConnection.STREAM_TYPE:
case EncoderStreamConnection.STREAM_TYPE:
{
EncoderConnection newConnection = new EncoderConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder);
EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
@ -146,9 +146,9 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
getEndPoint().upgrade(newConnection);
break;
}
case DecoderConnection.STREAM_TYPE:
case DecoderStreamConnection.STREAM_TYPE:
{
DecoderConnection newConnection = new DecoderConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder);
DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())

View File

@ -60,7 +60,7 @@ public abstract class BodyParser
* @return true if all the frame body bytes were parsed;
* false if not enough frame body bytes were present in the buffer
*/
public abstract boolean parse(ByteBuffer buffer);
public abstract Result parse(ByteBuffer buffer);
protected void emptyBody(ByteBuffer buffer)
{
@ -108,4 +108,9 @@ public abstract class BodyParser
LOG.info("failure while notifying listener {}", listener, x);
}
}
public enum Result
{
NO_FRAME, FRAGMENT_FRAME, WHOLE_FRAME
}
}

View File

@ -23,7 +23,7 @@ public class CancelPushBodyParser extends BodyParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -84,24 +84,31 @@ public class ControlParser
// TODO: enforce only control frames, but ignore unknown.
if (LOG.isDebugEnabled())
LOG.debug("ignoring unknown frame type {}", Integer.toHexString(frameType));
if (!unknownBodyParser.parse(buffer))
BodyParser.Result result = unknownBodyParser.parse(buffer);
if (result == BodyParser.Result.NO_FRAME)
return;
reset();
if (result == BodyParser.Result.WHOLE_FRAME)
reset();
}
else
{
if (headerParser.getFrameLength() == 0)
{
bodyParser.emptyBody(buffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), buffer);
reset();
}
else
{
if (!bodyParser.parse(buffer))
BodyParser.Result result = bodyParser.parse(buffer);
if (result == BodyParser.Result.NO_FRAME)
return;
if (LOG.isDebugEnabled())
LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), buffer);
if (result == BodyParser.Result.WHOLE_FRAME)
reset();
}
if (LOG.isDebugEnabled())
LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), buffer);
reset();
}
break;
}

View File

@ -50,7 +50,7 @@ public class DataBodyParser extends BodyParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -77,13 +77,13 @@ public class DataBodyParser extends BodyParser
{
reset();
onData(slice, false);
return true;
return Result.WHOLE_FRAME;
}
else
{
// We got partial data, simulate a smaller frame, and stay in DATA state.
onData(slice, true);
break;
return Result.FRAGMENT_FRAME;
}
}
default:
@ -92,7 +92,7 @@ public class DataBodyParser extends BodyParser
}
}
}
return false;
return Result.NO_FRAME;
}
private void onData(ByteBuffer buffer, boolean fragment)

View File

@ -23,7 +23,7 @@ public class GoAwayBodyParser extends BodyParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -53,7 +53,7 @@ public class HeadersBodyParser extends BodyParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -74,7 +74,7 @@ public class HeadersBodyParser extends BodyParser
length -= remaining;
ByteBuffer copy = BufferUtil.copy(buffer);
byteBuffers.add(copy);
return false;
return Result.NO_FRAME;
}
else
{
@ -99,7 +99,7 @@ public class HeadersBodyParser extends BodyParser
byteBuffers.clear();
}
return decode(encoded);
return decode(encoded) ? Result.WHOLE_FRAME : Result.NO_FRAME;
}
}
default:
@ -108,7 +108,7 @@ public class HeadersBodyParser extends BodyParser
}
}
}
return false;
return Result.NO_FRAME;
}
private boolean decode(ByteBuffer encoded)

View File

@ -23,7 +23,7 @@ public class MaxPushIdBodyParser extends BodyParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -15,10 +15,12 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import java.util.function.BooleanSupplier;
import java.util.function.UnaryOperator;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,14 +33,27 @@ public class MessageParser
{
private static final Logger LOG = LoggerFactory.getLogger(MessageParser.class);
private final HeaderParser headerParser;
private final HeaderParser headerParser = new HeaderParser();
private final BodyParser[] bodyParsers = new BodyParser[FrameType.maxType() + 1];
private final BodyParser unknownBodyParser;
private final ParserListener listener;
private final QpackDecoder decoder;
private final long streamId;
private final BooleanSupplier isLast;
private BodyParser unknownBodyParser;
private State state = State.HEADER;
protected boolean dataMode;
public MessageParser(ParserListener listener, QpackDecoder decoder, long streamId, BooleanSupplier isLast)
{
this.headerParser = new HeaderParser();
this.listener = listener;
this.decoder = decoder;
this.streamId = streamId;
this.isLast = isLast;
}
public void init(UnaryOperator<ParserListener> wrapper)
{
ParserListener listener = wrapper.apply(this.listener);
this.bodyParsers[FrameType.DATA.type()] = new DataBodyParser(headerParser, listener, streamId, isLast);
this.bodyParsers[FrameType.HEADERS.type()] = new HeadersBodyParser(headerParser, listener, decoder, streamId, isLast);
this.bodyParsers[FrameType.PUSH_PROMISE.type()] = new PushPromiseBodyParser(headerParser, listener);
@ -51,12 +66,20 @@ public class MessageParser
state = State.HEADER;
}
public void setDataMode(boolean enable)
{
this.dataMode = enable;
}
/**
* <p>Parses the given {@code buffer} bytes and emit events to a {@link ParserListener}.</p>
* <p>Only the bytes of one frame are consumed, therefore when this method returns,
* the buffer may contain unconsumed bytes, for example for other frames.</p>
*
* @param buffer the buffer to parse
* @return the result of the parsing
*/
public void parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
try
{
@ -69,9 +92,12 @@ public class MessageParser
if (headerParser.parse(buffer))
{
state = State.BODY;
// If we are in data mode, but we did not parse a DATA frame, bail out.
if (dataMode && headerParser.getFrameType() != FrameType.DATA.type())
return Result.MODE_SWITCH;
break;
}
return;
return Result.NO_FRAME;
}
case BODY:
{
@ -83,27 +109,37 @@ public class MessageParser
if (bodyParser == null)
{
// Unknown frame types must be ignored.
BodyParser.Result result = unknownBodyParser.parse(buffer);
if (result == BodyParser.Result.NO_FRAME)
return Result.NO_FRAME;
if (LOG.isDebugEnabled())
LOG.debug("Ignoring unknown frame type {}", Integer.toHexString(frameType));
if (!unknownBodyParser.parse(buffer))
return;
LOG.debug("Parsed unknown frame body for type {}", Integer.toHexString(frameType));
if (result == BodyParser.Result.WHOLE_FRAME)
reset();
break;
}
else
{
if (headerParser.getFrameLength() == 0)
{
bodyParser.emptyBody(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
reset();
return Result.FRAME;
}
else
{
if (!bodyParser.parse(buffer))
return;
BodyParser.Result result = bodyParser.parse(buffer);
if (result == BodyParser.Result.NO_FRAME)
return Result.NO_FRAME;
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
if (result == BodyParser.Result.WHOLE_FRAME)
reset();
return Result.FRAME;
}
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), buffer);
}
reset();
break;
}
default:
{
@ -118,6 +154,7 @@ public class MessageParser
LOG.debug("parse failed", x);
buffer.clear();
connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
return Result.NO_FRAME;
}
}
@ -126,6 +163,11 @@ public class MessageParser
unknownBodyParser.sessionFailure(buffer, error, reason);
}
public enum Result
{
NO_FRAME, FRAME, MODE_SWITCH
}
private enum State
{
HEADER, BODY

View File

@ -38,4 +38,44 @@ public interface ParserListener
public default void onSessionFailure(int error, String reason)
{
}
public static class Wrapper implements ParserListener
{
protected final ParserListener listener;
public Wrapper(ParserListener listener)
{
this.listener = listener;
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
listener.onHeaders(streamId, frame);
}
@Override
public void onData(long streamId, DataFrame frame)
{
listener.onData(streamId, frame);
}
@Override
public void onSettings(SettingsFrame frame)
{
listener.onSettings(frame);
}
@Override
public void onStreamFailure(long streamId, int error, String reason)
{
listener.onStreamFailure(streamId, error, reason);
}
@Override
public void onSessionFailure(int error, String reason)
{
listener.onSessionFailure(error, reason);
}
}
}

View File

@ -23,7 +23,7 @@ public class PushPromiseBodyParser extends BodyParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -50,7 +50,7 @@ public class SettingsBodyParser extends BodyParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -74,12 +74,12 @@ public class SettingsBodyParser extends BodyParser
if (settings.containsKey(key))
{
sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate");
return true;
return Result.NO_FRAME;
}
if (SettingsFrame.isReserved(key))
{
sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_reserved");
return true;
return Result.NO_FRAME;
}
if (length > 0)
{
@ -88,11 +88,11 @@ public class SettingsBodyParser extends BodyParser
else
{
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
return true;
return Result.NO_FRAME;
}
break;
}
return false;
return Result.NO_FRAME;
}
case VALUE:
{
@ -112,16 +112,16 @@ public class SettingsBodyParser extends BodyParser
Map<Long, Long> settings = this.settings;
reset();
onSettings(settings);
return true;
return Result.WHOLE_FRAME;
}
else
{
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
return true;
return Result.NO_FRAME;
}
break;
}
return false;
return Result.NO_FRAME;
}
default:
{
@ -129,7 +129,7 @@ public class SettingsBodyParser extends BodyParser
}
}
}
return false;
return Result.NO_FRAME;
}
private void onSettings(Map<Long, Long> settings)

View File

@ -23,7 +23,7 @@ public class UnknownBodyParser extends BodyParser
}
@Override
public boolean parse(ByteBuffer buffer)
public Result parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -17,6 +17,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.UnaryOperator;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
@ -65,6 +66,7 @@ public class DataGenerateParseTest
frames.add(frame);
}
}, null, 13, () -> true);
parser.init(UnaryOperator.identity());
for (ByteBuffer buffer : lease.getByteBuffers())
{
parser.parse(buffer);

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.UnaryOperator;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
@ -60,6 +61,7 @@ public class HeadersGenerateParseTest
frames.add(frame);
}
}, decoder, 13, () -> true);
parser.init(UnaryOperator.identity());
for (ByteBuffer buffer : lease.getByteBuffers())
{
parser.parse(buffer);

View File

@ -17,9 +17,9 @@ import java.util.Map;
import java.util.Objects;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.internal.HTTP3Connection;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.ProtocolSession;
@ -98,7 +98,6 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
long streamId = streamEndPoint.getStreamId();
ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
MessageParser parser = new MessageParser(http3Session.getSessionServer(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished);
HTTP3Connection connection = new HTTP3Connection(streamEndPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
return connection;
return new ServerHTTP3StreamConnection(streamEndPoint, http3Session, parser);
}
}

View File

@ -19,8 +19,8 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderConnection;
import org.eclipse.jetty.http3.internal.EncoderConnection;
import org.eclipse.jetty.http3.internal.DecoderStreamConnection;
import org.eclipse.jetty.http3.internal.EncoderStreamConnection;
import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
@ -59,14 +59,14 @@ public class ServerHTTP3Session extends ServerProtocolSession
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderConnection.STREAM_TYPE);
this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderFlusher), maxBlockedStreams);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderConnection.STREAM_TYPE);
this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderFlusher), maxRequestHeadersSize);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
@ -125,22 +125,22 @@ public class ServerHTTP3Session extends ServerProtocolSession
}
@Override
protected void onReadable(long readableStreamId)
protected boolean onReadable(long readableStreamId)
{
StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read", readableStreamId);
super.onReadable(readableStreamId);
LOG.debug("bidirectional stream #{} selected for read", readableStreamId);
return super.onReadable(readableStreamId);
}
else
{
// On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable();
}
}
@ -158,4 +158,9 @@ public class ServerHTTP3Session extends ServerProtocolSession
messageFlusher.offer(endPoint, frame, callback);
messageFlusher.iterate();
}
protected void onDataAvailable(long streamId)
{
apiSession.onDataAvailable(streamId);
}
}

View File

@ -0,0 +1,35 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.server.internal;
import org.eclipse.jetty.http3.internal.HTTP3StreamConnection;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
{
private final ServerHTTP3Session http3Session;
public ServerHTTP3StreamConnection(QuicStreamEndPoint endPoint, ServerHTTP3Session http3Session, MessageParser parser)
{
super(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
this.http3Session = http3Session;
}
@Override
protected void onDataAvailable(long streamId)
{
http3Session.onDataAvailable(streamId);
}
}

View File

@ -29,6 +29,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>

View File

@ -0,0 +1,62 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.tests;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
import org.eclipse.jetty.quic.server.ServerQuicConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.RegisterExtension;
public class AbstractHTTP3ClientServerTest
{
@RegisterExtension
final BeforeTestExecutionCallback printMethodName = context ->
System.err.printf("Running %s.%s() %s%n", context.getRequiredTestClass().getSimpleName(), context.getRequiredTestMethod().getName(), context.getDisplayName());
protected ServerQuicConnector connector;
protected HTTP3Client client;
protected Server server;
protected void startServer(Session.Server.Listener listener) throws Exception
{
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(listener));
server.addConnector(connector);
server.start();
}
protected void startClient() throws Exception
{
client = new HTTP3Client();
client.start();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(client);
LifeCycle.stop(server);
}
}

View File

@ -14,9 +14,12 @@
package org.eclipse.jetty.http3.tests;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
@ -26,53 +29,17 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
import org.eclipse.jetty.quic.server.ServerQuicConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HTTP3ClientServerTest
public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
{
private Server server;
private ServerQuicConnector connector;
private HTTP3Client client;
private void startServer(Session.Server.Listener listener) throws Exception
{
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(listener));
server.addConnector(connector);
server.start();
}
private void startClient() throws Exception
{
client = new HTTP3Client();
client.start();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(client);
LifeCycle.stop(server);
}
@Test
public void testConnectTriggersSettingsFrame() throws Exception
{
@ -152,7 +119,155 @@ public class HTTP3ClientServerTest
.get(5, TimeUnit.SECONDS);
assertNotNull(stream);
assertTrue(serverRequestLatch.await(555, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testDiscardRequestContent() throws Exception
{
AtomicReference<CountDownLatch> serverLatch = new AtomicReference<>(new CountDownLatch(1));
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
// Send the response.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
// Implicit demand, so onDataAvailable() will be called.
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
// FlowControl acknowledged already.
Stream.Data data = stream.readData();
if (data == null)
{
// Call me again when you have data.
stream.demand(true);
return;
}
// Recycle the ByteBuffer in data.frame.
data.succeed();
// Call me again immediately.
stream.demand(true);
if (data.frame().isLast())
serverLatch.get().countDown();
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
AtomicReference<CountDownLatch> clientLatch = new AtomicReference<>(new CountDownLatch(1));
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.POST.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, false);
Stream.Listener streamListener = new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
clientLatch.get().countDown();
}
};
Stream stream1 = session.newRequest(frame, streamListener)
.get(5, TimeUnit.SECONDS);
stream1.data(new DataFrame(ByteBuffer.allocate(8192), true));
assertTrue(clientLatch.get().await(5, TimeUnit.SECONDS));
assertTrue(serverLatch.get().await(5, TimeUnit.SECONDS));
// Send another request, but with 2 chunks of data separated by some time.
serverLatch.set(new CountDownLatch(1));
clientLatch.set(new CountDownLatch(1));
Stream stream2 = session.newRequest(frame, streamListener).get(5, TimeUnit.SECONDS);
stream2.data(new DataFrame(ByteBuffer.allocate(3 * 1024), false));
// Wait some time before sending the second chunk.
Thread.sleep(500);
stream2.data(new DataFrame(ByteBuffer.allocate(5 * 1024), true));
assertTrue(clientLatch.get().await(555, TimeUnit.SECONDS));
assertTrue(serverLatch.get().await(555, TimeUnit.SECONDS));
}
@Test
public void testEchoRequestContentAsResponseContent() throws Exception
{
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
// Send the response headers.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
// Read data.
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand(true);
return;
}
// Echo it back, then demand only when the write is finished.
stream.data(data.frame())
.whenComplete(data::completeAndDemand);
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, false);
byte[] bytesSent = new byte[8192];
new Random().nextBytes(bytesSent);
byte[] bytesReceived = new byte[bytesSent.length];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytesReceived);
CountDownLatch clientDataLatch = new CountDownLatch(1);
Stream stream = session.newRequest(frame, new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
clientResponseLatch.countDown();
}
@Override
public void onDataAvailable(Stream stream)
{
// Read data.
Stream.Data data = stream.readData();
if (data != null)
{
// Consume data.
byteBuffer.put(data.frame().getData());
data.callback().complete(null);
if (data.frame().isLast())
clientDataLatch.countDown();
}
// Demand more data.
stream.demand(true);
}
})
.get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.wrap(bytesSent), true));
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDataLatch.await(5, TimeUnit.SECONDS));
assertArrayEquals(bytesSent, bytesReceived);
}
}

View File

@ -0,0 +1,463 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.tests;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.util.BufferUtil;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
{
@Test
public void testOnDataAvailableThenExit() throws Exception
{
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
CountDownLatch serverStreamLatch = new CountDownLatch(1);
CountDownLatch serverDataLatch = new CountDownLatch(1);
AtomicLong onDataAvailableCalls = new AtomicLong();
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
onDataAvailableCalls.incrementAndGet();
if (serverStreamRef.compareAndSet(null, stream))
{
// Do nothing on the first pass, with respect to demand and reading data.
serverStreamLatch.countDown();
}
else
{
// When resumed, demand all content until the last.
Stream.Data data = stream.readData();
if (data != null && data.frame().isLast())
serverDataLatch.countDown();
else
stream.demand(true);
}
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame request = new HeadersFrame(metaData, false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(8192), true));
assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
// Wait a little to be sure we do not spin.
Thread.sleep(500);
assertEquals(1, onDataAvailableCalls.get());
// Resume processing of data, this should call onDataAvailable().
serverStreamRef.get().demand(true);
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testOnDataAvailableThenReadDataThenExit() throws Exception
{
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
CountDownLatch serverStreamLatch = new CountDownLatch(1);
CountDownLatch serverDataLatch = new CountDownLatch(1);
AtomicLong onDataAvailableCalls = new AtomicLong();
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
onDataAvailableCalls.incrementAndGet();
if (serverStreamRef.compareAndSet(null, stream))
{
serverStreamLatch.countDown();
// Read only one chunk of data.
Stream.Data data = stream.readData();
assertNotNull(data);
// Don't demand, just exit.
}
else
{
// When resumed, demand all content until the last.
Stream.Data data = stream.readData();
if (data != null && data.frame().isLast())
serverDataLatch.countDown();
else
stream.demand(true);
}
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame request = new HeadersFrame(metaData, false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(16), false));
assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
// Wait a little to be sure we do not spin.
Thread.sleep(500);
assertEquals(1, onDataAvailableCalls.get());
// Resume processing of data, this should call onDataAvailable(), but there is no data to read yet.
serverStreamRef.get().demand(true);
await().atMost(1, TimeUnit.SECONDS).until(() -> onDataAvailableCalls.get() == 2 && ((HTTP3Stream)stream).hasDemand());
stream.data(new DataFrame(ByteBuffer.allocate(32), true));
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testOnDataAvailableThenReadDataNullThenExit() throws Exception
{
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
CountDownLatch serverStreamLatch = new CountDownLatch(1);
CountDownLatch serverDataLatch = new CountDownLatch(1);
AtomicLong onDataAvailableCalls = new AtomicLong();
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
onDataAvailableCalls.incrementAndGet();
if (serverStreamRef.compareAndSet(null, stream))
{
while (true)
{
Stream.Data data = stream.readData();
if (data == null)
{
serverStreamLatch.countDown();
break;
}
}
// Do not demand after reading null data.
}
else
{
// When resumed, demand all content until the last.
Stream.Data data = stream.readData();
if (data != null && data.frame().isLast())
serverDataLatch.countDown();
else
stream.demand(true);
}
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame request = new HeadersFrame(metaData, false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(16), false));
assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
// Wait a little to be sure we do not spin.
Thread.sleep(500);
assertEquals(1, onDataAvailableCalls.get());
// Send a last empty frame.
stream.data(new DataFrame(BufferUtil.EMPTY_BUFFER, true));
// Resume processing of data, this should call onDataAvailable().
serverStreamRef.get().demand(true);
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testHeadersNoDataThenTrailers() throws Exception
{
CountDownLatch serverDataLatch = new CountDownLatch(1);
CountDownLatch serverTrailerLatch = new CountDownLatch(1);
AtomicLong onDataAvailableCalls = new AtomicLong();
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
onDataAvailableCalls.incrementAndGet();
// Must read to EOF to trigger fill+parse of the trailer.
Stream.Data data = stream.readData();
assertNull(data);
// It's typical to demand after null data.
stream.demand(true);
serverDataLatch.countDown();
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
{
serverTrailerLatch.countDown();
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame request = new HeadersFrame(metaData, false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
stream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true)).get(5, TimeUnit.SECONDS);
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
// Wait a little to be sure we do not spin.
Thread.sleep(500);
assertEquals(1, onDataAvailableCalls.get());
assertTrue(serverTrailerLatch.await(5, TimeUnit.SECONDS));
assertEquals(1, onDataAvailableCalls.get());
}
@Test
public void testHeadersDataTrailers() throws Exception
{
int dataLength = 8192;
AtomicInteger dataRead = new AtomicInteger();
CountDownLatch serverDataLatch = new CountDownLatch(1);
CountDownLatch serverTrailerLatch = new CountDownLatch(1);
AtomicLong onDataAvailableCalls = new AtomicLong();
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
onDataAvailableCalls.incrementAndGet();
Stream.Data data = stream.readData();
if (data != null)
{
if (dataRead.addAndGet(data.frame().getData().remaining()) == dataLength)
serverDataLatch.countDown();
}
stream.demand(true);
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
{
serverTrailerLatch.countDown();
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame request = new HeadersFrame(metaData, false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(dataLength), false));
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
long calls = onDataAvailableCalls.get();
stream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true)).get(5, TimeUnit.SECONDS);
assertTrue(serverTrailerLatch.await(5, TimeUnit.SECONDS));
// In order to detect that the trailer have arrived, we must call
// onDataAvailable() one more time, possibly two more times if an
// empty DATA frame was delivered to indicate the end of the stream.
assertThat(onDataAvailableCalls.get(), Matchers.lessThanOrEqualTo(calls + 2));
}
@Test
public void testRetainRelease() throws Exception
{
CountDownLatch serverDataLatch = new CountDownLatch(1);
List<Stream.Data> datas = new ArrayList<>();
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
while (true)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand(true);
return;
}
// Store the Data away to be used later.
datas.add(data);
if (data.frame().isLast())
serverDataLatch.countDown();
}
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame request = new HeadersFrame(metaData, false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
byte[] bytesSent = new byte[16384];
new Random().nextBytes(bytesSent);
stream.data(new DataFrame(ByteBuffer.wrap(bytesSent), true));
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
assertEquals(bytesSent.length, datas.stream().mapToInt(d -> d.frame().getData().remaining()).sum());
byte[] bytesReceived = new byte[bytesSent.length];
ByteBuffer buffer = ByteBuffer.wrap(bytesReceived);
datas.forEach(d -> buffer.put(d.frame().getData()));
assertArrayEquals(bytesSent, bytesReceived);
}
@Test
public void testDisableDemandOnRequest() throws Exception
{
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
CountDownLatch serverRequestLatch = new CountDownLatch(1);
CountDownLatch serverDataLatch = new CountDownLatch(1);
AtomicLong onDataAvailableCalls = new AtomicLong();
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverStreamRef.set(stream);
serverRequestLatch.countDown();
stream.demand(false);
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
onDataAvailableCalls.incrementAndGet();
Stream.Data data = stream.readData();
if (data != null && data.frame().isLast())
serverDataLatch.countDown();
stream.demand(true);
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame request = new HeadersFrame(metaData, false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(4096), true));
assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS));
// Wait a little to verify that onDataAvailable() is not called.
Thread.sleep(500);
assertEquals(0, onDataAvailableCalls.get());
// Resume processing of data.
serverStreamRef.get().demand(true);
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -44,13 +44,14 @@ public class ClientProtocolSession extends ProtocolSession
}
@Override
protected void onReadable(long readableStreamId)
protected boolean onReadable(long readableStreamId)
{
// On the client, we need a get-only semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getStreamEndPoint(readableStreamId);
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
if (streamEndPoint != null)
streamEndPoint.onReadable();
return streamEndPoint.onReadable();
return false;
}
}

View File

@ -93,18 +93,19 @@ public abstract class ProtocolSession
List<Long> readableStreamIds = session.getReadableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {}", readableStreamIds);
readableStreamIds.forEach(this::onReadable);
// TODO: ExecutionStrategy plug-in point is here.
// this::onReadable() just feeds the decoder and the instruction streams.
// this.onReadable() just feeds the decoder and the instruction streams.
// Note that req/rsp streams never eat DATA frame, it's a noop because they pull data
// when they want to read data frames, either via Stream.readData() or ServletInputStream.read().
// Then here we ask decoder for tasks, and have the ExecutionStrategy process them.
return !readableStreamIds.isEmpty();
return readableStreamIds.stream()
.map(this::onReadable)
.reduce(false, (result, interested) -> result || interested);
}
protected abstract void onReadable(long readableStreamId);
protected abstract boolean onReadable(long readableStreamId);
public void configureProtocolEndPoint(QuicStreamEndPoint endPoint)
{

View File

@ -125,7 +125,7 @@ public class QuicStreamEndPoint extends AbstractEndPoint
public int fill(ByteBuffer buffer) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("filling buffer from stream {}", streamId);
LOG.debug("filling buffer from stream {} finished={}", streamId, isStreamFinished());
int pos = BufferUtil.flipToFill(buffer);
int drained = session.fill(streamId, buffer);
BufferUtil.flipToFlush(buffer, pos);
@ -135,6 +135,8 @@ public class QuicStreamEndPoint extends AbstractEndPoint
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
// TODO: session.flush(streamId, buffer) feeds Quiche and then calls flush().
// Can we call flush() only after the for loop below?
if (LOG.isDebugEnabled())
LOG.debug("flushing {} buffer(s) to stream {}", buffers.length, streamId);
for (ByteBuffer buffer : buffers)
@ -188,11 +190,12 @@ public class QuicStreamEndPoint extends AbstractEndPoint
getWriteFlusher().completeWrite();
}
public void onReadable()
public boolean onReadable()
{
if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable", streamId);
getFillInterest().fillable();
return isFillInterested();
}
@Override

View File

@ -586,12 +586,17 @@ public class QuicheConnection
}
public int feedClearTextForStream(long streamId, ByteBuffer buffer) throws IOException
{
return feedClearTextForStream(streamId, buffer, false);
}
public int feedClearTextForStream(long streamId, ByteBuffer buffer, boolean last) throws IOException
{
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IOException("Quiche connection was released");
int written = LibQuiche.INSTANCE.quiche_conn_stream_send(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), false).intValue();
int written = LibQuiche.INSTANCE.quiche_conn_stream_send(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), last).intValue();
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return 0;
if (written < 0L)

View File

@ -39,12 +39,12 @@ public class ServerProtocolSession extends ProtocolSession
}
@Override
protected void onReadable(long readableStreamId)
protected boolean onReadable(long readableStreamId)
{
// On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
return streamEndPoint.onReadable();
}
}

View File

@ -186,6 +186,18 @@ public interface Callback extends Invocable
};
}
static Callback from(InvocationType invocationType, Runnable completed)
{
return new Completing(invocationType)
{
@Override
public void completed()
{
completed.run();
}
};
}
/**
* <p>Creates a Callback with the given {@code invocationType},
* that runs the given {@code Runnable} when it succeeds or fails.</p>