Issue #6728 - QUIC and HTTP/3

- WIP on the threading model.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-18 18:25:15 +02:00
parent a803dfa44f
commit dd4b970357
11 changed files with 182 additions and 117 deletions

View File

@ -128,21 +128,21 @@ public class ClientHTTP3Session extends ClientProtocolSession
}
@Override
protected boolean onReadable(long readableStreamId)
protected void onReadable(long readableStreamId)
{
StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
if (LOG.isDebugEnabled())
LOG.debug("bidirectional stream #{} selected for read", readableStreamId);
return super.onReadable(readableStreamId);
super.onReadable(readableStreamId);
}
else
{
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable();
streamEndPoint.onReadable();
}
}

View File

@ -13,9 +13,7 @@
package org.eclipse.jetty.http3.server.internal;
import java.util.ArrayDeque;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.HTTP3Configuration;
@ -36,9 +34,7 @@ import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.quic.server.ServerProtocolSession;
import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,8 +47,6 @@ public class ServerHTTP3Session extends ServerProtocolSession
private final HTTP3SessionServer session;
private final ControlFlusher controlFlusher;
private final HTTP3Flusher messageFlusher;
private final AdaptiveExecutionStrategy strategy;
private final HTTP3Producer producer = new HTTP3Producer();
public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession quicSession, Session.Server.Listener listener)
{
@ -89,9 +83,6 @@ public class ServerHTTP3Session extends ServerProtocolSession
this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers());
addBean(messageFlusher);
this.strategy = new AdaptiveExecutionStrategy(producer, getQuicSession().getExecutor());
addBean(strategy);
}
public QpackDecoder getQpackDecoder()
@ -104,24 +95,6 @@ public class ServerHTTP3Session extends ServerProtocolSession
return session;
}
public void offer(Runnable task)
{
producer.offer(task);
}
@Override
protected boolean processReadableStreams()
{
// Calling super.processReadableStreams() is going to fill and parse HEADERS frames on the current thread,
// so the QPACK decoder is not accessed concurrently.
// The processing of HEADERS frames will produce Runnable tasks and offer them to this instance (via calls
// to offer(Runnable)) so that the execution strategy can consume them.
boolean result = super.processReadableStreams();
strategy.produce();
return result;
}
@Override
protected void doStart() throws Exception
{
@ -154,21 +127,21 @@ public class ServerHTTP3Session extends ServerProtocolSession
}
@Override
protected boolean onReadable(long readableStreamId)
protected void onReadable(long readableStreamId)
{
StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
if (LOG.isDebugEnabled())
LOG.debug("bidirectional stream #{} selected for read", readableStreamId);
return super.onReadable(readableStreamId);
super.onReadable(readableStreamId);
}
else
{
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable();
streamEndPoint.onReadable();
}
}
@ -227,25 +200,4 @@ public class ServerHTTP3Session extends ServerProtocolSession
{
session.onDataAvailable(streamId);
}
private class HTTP3Producer implements ExecutionStrategy.Producer
{
private final Queue<Runnable> tasks = new ArrayDeque<>();
public void offer(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("enqueuing task {} on {}", task, ServerHTTP3Session.this);
tasks.offer(task);
}
@Override
public Runnable produce()
{
Runnable task = tasks.poll();
if (LOG.isDebugEnabled())
LOG.debug("dequeued task {} on {}", task, ServerHTTP3Session.this);
return task;
}
}
}

View File

@ -45,15 +45,14 @@ public class ClientProtocolSession extends ProtocolSession
}
@Override
protected boolean onReadable(long readableStreamId)
protected void onReadable(long readableStreamId)
{
// On the client, we need a get-only semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getStreamEndPoint(readableStreamId);
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
if (streamEndPoint != null)
return streamEndPoint.onReadable();
return false;
streamEndPoint.onReadable();
}
@Override

View File

@ -112,6 +112,14 @@ public class ClientQuicConnection extends QuicConnection
}
}
@Override
public void onFillable()
{
Runnable task = receiveAndProcess();
if (task != null)
task.run();
}
@Override
protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
{

View File

@ -13,15 +13,19 @@
package org.eclipse.jetty.quic.common;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.quic.common.internal.QuicErrorCode;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,12 +33,15 @@ public abstract class ProtocolSession extends ContainerLifeCycle
{
private static final Logger LOG = LoggerFactory.getLogger(ProtocolSession.class);
private final AtomicInteger active = new AtomicInteger();
private final StreamsProducer producer = new StreamsProducer();
private final AdaptiveExecutionStrategy strategy;
private final QuicSession session;
public ProtocolSession(QuicSession session)
{
this.session = session;
this.strategy = new AdaptiveExecutionStrategy(producer, session.getExecutor());
addBean(strategy);
}
public QuicSession getQuicSession()
@ -44,36 +51,12 @@ public abstract class ProtocolSession extends ContainerLifeCycle
public void process()
{
// This method is called by the network thread and
// dispatches to one, per-session, processing thread.
// The active counter counts up to 2, with the meanings:
// 0=idle, 1=process, 2=re-process, where re-process is
// necessary to close race between the processing thread
// seeing active=1 and about to exit, and the network
// thread also seeing active=1 and not dispatching,
// leaving unprocessed data in the session.
if (active.getAndUpdate(count -> count <= 1 ? count + 1 : count) == 0)
session.getExecutor().execute(this::processSession);
strategy.produce();
}
private void processSession()
public void offer(Runnable task)
{
while (true)
{
processWritableStreams();
if (processReadableStreams())
continue;
// Exit if did not process any stream and we are idle.
if (active.decrementAndGet() == 0)
{
CloseInfo closeInfo = session.getRemoteCloseInfo();
if (closeInfo != null)
onClose(closeInfo.error(), closeInfo.reason());
break;
}
}
producer.offer(task);
}
public QuicStreamEndPoint getStreamEndPoint(long streamId)
@ -104,18 +87,15 @@ public abstract class ProtocolSession extends ContainerLifeCycle
streamEndPoint.onWritable();
}
protected boolean processReadableStreams()
protected void processReadableStreams()
{
List<Long> readableStreamIds = session.getReadableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {}", readableStreamIds);
return readableStreamIds.stream()
.map(this::onReadable)
.reduce(false, (result, interested) -> result || interested);
readableStreamIds.forEach(this::onReadable);
}
protected abstract boolean onReadable(long readableStreamId);
protected abstract void onReadable(long readableStreamId);
public void configureProtocolEndPoint(QuicStreamEndPoint endPoint)
{
@ -158,4 +138,54 @@ public abstract class ProtocolSession extends ContainerLifeCycle
{
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context);
}
private class StreamsProducer implements ExecutionStrategy.Producer
{
private final AutoLock lock = new AutoLock();
private final Queue<Runnable> tasks = new ArrayDeque<>();
public void offer(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("enqueuing task {} on {}", task, ProtocolSession.this);
try (AutoLock l = lock.lock())
{
tasks.offer(task);
}
}
private Runnable poll()
{
try (AutoLock l = lock.lock())
{
return tasks.poll();
}
}
@Override
public Runnable produce()
{
Runnable task = poll();
if (LOG.isDebugEnabled())
LOG.debug("dequeued task {} on {}", task, ProtocolSession.this);
if (task != null)
return task;
processWritableStreams();
processReadableStreams();
task = poll();
if (LOG.isDebugEnabled())
LOG.debug("dequeued produced task {} on {}", task, ProtocolSession.this);
if (task != null)
return task;
CloseInfo closeInfo = session.getRemoteCloseInfo();
if (closeInfo != null)
onClose(closeInfo.error(), closeInfo.reason());
getQuicSession().processingComplete();
return null;
}
}
}

View File

@ -131,6 +131,8 @@ public abstract class QuicConnection extends AbstractConnection
listeners.remove((QuicSession.Listener)listener);
}
public abstract void onFillable();
@Override
public abstract boolean onIdleExpired();
@ -159,8 +161,14 @@ public abstract class QuicConnection extends AbstractConnection
}
}
@Override
public void onFillable()
protected abstract QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException;
public void write(Callback callback, SocketAddress remoteAddress, ByteBuffer... buffers)
{
flusher.offer(callback, remoteAddress, buffers);
}
protected Runnable receiveAndProcess()
{
try
{
@ -172,18 +180,18 @@ public abstract class QuicConnection extends AbstractConnection
int fill = remoteAddress == DatagramChannelEndPoint.EOF ? -1 : cipherBuffer.remaining();
if (LOG.isDebugEnabled())
LOG.debug("filled cipher buffer with {} byte(s)", fill);
// ServerDatagramEndPoint will only return -1 if input is shut down.
// DatagramChannelEndPoint will only return -1 if input is shut down.
if (fill < 0)
{
byteBufferPool.release(cipherBuffer);
getEndPoint().shutdownOutput();
return;
return null;
}
if (fill == 0)
{
byteBufferPool.release(cipherBuffer);
fillInterested();
return;
return null;
}
if (LOG.isDebugEnabled())
@ -224,24 +232,23 @@ public abstract class QuicConnection extends AbstractConnection
}
if (LOG.isDebugEnabled())
LOG.debug("packet is for existing session with connection ID {}, processing it ({} byte(s))", quicheConnectionId, cipherBuffer.remaining());
session.process(remoteAddress, cipherBuffer);
LOG.debug("packet is for existing session {}, processing {} bytes", session, cipherBuffer.remaining());
Runnable task = session.process(remoteAddress, cipherBuffer);
if (LOG.isDebugEnabled())
LOG.debug("processing session {} produced task {}", session, task);
if (task != null)
return task;
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("caught exception in onFillable loop", x);
LOG.debug("exception in receiveAndProcess()", x);
// TODO: close?
return null;
}
}
protected abstract QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException;
public void write(Callback callback, SocketAddress remoteAddress, ByteBuffer... buffers)
{
flusher.offer(callback, remoteAddress, buffers);
}
private class Flusher extends IteratingCallback
{
private final AutoLock lock = new AutoLock();

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@ -58,6 +59,7 @@ public abstract class QuicSession extends ContainerLifeCycle
private final AtomicLong[] ids = new AtomicLong[StreamType.values().length];
private final ConcurrentMap<Long, QuicStreamEndPoint> endPoints = new ConcurrentHashMap<>();
private final AtomicBoolean processing = new AtomicBoolean();
private final Executor executor;
private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool;
@ -282,7 +284,7 @@ public abstract class QuicSession extends ContainerLifeCycle
this.quicheConnectionId = quicheConnectionId;
}
public void process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
public Runnable process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
{
// While the connection ID remains the same,
// the remote address may change so store it again.
@ -327,14 +329,23 @@ public abstract class QuicSession extends ContainerLifeCycle
protocolSession = session = createProtocolSession();
addManaged(session);
}
session.process();
if (processing.compareAndSet(false, true))
return session::process;
return null;
}
else
{
flush();
return null;
}
}
void processingComplete()
{
processing.set(false);
}
protected abstract ProtocolSession createProtocolSession();
List<Long> getWritableStreamIds()

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.io.FillInterest;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,14 +41,17 @@ public class QuicStreamEndPoint extends AbstractEndPoint
private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class);
private static final ByteBuffer LAST_FLAG = ByteBuffer.allocate(0);
private final AutoLock lock = new AutoLock();
private final QuicSession session;
private final long streamId;
private boolean readable;
public QuicStreamEndPoint(Scheduler scheduler, QuicSession session, long streamId)
{
super(scheduler);
this.session = session;
this.streamId = streamId;
this.readable = true;
}
public QuicSession getQuicSession()
@ -209,12 +213,38 @@ public class QuicStreamEndPoint extends AbstractEndPoint
getWriteFlusher().completeWrite();
}
public boolean onReadable()
public void onReadable()
{
if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable", streamId);
// TODO: use AtomicBoolean.
try (AutoLock l = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable, processing: {}", streamId, readable);
if (!readable)
return;
readable = false;
}
getFillInterest().fillable();
return isFillInterested();
}
@Override
public void fillInterested(Callback callback)
{
try (AutoLock l = lock.lock())
{
readable = true;
}
super.fillInterested(callback);
}
@Override
public boolean tryFillInterested(Callback callback)
{
try (AutoLock l = lock.lock())
{
readable = true;
}
return super.tryFillInterested(callback);
}
@Override

View File

@ -34,13 +34,13 @@ public class ServerProtocolSession extends ProtocolSession
}
@Override
protected boolean onReadable(long readableStreamId)
protected void onReadable(long readableStreamId)
{
// On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable();
streamEndPoint.onReadable();
}
@Override

View File

@ -32,7 +32,10 @@ import org.eclipse.jetty.quic.server.internal.SimpleTokenValidator;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,6 +48,7 @@ public class ServerQuicConnection extends QuicConnection
private final QuicheConfig quicheConfig;
private final Connector connector;
private final AdaptiveExecutionStrategy strategy;
private final SessionTimeouts sessionTimeouts;
protected ServerQuicConnection(Connector connector, EndPoint endPoint, QuicheConfig quicheConfig)
@ -52,6 +56,7 @@ public class ServerQuicConnection extends QuicConnection
super(connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), endPoint);
this.quicheConfig = quicheConfig;
this.connector = connector;
this.strategy = new AdaptiveExecutionStrategy(new QuicProducer(), getExecutor());
this.sessionTimeouts = new SessionTimeouts(connector.getScheduler());
}
@ -59,9 +64,23 @@ public class ServerQuicConnection extends QuicConnection
public void onOpen()
{
super.onOpen();
LifeCycle.start(strategy);
fillInterested();
}
@Override
public void onClose(Throwable cause)
{
LifeCycle.stop(strategy);
super.onClose(cause);
}
@Override
public void onFillable()
{
strategy.produce();
}
@Override
protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
{
@ -118,6 +137,15 @@ public class ServerQuicConnection extends QuicConnection
// listening DatagramChannelEndPoint, so it must not be closed.
}
private class QuicProducer implements ExecutionStrategy.Producer
{
@Override
public Runnable produce()
{
return receiveAndProcess();
}
}
private class SessionTimeouts extends CyclicTimeouts<ServerQuicSession>
{
private SessionTimeouts(Scheduler scheduler)

View File

@ -103,10 +103,10 @@ public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Exp
}
@Override
public void process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
public Runnable process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
{
notIdle();
super.process(remoteAddress, cipherBufferIn);
return super.process(remoteAddress, cipherBufferIn);
}
@Override