From eec497e0a9d191e45b5bf91409d2d6912c8a43a1 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Thu, 18 Mar 2021 20:36:04 +0100 Subject: [PATCH] move the execution strategy from connection to session Signed-off-by: Ludovic Orban --- .../jetty/http3/server/QuicConnection.java | 22 ------------ .../jetty/http3/server/QuicSession.java | 34 +++++++++++++++++-- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicConnection.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicConnection.java index 4f5ebf964a9..7d9731602af 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicConnection.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicConnection.java @@ -19,9 +19,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Collection; import java.util.List; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.eclipse.jetty.http3.quiche.QuicheConfig; @@ -34,10 +32,7 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; -import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.AutoLock; -import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,16 +45,12 @@ public class QuicConnection extends AbstractConnection private final QuicheConfig quicheConfig; private final ByteBufferPool byteBufferPool; private final Flusher flusher = new Flusher(); - private final ExecutionStrategy strategy; - private final Queue strategyQueue = new ConcurrentLinkedQueue<>(); public QuicConnection(Connector connector, ServerDatagramEndPoint endp) { super(endp, connector.getExecutor()); this.connector = connector; this.byteBufferPool = connector.getByteBufferPool(); - this.strategy = new EatWhatYouKill(strategyQueue::poll, connector.getExecutor()); - LifeCycle.start(strategy); File[] files; try @@ -88,19 +79,6 @@ public class QuicConnection extends AbstractConnection quicheConfig.setApplicationProtos(getProtocols().toArray(new String[0])); } - public void dispatch(Runnable runnable) - { - strategyQueue.offer(runnable); - strategy.dispatch(); - } - - @Override - public void onClose(Throwable cause) - { - super.onClose(cause); - LifeCycle.stop(strategy); - } - void onClose(QuicheConnectionId quicheConnectionId) { sessions.remove(quicheConnectionId); diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicSession.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicSession.java index 08a0deec63d..fe07aae0150 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicSession.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/QuicSession.java @@ -16,7 +16,9 @@ package org.eclipse.jetty.http3.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.List; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -33,11 +35,15 @@ import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK strategy in this class +public class QuicSession { private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class); @@ -47,6 +53,10 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s private final QuicheConnection quicheConnection; private final QuicConnection connection; private final ConcurrentMap endpoints = new ConcurrentHashMap<>(); + private final ExecutionStrategy strategy; + private final AutoLock strategyQueueLock = new AutoLock(); + private final Queue strategyQueue = new ArrayDeque<>(); + private InetSocketAddress remoteAddress; QuicSession(Connector connector, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress) @@ -57,6 +67,14 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s this.connection = connection; this.remoteAddress = remoteAddress; this.flusher = new Flusher(connector.getScheduler()); + this.strategy = new EatWhatYouKill(() -> + { + try (AutoLock l = strategyQueueLock.lock()) + { + return strategyQueue.poll(); + } + }, connector.getExecutor()); + LifeCycle.start(strategy); } public int fill(long streamId, ByteBuffer buffer) throws IOException @@ -124,7 +142,7 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s onWritable(writableStreamId); } }; - connection.dispatch(onWritable); + dispatch(onWritable); List readableStreamIds = quicheConnection.readableStreamIds(); if (LOG.isDebugEnabled()) @@ -132,7 +150,7 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s for (Long readableStreamId : readableStreamIds) { Runnable onReadable = () -> onReadable(readableStreamId); - connection.dispatch(onReadable); + dispatch(onReadable); } } else @@ -157,6 +175,15 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s streamEndPoint.onReadable(); } + private void dispatch(Runnable runnable) + { + try (AutoLock l = strategyQueueLock.lock()) + { + strategyQueue.offer(runnable); + } + strategy.dispatch(); + } + void flush() { flusher.iterate(); @@ -206,6 +233,7 @@ public class QuicSession // TODO: extends ContainerLifeCycle and move the EWYK s endpoints.clear(); flusher.close(); connection.onClose(quicheConnectionId); + LifeCycle.stop(strategy); } finally {