Implemented support for virtual threads for HTTP/1.1, HTTP/2 and HTTP/3. The virtual thread support is in AdaptiveExecutionStrategy. When virtual threads are supported and enabled, reserved threads are disabled and blocking tasks are run in a virtual thread instead that being executed by the Executor. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
50b7dc4bbc
commit
be3d16bdbb
|
@ -18,12 +18,13 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.util.VirtualThreads;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
||||
import org.eclipse.jetty.util.thread.TryExecutor;
|
||||
|
||||
public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool, TryExecutor
|
||||
public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool, TryExecutor, VirtualThreads.Configurable
|
||||
{
|
||||
private Executor _executor; // memory barrier provided by start/stop semantics
|
||||
private TryExecutor _tryExecutor;
|
||||
|
@ -61,6 +62,19 @@ public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPo
|
|||
return _tryExecutor.tryExecute(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUseVirtualThreads()
|
||||
{
|
||||
return VirtualThreads.isUseVirtualThreads(_executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUseVirtualThreads(boolean useVirtualThreads)
|
||||
{
|
||||
if (_executor instanceof VirtualThreads.Configurable)
|
||||
((VirtualThreads.Configurable)_executor).setUseVirtualThreads(useVirtualThreads);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getIdleThreads()
|
||||
{
|
||||
|
|
|
@ -43,11 +43,11 @@ public abstract class AbstractConnection implements Connection
|
|||
private final Callback _readCallback;
|
||||
private int _inputBufferSize = 2048;
|
||||
|
||||
protected AbstractConnection(EndPoint endp, Executor executor)
|
||||
protected AbstractConnection(EndPoint endPoint, Executor executor)
|
||||
{
|
||||
if (executor == null)
|
||||
throw new IllegalArgumentException("Executor must not be null!");
|
||||
_endPoint = endp;
|
||||
_endPoint = endPoint;
|
||||
_executor = executor;
|
||||
_readCallback = new ReadCallback();
|
||||
}
|
||||
|
@ -135,11 +135,6 @@ public abstract class AbstractConnection implements Connection
|
|||
getEndPoint().fillInterested(_readCallback);
|
||||
}
|
||||
|
||||
public void tryFillInterested()
|
||||
{
|
||||
tryFillInterested(_readCallback);
|
||||
}
|
||||
|
||||
public void tryFillInterested(Callback callback)
|
||||
{
|
||||
getEndPoint().tryFillInterested(callback);
|
||||
|
@ -320,7 +315,7 @@ public abstract class AbstractConnection implements Connection
|
|||
}
|
||||
|
||||
@Override
|
||||
public void failed(final Throwable x)
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
onFillInterestedFailed(x);
|
||||
}
|
||||
|
@ -328,7 +323,7 @@ public abstract class AbstractConnection implements Connection
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("AC.ReadCB@%h{%s}", AbstractConnection.this, AbstractConnection.this);
|
||||
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
|
||||
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
|
||||
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
|
||||
<Set name="useVirtualThreads" type="int"><Property name="jetty.threadPool.useVirtualThreads" default="false"/></Set>
|
||||
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
|
||||
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
|
||||
</New>
|
||||
|
|
|
@ -17,6 +17,9 @@ etc/jetty-threadpool.xml
|
|||
## Number of reserved threads (-1 for heuristic).
|
||||
#jetty.threadPool.reservedThreads=-1
|
||||
|
||||
## Whether to use virtual threads, if the runtime supports them.
|
||||
#jetty.threadPool.useVirtualThreads=false
|
||||
|
||||
## Thread idle timeout (in milliseconds).
|
||||
#jetty.threadPool.idleTimeout=60000
|
||||
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -34,18 +32,18 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple
|
|||
{
|
||||
private final String _protocol;
|
||||
private final List<String> _protocols;
|
||||
private int _inputbufferSize = 8192;
|
||||
private int _inputBufferSize = 8192;
|
||||
|
||||
protected AbstractConnectionFactory(String protocol)
|
||||
{
|
||||
_protocol = protocol;
|
||||
_protocols = Collections.unmodifiableList(Arrays.asList(new String[]{protocol}));
|
||||
_protocols = List.of(protocol);
|
||||
}
|
||||
|
||||
protected AbstractConnectionFactory(String... protocols)
|
||||
{
|
||||
_protocol = protocols[0];
|
||||
_protocols = Collections.unmodifiableList(Arrays.asList(protocols));
|
||||
_protocols = List.of(protocols);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -64,12 +62,12 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple
|
|||
@ManagedAttribute("The buffer size used to read from the network")
|
||||
public int getInputBufferSize()
|
||||
{
|
||||
return _inputbufferSize;
|
||||
return _inputBufferSize;
|
||||
}
|
||||
|
||||
public void setInputBufferSize(int size)
|
||||
{
|
||||
_inputbufferSize = size;
|
||||
_inputBufferSize = size;
|
||||
}
|
||||
|
||||
protected String findNextProtocol(Connector connector)
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* <p>Utility class to use to query the runtime for virtual thread support,
|
||||
* and, if virtual threads are supported, to start virtual threads.</p>
|
||||
*
|
||||
* @see #areSupported()
|
||||
* @see #startVirtualThread(Runnable)
|
||||
* @see #isVirtualThread()
|
||||
*/
|
||||
public class VirtualThreads
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(VirtualThreads.class);
|
||||
private static final Method startVirtualThread = probeStartVirtualThread();
|
||||
private static final Method isVirtualThread = probeIsVirtualThread();
|
||||
|
||||
private static Method probeStartVirtualThread()
|
||||
{
|
||||
try
|
||||
{
|
||||
return Thread.class.getMethod("startVirtualThread", Runnable.class);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static Method probeIsVirtualThread()
|
||||
{
|
||||
try
|
||||
{
|
||||
return Thread.class.getMethod("isVirtual");
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static void warn()
|
||||
{
|
||||
LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version"));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether the runtime supports virtual threads
|
||||
*/
|
||||
public static boolean areSupported()
|
||||
{
|
||||
return startVirtualThread != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Starts a virtual thread to execute the given task, or throws
|
||||
* {@link UnsupportedOperationException} if virtual threads are not
|
||||
* supported.</p>
|
||||
*
|
||||
* @param task the task to execute in a virtual thread
|
||||
* @see #areSupported()
|
||||
*/
|
||||
public static void startVirtualThread(Runnable task)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Starting in virtual thread: {}", task);
|
||||
startVirtualThread.invoke(null, task);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
warn();
|
||||
throw new UnsupportedOperationException(x);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether the current thread is a virtual thread
|
||||
*/
|
||||
public static boolean isVirtualThread()
|
||||
{
|
||||
try
|
||||
{
|
||||
return (Boolean)isVirtualThread.invoke(Thread.currentThread());
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
warn();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Tests whether the given executor implements {@link Configurable} and
|
||||
* it has been configured to use virtual threads.</p>
|
||||
*
|
||||
* @param executor the Executor to test
|
||||
* @return whether the given executor implements {@link Configurable}
|
||||
* and it has been configured to use virtual threads
|
||||
*/
|
||||
public static boolean isUseVirtualThreads(Executor executor)
|
||||
{
|
||||
if (executor instanceof Configurable)
|
||||
return ((Configurable)executor).isUseVirtualThreads();
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Implementations of this interface can be configured to use virtual threads.</p>
|
||||
* <p>Whether virtual threads are actually used depends on whether the runtime
|
||||
* supports virtual threads and, if the runtime supports them, whether they are
|
||||
* configured to be used via {@link #setUseVirtualThreads(boolean)}.</p>
|
||||
*/
|
||||
public interface Configurable
|
||||
{
|
||||
/**
|
||||
* @return whether to use virtual threads
|
||||
*/
|
||||
default boolean isUseVirtualThreads()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param useVirtualThreads whether to use virtual threads
|
||||
* @throws UnsupportedOperationException if the runtime does not support virtual threads
|
||||
* @see #areSupported()
|
||||
*/
|
||||
default void setUseVirtualThreads(boolean useVirtualThreads)
|
||||
{
|
||||
if (useVirtualThreads && !VirtualThreads.areSupported())
|
||||
{
|
||||
warn();
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private VirtualThreads()
|
||||
{
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.util.ProcessorUtils;
|
||||
import org.eclipse.jetty.util.VirtualThreads;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
|
@ -34,7 +35,7 @@ import org.eclipse.jetty.util.component.DumpableCollection;
|
|||
* A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}.
|
||||
*/
|
||||
@ManagedObject("A thread pool")
|
||||
public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor
|
||||
public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor, VirtualThreads.Configurable
|
||||
{
|
||||
private final ThreadPoolExecutor _executor;
|
||||
private final ThreadPoolBudget _budget;
|
||||
|
@ -46,6 +47,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
|
|||
private int _priority = Thread.NORM_PRIORITY;
|
||||
private boolean _daemon;
|
||||
private boolean _detailedDump;
|
||||
private boolean _useVirtualThreads;
|
||||
|
||||
public ExecutorThreadPool()
|
||||
{
|
||||
|
@ -268,6 +270,25 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
|
|||
return getThreads() == getMaxThreads() && _executor.getQueue().size() >= getIdleThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUseVirtualThreads()
|
||||
{
|
||||
return _useVirtualThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUseVirtualThreads(boolean useVirtualThreads)
|
||||
{
|
||||
try
|
||||
{
|
||||
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
|
||||
_useVirtualThreads = useVirtualThreads;
|
||||
}
|
||||
catch (UnsupportedOperationException ignored)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.eclipse.jetty.util.AtomicBiInteger;
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.VirtualThreads;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.annotation.ManagedOperation;
|
||||
|
@ -74,7 +75,7 @@ import org.slf4j.LoggerFactory;
|
|||
* </ul>
|
||||
*/
|
||||
@ManagedObject("A thread pool")
|
||||
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor
|
||||
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class);
|
||||
private static final Runnable NOOP = () ->
|
||||
|
@ -109,6 +110,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
private int _lowThreadsThreshold = 1;
|
||||
private ThreadPoolBudget _budget;
|
||||
private long _stopTimeout;
|
||||
private boolean _useVirtualThreads;
|
||||
|
||||
public QueuedThreadPool()
|
||||
{
|
||||
|
@ -511,6 +513,25 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
_lowThreadsThreshold = lowThreadsThreshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUseVirtualThreads()
|
||||
{
|
||||
return _useVirtualThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUseVirtualThreads(boolean useVirtualThreads)
|
||||
{
|
||||
try
|
||||
{
|
||||
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
|
||||
_useVirtualThreads = useVirtualThreads;
|
||||
}
|
||||
catch (UnsupportedOperationException ignored)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of jobs in the queue waiting for a thread
|
||||
*/
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.stream.Collectors;
|
|||
|
||||
import org.eclipse.jetty.util.AtomicBiInteger;
|
||||
import org.eclipse.jetty.util.ProcessorUtils;
|
||||
import org.eclipse.jetty.util.VirtualThreads;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
|
@ -103,6 +104,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
{
|
||||
if (capacity >= 0)
|
||||
return capacity;
|
||||
if (VirtualThreads.isUseVirtualThreads(executor))
|
||||
return 0;
|
||||
int cpus = ProcessorUtils.availableProcessors();
|
||||
if (executor instanceof ThreadPool.SizedThreadPool)
|
||||
{
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.concurrent.RejectedExecutionException;
|
|||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.VirtualThreads;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.annotation.ManagedOperation;
|
||||
|
@ -136,11 +137,12 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
|
|||
private final Executor _executor;
|
||||
private final TryExecutor _tryExecutor;
|
||||
private final Runnable _runPendingProducer = () -> tryProduce(true);
|
||||
private boolean _useVirtualThreads;
|
||||
private State _state = State.IDLE;
|
||||
private boolean _pending;
|
||||
|
||||
/**
|
||||
* @param producer The produce of tasks to be consumed.
|
||||
* @param producer The producer of tasks to be consumed.
|
||||
* @param executor The executor to be used for executing producers or consumers, depending on the sub-strategy.
|
||||
*/
|
||||
public AdaptiveExecutionStrategy(Producer producer, Executor executor)
|
||||
|
@ -154,6 +156,13 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
|
|||
LOG.debug("{} created", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
super.doStart();
|
||||
_useVirtualThreads = VirtualThreads.isUseVirtualThreads(_executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatch()
|
||||
{
|
||||
|
@ -462,7 +471,10 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
|
|||
{
|
||||
try
|
||||
{
|
||||
_executor.execute(task);
|
||||
if (isUseVirtualThreads())
|
||||
VirtualThreads.startVirtualThread(task);
|
||||
else
|
||||
_executor.execute(task);
|
||||
}
|
||||
catch (RejectedExecutionException e)
|
||||
{
|
||||
|
@ -476,6 +488,12 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
|
|||
}
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "whether this execution strategy uses virtual threads", readonly = true)
|
||||
public boolean isUseVirtualThreads()
|
||||
{
|
||||
return _useVirtualThreads;
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true)
|
||||
public long getPCTasksConsumed()
|
||||
{
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class ProduceConsume implements ExecutionStrategy, Runnable
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ExecuteProduceConsume.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ProduceConsume.class);
|
||||
|
||||
private final AutoLock _lock = new AutoLock();
|
||||
private final Producer _producer;
|
||||
|
|
|
@ -35,9 +35,9 @@
|
|||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>jdk17</id>
|
||||
<id>enable-incubator-foreign</id>
|
||||
<activation>
|
||||
<jdk>[17,)</jdk>
|
||||
<jdk>[17,19)</jdk>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
|
@ -55,6 +55,25 @@
|
|||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>enable-foreign-and-virtual-threads-preview</id>
|
||||
<activation>
|
||||
<jdk>[19,)</jdk>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<argLine>
|
||||
@{argLine}
|
||||
--enable-preview
|
||||
</argLine>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -120,7 +120,7 @@ public class TransportScenario
|
|||
case H2C:
|
||||
case H2:
|
||||
case FCGI:
|
||||
return new ServerConnector(server, provideServerConnectionFactory(transport));
|
||||
return new ServerConnector(server, 1, 1, provideServerConnectionFactory(transport));
|
||||
case H3:
|
||||
return new HTTP3ServerConnector(server, sslContextFactory, provideServerConnectionFactory(transport));
|
||||
case UNIX_DOMAIN:
|
||||
|
@ -343,6 +343,12 @@ public class TransportScenario
|
|||
}
|
||||
|
||||
public void startServer(Handler handler) throws Exception
|
||||
{
|
||||
prepareServer(handler);
|
||||
server.start();
|
||||
}
|
||||
|
||||
protected void prepareServer(Handler handler)
|
||||
{
|
||||
sslContextFactory = newServerSslContextFactory();
|
||||
QueuedThreadPool serverThreads = new QueuedThreadPool();
|
||||
|
@ -353,23 +359,12 @@ public class TransportScenario
|
|||
server.addBean(mbeanContainer);
|
||||
connector = newServerConnector(server);
|
||||
server.addConnector(connector);
|
||||
|
||||
server.setRequestLog((request, response) ->
|
||||
{
|
||||
int status = response.getCommittedMetaData().getStatus();
|
||||
requestLog.offer(String.format("%s %s %s %03d", request.getMethod(), request.getRequestURI(), request.getProtocol(), status));
|
||||
});
|
||||
|
||||
server.setHandler(handler);
|
||||
|
||||
try
|
||||
{
|
||||
server.start();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
protected SslContextFactory.Server newServerSslContextFactory()
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ReadListener;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletInputStream;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.WriteListener;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.util.StringRequestContent;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.util.VirtualThreads;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
||||
import org.junit.jupiter.api.Assumptions;
|
||||
import org.junit.jupiter.api.condition.DisabledForJreRange;
|
||||
import org.junit.jupiter.api.condition.JRE;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@DisabledForJreRange(max = JRE.JAVA_18)
|
||||
public class VirtualThreadsTest extends AbstractTest<TransportScenario>
|
||||
{
|
||||
@Override
|
||||
public void init(Transport transport) throws IOException
|
||||
{
|
||||
setScenario(new TransportScenario(transport));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testServletInvokedOnVirtualThread(Transport transport) throws Exception
|
||||
{
|
||||
// No virtual thread support in FCGI server-side.
|
||||
Assumptions.assumeTrue(transport != Transport.FCGI);
|
||||
|
||||
init(transport);
|
||||
scenario.prepareServer(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
if (!VirtualThreads.isVirtualThread())
|
||||
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
|
||||
}
|
||||
});
|
||||
ThreadPool threadPool = scenario.server.getThreadPool();
|
||||
if (threadPool instanceof VirtualThreads.Configurable)
|
||||
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
|
||||
scenario.server.start();
|
||||
scenario.startClient();
|
||||
|
||||
ContentResponse response = scenario.client.newRequest(scenario.newURI())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus(), " for transport " + transport);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testServletCallbacksInvokedOnVirtualThread(Transport transport) throws Exception
|
||||
{
|
||||
// No virtual thread support in FCGI server-side.
|
||||
Assumptions.assumeTrue(transport != Transport.FCGI);
|
||||
|
||||
init(transport);
|
||||
byte[] data = new byte[128 * 1024 * 1024];
|
||||
scenario.prepareServer(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
if (!VirtualThreads.isVirtualThread())
|
||||
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
|
||||
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
ServletInputStream input = request.getInputStream();
|
||||
ServletOutputStream output = response.getOutputStream();
|
||||
|
||||
input.setReadListener(new ReadListener()
|
||||
{
|
||||
@Override
|
||||
public void onDataAvailable() throws IOException
|
||||
{
|
||||
if (!VirtualThreads.isVirtualThread())
|
||||
throw new IOException("not a virtual thread");
|
||||
while (input.isReady())
|
||||
{
|
||||
int read = input.read();
|
||||
if (read < 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAllDataRead() throws IOException
|
||||
{
|
||||
if (!VirtualThreads.isVirtualThread())
|
||||
throw new IOException("not a virtual thread");
|
||||
// Write a large response content to cause onWritePossible() to be called.
|
||||
output.write(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
}
|
||||
});
|
||||
|
||||
output.setWriteListener(new WriteListener()
|
||||
{
|
||||
@Override
|
||||
public void onWritePossible() throws IOException
|
||||
{
|
||||
if (!VirtualThreads.isVirtualThread())
|
||||
throw new IOException("not a virtual thread");
|
||||
asyncContext.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
ThreadPool threadPool = scenario.server.getThreadPool();
|
||||
if (threadPool instanceof VirtualThreads.Configurable)
|
||||
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
|
||||
scenario.server.start();
|
||||
scenario.startClient();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicInteger length = new AtomicInteger();
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.method(HttpMethod.POST)
|
||||
.body(new StringRequestContent("hello"))
|
||||
.onResponseContent((response, content) -> length.addAndGet(content.remaining()))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200)
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
assertTrue(latch.await(15, TimeUnit.SECONDS));
|
||||
assertEquals(length.get(), data.length);
|
||||
}
|
||||
}
|
|
@ -10,3 +10,5 @@ org.eclipse.jetty.http2.hpack.LEVEL=INFO
|
|||
org.eclipse.jetty.http3.qpack.LEVEL=INFO
|
||||
#org.eclipse.jetty.quic.LEVEL=DEBUG
|
||||
org.eclipse.jetty.quic.quiche.LEVEL=INFO
|
||||
#org.eclipse.jetty.util.VirtualThreads.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.util.thread.strategy.LEVEL=DEBUG
|
||||
|
|
Loading…
Reference in New Issue