Issue #6728 - QUIC and HTTP/3

- Moved ExecutionStrategy from ServerQuicConnection to QuicConnection.
  For the server the produced task is declared as BLOCKING, but for the
  client the produced task is NON_BLOCKING.
- Fixed race condition in QuicSession.process(...).
- Updated quic-quiche pom.xml.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-21 19:23:53 +02:00
parent 5ca88d3231
commit 4eaff8e880
17 changed files with 173 additions and 99 deletions

View File

@ -33,7 +33,7 @@ public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpC
protected AbstractConnectorHttpClientTransport(ClientConnector connector) protected AbstractConnectorHttpClientTransport(ClientConnector connector)
{ {
this.connector = Objects.requireNonNull(connector); this.connector = Objects.requireNonNull(connector);
addBean(connector, false); addBean(connector);
} }
public ClientConnector getClientConnector() public ClientConnector getClientConnector()

View File

@ -73,17 +73,16 @@ public interface Stream
* that the end of the read side of the stream has not yet been reached, which * that the end of the read side of the stream has not yet been reached, which
* may happen in these cases:</p> * may happen in these cases:</p>
* <ul> * <ul>
* <li>not all the bytes have been received so far, and a further attempt * <li>not all the bytes have been received so far, for example the remote
* to call this method returns {@code null} because the rest of the bytes * peer did not send them yet, or they are in-flight</li>
* are not yet available (for example, the remote peer did not send them
* yet, or they are in-flight)</li>
* <li>all the bytes have been received, but there is a trailer HEADERS * <li>all the bytes have been received, but there is a trailer HEADERS
* frame to be received to indicate the end of the read side of the * frame to be received to indicate the end of the read side of the
* stream.</li> * stream</li>
* </ul> * </ul>
* <p>When the returned {@link Stream.Data} object is not {@code null}, * <p>When the returned {@link Stream.Data} object is not {@code null},
* applications <em>must</em> call {@link Stream.Data#complete()} to * applications <em>must</em> call, either immediately or later (possibly
* notify the implementation that the bytes have been processed.</p> * asynchronously) {@link Stream.Data#complete()} to notify the
* implementation that the bytes have been processed.</p>
* <p>{@link Stream.Data} objects may be stored away for later, asynchronous, * <p>{@link Stream.Data} objects may be stored away for later, asynchronous,
* processing (for example, to process them only when all of them have been * processing (for example, to process them only when all of them have been
* received).</p> * received).</p>
@ -163,8 +162,8 @@ public interface Stream
/** /**
* <p>Callback method invoked if the application has expressed * <p>Callback method invoked if the application has expressed
* {@link Stream#demand() demand} for content, and if there is * {@link Stream#demand() demand} for content, and if there may
* content available.</p> * be content available.</p>
* <p>A server application that wishes to handle request content * <p>A server application that wishes to handle request content
* should typically call {@link Stream#demand()} from * should typically call {@link Stream#demand()} from
* {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.</p> * {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.</p>
@ -175,7 +174,7 @@ public interface Stream
* cancelled; applications that implement this method should read * cancelled; applications that implement this method should read
* content calling {@link Stream#readData()}, and call * content calling {@link Stream#readData()}, and call
* {@link Stream#demand()} to signal to the implementation to call * {@link Stream#demand()} to signal to the implementation to call
* again this method when there is more content available.</p> * again this method when there may be more content available.</p>
* <p>Only one thread at a time invokes this method, although it * <p>Only one thread at a time invokes this method, although it
* may not be the same thread across different invocations.</p> * may not be the same thread across different invocations.</p>
* <p>It is always guaranteed that invoking {@link Stream#demand()} * <p>It is always guaranteed that invoking {@link Stream#demand()}

View File

@ -116,6 +116,7 @@ public class AbstractClientServerTest
@AfterEach @AfterEach
public void dispose() public void dispose()
{ {
LifeCycle.stop(http3Client);
LifeCycle.stop(httpClient); LifeCycle.stop(httpClient);
LifeCycle.stop(server); LifeCycle.stop(server);
} }

View File

@ -525,16 +525,36 @@ public class DataDemandTest extends AbstractClientServerTest
stream.demand(); stream.demand();
return new Stream.Listener() return new Stream.Listener()
{ {
private boolean firstData;
private boolean nullData;
@Override @Override
public void onDataAvailable(Stream stream) public void onDataAvailable(Stream stream)
{ {
while (!firstData)
{
Stream.Data data = stream.readData();
if (data == null)
continue;
firstData = true;
break;
}
if (!nullData)
{
assertNull(stream.readData());
// Verify that readData() is idempotent.
assertNull(stream.readData());
nullData = true;
nullDataLatch.countDown();
stream.demand();
return;
}
Stream.Data data = stream.readData(); Stream.Data data = stream.readData();
if (data == null) if (data == null)
{ {
// Second attempt to read still has no data, should be idempotent.
assertNull(stream.readData());
stream.demand(); stream.demand();
nullDataLatch.countDown();
} }
else else
{ {
@ -552,12 +572,13 @@ public class DataDemandTest extends AbstractClientServerTest
Session.Client session = newSession(new Session.Client.Listener() {}); Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false); HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); Stream stream = session.newRequest(request, new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);
// Send a first chunk to trigger reads. // Send a first chunk to trigger reads.
stream.data(new DataFrame(ByteBuffer.allocate(16), false)); stream.data(new DataFrame(ByteBuffer.allocate(16), false));
assertTrue(nullDataLatch.await(555, TimeUnit.SECONDS)); assertTrue(nullDataLatch.await(5, TimeUnit.SECONDS));
stream.data(new DataFrame(ByteBuffer.allocate(4096), true)); stream.data(new DataFrame(ByteBuffer.allocate(4096), true));

View File

@ -1177,7 +1177,8 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
http3Client.stop(); // Stopping the HttpClient will also stop the HTTP3Client.
httpClient.stop();
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS)); assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));

View File

@ -39,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientTransportOverHTTP3Test extends AbstractClientServerTest public class HttpClientTransportOverHTTP3Test extends AbstractClientServerTest
{ {
@Test @Test
public void testRequestHasHTTP2Version() throws Exception public void testRequestHasHTTP3Version() throws Exception
{ {
start(new AbstractHandler() start(new AbstractHandler()
{ {

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.quic.client;
import org.eclipse.jetty.quic.common.ProtocolSession; import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -23,6 +24,8 @@ public class ClientProtocolSession extends ProtocolSession
{ {
private static final Logger LOG = LoggerFactory.getLogger(ClientProtocolSession.class); private static final Logger LOG = LoggerFactory.getLogger(ClientProtocolSession.class);
private final Runnable producer = Invocable.from(Invocable.InvocationType.NON_BLOCKING, this::produce);
public ClientProtocolSession(ClientQuicSession session) public ClientProtocolSession(ClientQuicSession session)
{ {
super(session); super(session);
@ -49,6 +52,12 @@ public class ClientProtocolSession extends ProtocolSession
getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint); getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
} }
@Override
public Runnable getProducer()
{
return producer;
}
@Override @Override
protected boolean onReadable(long readableStreamId) protected boolean onReadable(long readableStreamId)
{ {

View File

@ -112,18 +112,6 @@ public class ClientQuicConnection extends QuicConnection
} }
} }
@Override
public void onFillable()
{
while (true)
{
Runnable task = receiveAndProcess();
if (task == null)
break;
task.run();
}
}
@Override @Override
protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
{ {

View File

@ -123,7 +123,7 @@ public class End2EndClientTest
{ {
for (int i = 0; i < 1000; i++) for (int i = 0; i < 1000; i++)
{ {
ContentResponse response = client.GET("https://localhost:" + connector.getLocalPort()); ContentResponse response = client.GET("https://localhost:" + connector.getLocalPort() + "/" + i);
assertThat(response.getStatus(), is(200)); assertThat(response.getStatus(), is(200));
String contentAsString = response.getContentAsString(); String contentAsString = response.getContentAsString();
assertThat(contentAsString, is(responseContent)); assertThat(contentAsString, is(responseContent));
@ -137,11 +137,12 @@ public class End2EndClientTest
CompletableFuture<?>[] futures = new CompletableFuture[count]; CompletableFuture<?>[] futures = new CompletableFuture[count];
for (int i = 0; i < count; ++i) for (int i = 0; i < count; ++i)
{ {
String path = "/" + i;
futures[i] = CompletableFuture.runAsync(() -> futures[i] = CompletableFuture.runAsync(() ->
{ {
try try
{ {
ContentResponse response = client.GET("https://localhost:" + connector.getLocalPort()); ContentResponse response = client.GET("https://localhost:" + connector.getLocalPort() + path);
assertThat(response.getStatus(), is(200)); assertThat(response.getStatus(), is(200));
String contentAsString = response.getContentAsString(); String contentAsString = response.getContentAsString();
assertThat(contentAsString, is(responseContent)); assertThat(contentAsString, is(responseContent));

View File

@ -49,7 +49,9 @@ public abstract class ProtocolSession extends ContainerLifeCycle
return session; return session;
} }
public void process() public abstract Runnable getProducer();
public void produce()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("processing {}", this); LOG.debug("processing {}", this);
@ -76,7 +78,10 @@ public abstract class ProtocolSession extends ContainerLifeCycle
List<Long> writableStreamIds = session.getWritableStreamIds(); List<Long> writableStreamIds = session.getWritableStreamIds();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("writable stream ids: {}", writableStreamIds); LOG.debug("writable stream ids: {}", writableStreamIds);
writableStreamIds.forEach(this::onWritable); for (long writableStreamId : writableStreamIds)
{
onWritable(writableStreamId);
}
} }
protected void onWritable(long writableStreamId) protected void onWritable(long writableStreamId)
@ -97,7 +102,8 @@ public abstract class ProtocolSession extends ContainerLifeCycle
boolean result = false; boolean result = false;
for (long readableStreamId : readableStreamIds) for (long readableStreamId : readableStreamIds)
{ {
result = result || onReadable(readableStreamId); // Bit-wise "or" because we do not want onReadable() to be short-circuited.
result |= onReadable(readableStreamId);
} }
return result; return result;
} }
@ -154,7 +160,7 @@ public abstract class ProtocolSession extends ContainerLifeCycle
public void offer(Runnable task) public void offer(Runnable task)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("enqueuing task {} on {}", task, ProtocolSession.this); LOG.debug("enqueuing stream task {} on {}", task, ProtocolSession.this);
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
{ {
tasks.offer(task); tasks.offer(task);
@ -174,7 +180,7 @@ public abstract class ProtocolSession extends ContainerLifeCycle
{ {
Runnable task = poll(); Runnable task = poll();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("dequeued existing task {} on {}", task, ProtocolSession.this); LOG.debug("dequeued existing stream task {} on {}", task, ProtocolSession.this);
if (task != null) if (task != null)
return task; return task;
@ -185,7 +191,7 @@ public abstract class ProtocolSession extends ContainerLifeCycle
task = poll(); task = poll();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("dequeued produced task {} on {}", task, ProtocolSession.this); LOG.debug("dequeued produced stream task {} on {}", task, ProtocolSession.this);
if (task != null) if (task != null)
return task; return task;
@ -197,7 +203,6 @@ public abstract class ProtocolSession extends ContainerLifeCycle
if (closeInfo != null) if (closeInfo != null)
onClose(closeInfo.error(), closeInfo.reason()); onClose(closeInfo.error(), closeInfo.reason());
getQuicSession().processingComplete();
return null; return null;
} }
} }

View File

@ -38,7 +38,9 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -58,6 +60,7 @@ public abstract class QuicConnection extends AbstractConnection
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final Scheduler scheduler; private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
private final AdaptiveExecutionStrategy strategy;
private final Flusher flusher = new Flusher(); private final Flusher flusher = new Flusher();
private int outputBufferSize = 2048; private int outputBufferSize = 2048;
private boolean useInputDirectByteBuffers = true; private boolean useInputDirectByteBuffers = true;
@ -68,6 +71,7 @@ public abstract class QuicConnection extends AbstractConnection
super(endPoint, executor); super(endPoint, executor);
this.scheduler = scheduler; this.scheduler = scheduler;
this.byteBufferPool = byteBufferPool; this.byteBufferPool = byteBufferPool;
this.strategy = new AdaptiveExecutionStrategy(new QuicProducer(), getExecutor());
} }
public Scheduler getScheduler() public Scheduler getScheduler()
@ -131,11 +135,33 @@ public abstract class QuicConnection extends AbstractConnection
listeners.remove((QuicSession.Listener)listener); listeners.remove((QuicSession.Listener)listener);
} }
public abstract void onFillable(); @Override
public void onOpen()
{
super.onOpen();
LifeCycle.start(strategy);
}
@Override
public void onClose(Throwable cause)
{
LifeCycle.stop(strategy);
super.onClose(cause);
}
public void onFillable()
{
produce();
}
@Override @Override
public abstract boolean onIdleExpired(); public abstract boolean onIdleExpired();
public void produce()
{
strategy.produce();
}
@Override @Override
public void close() public void close()
{ {
@ -172,12 +198,11 @@ public abstract class QuicConnection extends AbstractConnection
{ {
try try
{ {
if (isFillInterested()) boolean interested = isFillInterested();
{ if (LOG.isDebugEnabled())
if (LOG.isDebugEnabled()) LOG.debug("receiveAndProcess() interested={}", interested);
LOG.debug("receiveAndProcess() idle"); if (interested)
return null; return null;
}
ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true) while (true)
@ -249,7 +274,7 @@ public abstract class QuicConnection extends AbstractConnection
LOG.debug("packet is for existing session {}, processing {} bytes", session, cipherBuffer.remaining()); LOG.debug("packet is for existing session {}, processing {} bytes", session, cipherBuffer.remaining());
Runnable task = session.process(remoteAddress, cipherBuffer); Runnable task = session.process(remoteAddress, cipherBuffer);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("produced task {} on {}", task, session); LOG.debug("produced session task {} on {}", task, this);
if (task != null) if (task != null)
return task; return task;
} }
@ -263,6 +288,15 @@ public abstract class QuicConnection extends AbstractConnection
} }
} }
private class QuicProducer implements ExecutionStrategy.Producer
{
@Override
public Runnable produce()
{
return receiveAndProcess();
}
}
private class Flusher extends IteratingCallback private class Flusher extends IteratingCallback
{ {
private final AutoLock lock = new AutoLock(); private final AutoLock lock = new AutoLock();

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -59,7 +58,6 @@ public abstract class QuicSession extends ContainerLifeCycle
private final AtomicLong[] ids = new AtomicLong[StreamType.values().length]; private final AtomicLong[] ids = new AtomicLong[StreamType.values().length];
private final ConcurrentMap<Long, QuicStreamEndPoint> endPoints = new ConcurrentHashMap<>(); private final ConcurrentMap<Long, QuicStreamEndPoint> endPoints = new ConcurrentHashMap<>();
private final AtomicBoolean processing = new AtomicBoolean();
private final Executor executor; private final Executor executor;
private final Scheduler scheduler; private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
@ -297,7 +295,7 @@ public abstract class QuicSession extends ContainerLifeCycle
if (accepted != remaining) if (accepted != remaining)
throw new IllegalStateException(); throw new IllegalStateException();
if (quicheConnection.isConnectionEstablished()) if (isConnectionEstablished())
{ {
// HTTP/1.1 // HTTP/1.1
// client1 -- sockEP1 -- H1Connection // client1 -- sockEP1 -- H1Connection
@ -330,10 +328,9 @@ public abstract class QuicSession extends ContainerLifeCycle
addManaged(session); addManaged(session);
} }
boolean process = processing.compareAndSet(false, true);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("processing={} on {}", process, session); LOG.debug("processing {}", session);
return process ? session::process : null; return session.getProducer();
} }
else else
{ {
@ -342,13 +339,6 @@ public abstract class QuicSession extends ContainerLifeCycle
} }
} }
void processingComplete()
{
if (LOG.isDebugEnabled())
LOG.debug("processing complete on {}", protocolSession);
processing.set(false);
}
// TODO: this is ugly, is there a better solution? // TODO: this is ugly, is there a better solution?
protected Runnable pollTask() protected Runnable pollTask()
{ {

View File

@ -214,6 +214,19 @@ public class QuicStreamEndPoint extends AbstractEndPoint
*/ */
public boolean onReadable() public boolean onReadable()
{ {
// TODO: there is a race condition here.
// Thread T1 enters this method and sees that this endPoint
// is not interested, so it does not call onFillable().
// Thread T2 is in fillInterested() about to set interest.
// When this happens, this endPoint will miss a notification
// that there is data to read.
// The race condition is fixed by always calling produce()
// from fillInterested() methods below.
// TODO: an alternative way of avoid the race would be to emulate an NIO style
// notification, where onReadable() gets called only if there is interest.
// getQuicSession().setFillInterested(getStreamId(), false);
boolean interested = isFillInterested(); boolean interested = isFillInterested();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable, processing: {}", streamId, interested); LOG.debug("stream {} is readable, processing: {}", streamId, interested);
@ -226,14 +239,20 @@ public class QuicStreamEndPoint extends AbstractEndPoint
public void fillInterested(Callback callback) public void fillInterested(Callback callback)
{ {
super.fillInterested(callback); super.fillInterested(callback);
getQuicSession().getProtocolSession().process();
// TODO: see above
// getQuicSession().setFillInterested(getStreamId(), true);
// Method produce() could block, but it's called synchronously from the producer thread
// which calls onReadable() above, so it will just go into re-producing and never block.
getQuicSession().getProtocolSession().produce();
} }
@Override @Override
public boolean tryFillInterested(Callback callback) public boolean tryFillInterested(Callback callback)
{ {
boolean result = super.tryFillInterested(callback); boolean result = super.tryFillInterested(callback);
getQuicSession().getProtocolSession().process(); getQuicSession().getProtocolSession().produce();
return result; return result;
} }

View File

@ -19,20 +19,7 @@
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId> <artifactId>maven-checkstyle-plugin</artifactId>
<configuration> <version>${maven.checkstyle.plugin.version}</version>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration> <configuration>
<skip>true</skip> <skip>true</skip>
</configuration> </configuration>

View File

@ -13,8 +13,11 @@
package org.eclipse.jetty.quic.server; package org.eclipse.jetty.quic.server;
import java.util.function.Consumer;
import org.eclipse.jetty.quic.common.ProtocolSession; import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -22,6 +25,9 @@ public class ServerProtocolSession extends ProtocolSession
{ {
private static final Logger LOG = LoggerFactory.getLogger(ServerProtocolSession.class); private static final Logger LOG = LoggerFactory.getLogger(ServerProtocolSession.class);
private final Runnable producer = Invocable.from(Invocable.InvocationType.BLOCKING, this::produce);
private final Consumer<QuicStreamEndPoint> configureProtocolEndPoint = this::configureProtocolEndPoint;
public ServerProtocolSession(ServerQuicSession session) public ServerProtocolSession(ServerQuicSession session)
{ {
super(session); super(session);
@ -33,11 +39,17 @@ public class ServerProtocolSession extends ProtocolSession
return (ServerQuicSession)super.getQuicSession(); return (ServerQuicSession)super.getQuicSession();
} }
@Override
public Runnable getProducer()
{
return producer;
}
@Override @Override
protected boolean onReadable(long readableStreamId) protected boolean onReadable(long readableStreamId)
{ {
// On the server, we need a get-or-create semantic in case of reads. // On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint); QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, configureProtocolEndPoint);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable(); return streamEndPoint.onReadable();

View File

@ -32,10 +32,7 @@ import org.eclipse.jetty.quic.server.internal.SimpleTokenValidator;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -48,7 +45,6 @@ public class ServerQuicConnection extends QuicConnection
private final QuicheConfig quicheConfig; private final QuicheConfig quicheConfig;
private final Connector connector; private final Connector connector;
private final AdaptiveExecutionStrategy strategy;
private final SessionTimeouts sessionTimeouts; private final SessionTimeouts sessionTimeouts;
protected ServerQuicConnection(Connector connector, EndPoint endPoint, QuicheConfig quicheConfig) protected ServerQuicConnection(Connector connector, EndPoint endPoint, QuicheConfig quicheConfig)
@ -56,7 +52,6 @@ public class ServerQuicConnection extends QuicConnection
super(connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), endPoint); super(connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), endPoint);
this.quicheConfig = quicheConfig; this.quicheConfig = quicheConfig;
this.connector = connector; this.connector = connector;
this.strategy = new AdaptiveExecutionStrategy(new QuicProducer(), getExecutor());
this.sessionTimeouts = new SessionTimeouts(connector.getScheduler()); this.sessionTimeouts = new SessionTimeouts(connector.getScheduler());
} }
@ -64,21 +59,13 @@ public class ServerQuicConnection extends QuicConnection
public void onOpen() public void onOpen()
{ {
super.onOpen(); super.onOpen();
LifeCycle.start(strategy);
fillInterested(); fillInterested();
} }
@Override
public void onClose(Throwable cause)
{
LifeCycle.stop(strategy);
super.onClose(cause);
}
@Override @Override
public void onFillable() public void onFillable()
{ {
strategy.produce(); produce();
} }
@Override @Override
@ -137,15 +124,6 @@ public class ServerQuicConnection extends QuicConnection
// listening DatagramChannelEndPoint, so it must not be closed. // listening DatagramChannelEndPoint, so it must not be closed.
} }
private class QuicProducer implements ExecutionStrategy.Producer
{
@Override
public Runnable produce()
{
return receiveAndProcess();
}
}
private class SessionTimeouts extends CyclicTimeouts<ServerQuicSession> private class SessionTimeouts extends CyclicTimeouts<ServerQuicSession>
{ {
private SessionTimeouts(Scheduler scheduler) private SessionTimeouts(Scheduler scheduler)

View File

@ -125,6 +125,35 @@ public interface Invocable
return new ReadyTask(type, task); return new ReadyTask(type, task);
} }
public static Runnable from(InvocationType type, Runnable task)
{
class RunnableInvocable implements Runnable, Invocable
{
private final InvocationType type;
private final Runnable task;
RunnableInvocable(InvocationType type, Runnable task)
{
this.type = type;
this.task = task;
}
@Override
public void run()
{
task.run();
}
@Override
public InvocationType getInvocationType()
{
return type;
}
}
return new RunnableInvocable(type, task);
}
/** /**
* Test if the current thread has been tagged as non blocking * Test if the current thread has been tagged as non blocking
* *