dispatch session readable events to EWYK

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-18 10:31:46 +01:00 committed by Simone Bordet
parent 55657e3ea1
commit 432eddf583
2 changed files with 23 additions and 2 deletions

View File

@ -19,7 +19,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
@ -32,7 +34,10 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,12 +50,16 @@ public class QuicConnection extends AbstractConnection
private final QuicheConfig quicheConfig;
private final ByteBufferPool byteBufferPool;
private final Flusher flusher = new Flusher();
private final ExecutionStrategy strategy;
private final Queue<Runnable> strategyQueue = new ConcurrentLinkedQueue<>();
public QuicConnection(Connector connector, ServerDatagramEndPoint endp)
{
super(endp, connector.getExecutor());
this.connector = connector;
this.byteBufferPool = connector.getByteBufferPool();
this.strategy = new EatWhatYouKill(strategyQueue::poll, connector.getExecutor());
LifeCycle.start(strategy);
File[] files;
try
@ -79,6 +88,19 @@ public class QuicConnection extends AbstractConnection
quicheConfig.setApplicationProtos(getProtocols().toArray(new String[0]));
}
public void dispatch(Runnable runnable)
{
strategyQueue.offer(runnable);
strategy.dispatch();
}
@Override
public void onClose(Throwable cause)
{
super.onClose(cause);
LifeCycle.stop(strategy);
}
void onClose(QuicheConnectionId quicheConnectionId)
{
sessions.remove(quicheConnectionId);

View File

@ -146,8 +146,7 @@ public class QuicSession
if (LOG.isDebugEnabled())
LOG.debug("selected endpoint for read: {}", streamEndPoint);
Runnable runnable = streamEndPoint.onReadable();
// TODO: run with EWYK
runnable.run();
connection.dispatch(runnable);
}
void flush()