Fixes #12171 - QoSHandler does not resume on a virtual thread. (#12174)

Now QoSHandler resumes requests using Request.getComponents().getExecutor().
This Executor is configured to be the virtual thread executor, if present, otherwise the Server Executor.

Removed warn() from VirtualThreads.isVirtualThread(), as it was too verbose.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-08-27 11:03:48 +03:00 committed by GitHub
parent 394bc136f3
commit 0420e926a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 164 additions and 12 deletions

View File

@ -13,34 +13,55 @@
package org.eclipse.jetty.server;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.ThreadPool;
/**
* Common components made available via a {@link Request}
* Common components made available via a {@link Request}.
*/
public interface Components
{
/**
* @return the {@link ByteBufferPool} associated with the {@link Request}
*/
ByteBufferPool getByteBufferPool();
/**
* @return the {@link Scheduler} associated with the {@link Request}
*/
Scheduler getScheduler();
/**
* @return the {@link ThreadPool} associated with the {@link Request}
* @deprecated use {@link #getExecutor()} instead
*/
@Deprecated(since = "12.0.13", forRemoval = true)
ThreadPool getThreadPool();
/**
* A Map which can be used as a cache for object (e.g. Cookie cache).
* The cache will have a life cycle limited by the connection, i.e. no cache map will live
* @return the {@link Executor} associated with the {@link Request}
*/
default Executor getExecutor()
{
return getThreadPool();
}
/**
* <p>A map-like object that can be used as a cache (for example, as a cookie cache).</p>
* <p>The cache will have a life cycle limited by the connection, i.e. no cache map will live
* longer that the connection associated with it. However, a cache may have a shorter life
* than a connection (e.g. it may be discarded for implementation reasons). A cache map is
* guaranteed to be given to only a single request concurrently (scoped by
* {@link org.eclipse.jetty.server.internal.HttpChannelState}), so objects saved there do not
* need to be made safe from access by simultaneous request.
* If the connection is known to be none-persistent then the cache may be a noop
* cache and discard all items set on it.
* If the connection is known to be non-persistent then the cache may be a noop
* cache and discard all items set on it.</p>
*
* @return A Map, which may be an empty map that discards all items.
* @return A map-like object, which may be an empty implementation that discards all items.
*/
Attributes getCache();
}

View File

@ -397,13 +397,18 @@ public class QoSHandler extends ConditionalHandler.Abstract
if (LOG.isDebugEnabled())
LOG.debug("{} resuming {}", this, entry.request);
// Always dispatch to avoid StackOverflowError.
getServer().getThreadPool().execute(entry);
execute(entry.request, entry);
return true;
}
}
return false;
}
private void execute(Request request, Runnable task)
{
request.getComponents().getExecutor().execute(task);
}
private class Entry implements CyclicTimeouts.Expirable, Runnable
{
private final Request request;
@ -458,7 +463,7 @@ public class QoSHandler extends ConditionalHandler.Abstract
}
if (removed)
failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException());
execute(request, () -> failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException()));
}
@Override

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
@ -63,6 +64,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
@ -231,7 +233,18 @@ public class HttpChannelState implements HttpChannel, Components
@Override
public ThreadPool getThreadPool()
{
return getServer().getThreadPool();
Executor executor = getExecutor();
if (executor instanceof ThreadPool threadPool)
return threadPool;
return new ThreadPoolWrapper(executor);
}
@Override
public Executor getExecutor()
{
Executor executor = getServer().getThreadPool();
Executor virtualExecutor = VirtualThreads.getVirtualThreadsExecutor(executor);
return virtualExecutor != null ? virtualExecutor : executor;
}
@Override
@ -1948,4 +1961,43 @@ public class HttpChannelState implements HttpChannel, Components
throw t;
}
}
private static class ThreadPoolWrapper implements ThreadPool
{
private final Executor _executor;
private ThreadPoolWrapper(Executor executor)
{
_executor = executor;
}
@Override
public void execute(Runnable command)
{
_executor.execute(command);
}
@Override
public void join()
{
}
@Override
public int getThreads()
{
return 0;
}
@Override
public int getIdleThreads()
{
return 0;
}
@Override
public boolean isLowOnThreads()
{
return false;
}
}
}

View File

@ -91,7 +91,7 @@ public class ServerTest
{
Runnable after = _afterHandle.getAndSet(null);
if (after != null)
getThreadPool().execute(after);
getExecutor().execute(after);
}
};
}

View File

@ -13,6 +13,9 @@
package org.eclipse.jetty.server.handler;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@ -28,10 +31,15 @@ import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -50,7 +58,8 @@ public class QoSHandlerTest
private void start(QoSHandler qosHandler) throws Exception
{
server = new Server();
if (server == null)
server = new Server();
connector = new LocalConnector(server);
server.addConnector(connector);
server.setHandler(qosHandler);
@ -483,4 +492,70 @@ public class QoSHandlerTest
}
});
}
@Test
@DisabledForJreRange(max = JRE.JAVA_20)
public void testRequestInVirtualThreadIsResumedInVirtualThread() throws Exception
{
QoSHandler qosHandler = new QoSHandler();
qosHandler.setMaxRequestCount(1);
List<Callback> callbacks = new ArrayList<>();
qosHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.setStatus(VirtualThreads.isVirtualThread() ? HttpStatus.OK_200 : HttpStatus.NOT_ACCEPTABLE_406);
// Save the callback but do not succeed it yet.
callbacks.add(callback);
return true;
}
});
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("st");
serverThreads.setVirtualThreadsExecutor(VirtualThreads.getNamedVirtualThreadsExecutor("vst"));
server = new Server(serverThreads);
ServerConnector networkConnector = new ServerConnector(server, 1, 1);
server.addConnector(networkConnector);
start(qosHandler);
// Send the first request that will not be completed yet.
try (SocketChannel client1 = SocketChannel.open(new InetSocketAddress("localhost", networkConnector.getLocalPort())))
{
client1.write(StandardCharsets.UTF_8.encode("""
GET /first HTTP/1.1
Host: localhost
"""));
// Wait that the request arrives at the server.
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1));
// Send the second request, it should be suspended by QoSHandler.
try (SocketChannel client2 = SocketChannel.open(new InetSocketAddress("localhost", networkConnector.getLocalPort())))
{
client2.write(StandardCharsets.UTF_8.encode("""
GET /second HTTP/1.1
Host: localhost
"""));
// Wait for the second request to be suspended.
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1));
// Finish the first request, so that the second can be resumed.
callbacks.remove(0).succeeded();
client1.socket().setSoTimeout(5000);
HttpTester.Response response1 = HttpTester.parseResponse(client1);
assertEquals(HttpStatus.OK_200, response1.getStatus());
// Wait for the second request to arrive to the server.
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1));
// Finish the second request.
callbacks.remove(0).succeeded();
client2.socket().setSoTimeout(5000);
HttpTester.Response response2 = HttpTester.parseResponse(client2);
assertEquals(HttpStatus.OK_200, response2.getStatus());
}
}
}
}

View File

@ -101,7 +101,6 @@ public class VirtualThreads
}
catch (Throwable x)
{
warn();
return false;
}
}