Issue #6728 - QUIC and HTTP/3

- Updates to the data support after review.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-22 12:34:57 +02:00
parent 43551487ee
commit 87edb609bd
9 changed files with 75 additions and 82 deletions

View File

@ -122,14 +122,14 @@ public class ClientHTTP3Session extends ClientProtocolSession
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read", readableStreamId);
LOG.debug("bidirectional stream #{} selected for read", readableStreamId);
return super.onReadable(readableStreamId);
}
else
{
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable();
}
}

View File

@ -53,6 +53,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
getProtocolSession().writeFrame(streamId, frame, callback);
}
@Override
public void onOpen()
{
promise.succeeded(this);

View File

@ -26,7 +26,7 @@ public interface Stream
public Stream.Data readData();
public void demand(boolean enable);
public void demand();
public CompletableFuture<Stream> trailer(HeadersFrame frame);
@ -48,12 +48,12 @@ public interface Stream
public static class Data
{
private final DataFrame frame;
private final CompletableFuture<Object> callback;
private final Runnable complete;
public Data(DataFrame frame, CompletableFuture<Object> callback)
public Data(DataFrame frame, Runnable complete)
{
this.frame = frame;
this.callback = callback;
this.complete = complete;
}
public DataFrame frame()
@ -61,29 +61,15 @@ public interface Stream
return frame;
}
public CompletableFuture<Object> callback()
public void complete()
{
return callback;
complete.run();
}
public void complete(Object result, Throwable failure)
@Override
public String toString()
{
if (failure == null)
callback().complete(result);
else
callback().completeExceptionally(failure);
}
public void completeAndDemand(Stream stream, Throwable failure)
{
complete(stream, failure);
if (failure == null)
stream.demand(true);
}
public void succeed()
{
callback().complete(null);
return String.format("%s[%s]", getClass().getSimpleName(), frame);
}
}
}

View File

@ -44,6 +44,10 @@ public abstract class HTTP3Session implements Session, ParserListener
this.listener = listener;
}
public void onOpen()
{
}
public ProtocolSession getProtocolSession()
{
return session;

View File

@ -66,10 +66,10 @@ public class HTTP3Stream implements Stream
}
@Override
public void demand(boolean enable)
public void demand()
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
connection.demand(enable);
connection.demand();
}
@Override

View File

@ -16,7 +16,6 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http.MetaData;
@ -58,8 +57,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool);
this.parser = parser;
parser.init(MessageListener::new);
// By default, invoke onDataAvailable() after onRequest()/onResponse().
this.dataDemand = true;
}
@Override
@ -146,9 +143,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
buffer.retain();
CompletableFuture<Object> completable = new CompletableFuture<>()
.whenComplete((r, x) -> buffer.release());
return new Stream.Data(frame, completable);
return new Stream.Data(frame, buffer::release);
}
case MODE_SWITCH:
{
@ -170,12 +165,12 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
public void demand(boolean enable)
public void demand()
{
boolean process = false;
try (AutoLock l = lock.lock())
{
dataDemand = enable;
dataDemand = true;
if (dataStalled)
{
dataStalled = false;

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
@ -42,37 +41,32 @@ public class ServerHTTP3Session extends ServerProtocolSession
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3SessionServer apiSession;
private final InstructionFlusher encoderFlusher;
private final InstructionFlusher decoderFlusher;
private final HTTP3SessionServer applicationSession;
private final ControlFlusher controlFlusher;
private final MessageGenerator generator;
private final HTTP3Flusher messageFlusher;
public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize)
{
super(session);
this.apiSession = new HTTP3SessionServer(this, listener);
this.applicationSession = new HTTP3SessionServer(this, listener);
if (LOG.isDebugEnabled())
LOG.debug("initializing HTTP/3 streams");
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderFlusher), maxBlockedStreams);
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderFlusher), maxRequestHeadersSize);
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxRequestHeadersSize);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
// TODO: make parameters configurable.
this.generator = new MessageGenerator(encoder, 4096, true);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
@ -90,29 +84,25 @@ public class ServerHTTP3Session extends ServerProtocolSession
public HTTP3SessionServer getSessionServer()
{
return apiSession;
return applicationSession;
}
@Override
public void onOpen()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = apiSession.onPreface();
Map<Long, Long> settings = applicationSession.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
applicationSession.onOpen();
}
private QuicStreamEndPoint configureEncoderEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
}
private QuicStreamEndPoint configureDecoderEndPoint(long streamId)
private QuicStreamEndPoint configureInstructionEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
@ -136,7 +126,6 @@ public class ServerHTTP3Session extends ServerProtocolSession
}
else
{
// On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
@ -146,7 +135,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, apiSession);
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
@ -161,6 +150,12 @@ public class ServerHTTP3Session extends ServerProtocolSession
protected void onDataAvailable(long streamId)
{
apiSession.onDataAvailable(streamId);
applicationSession.onDataAvailable(streamId);
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

View File

@ -134,7 +134,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
{
// Send the response.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
// Implicit demand, so onDataAvailable() will be called.
stream.demand();
return new Stream.Listener()
{
@Override
@ -145,13 +145,13 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
if (data == null)
{
// Call me again when you have data.
stream.demand(true);
stream.demand();
return;
}
// Recycle the ByteBuffer in data.frame.
data.succeed();
data.complete();
// Call me again immediately.
stream.demand(true);
stream.demand();
if (data.frame().isLast())
serverLatch.get().countDown();
}
@ -205,6 +205,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
{
// Send the response headers.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
stream.demand();
return new Stream.Listener()
{
@Override
@ -214,12 +215,15 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand(true);
stream.demand();
return;
}
// Echo it back, then demand only when the write is finished.
stream.data(data.frame())
.whenComplete(data::completeAndDemand);
// Always complete.
.whenComplete((s, x) -> data.complete())
// Demand only if successful.
.thenRun(stream::demand);
}
};
}
@ -244,6 +248,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
public void onResponse(Stream stream, HeadersFrame frame)
{
clientResponseLatch.countDown();
stream.demand();
}
@Override
@ -255,12 +260,12 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
{
// Consume data.
byteBuffer.put(data.frame().getData());
data.callback().complete(null);
data.complete();
if (data.frame().isLast())
clientDataLatch.countDown();
}
// Demand more data.
stream.demand(true);
stream.demand();
}
})
.get(5, TimeUnit.SECONDS);

View File

@ -60,6 +60,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
{
@Override
@ -78,7 +79,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
if (data != null && data.frame().isLast())
serverDataLatch.countDown();
else
stream.demand(true);
stream.demand();
}
}
};
@ -101,7 +102,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
assertEquals(1, onDataAvailableCalls.get());
// Resume processing of data, this should call onDataAvailable().
serverStreamRef.get().demand(true);
serverStreamRef.get().demand();
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
}
@ -118,6 +119,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
{
@Override
@ -139,7 +141,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
if (data != null && data.frame().isLast())
serverDataLatch.countDown();
else
stream.demand(true);
stream.demand();
}
}
};
@ -162,9 +164,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
assertEquals(1, onDataAvailableCalls.get());
// Resume processing of data, this should call onDataAvailable(), but there is no data to read yet.
serverStreamRef.get().demand(true);
Stream serverStream = serverStreamRef.get();
serverStream.demand();
await().atMost(1, TimeUnit.SECONDS).until(() -> onDataAvailableCalls.get() == 2 && ((HTTP3Stream)stream).hasDemand());
await().atMost(1, TimeUnit.SECONDS).until(() -> onDataAvailableCalls.get() == 2 && ((HTTP3Stream)serverStream).hasDemand());
stream.data(new DataFrame(ByteBuffer.allocate(32), true));
@ -183,6 +186,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
{
@Override
@ -209,7 +213,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
if (data != null && data.frame().isLast())
serverDataLatch.countDown();
else
stream.demand(true);
stream.demand();
}
}
};
@ -235,7 +239,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
stream.data(new DataFrame(BufferUtil.EMPTY_BUFFER, true));
// Resume processing of data, this should call onDataAvailable().
serverStreamRef.get().demand(true);
serverStreamRef.get().demand();
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
}
@ -251,6 +255,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
{
@Override
@ -261,7 +266,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
Stream.Data data = stream.readData();
assertNull(data);
// It's typical to demand after null data.
stream.demand(true);
stream.demand();
serverDataLatch.countDown();
}
@ -306,6 +311,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
{
@Override
@ -318,7 +324,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
if (dataRead.addAndGet(data.frame().getData().remaining()) == dataLength)
serverDataLatch.countDown();
}
stream.demand(true);
stream.demand();
}
@Override
@ -363,6 +369,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
{
@Override
@ -373,7 +380,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand(true);
stream.demand();
return;
}
// Store the Data away to be used later.
@ -422,7 +429,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
{
serverStreamRef.set(stream);
serverRequestLatch.countDown();
stream.demand(false);
// Do not demand here.
return new Stream.Listener()
{
@Override
@ -432,7 +439,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
Stream.Data data = stream.readData();
if (data != null && data.frame().isLast())
serverDataLatch.countDown();
stream.demand(true);
stream.demand();
}
};
}
@ -456,7 +463,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
assertEquals(0, onDataAvailableCalls.get());
// Resume processing of data.
serverStreamRef.get().demand(true);
serverStreamRef.get().demand();
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
}