move the execution strategy from connection to session

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-18 20:36:04 +01:00 committed by Simone Bordet
parent 6b07965b9a
commit eec497e0a9
2 changed files with 31 additions and 25 deletions

View File

@ -19,9 +19,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.http3.quiche.QuicheConfig; import org.eclipse.jetty.http3.quiche.QuicheConfig;
@ -34,10 +32,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,16 +45,12 @@ public class QuicConnection extends AbstractConnection
private final QuicheConfig quicheConfig; private final QuicheConfig quicheConfig;
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
private final Flusher flusher = new Flusher(); private final Flusher flusher = new Flusher();
private final ExecutionStrategy strategy;
private final Queue<Runnable> strategyQueue = new ConcurrentLinkedQueue<>();
public QuicConnection(Connector connector, ServerDatagramEndPoint endp) public QuicConnection(Connector connector, ServerDatagramEndPoint endp)
{ {
super(endp, connector.getExecutor()); super(endp, connector.getExecutor());
this.connector = connector; this.connector = connector;
this.byteBufferPool = connector.getByteBufferPool(); this.byteBufferPool = connector.getByteBufferPool();
this.strategy = new EatWhatYouKill(strategyQueue::poll, connector.getExecutor());
LifeCycle.start(strategy);
File[] files; File[] files;
try try
@ -88,19 +79,6 @@ public class QuicConnection extends AbstractConnection
quicheConfig.setApplicationProtos(getProtocols().toArray(new String[0])); quicheConfig.setApplicationProtos(getProtocols().toArray(new String[0]));
} }
public void dispatch(Runnable runnable)
{
strategyQueue.offer(runnable);
strategy.dispatch();
}
@Override
public void onClose(Throwable cause)
{
super.onClose(cause);
LifeCycle.stop(strategy);
}
void onClose(QuicheConnectionId quicheConnectionId) void onClose(QuicheConnectionId quicheConnectionId)
{ {
sessions.remove(quicheConnectionId); sessions.remove(quicheConnectionId);

View File

@ -16,7 +16,9 @@ package org.eclipse.jetty.http3.server;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -33,11 +35,15 @@ import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK strategy in this class public class QuicSession
{ {
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class); private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
@ -47,6 +53,10 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s
private final QuicheConnection quicheConnection; private final QuicheConnection quicheConnection;
private final QuicConnection connection; private final QuicConnection connection;
private final ConcurrentMap<Long, QuicStreamEndPoint> endpoints = new ConcurrentHashMap<>(); private final ConcurrentMap<Long, QuicStreamEndPoint> endpoints = new ConcurrentHashMap<>();
private final ExecutionStrategy strategy;
private final AutoLock strategyQueueLock = new AutoLock();
private final Queue<Runnable> strategyQueue = new ArrayDeque<>();
private InetSocketAddress remoteAddress; private InetSocketAddress remoteAddress;
QuicSession(Connector connector, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress) QuicSession(Connector connector, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
@ -57,6 +67,14 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s
this.connection = connection; this.connection = connection;
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
this.flusher = new Flusher(connector.getScheduler()); this.flusher = new Flusher(connector.getScheduler());
this.strategy = new EatWhatYouKill(() ->
{
try (AutoLock l = strategyQueueLock.lock())
{
return strategyQueue.poll();
}
}, connector.getExecutor());
LifeCycle.start(strategy);
} }
public int fill(long streamId, ByteBuffer buffer) throws IOException public int fill(long streamId, ByteBuffer buffer) throws IOException
@ -124,7 +142,7 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s
onWritable(writableStreamId); onWritable(writableStreamId);
} }
}; };
connection.dispatch(onWritable); dispatch(onWritable);
List<Long> readableStreamIds = quicheConnection.readableStreamIds(); List<Long> readableStreamIds = quicheConnection.readableStreamIds();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -132,7 +150,7 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s
for (Long readableStreamId : readableStreamIds) for (Long readableStreamId : readableStreamIds)
{ {
Runnable onReadable = () -> onReadable(readableStreamId); Runnable onReadable = () -> onReadable(readableStreamId);
connection.dispatch(onReadable); dispatch(onReadable);
} }
} }
else else
@ -157,6 +175,15 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s
streamEndPoint.onReadable(); streamEndPoint.onReadable();
} }
private void dispatch(Runnable runnable)
{
try (AutoLock l = strategyQueueLock.lock())
{
strategyQueue.offer(runnable);
}
strategy.dispatch();
}
void flush() void flush()
{ {
flusher.iterate(); flusher.iterate();
@ -206,6 +233,7 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s
endpoints.clear(); endpoints.clear();
flusher.close(); flusher.close();
connection.onClose(quicheConnectionId); connection.onClose(quicheConnectionId);
LifeCycle.stop(strategy);
} }
finally finally
{ {