[AMQ-9394] Tech Preview: Virtual Thread support

This commit is contained in:
Matt Pavlovich 2024-03-05 14:51:57 -06:00
parent 317d3542b3
commit 9d9fb74d4c
No known key found for this signature in database
16 changed files with 796 additions and 6 deletions

View File

@ -50,6 +50,7 @@ import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.annotation.Experimental;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
@ -223,6 +224,7 @@ public class BrokerService implements Service {
private boolean monitorConnectionSplits = false;
private int taskRunnerPriority = Thread.NORM_PRIORITY;
private boolean dedicatedTaskRunner;
private boolean virtualThreadTaskRunner;
private boolean cacheTempDestinations = false;// useful for failover
private int timeBeforePurgeTempDestinations = 5000;
private final List<Runnable> shutdownHooks = new ArrayList<>();
@ -1269,7 +1271,7 @@ public class BrokerService implements Service {
public TaskRunnerFactory getTaskRunnerFactory() {
if (this.taskRunnerFactory == null) {
this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
isDedicatedTaskRunner());
isDedicatedTaskRunner(), isVirtualThreadTaskRunner());
this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
}
return this.taskRunnerFactory;
@ -1280,9 +1282,10 @@ public class BrokerService implements Service {
}
public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
// [AMQ-9394] TODO: Should we have a separate config flag for virtualThread for persistence task runner?
if (taskRunnerFactory == null) {
persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
true, 1000, isDedicatedTaskRunner());
true, 1000, isDedicatedTaskRunner(), isVirtualThreadTaskRunner());
}
return persistenceTaskRunnerFactory;
}
@ -1891,6 +1894,15 @@ public class BrokerService implements Service {
this.dedicatedTaskRunner = dedicatedTaskRunner;
}
public boolean isVirtualThreadTaskRunner() {
return virtualThreadTaskRunner;
}
@Experimental("Tech Preview for Virtaul Thread support")
public void setVirtualThreadTaskRunner(boolean virtualThreadTaskRunner) {
this.virtualThreadTaskRunner = virtualThreadTaskRunner;
}
public boolean isCacheTempDestinations() {
return cacheTempDestinations;
}

View File

@ -517,6 +517,16 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.isSlave();
}
@Override
public boolean isDedicatedTaskRunner() {
return brokerService.isDedicatedTaskRunner();
}
@Override
public boolean isVirtualThreadTaskRunner() {
return brokerService.isVirtualThreadTaskRunner();
}
private ManagedRegionBroker safeGetBroker() {
if (broker == null) {
throw new IllegalStateException("Broker is not yet started.");

View File

@ -335,4 +335,11 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo(value="The total number of times that the max number of uncommitted count has been exceeded across all destinations")
long getTotalMaxUncommittedExceededCount();
@MBeanInfo("Dedicated Task Runner enabled.")
boolean isDedicatedTaskRunner();
@MBeanInfo("Virtual Thread Task Runner enabled.")
boolean isVirtualThreadTaskRunner();
}

View File

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>6.2.0-SNAPSHOT</version>
</parent>
<artifactId>activemq-client-jdk21-test</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ :: Client JDK 21 Test</name>
<description>Test module for activemq-client-jdk21 with tech preview support for Virtual Threads</description>
<properties>
<source-version>21</source-version>
<target-version>21</target-version>
<maven.javadoc.skip>true</maven.javadoc.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client-jdk21</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-unit-tests</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import junit.framework.Test;
public class VirtualThreadTaskRunnerBrokerTest extends BrokerTest {
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.setVirtualThreadTaskRunner(true);
return broker;
}
public static Test suite() {
return suite(VirtualThreadTaskRunnerBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -0,0 +1,209 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>6.2.0-SNAPSHOT</version>
</parent>
<artifactId>activemq-client-jdk21</artifactId>
<packaging>bundle</packaging>
<name>ActiveMQ :: Client JDK 21</name>
<description>ActiveMQ Client implementation compiled with JDK 21 and tech preview support for Virtual Threads</description>
<properties>
<source-version>21</source-version>
<target-version>21</target-version>
<maven.javadoc.skip>true</maven.javadoc.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
<version>${jakarta-jms-api-version}</version>
</dependency>
<dependency>
<groupId>org.fusesource.hawtbuf</groupId>
<artifactId>hawtbuf</artifactId>
<version>${hawtbuf-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.jmdns</groupId>
<artifactId>jmdns</artifactId>
<optional>true</optional>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-unit-tests</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-source</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<classifier>sources</classifier>
<type>jar</type>
<outputDirectory>${project.build.directory}/copied-sources/activemq-client</outputDirectory>
<excludes>**/META-INF/*,**/META-INF/maven/**,**/zeroconf/**</excludes>
<includes>**/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-java-source</id>
<phase>generate-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<resources>
<resource>
<directory>${project.build.directory}/copied-sources/activemq-client</directory>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-resources</id>
<phase>generate-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/generated-resources/META-INF</outputDirectory>
<resources>
<resource>
<directory>${project.build.directory}/copied-sources/activemq-client/META-INF</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-resource</id>
<phase>generate-resources</phase>
<goals>
<goal>add-resource</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>${project.build.directory}/generated-resources</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<inherited>true</inherited>
<configuration>
<instructions>
<Import-Package>
!java.*,
!com.google.errorprone.annotations,
!com.google.errorprone.annotations.concurrent,
com.thoughtworks.xstream.*;resolution:="optional",
*
</Import-Package>
<Private-Package>
com.google.errorprone.annotations,
com.google.errorprone.annotations.concurrent
</Private-Package>
<_noee>true</_noee>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.annotation.Experimental;
import org.slf4j.Logger;
@Experimental("Tech Preview for Virtual Thread support")
public class VirtualThreadExecutor {
private VirtualThreadExecutor() {}
public static ExecutorService createVirtualThreadExecutorService(final String name, final AtomicLong id, final Logger LOG) {
// [AMQ-9394] NOTE: Submitted JDK feature enhancement id: 9076243 to allow AtomicLong thread id param
// https://bugs.java.com/bugdatabase/view_bug?bug_id=JDK-8320377
Thread.Builder.OfVirtual threadBuilderOfVirtual = Thread.ofVirtual()
.name(name)
.uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Error in thread '{}'", t.getName(), e);
}
});
// [AMQ-9394] Work around to have global thread id increment across ThreadFactories
ThreadFactory virtualThreadFactory = threadBuilderOfVirtual.factory();
ThreadFactory atomicThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread tmpThread = virtualThreadFactory.newThread(r);
tmpThread.setName(tmpThread.getName() + id.incrementAndGet());
return tmpThread;
}
};
return Executors.newThreadPerTaskExecutor(atomicThreadFactory); // [AMQ-9394] Same as newVirtualThreadPerTaskExecutor
}
}

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.thread;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class VirtualThreadTaskRunner implements TaskRunner {
private static final Logger LOG = LoggerFactory.getLogger(VirtualThreadTaskRunner.class);
private final int maxIterationsPerRun;
private final Executor executor;
private final Task task;
private final Lock taskRunnerLock = new ReentrantLock();
private final Runnable runable;
private final Condition runnableCondition;
private boolean queued;
private boolean shutdown;
private boolean iterating;
// [AMQ-9394] TODO: Remove references to the running thread where possible
private volatile Thread runningThread;
public VirtualThreadTaskRunner(Executor executor, final Task task, int maxIterationsPerRun) {
this.executor = executor;
this.maxIterationsPerRun = maxIterationsPerRun;
this.task = task;
runable = new Runnable() {
@Override
public void run() {
runningThread = Thread.currentThread();
try {
runTask();
} finally {
LOG.trace("Run task done: {}", task);
runningThread = null;
}
}
};
this.runnableCondition = taskRunnerLock.newCondition();
}
/**
* We Expect MANY wakeup calls on the same TaskRunner.
*/
@Override
public void wakeup() throws InterruptedException {
taskRunnerLock.lock();
try {
// When we get in here, we make some assumptions of state:
// queued=false, iterating=false: wakeup() has not be called and
// therefore task is not executing.
// queued=true, iterating=false: wakeup() was called but, task
// execution has not started yet
// queued=false, iterating=true : wakeup() was called, which caused
// task execution to start.
// queued=true, iterating=true : wakeup() called after task
// execution was started.
if (queued || shutdown) {
return;
}
queued = true;
// The runTask() method will do this for me once we are done
// iterating.
if (!iterating) {
executor.execute(runable);
}
} finally {
taskRunnerLock.unlock();
}
}
/**
* shut down the task
*
* @throws InterruptedException
*/
@Override
public void shutdown(long timeout) throws InterruptedException {
LOG.trace("Shutdown timeout: {} task: {}", timeout, task);
taskRunnerLock.lock();
try {
shutdown = true;
// the check on the thread is done
// because a call to iterate can result in
// shutDown() being called, which would wait forever
// waiting for iterating to finish
if (runningThread != Thread.currentThread()) {
if (iterating) {
runnableCondition.await(timeout, TimeUnit.MILLISECONDS);
}
}
} finally {
taskRunnerLock.unlock();
}
}
@Override
public void shutdown() throws InterruptedException {
shutdown(0);
}
final void runTask() {
taskRunnerLock.lock();
try {
queued = false;
if (shutdown) {
iterating = false;
runnableCondition.signalAll();
return;
}
iterating = true;
} finally {
taskRunnerLock.unlock();
}
// Don't synchronize while we are iterating so that
// multiple wakeup() calls can be executed concurrently.
boolean done = false;
try {
for (int i = 0; i < maxIterationsPerRun; i++) {
LOG.trace("Running task iteration {} - {}", i, task);
if (!task.iterate()) {
done = true;
break;
}
}
} finally {
taskRunnerLock.lock();
try {
iterating = false;
runnableCondition.signalAll();
if (shutdown) {
queued = false;
runnableCondition.signalAll();
} else {
// If we could not iterate all the items
// then we need to re-queue.
if (!done) {
queued = true;
}
if (queued) {
executor.execute(runable);
}
}
} finally {
taskRunnerLock.unlock();
}
}
}
}

View File

@ -199,6 +199,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner;
private boolean useVirtualThreadTaskRunner;
protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
private long consumerFailoverRedeliveryWaitPeriod;
private volatile Scheduler scheduler;
@ -1068,10 +1069,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
public boolean isUseVirtualThreadTaskRunner() {
return useVirtualThreadTaskRunner;
}
public void setUseVirtualThreadTaskRunner(boolean useVirtualThreadTaskRunner) {
this.useVirtualThreadTaskRunner = useVirtualThreadTaskRunner;
}
public TaskRunnerFactory getSessionTaskRunner() {
synchronized (this) {
if (sessionTaskRunner == null) {
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
if(isUseVirtualThreadTaskRunner()) {
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), isUseVirtualThreadTaskRunner());
} else {
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
}
sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
}
}

View File

@ -148,6 +148,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
private boolean useDedicatedTaskRunner;
private boolean useVirtualThreadTaskRunner;
private long consumerFailoverRedeliveryWaitPeriod = 0;
private boolean checkForDuplicates = true;
private ClientInternalExceptionListener clientInternalExceptionListener;
@ -438,6 +439,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setAuditDepth(getAuditDepth());
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
connection.setUseVirtualThreadTaskRunner(isUseVirtualThreadTaskRunner());
connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
connection.setCheckForDuplicates(isCheckForDuplicates());
connection.setMessagePrioritySupported(isMessagePrioritySupported());
@ -1148,6 +1150,14 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
return useDedicatedTaskRunner;
}
public void setUseVirtualThreadTaskRunner(boolean useVirtualThreadTaskRunner) {
this.useVirtualThreadTaskRunner = useVirtualThreadTaskRunner;
}
public boolean isUseVirtualThreadTaskRunner() {
return useVirtualThreadTaskRunner;
}
public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* The Experimental annotation documents in-progress
* ActiveMQ features and communicates preview status
* of new features that may change or be removed
* between any release,
*
* @author Matt Pavlovich <mattrpav@apache.org>
* @since 6.2.0
*/
@Documented
@Retention(value=RetentionPolicy.CLASS)
@Target(value={
ElementType.CONSTRUCTOR,
ElementType.FIELD,
ElementType.LOCAL_VARIABLE,
ElementType.METHOD,
ElementType.PACKAGE,
ElementType.PARAMETER,
ElementType.TYPE})
public @interface Experimental {
String value();
}

View File

@ -16,8 +16,12 @@
*/
package org.apache.activemq.thread;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -56,6 +60,9 @@ public class TaskRunnerFactory implements Executor {
private RejectedExecutionHandler rejectedTaskHandler = null;
private ClassLoader threadClassLoader;
// [AMQ-9394] Virtual Thread support
private boolean virtualThreadTaskRunner = false;
public TaskRunnerFactory() {
this("ActiveMQ Task");
}
@ -72,13 +79,22 @@ public class TaskRunnerFactory implements Executor {
this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize());
}
public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, boolean virtualThreadTaskRunner) {
this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize(), virtualThreadTaskRunner);
}
public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, maxThreadPoolSize, false);
}
public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize, boolean virtualThreadTaskRunner) {
this.name = name;
this.priority = priority;
this.daemon = daemon;
this.maxIterationsPerRun = maxIterationsPerRun;
this.dedicatedTaskRunner = dedicatedTaskRunner;
this.maxThreadPoolSize = maxThreadPoolSize;
this.virtualThreadTaskRunner = virtualThreadTaskRunner;
}
public void init() {
@ -90,7 +106,9 @@ public class TaskRunnerFactory implements Executor {
synchronized(this) {
//need to recheck if initDone is true under the lock
if (!initDone.get()) {
if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
if (virtualThreadTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseVirtualThreadTaskRunner")) ) {
executorRef.compareAndSet(null, createVirtualThreadExecutor());
} else if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
executorRef.set(null);
} else {
executorRef.compareAndSet(null, createDefaultExecutor());
@ -154,7 +172,11 @@ public class TaskRunnerFactory implements Executor {
init();
ExecutorService executor = executorRef.get();
if (executor != null) {
return new PooledTaskRunner(executor, task, maxIterationsPerRun);
if(isVirtualThreadTaskRunner()) {
return createVirtualThreadTaskRunner(executor, task, maxIterationsPerRun);
} else {
return new PooledTaskRunner(executor, task, maxIterationsPerRun);
}
} else {
return new DedicatedTaskRunner(task, name, priority, daemon);
}
@ -217,6 +239,48 @@ public class TaskRunnerFactory implements Executor {
return rc;
}
protected ExecutorService createVirtualThreadExecutor() {
if(!(Runtime.version().feature() >= 21)) {
LOG.error("Virtual Thread support requires JDK 21 or higher");
throw new IllegalStateException("Virtual Thread support requires JDK 21 or higher");
}
try {
Class<?> virtualThreadExecutorClass = Class.forName("org.apache.activemq.thread.VirtualThreadExecutor", false, threadClassLoader);
Method method = virtualThreadExecutorClass.getMethod("createVirtualThreadExecutorService", String.class, AtomicLong.class, Logger.class);
Object result = method.invoke(null, name, id, LOG);
if(!ExecutorService.class.isAssignableFrom(result.getClass())) {
throw new IllegalStateException("VirtualThreadExecutor not returned");
}
LOG.info("VirtualThreadExecutor initialized name:{}", name);
return ExecutorService.class.cast(result);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException e) {
LOG.error("VirtualThreadExecutor class failed to load", e);
throw new IllegalStateException(e);
}
}
protected TaskRunner createVirtualThreadTaskRunner(Executor executor, Task task, int maxIterations) {
if(!(Runtime.version().feature() >= 21)) {
LOG.error("Virtual Thread support requires JDK 21 or higher");
throw new IllegalStateException("Virtual Thread support requires JDK 21 or higher");
}
try {
Class<?> virtualThreadTaskRunnerClass = Class.forName("org.apache.activemq.thread.VirtualThreadTaskRunner", false, threadClassLoader);
Constructor<?> constructor = virtualThreadTaskRunnerClass.getConstructor(Executor.class, Task.class, Integer.TYPE);
Object result = constructor.newInstance(executor, task, maxIterations);
if(!TaskRunner.class.isAssignableFrom(result.getClass())) {
throw new IllegalStateException("VirtualThreadTaskRunner not returned");
}
return TaskRunner.class.cast(result);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException | InstantiationException | IllegalArgumentException e) {
LOG.error("VirtualThreadTaskRunner class failed to load", e);
throw new IllegalStateException(e);
}
}
public ExecutorService getExecutor() {
return executorRef.get();
}
@ -265,6 +329,14 @@ public class TaskRunnerFactory implements Executor {
this.dedicatedTaskRunner = dedicatedTaskRunner;
}
public boolean isVirtualThreadTaskRunner() {
return virtualThreadTaskRunner;
}
public void setVirtualThreadTaskRunner(boolean virtualThreadTaskRunner) {
this.virtualThreadTaskRunner = virtualThreadTaskRunner;
}
public int getMaxThreadPoolSize() {
return maxThreadPoolSize;
}

View File

@ -91,7 +91,7 @@ public class TaskRunnerTest {
for (int i = 0; i < enqueueCount / workerCount; i++) {
queue.incrementAndGet();
runner.wakeup();
yield();
this.yield();
}
} catch (BrokenBarrierException e) {
} catch (InterruptedException e) {

View File

@ -41,6 +41,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-client-jdk21</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-openwire-legacy</artifactId>

View File

@ -158,6 +158,16 @@
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
</dependencySet>
<dependencySet>
<outputDirectory>lib/jdk21</outputDirectory>
<unpack>false</unpack>
<scope>runtime</scope>
<includes>
<include>${pom.groupId}:activemq-client-jdk21</include>
</includes>
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
</dependencySet>
<dependencySet>
<outputDirectory>lib/camel</outputDirectory>
<unpack>false</unpack>

48
pom.xml
View File

@ -301,6 +301,11 @@
<artifactId>activemq-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client-jdk21</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
@ -1597,5 +1602,48 @@
<javadoc.options>-Xdoclint:none</javadoc.options>
</properties>
</profile>
<profile>
<id>jdk-21</id>
<activation>
<jdk>[21,)</jdk>
</activation>
<modules>
<module>activemq-openwire-generator</module>
<module>activemq-client</module>
<module>activemq-client-jdk21</module>
<module>activemq-client-jdk21-test</module>
<module>activemq-openwire-legacy</module>
<module>activemq-broker</module>
<module>activemq-stomp</module>
<module>activemq-mqtt</module>
<module>activemq-amqp</module>
<module>activemq-kahadb-store</module>
<module>activemq-jdbc-store</module>
<module>activemq-unit-tests</module>
<module>activemq-all</module>
<module>activemq-console</module>
<module>activemq-jaas</module>
<module>activemq-jms-pool</module>
<module>activemq-pool</module>
<module>activemq-cf</module>
<module>activemq-ra</module>
<module>activemq-rar</module>
<module>activemq-run</module>
<module>activemq-shiro</module>
<module>activemq-spring</module>
<module>activemq-runtime-config</module>
<module>activemq-tooling</module>
<module>activemq-web</module>
<module>activemq-web-demo</module>
<module>activemq-web-console</module>
<module>activemq-karaf</module>
<module>activemq-osgi</module>
<module>activemq-blueprint</module>
<module>activemq-karaf-itest</module>
<module>assembly</module>
<module>activemq-log4j-appender</module>
<module>activemq-http</module>
</modules>
</profile>
</profiles>
</project>