Merged branch 'jetty-10.0.x' into 'jetty-11.0.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-08-10 14:56:43 +02:00
commit 1e778bd97b
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
15 changed files with 461 additions and 36 deletions

View File

@ -18,12 +18,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.TryExecutor;
public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool, TryExecutor
public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPool, TryExecutor, VirtualThreads.Configurable
{
private Executor _executor; // memory barrier provided by start/stop semantics
private TryExecutor _tryExecutor;
@ -61,6 +62,19 @@ public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPo
return _tryExecutor.tryExecute(task);
}
@Override
public boolean isUseVirtualThreads()
{
return VirtualThreads.isUseVirtualThreads(_executor);
}
@Override
public void setUseVirtualThreads(boolean useVirtualThreads)
{
if (_executor instanceof VirtualThreads.Configurable)
((VirtualThreads.Configurable)_executor).setUseVirtualThreads(useVirtualThreads);
}
@Override
public int getIdleThreads()
{

View File

@ -43,11 +43,11 @@ public abstract class AbstractConnection implements Connection
private final Callback _readCallback;
private int _inputBufferSize = 2048;
protected AbstractConnection(EndPoint endp, Executor executor)
protected AbstractConnection(EndPoint endPoint, Executor executor)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endp;
_endPoint = endPoint;
_executor = executor;
_readCallback = new ReadCallback();
}
@ -135,11 +135,6 @@ public abstract class AbstractConnection implements Connection
getEndPoint().fillInterested(_readCallback);
}
public void tryFillInterested()
{
tryFillInterested(_readCallback);
}
public void tryFillInterested(Callback callback)
{
getEndPoint().tryFillInterested(callback);
@ -320,7 +315,7 @@ public abstract class AbstractConnection implements Connection
}
@Override
public void failed(final Throwable x)
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}
@ -328,7 +323,7 @@ public abstract class AbstractConnection implements Connection
@Override
public String toString()
{
return String.format("AC.ReadCB@%h{%s}", AbstractConnection.this, AbstractConnection.this);
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
}
}
}

View File

@ -23,6 +23,7 @@
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="useVirtualThreads" type="boolean"><Property name="jetty.threadPool.useVirtualThreads" default="false"/></Set>
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
</New>

View File

@ -17,6 +17,9 @@ etc/jetty-threadpool.xml
## Number of reserved threads (-1 for heuristic).
#jetty.threadPool.reservedThreads=-1
## Whether to use virtual threads, if the runtime supports them.
#jetty.threadPool.useVirtualThreads=false
## Thread idle timeout (in milliseconds).
#jetty.threadPool.idleTimeout=60000

View File

@ -13,8 +13,6 @@
package org.eclipse.jetty.server;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -34,18 +32,18 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple
{
private final String _protocol;
private final List<String> _protocols;
private int _inputbufferSize = 8192;
private int _inputBufferSize = 8192;
protected AbstractConnectionFactory(String protocol)
{
_protocol = protocol;
_protocols = Collections.unmodifiableList(Arrays.asList(new String[]{protocol}));
_protocols = List.of(protocol);
}
protected AbstractConnectionFactory(String... protocols)
{
_protocol = protocols[0];
_protocols = Collections.unmodifiableList(Arrays.asList(protocols));
_protocols = List.of(protocols);
}
@Override
@ -64,12 +62,12 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple
@ManagedAttribute("The buffer size used to read from the network")
public int getInputBufferSize()
{
return _inputbufferSize;
return _inputBufferSize;
}
public void setInputBufferSize(int size)
{
_inputbufferSize = size;
_inputBufferSize = size;
}
protected String findNextProtocol(Connector connector)

View File

@ -0,0 +1,161 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.util;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Utility class to use to query the runtime for virtual thread support,
* and, if virtual threads are supported, to start virtual threads.</p>
*
* @see #areSupported()
* @see #startVirtualThread(Runnable)
* @see #isVirtualThread()
*/
public class VirtualThreads
{
private static final Logger LOG = LoggerFactory.getLogger(VirtualThreads.class);
private static final Method startVirtualThread = probeStartVirtualThread();
private static final Method isVirtualThread = probeIsVirtualThread();
private static Method probeStartVirtualThread()
{
try
{
return Thread.class.getMethod("startVirtualThread", Runnable.class);
}
catch (Throwable x)
{
return null;
}
}
private static Method probeIsVirtualThread()
{
try
{
return Thread.class.getMethod("isVirtual");
}
catch (Throwable x)
{
return null;
}
}
private static void warn()
{
LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version"));
}
/**
* @return whether the runtime supports virtual threads
*/
public static boolean areSupported()
{
return startVirtualThread != null;
}
/**
* <p>Starts a virtual thread to execute the given task, or throws
* {@link UnsupportedOperationException} if virtual threads are not
* supported.</p>
*
* @param task the task to execute in a virtual thread
* @see #areSupported()
*/
public static void startVirtualThread(Runnable task)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Starting in virtual thread: {}", task);
startVirtualThread.invoke(null, task);
}
catch (Throwable x)
{
warn();
throw new UnsupportedOperationException(x);
}
}
/**
* @return whether the current thread is a virtual thread
*/
public static boolean isVirtualThread()
{
try
{
return (Boolean)isVirtualThread.invoke(Thread.currentThread());
}
catch (Throwable x)
{
warn();
return false;
}
}
/**
* <p>Tests whether the given executor implements {@link Configurable} and
* it has been configured to use virtual threads.</p>
*
* @param executor the Executor to test
* @return whether the given executor implements {@link Configurable}
* and it has been configured to use virtual threads
*/
public static boolean isUseVirtualThreads(Executor executor)
{
if (executor instanceof Configurable)
return ((Configurable)executor).isUseVirtualThreads();
return false;
}
/**
* <p>Implementations of this interface can be configured to use virtual threads.</p>
* <p>Whether virtual threads are actually used depends on whether the runtime
* supports virtual threads and, if the runtime supports them, whether they are
* configured to be used via {@link #setUseVirtualThreads(boolean)}.</p>
*/
public interface Configurable
{
/**
* @return whether to use virtual threads
*/
default boolean isUseVirtualThreads()
{
return false;
}
/**
* @param useVirtualThreads whether to use virtual threads
* @throws UnsupportedOperationException if the runtime does not support virtual threads
* @see #areSupported()
*/
default void setUseVirtualThreads(boolean useVirtualThreads)
{
if (useVirtualThreads && !VirtualThreads.areSupported())
{
warn();
throw new UnsupportedOperationException();
}
}
}
private VirtualThreads()
{
}
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -34,7 +35,7 @@ import org.eclipse.jetty.util.component.DumpableCollection;
* A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}.
*/
@ManagedObject("A thread pool")
public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor
public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor, VirtualThreads.Configurable
{
private final ThreadPoolExecutor _executor;
private final ThreadPoolBudget _budget;
@ -46,6 +47,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
private int _priority = Thread.NORM_PRIORITY;
private boolean _daemon;
private boolean _detailedDump;
private boolean _useVirtualThreads;
public ExecutorThreadPool()
{
@ -268,6 +270,25 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
return getThreads() == getMaxThreads() && _executor.getQueue().size() >= getIdleThreads();
}
@Override
public boolean isUseVirtualThreads()
{
return _useVirtualThreads;
}
@Override
public void setUseVirtualThreads(boolean useVirtualThreads)
{
try
{
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
_useVirtualThreads = useVirtualThreads;
}
catch (UnsupportedOperationException ignored)
{
}
}
@Override
protected void doStart() throws Exception
{

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
@ -74,7 +75,7 @@ import org.slf4j.LoggerFactory;
* </ul>
*/
@ManagedObject("A thread pool")
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable
{
private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class);
private static final Runnable NOOP = () ->
@ -109,6 +110,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
private int _lowThreadsThreshold = 1;
private ThreadPoolBudget _budget;
private long _stopTimeout;
private boolean _useVirtualThreads;
public QueuedThreadPool()
{
@ -511,6 +513,25 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
_lowThreadsThreshold = lowThreadsThreshold;
}
@Override
public boolean isUseVirtualThreads()
{
return _useVirtualThreads;
}
@Override
public void setUseVirtualThreads(boolean useVirtualThreads)
{
try
{
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
_useVirtualThreads = useVirtualThreads;
}
catch (UnsupportedOperationException ignored)
{
}
}
/**
* @return the number of jobs in the queue waiting for a thread
*/

View File

@ -26,6 +26,7 @@ import java.util.stream.Collectors;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
@ -103,6 +104,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
{
if (capacity >= 0)
return capacity;
if (VirtualThreads.isUseVirtualThreads(executor))
return 0;
int cpus = ProcessorUtils.availableProcessors();
if (executor instanceof ThreadPool.SizedThreadPool)
{

View File

@ -21,6 +21,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
@ -136,11 +137,12 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
private final Executor _executor;
private final TryExecutor _tryExecutor;
private final Runnable _runPendingProducer = () -> tryProduce(true);
private boolean _useVirtualThreads;
private State _state = State.IDLE;
private boolean _pending;
/**
* @param producer The produce of tasks to be consumed.
* @param producer The producer of tasks to be consumed.
* @param executor The executor to be used for executing producers or consumers, depending on the sub-strategy.
*/
public AdaptiveExecutionStrategy(Producer producer, Executor executor)
@ -154,6 +156,13 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
LOG.debug("{} created", this);
}
@Override
protected void doStart() throws Exception
{
super.doStart();
_useVirtualThreads = VirtualThreads.isUseVirtualThreads(_executor);
}
@Override
public void dispatch()
{
@ -462,7 +471,10 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
{
try
{
_executor.execute(task);
if (isUseVirtualThreads())
VirtualThreads.startVirtualThread(task);
else
_executor.execute(task);
}
catch (RejectedExecutionException e)
{
@ -476,6 +488,12 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
}
}
@ManagedAttribute(value = "whether this execution strategy uses virtual threads", readonly = true)
public boolean isUseVirtualThreads()
{
return _useVirtualThreads;
}
@ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true)
public long getPCTasksConsumed()
{

View File

@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
*/
public class ProduceConsume implements ExecutionStrategy, Runnable
{
private static final Logger LOG = LoggerFactory.getLogger(ExecuteProduceConsume.class);
private static final Logger LOG = LoggerFactory.getLogger(ProduceConsume.class);
private final AutoLock _lock = new AutoLock();
private final Producer _producer;

View File

@ -35,9 +35,9 @@
<profiles>
<profile>
<id>jdk17</id>
<id>enable-incubator-foreign</id>
<activation>
<jdk>[17,)</jdk>
<jdk>[17,19)</jdk>
</activation>
<build>
<plugins>
@ -55,6 +55,25 @@
</plugins>
</build>
</profile>
<profile>
<id>enable-foreign-and-virtual-threads-preview</id>
<activation>
<jdk>[19,)</jdk>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>
@{argLine}
--enable-preview
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<dependencies>

View File

@ -120,7 +120,7 @@ public class TransportScenario
case H2C:
case H2:
case FCGI:
return new ServerConnector(server, provideServerConnectionFactory(transport));
return new ServerConnector(server, 1, 1, provideServerConnectionFactory(transport));
case H3:
return new HTTP3ServerConnector(server, sslContextFactory, provideServerConnectionFactory(transport));
case UNIX_DOMAIN:
@ -343,6 +343,12 @@ public class TransportScenario
}
public void startServer(Handler handler) throws Exception
{
prepareServer(handler);
server.start();
}
protected void prepareServer(Handler handler)
{
sslContextFactory = newServerSslContextFactory();
QueuedThreadPool serverThreads = new QueuedThreadPool();
@ -353,23 +359,12 @@ public class TransportScenario
server.addBean(mbeanContainer);
connector = newServerConnector(server);
server.addConnector(connector);
server.setRequestLog((request, response) ->
{
int status = response.getCommittedMetaData().getStatus();
requestLog.offer(String.format("%s %s %s %03d", request.getMethod(), request.getRequestURI(), request.getProtocol(), status));
});
server.setHandler(handler);
try
{
server.start();
}
catch (Exception e)
{
e.printStackTrace();
}
}
protected SslContextFactory.Server newServerSslContextFactory()

View File

@ -0,0 +1,174 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.condition.DisabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@DisabledForJreRange(max = JRE.JAVA_18)
public class VirtualThreadsTest extends AbstractTest<TransportScenario>
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testServletInvokedOnVirtualThread(Transport transport) throws Exception
{
// No virtual thread support in FCGI server-side.
Assumptions.assumeTrue(transport != Transport.FCGI);
init(transport);
scenario.prepareServer(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
if (!VirtualThreads.isVirtualThread())
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
}
});
ThreadPool threadPool = scenario.server.getThreadPool();
if (threadPool instanceof VirtualThreads.Configurable)
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
scenario.server.start();
scenario.startClient();
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus(), " for transport " + transport);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testServletCallbacksInvokedOnVirtualThread(Transport transport) throws Exception
{
// No virtual thread support in FCGI server-side.
Assumptions.assumeTrue(transport != Transport.FCGI);
init(transport);
byte[] data = new byte[128 * 1024 * 1024];
scenario.prepareServer(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
if (!VirtualThreads.isVirtualThread())
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
ServletOutputStream output = response.getOutputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
if (!VirtualThreads.isVirtualThread())
throw new IOException("not a virtual thread");
while (input.isReady())
{
int read = input.read();
if (read < 0)
break;
}
}
@Override
public void onAllDataRead() throws IOException
{
if (!VirtualThreads.isVirtualThread())
throw new IOException("not a virtual thread");
// Write a large response content to cause onWritePossible() to be called.
output.write(data);
}
@Override
public void onError(Throwable t)
{
}
});
output.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
if (!VirtualThreads.isVirtualThread())
throw new IOException("not a virtual thread");
asyncContext.complete();
}
@Override
public void onError(Throwable t)
{
}
});
}
});
ThreadPool threadPool = scenario.server.getThreadPool();
if (threadPool instanceof VirtualThreads.Configurable)
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
scenario.server.start();
scenario.startClient();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger length = new AtomicInteger();
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
.body(new StringRequestContent("hello"))
.onResponseContent((response, content) -> length.addAndGet(content.remaining()))
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown();
});
assertTrue(latch.await(15, TimeUnit.SECONDS));
assertEquals(length.get(), data.length);
}
}

View File

@ -10,3 +10,5 @@ org.eclipse.jetty.http2.hpack.LEVEL=INFO
org.eclipse.jetty.http3.qpack.LEVEL=INFO
#org.eclipse.jetty.quic.LEVEL=DEBUG
org.eclipse.jetty.quic.quiche.LEVEL=INFO
#org.eclipse.jetty.util.VirtualThreads.LEVEL=DEBUG
#org.eclipse.jetty.util.thread.strategy.LEVEL=DEBUG