implement adaptive execution strategy for simple requests

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-10-12 16:40:28 +02:00 committed by Simone Bordet
parent c430226bfa
commit 0ea4b3f7c4
4 changed files with 65 additions and 15 deletions

View File

@ -16,9 +16,12 @@ package org.eclipse.jetty.http3.server;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream; import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection; import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionFactory public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionFactory
@ -38,9 +41,14 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
@Override @Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame) public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{ {
HTTP3StreamListener listener = new HTTP3StreamListener(((HTTP3Stream)stream).getEndPoint()); HTTP3Stream http3Stream = (HTTP3Stream)stream;
listener.onRequest(stream, frame); HTTP3StreamListener listener = new HTTP3StreamListener(http3Stream.getEndPoint());
// TODO get a runnable to feed EWYK? See ProtocolSession.processReadableStreams() Runnable runnable = listener.onRequest(stream, frame);
if (runnable != null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)((HTTP3Session)http3Stream.getSession()).getProtocolSession();
protocolSession.offer(runnable);
}
return listener; return listener;
} }
} }
@ -59,9 +67,9 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
return (ServerHTTP3StreamConnection)endPoint.getConnection(); return (ServerHTTP3StreamConnection)endPoint.getConnection();
} }
public void onRequest(Stream stream, HeadersFrame frame) public Runnable onRequest(Stream stream, HeadersFrame frame)
{ {
getConnection().onRequest((HTTP3Stream)stream, frame); return getConnection().onRequest((HTTP3Stream)stream, frame);
} }
@Override @Override

View File

@ -13,7 +13,9 @@
package org.eclipse.jetty.http3.server.internal; package org.eclipse.jetty.http3.server.internal;
import java.util.ArrayDeque;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
@ -32,7 +34,10 @@ import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.quic.server.ServerProtocolSession; import org.eclipse.jetty.quic.server.ServerProtocolSession;
import org.eclipse.jetty.quic.server.ServerQuicSession; import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,6 +50,8 @@ public class ServerHTTP3Session extends ServerProtocolSession
private final HTTP3SessionServer session; private final HTTP3SessionServer session;
private final ControlFlusher controlFlusher; private final ControlFlusher controlFlusher;
private final HTTP3Flusher messageFlusher; private final HTTP3Flusher messageFlusher;
private final AdaptiveExecutionStrategy strategy;
private final HTTP3Producer producer = new HTTP3Producer();
public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize) public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize)
{ {
@ -76,6 +83,9 @@ public class ServerHTTP3Session extends ServerProtocolSession
// TODO: make parameters configurable. // TODO: make parameters configurable.
this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true); this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true);
this.strategy = new AdaptiveExecutionStrategy(producer, getQuicSession().getExecutor());
// TODO: call addBean instead
LifeCycle.start(strategy);
} }
public QpackDecoder getQpackDecoder() public QpackDecoder getQpackDecoder()
@ -93,6 +103,24 @@ public class ServerHTTP3Session extends ServerProtocolSession
return session.getStreamIdleTimeout(); return session.getStreamIdleTimeout();
} }
public void offer(Runnable task)
{
producer.offer(task);
}
@Override
protected boolean processReadableStreams()
{
// Calling super.processReadableStreams() is going to fill and parse HEADERS frames on the current thread,
// so the QPACK decoder is not accessed concurrently.
// The processing of HEADERS frames will produce Runnable tasks and offer them to this instance (via calls
// to offer(Runnable)) so that the execution strategy can consume them.
boolean result = super.processReadableStreams();
strategy.produce();
return result;
}
public void setStreamIdleTimeout(long streamIdleTimeout) public void setStreamIdleTimeout(long streamIdleTimeout)
{ {
session.setStreamIdleTimeout(streamIdleTimeout); session.setStreamIdleTimeout(streamIdleTimeout);
@ -197,4 +225,25 @@ public class ServerHTTP3Session extends ServerProtocolSession
{ {
session.onDataAvailable(streamId); session.onDataAvailable(streamId);
} }
private class HTTP3Producer implements ExecutionStrategy.Producer
{
private final Queue<Runnable> tasks = new ArrayDeque<>();
public void offer(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("enqueuing task {} on {}", task, ServerHTTP3Session.this);
tasks.offer(task);
}
@Override
public Runnable produce()
{
Runnable task = tasks.poll();
if (LOG.isDebugEnabled())
LOG.debug("dequeued task {} on {}", task, ServerHTTP3Session.this);
return task;
}
}
} }

View File

@ -44,14 +44,13 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
http3Session.onDataAvailable(streamId); http3Session.onDataAvailable(streamId);
} }
public void onRequest(HTTP3Stream stream, HeadersFrame frame) public Runnable onRequest(HTTP3Stream stream, HeadersFrame frame)
{ {
HttpTransport transport = new HttpTransportOverHTTP3(stream); HttpTransport transport = new HttpTransportOverHTTP3(stream);
HttpChannel channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this); HttpChannel channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this);
stream.setAttachment(channel); stream.setAttachment(channel);
// TODO create a Runnable and feed EWYK
channel.onRequest(((MetaData.Request)frame.getMetaData())); channel.onRequest(((MetaData.Request)frame.getMetaData()));
channel.handle(); return channel;
} }
public void onDataAvailable(HTTP3Stream stream) public void onDataAvailable(HTTP3Stream stream)

View File

@ -103,18 +103,12 @@ public abstract class ProtocolSession
streamEndPoint.onWritable(); streamEndPoint.onWritable();
} }
private boolean processReadableStreams() protected boolean processReadableStreams()
{ {
List<Long> readableStreamIds = session.getReadableStreamIds(); List<Long> readableStreamIds = session.getReadableStreamIds();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {}", readableStreamIds); LOG.debug("readable stream ids: {}", readableStreamIds);
// TODO: ExecutionStrategy plug-in point is here.
// this.onReadable() just feeds the decoder and the instruction streams.
// Note that req/rsp streams never eat DATA frame, it's a noop because they pull data
// when they want to read data frames, either via Stream.readData() or ServletInputStream.read().
// Then here we ask decoder for tasks, and have the ExecutionStrategy process them.
return readableStreamIds.stream() return readableStreamIds.stream()
.map(this::onReadable) .map(this::onReadable)
.reduce(false, (result, interested) -> result || interested); .reduce(false, (result, interested) -> result || interested);