diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java index 7cf9de6c08a..24abf2f8adc 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java @@ -16,9 +16,12 @@ package org.eclipse.jetty.http3.server; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.http3.internal.HTTP3Stream; +import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session; import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.quic.common.ProtocolSession; import org.eclipse.jetty.server.HttpConfiguration; public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionFactory @@ -38,9 +41,14 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { - HTTP3StreamListener listener = new HTTP3StreamListener(((HTTP3Stream)stream).getEndPoint()); - listener.onRequest(stream, frame); - // TODO get a runnable to feed EWYK? See ProtocolSession.processReadableStreams() + HTTP3Stream http3Stream = (HTTP3Stream)stream; + HTTP3StreamListener listener = new HTTP3StreamListener(http3Stream.getEndPoint()); + Runnable runnable = listener.onRequest(stream, frame); + if (runnable != null) + { + ServerHTTP3Session protocolSession = (ServerHTTP3Session)((HTTP3Session)http3Stream.getSession()).getProtocolSession(); + protocolSession.offer(runnable); + } return listener; } } @@ -59,9 +67,9 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF return (ServerHTTP3StreamConnection)endPoint.getConnection(); } - public void onRequest(Stream stream, HeadersFrame frame) + public Runnable onRequest(Stream stream, HeadersFrame frame) { - getConnection().onRequest((HTTP3Stream)stream, frame); + return getConnection().onRequest((HTTP3Stream)stream, frame); } @Override diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index f9fa6328a38..adb6766c115 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -13,7 +13,9 @@ package org.eclipse.jetty.http3.server.internal; +import java.util.ArrayDeque; import java.util.Map; +import java.util.Queue; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.frames.Frame; @@ -32,7 +34,10 @@ import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.server.ServerProtocolSession; import org.eclipse.jetty.quic.server.ServerQuicSession; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +50,8 @@ public class ServerHTTP3Session extends ServerProtocolSession private final HTTP3SessionServer session; private final ControlFlusher controlFlusher; private final HTTP3Flusher messageFlusher; + private final AdaptiveExecutionStrategy strategy; + private final HTTP3Producer producer = new HTTP3Producer(); public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize) { @@ -76,6 +83,9 @@ public class ServerHTTP3Session extends ServerProtocolSession // TODO: make parameters configurable. this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true); + this.strategy = new AdaptiveExecutionStrategy(producer, getQuicSession().getExecutor()); + // TODO: call addBean instead + LifeCycle.start(strategy); } public QpackDecoder getQpackDecoder() @@ -93,6 +103,24 @@ public class ServerHTTP3Session extends ServerProtocolSession return session.getStreamIdleTimeout(); } + public void offer(Runnable task) + { + producer.offer(task); + } + + @Override + protected boolean processReadableStreams() + { + // Calling super.processReadableStreams() is going to fill and parse HEADERS frames on the current thread, + // so the QPACK decoder is not accessed concurrently. + // The processing of HEADERS frames will produce Runnable tasks and offer them to this instance (via calls + // to offer(Runnable)) so that the execution strategy can consume them. + + boolean result = super.processReadableStreams(); + strategy.produce(); + return result; + } + public void setStreamIdleTimeout(long streamIdleTimeout) { session.setStreamIdleTimeout(streamIdleTimeout); @@ -197,4 +225,25 @@ public class ServerHTTP3Session extends ServerProtocolSession { session.onDataAvailable(streamId); } + + private class HTTP3Producer implements ExecutionStrategy.Producer + { + private final Queue tasks = new ArrayDeque<>(); + + public void offer(Runnable task) + { + if (LOG.isDebugEnabled()) + LOG.debug("enqueuing task {} on {}", task, ServerHTTP3Session.this); + tasks.offer(task); + } + + @Override + public Runnable produce() + { + Runnable task = tasks.poll(); + if (LOG.isDebugEnabled()) + LOG.debug("dequeued task {} on {}", task, ServerHTTP3Session.this); + return task; + } + } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java index e7323d61bda..7d351d98f16 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java @@ -44,14 +44,13 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection http3Session.onDataAvailable(streamId); } - public void onRequest(HTTP3Stream stream, HeadersFrame frame) + public Runnable onRequest(HTTP3Stream stream, HeadersFrame frame) { HttpTransport transport = new HttpTransportOverHTTP3(stream); HttpChannel channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this); stream.setAttachment(channel); - // TODO create a Runnable and feed EWYK channel.onRequest(((MetaData.Request)frame.getMetaData())); - channel.handle(); + return channel; } public void onDataAvailable(HTTP3Stream stream) diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java index da9faec77c3..d0bc1a4a054 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java @@ -103,18 +103,12 @@ public abstract class ProtocolSession streamEndPoint.onWritable(); } - private boolean processReadableStreams() + protected boolean processReadableStreams() { List readableStreamIds = session.getReadableStreamIds(); if (LOG.isDebugEnabled()) LOG.debug("readable stream ids: {}", readableStreamIds); - // TODO: ExecutionStrategy plug-in point is here. - // this.onReadable() just feeds the decoder and the instruction streams. - // Note that req/rsp streams never eat DATA frame, it's a noop because they pull data - // when they want to read data frames, either via Stream.readData() or ServletInputStream.read(). - // Then here we ask decoder for tasks, and have the ExecutionStrategy process them. - return readableStreamIds.stream() .map(this::onReadable) .reduce(false, (result, interested) -> result || interested);