From 9d9fb74d4c6147919967c7d45eea195f1fdaf3f0 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Tue, 5 Mar 2024 14:51:57 -0600 Subject: [PATCH] [AMQ-9394] Tech Preview: Virtual Thread support --- .../apache/activemq/broker/BrokerService.java | 16 +- .../activemq/broker/jmx/BrokerView.java | 10 + .../activemq/broker/jmx/BrokerViewMBean.java | 7 + activemq-client-jdk21-test/pom.xml | 78 +++++++ .../VirtualThreadTaskRunnerBrokerTest.java | 37 ++++ activemq-client-jdk21/pom.xml | 209 ++++++++++++++++++ .../thread/VirtualThreadExecutor.java | 58 +++++ .../thread/VirtualThreadTaskRunner.java | 176 +++++++++++++++ .../apache/activemq/ActiveMQConnection.java | 15 +- .../activemq/ActiveMQConnectionFactory.java | 10 + .../activemq/annotation/Experimental.java | 46 ++++ .../activemq/thread/TaskRunnerFactory.java | 76 ++++++- .../activemq/thread/TaskRunnerTest.java | 2 +- assembly/pom.xml | 4 + assembly/src/main/descriptors/common-bin.xml | 10 + pom.xml | 48 ++++ 16 files changed, 796 insertions(+), 6 deletions(-) create mode 100644 activemq-client-jdk21-test/pom.xml create mode 100644 activemq-client-jdk21-test/src/test/java/org/apache/activemq/broker/VirtualThreadTaskRunnerBrokerTest.java create mode 100644 activemq-client-jdk21/pom.xml create mode 100644 activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadExecutor.java create mode 100644 activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadTaskRunner.java create mode 100644 activemq-client/src/main/java/org/apache/activemq/annotation/Experimental.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index b05c3ec66d..fe4125812e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -50,6 +50,7 @@ import org.apache.activemq.ActiveMQConnectionMetaData; import org.apache.activemq.ConfigurationException; import org.apache.activemq.Service; import org.apache.activemq.advisory.AdvisoryBroker; +import org.apache.activemq.annotation.Experimental; import org.apache.activemq.broker.cluster.ConnectionSplitBroker; import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.BrokerMBeanSupport; @@ -223,6 +224,7 @@ public class BrokerService implements Service { private boolean monitorConnectionSplits = false; private int taskRunnerPriority = Thread.NORM_PRIORITY; private boolean dedicatedTaskRunner; + private boolean virtualThreadTaskRunner; private boolean cacheTempDestinations = false;// useful for failover private int timeBeforePurgeTempDestinations = 5000; private final List shutdownHooks = new ArrayList<>(); @@ -1269,7 +1271,7 @@ public class BrokerService implements Service { public TaskRunnerFactory getTaskRunnerFactory() { if (this.taskRunnerFactory == null) { this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, - isDedicatedTaskRunner()); + isDedicatedTaskRunner(), isVirtualThreadTaskRunner()); this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); } return this.taskRunnerFactory; @@ -1280,9 +1282,10 @@ public class BrokerService implements Service { } public TaskRunnerFactory getPersistenceTaskRunnerFactory() { + // [AMQ-9394] TODO: Should we have a separate config flag for virtualThread for persistence task runner? if (taskRunnerFactory == null) { persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, - true, 1000, isDedicatedTaskRunner()); + true, 1000, isDedicatedTaskRunner(), isVirtualThreadTaskRunner()); } return persistenceTaskRunnerFactory; } @@ -1891,6 +1894,15 @@ public class BrokerService implements Service { this.dedicatedTaskRunner = dedicatedTaskRunner; } + public boolean isVirtualThreadTaskRunner() { + return virtualThreadTaskRunner; + } + + @Experimental("Tech Preview for Virtaul Thread support") + public void setVirtualThreadTaskRunner(boolean virtualThreadTaskRunner) { + this.virtualThreadTaskRunner = virtualThreadTaskRunner; + } + public boolean isCacheTempDestinations() { return cacheTempDestinations; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index e8ec158dae..c624fc089c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -517,6 +517,16 @@ public class BrokerView implements BrokerViewMBean { return brokerService.isSlave(); } + @Override + public boolean isDedicatedTaskRunner() { + return brokerService.isDedicatedTaskRunner(); + } + + @Override + public boolean isVirtualThreadTaskRunner() { + return brokerService.isVirtualThreadTaskRunner(); + } + private ManagedRegionBroker safeGetBroker() { if (broker == null) { throw new IllegalStateException("Broker is not yet started."); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index 7584a71734..29a731d82f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -335,4 +335,11 @@ public interface BrokerViewMBean extends Service { @MBeanInfo(value="The total number of times that the max number of uncommitted count has been exceeded across all destinations") long getTotalMaxUncommittedExceededCount(); + + @MBeanInfo("Dedicated Task Runner enabled.") + boolean isDedicatedTaskRunner(); + + @MBeanInfo("Virtual Thread Task Runner enabled.") + boolean isVirtualThreadTaskRunner(); + } diff --git a/activemq-client-jdk21-test/pom.xml b/activemq-client-jdk21-test/pom.xml new file mode 100644 index 0000000000..c433d2edd3 --- /dev/null +++ b/activemq-client-jdk21-test/pom.xml @@ -0,0 +1,78 @@ + + + + 4.0.0 + + org.apache.activemq + activemq-parent + 6.2.0-SNAPSHOT + + activemq-client-jdk21-test + jar + ActiveMQ :: Client JDK 21 Test + Test module for activemq-client-jdk21 with tech preview support for Virtual Threads + + 21 + 21 + true + + + + org.apache.activemq + activemq-client-jdk21 + ${project.version} + + + org.apache.activemq + activemq-client + + + + + junit + junit + test + + + org.apache.activemq + activemq-broker + test + tests + ${project.version} + + + org.apache.activemq + activemq-client + + + + + org.apache.activemq + activemq-unit-tests + test + tests + ${project.version} + + + org.apache.activemq + activemq-client + + + + + diff --git a/activemq-client-jdk21-test/src/test/java/org/apache/activemq/broker/VirtualThreadTaskRunnerBrokerTest.java b/activemq-client-jdk21-test/src/test/java/org/apache/activemq/broker/VirtualThreadTaskRunnerBrokerTest.java new file mode 100644 index 0000000000..88d67158ae --- /dev/null +++ b/activemq-client-jdk21-test/src/test/java/org/apache/activemq/broker/VirtualThreadTaskRunnerBrokerTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; + +public class VirtualThreadTaskRunnerBrokerTest extends BrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setVirtualThreadTaskRunner(true); + return broker; + } + + public static Test suite() { + return suite(VirtualThreadTaskRunnerBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} diff --git a/activemq-client-jdk21/pom.xml b/activemq-client-jdk21/pom.xml new file mode 100644 index 0000000000..06b76930e7 --- /dev/null +++ b/activemq-client-jdk21/pom.xml @@ -0,0 +1,209 @@ + + + + 4.0.0 + + org.apache.activemq + activemq-parent + 6.2.0-SNAPSHOT + + activemq-client-jdk21 + bundle + ActiveMQ :: Client JDK 21 + ActiveMQ Client implementation compiled with JDK 21 and tech preview support for Virtual Threads + + 21 + 21 + true + + + + org.apache.activemq + activemq-client + provided + + + jakarta.jms + jakarta.jms-api + + + + + jakarta.jms + jakarta.jms-api + ${jakarta-jms-api-version} + + + org.fusesource.hawtbuf + hawtbuf + ${hawtbuf-version} + + + org.slf4j + slf4j-api + + + org.jmdns + jmdns + true + provided + + + com.thoughtworks.xstream + xstream + provided + + + junit + junit + test + + + org.apache.activemq + activemq-broker + test + tests + ${project.version} + + + org.apache.activemq + activemq-unit-tests + test + tests + ${project.version} + + + + + + maven-dependency-plugin + + + unpack-source + initialize + + unpack + + + + + org.apache.activemq + activemq-client + sources + jar + ${project.build.directory}/copied-sources/activemq-client + **/META-INF/*,**/META-INF/maven/**,**/zeroconf/** + **/** + + + + + + + + maven-resources-plugin + + + copy-java-source + generate-sources + + copy-resources + + + ${project.build.directory}/generated-sources + + + ${project.build.directory}/copied-sources/activemq-client + + + + + + copy-resources + generate-sources + + copy-resources + + + ${project.build.directory}/generated-resources/META-INF + + + ${project.build.directory}/copied-sources/activemq-client/META-INF + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources + + + + + add-resource + generate-resources + + add-resource + + + + + ${project.build.directory}/generated-resources + + + + + + + + org.apache.felix + maven-bundle-plugin + true + true + + + + !java.*, + !com.google.errorprone.annotations, + !com.google.errorprone.annotations.concurrent, + com.thoughtworks.xstream.*;resolution:="optional", + * + + + com.google.errorprone.annotations, + com.google.errorprone.annotations.concurrent + + <_noee>true + + + + + + diff --git a/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadExecutor.java b/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadExecutor.java new file mode 100644 index 0000000000..ad08897eeb --- /dev/null +++ b/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadExecutor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.thread; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.annotation.Experimental; +import org.slf4j.Logger; + +@Experimental("Tech Preview for Virtual Thread support") +public class VirtualThreadExecutor { + + private VirtualThreadExecutor() {} + + public static ExecutorService createVirtualThreadExecutorService(final String name, final AtomicLong id, final Logger LOG) { + + // [AMQ-9394] NOTE: Submitted JDK feature enhancement id: 9076243 to allow AtomicLong thread id param + // https://bugs.java.com/bugdatabase/view_bug?bug_id=JDK-8320377 + Thread.Builder.OfVirtual threadBuilderOfVirtual = Thread.ofVirtual() + .name(name) + .uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + LOG.error("Error in thread '{}'", t.getName(), e); + } + }); + + // [AMQ-9394] Work around to have global thread id increment across ThreadFactories + ThreadFactory virtualThreadFactory = threadBuilderOfVirtual.factory(); + ThreadFactory atomicThreadFactory = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread tmpThread = virtualThreadFactory.newThread(r); + tmpThread.setName(tmpThread.getName() + id.incrementAndGet()); + return tmpThread; + } + }; + + return Executors.newThreadPerTaskExecutor(atomicThreadFactory); // [AMQ-9394] Same as newVirtualThreadPerTaskExecutor + } +} diff --git a/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadTaskRunner.java b/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadTaskRunner.java new file mode 100644 index 0000000000..0b58657e7e --- /dev/null +++ b/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadTaskRunner.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.thread; + +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class VirtualThreadTaskRunner implements TaskRunner { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualThreadTaskRunner.class); + private final int maxIterationsPerRun; + private final Executor executor; + private final Task task; + private final Lock taskRunnerLock = new ReentrantLock(); + private final Runnable runable; + private final Condition runnableCondition; + private boolean queued; + private boolean shutdown; + private boolean iterating; + // [AMQ-9394] TODO: Remove references to the running thread where possible + private volatile Thread runningThread; + + public VirtualThreadTaskRunner(Executor executor, final Task task, int maxIterationsPerRun) { + this.executor = executor; + this.maxIterationsPerRun = maxIterationsPerRun; + this.task = task; + runable = new Runnable() { + @Override + public void run() { + runningThread = Thread.currentThread(); + try { + runTask(); + } finally { + LOG.trace("Run task done: {}", task); + runningThread = null; + } + } + }; + this.runnableCondition = taskRunnerLock.newCondition(); + } + + /** + * We Expect MANY wakeup calls on the same TaskRunner. + */ + @Override + public void wakeup() throws InterruptedException { + taskRunnerLock.lock(); + try { + // When we get in here, we make some assumptions of state: + // queued=false, iterating=false: wakeup() has not be called and + // therefore task is not executing. + // queued=true, iterating=false: wakeup() was called but, task + // execution has not started yet + // queued=false, iterating=true : wakeup() was called, which caused + // task execution to start. + // queued=true, iterating=true : wakeup() called after task + // execution was started. + + if (queued || shutdown) { + return; + } + + queued = true; + + // The runTask() method will do this for me once we are done + // iterating. + if (!iterating) { + executor.execute(runable); + } + } finally { + taskRunnerLock.unlock(); + } + } + + /** + * shut down the task + * + * @throws InterruptedException + */ + @Override + public void shutdown(long timeout) throws InterruptedException { + LOG.trace("Shutdown timeout: {} task: {}", timeout, task); + taskRunnerLock.lock(); + try { + shutdown = true; + // the check on the thread is done + // because a call to iterate can result in + // shutDown() being called, which would wait forever + // waiting for iterating to finish + if (runningThread != Thread.currentThread()) { + if (iterating) { + runnableCondition.await(timeout, TimeUnit.MILLISECONDS); + } + } + } finally { + taskRunnerLock.unlock(); + } + } + + @Override + public void shutdown() throws InterruptedException { + shutdown(0); + } + + final void runTask() { + + taskRunnerLock.lock(); + try { + queued = false; + if (shutdown) { + iterating = false; + runnableCondition.signalAll(); + return; + } + iterating = true; + } finally { + taskRunnerLock.unlock(); + } + + // Don't synchronize while we are iterating so that + // multiple wakeup() calls can be executed concurrently. + boolean done = false; + try { + for (int i = 0; i < maxIterationsPerRun; i++) { + LOG.trace("Running task iteration {} - {}", i, task); + if (!task.iterate()) { + done = true; + break; + } + } + } finally { + taskRunnerLock.lock(); + try { + iterating = false; + runnableCondition.signalAll(); + if (shutdown) { + queued = false; + runnableCondition.signalAll(); + } else { + // If we could not iterate all the items + // then we need to re-queue. + if (!done) { + queued = true; + } + + if (queued) { + executor.execute(runable); + } + } + + } finally { + taskRunnerLock.unlock(); + } + } + } +} diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index a91349b28b..74a25b728a 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -199,6 +199,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private DestinationSource destinationSource; private final Object ensureConnectionInfoSentMutex = new Object(); private boolean useDedicatedTaskRunner; + private boolean useVirtualThreadTaskRunner; protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0); private long consumerFailoverRedeliveryWaitPeriod; private volatile Scheduler scheduler; @@ -1068,10 +1069,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.useDedicatedTaskRunner = useDedicatedTaskRunner; } + public boolean isUseVirtualThreadTaskRunner() { + return useVirtualThreadTaskRunner; + } + + public void setUseVirtualThreadTaskRunner(boolean useVirtualThreadTaskRunner) { + this.useVirtualThreadTaskRunner = useVirtualThreadTaskRunner; + } + public TaskRunnerFactory getSessionTaskRunner() { synchronized (this) { if (sessionTaskRunner == null) { - sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize); + if(isUseVirtualThreadTaskRunner()) { + sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), isUseVirtualThreadTaskRunner()); + } else { + sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize); + } sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler); } } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 54969f39ce..ae57d2624b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -148,6 +148,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; private boolean useDedicatedTaskRunner; + private boolean useVirtualThreadTaskRunner; private long consumerFailoverRedeliveryWaitPeriod = 0; private boolean checkForDuplicates = true; private ClientInternalExceptionListener clientInternalExceptionListener; @@ -438,6 +439,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setAuditDepth(getAuditDepth()); connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); + connection.setUseVirtualThreadTaskRunner(isUseVirtualThreadTaskRunner()); connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); connection.setCheckForDuplicates(isCheckForDuplicates()); connection.setMessagePrioritySupported(isMessagePrioritySupported()); @@ -1148,6 +1150,14 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne return useDedicatedTaskRunner; } + public void setUseVirtualThreadTaskRunner(boolean useVirtualThreadTaskRunner) { + this.useVirtualThreadTaskRunner = useVirtualThreadTaskRunner; + } + + public boolean isUseVirtualThreadTaskRunner() { + return useVirtualThreadTaskRunner; + } + public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; } diff --git a/activemq-client/src/main/java/org/apache/activemq/annotation/Experimental.java b/activemq-client/src/main/java/org/apache/activemq/annotation/Experimental.java new file mode 100644 index 0000000000..62b6bd429c --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/annotation/Experimental.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * The Experimental annotation documents in-progress + * ActiveMQ features and communicates preview status + * of new features that may change or be removed + * between any release, + * + * @author Matt Pavlovich + * @since 6.2.0 + */ +@Documented +@Retention(value=RetentionPolicy.CLASS) +@Target(value={ + ElementType.CONSTRUCTOR, + ElementType.FIELD, + ElementType.LOCAL_VARIABLE, + ElementType.METHOD, + ElementType.PACKAGE, + ElementType.PARAMETER, + ElementType.TYPE}) +public @interface Experimental { + String value(); +} diff --git a/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index 002aec79d4..3c872bef3c 100644 --- a/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -16,8 +16,12 @@ */ package org.apache.activemq.thread; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -56,6 +60,9 @@ public class TaskRunnerFactory implements Executor { private RejectedExecutionHandler rejectedTaskHandler = null; private ClassLoader threadClassLoader; + // [AMQ-9394] Virtual Thread support + private boolean virtualThreadTaskRunner = false; + public TaskRunnerFactory() { this("ActiveMQ Task"); } @@ -72,13 +79,22 @@ public class TaskRunnerFactory implements Executor { this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize()); } + public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, boolean virtualThreadTaskRunner) { + this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize(), virtualThreadTaskRunner); + } + public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) { + this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, maxThreadPoolSize, false); + } + + public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize, boolean virtualThreadTaskRunner) { this.name = name; this.priority = priority; this.daemon = daemon; this.maxIterationsPerRun = maxIterationsPerRun; this.dedicatedTaskRunner = dedicatedTaskRunner; this.maxThreadPoolSize = maxThreadPoolSize; + this.virtualThreadTaskRunner = virtualThreadTaskRunner; } public void init() { @@ -90,7 +106,9 @@ public class TaskRunnerFactory implements Executor { synchronized(this) { //need to recheck if initDone is true under the lock if (!initDone.get()) { - if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) { + if (virtualThreadTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseVirtualThreadTaskRunner")) ) { + executorRef.compareAndSet(null, createVirtualThreadExecutor()); + } else if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) { executorRef.set(null); } else { executorRef.compareAndSet(null, createDefaultExecutor()); @@ -154,7 +172,11 @@ public class TaskRunnerFactory implements Executor { init(); ExecutorService executor = executorRef.get(); if (executor != null) { - return new PooledTaskRunner(executor, task, maxIterationsPerRun); + if(isVirtualThreadTaskRunner()) { + return createVirtualThreadTaskRunner(executor, task, maxIterationsPerRun); + } else { + return new PooledTaskRunner(executor, task, maxIterationsPerRun); + } } else { return new DedicatedTaskRunner(task, name, priority, daemon); } @@ -217,6 +239,48 @@ public class TaskRunnerFactory implements Executor { return rc; } + protected ExecutorService createVirtualThreadExecutor() { + if(!(Runtime.version().feature() >= 21)) { + LOG.error("Virtual Thread support requires JDK 21 or higher"); + throw new IllegalStateException("Virtual Thread support requires JDK 21 or higher"); + } + + try { + Class virtualThreadExecutorClass = Class.forName("org.apache.activemq.thread.VirtualThreadExecutor", false, threadClassLoader); + Method method = virtualThreadExecutorClass.getMethod("createVirtualThreadExecutorService", String.class, AtomicLong.class, Logger.class); + Object result = method.invoke(null, name, id, LOG); + if(!ExecutorService.class.isAssignableFrom(result.getClass())) { + throw new IllegalStateException("VirtualThreadExecutor not returned"); + } + LOG.info("VirtualThreadExecutor initialized name:{}", name); + return ExecutorService.class.cast(result); + } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException e) { + LOG.error("VirtualThreadExecutor class failed to load", e); + throw new IllegalStateException(e); + } + } + + protected TaskRunner createVirtualThreadTaskRunner(Executor executor, Task task, int maxIterations) { + if(!(Runtime.version().feature() >= 21)) { + LOG.error("Virtual Thread support requires JDK 21 or higher"); + throw new IllegalStateException("Virtual Thread support requires JDK 21 or higher"); + } + + try { + Class virtualThreadTaskRunnerClass = Class.forName("org.apache.activemq.thread.VirtualThreadTaskRunner", false, threadClassLoader); + Constructor constructor = virtualThreadTaskRunnerClass.getConstructor(Executor.class, Task.class, Integer.TYPE); + Object result = constructor.newInstance(executor, task, maxIterations); + if(!TaskRunner.class.isAssignableFrom(result.getClass())) { + throw new IllegalStateException("VirtualThreadTaskRunner not returned"); + } + return TaskRunner.class.cast(result); + } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException | InstantiationException | IllegalArgumentException e) { + LOG.error("VirtualThreadTaskRunner class failed to load", e); + throw new IllegalStateException(e); + } + } + + public ExecutorService getExecutor() { return executorRef.get(); } @@ -265,6 +329,14 @@ public class TaskRunnerFactory implements Executor { this.dedicatedTaskRunner = dedicatedTaskRunner; } + public boolean isVirtualThreadTaskRunner() { + return virtualThreadTaskRunner; + } + + public void setVirtualThreadTaskRunner(boolean virtualThreadTaskRunner) { + this.virtualThreadTaskRunner = virtualThreadTaskRunner; + } + public int getMaxThreadPoolSize() { return maxThreadPoolSize; } diff --git a/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java b/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java index 1a721f344a..81879a8f92 100644 --- a/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java +++ b/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java @@ -91,7 +91,7 @@ public class TaskRunnerTest { for (int i = 0; i < enqueueCount / workerCount; i++) { queue.incrementAndGet(); runner.wakeup(); - yield(); + this.yield(); } } catch (BrokenBarrierException e) { } catch (InterruptedException e) { diff --git a/assembly/pom.xml b/assembly/pom.xml index 5d5e5a721e..8442a815b0 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -41,6 +41,10 @@ ${project.groupId} activemq-client + + ${project.groupId} + activemq-client-jdk21 + org.apache.activemq activemq-openwire-legacy diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml index e128769961..47b895ffa1 100644 --- a/assembly/src/main/descriptors/common-bin.xml +++ b/assembly/src/main/descriptors/common-bin.xml @@ -158,6 +158,16 @@ 0644 0755 + + lib/jdk21 + false + runtime + + ${pom.groupId}:activemq-client-jdk21 + + 0644 + 0755 + lib/camel false diff --git a/pom.xml b/pom.xml index 79c8aefdf5..eae69738ef 100644 --- a/pom.xml +++ b/pom.xml @@ -301,6 +301,11 @@ activemq-client ${project.version} + + org.apache.activemq + activemq-client-jdk21 + ${project.version} + org.apache.activemq activemq-client @@ -1597,5 +1602,48 @@ -Xdoclint:none + + jdk-21 + + [21,) + + + activemq-openwire-generator + activemq-client + activemq-client-jdk21 + activemq-client-jdk21-test + activemq-openwire-legacy + activemq-broker + activemq-stomp + activemq-mqtt + activemq-amqp + activemq-kahadb-store + activemq-jdbc-store + activemq-unit-tests + activemq-all + activemq-console + activemq-jaas + activemq-jms-pool + activemq-pool + activemq-cf + activemq-ra + activemq-rar + activemq-run + activemq-shiro + activemq-spring + activemq-runtime-config + activemq-tooling + activemq-web + activemq-web-demo + activemq-web-console + activemq-karaf + activemq-osgi + activemq-blueprint + activemq-karaf-itest + assembly + activemq-log4j-appender + activemq-http + +