diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index e37e2bf5b1..8954839a07 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -2021,4 +2021,8 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 224100, value = "Timed out waiting for large messages deletion with IDs {0}, might not be deleted if broker crashes atm", format = Message.Format.MESSAGE_FORMAT) void timedOutWaitingForLargeMessagesDeletion(List largeMessageIds); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 224101, value = "Apache ActiveMQ Artemis is using a scheduled pool without remove on cancel policy, so a cancelled task could be not automatically removed from the work queue, it may also cause unbounded retention of cancelled tasks.", format = Message.Format.MESSAGE_FORMAT) + void scheduledPoolWithNoRemoveOnCancelPolicy(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index b5c7054380..9b5c3cb188 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2707,10 +2707,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { return new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", false, ClientSessionFactoryImpl.class.getClassLoader()); } }); - scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory); + + ScheduledThreadPoolExecutor scheduledPoolExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory); + scheduledPoolExecutor.setRemoveOnCancelPolicy(true); + scheduledPool = scheduledPoolExecutor; } else { this.scheduledPoolSupplied = true; this.scheduledPool = serviceRegistry.getScheduledExecutorService(); + + if (!(scheduledPool instanceof ScheduledThreadPoolExecutor) || + !((ScheduledThreadPoolExecutor)scheduledPool).getRemoveOnCancelPolicy()) { + ActiveMQServerLogger.LOGGER.scheduledPoolWithNoRemoveOnCancelPolicy(); + } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java new file mode 100644 index 0000000000..e443478ab0 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.lang.ref.WeakReference; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class ActiveMQServerImplTest extends ActiveMQTestBase { + + @Test + public void testScheduledPoolGC() throws Exception { + ActiveMQServer server = createServer(false); + + server.start(); + + Runnable scheduledRunnable = new Runnable() { + @Override + public void run() { + Assert.fail(); + } + }; + WeakReference scheduledRunnableRef = new WeakReference<>(scheduledRunnable); + + ScheduledExecutorService scheduledPool = server.getScheduledPool(); + ScheduledFuture scheduledFuture = scheduledPool.schedule(scheduledRunnable, 5000, TimeUnit.MILLISECONDS); + + Assert.assertFalse(scheduledFuture.isCancelled()); + Assert.assertTrue(scheduledFuture.cancel(true)); + Assert.assertTrue(scheduledFuture.isCancelled()); + + Assert.assertNotEquals(null, scheduledRunnableRef.get()); + + scheduledRunnable = null; + + forceGC(); + + Assert.assertEquals(null, scheduledRunnableRef.get()); + + server.stop(); + } + +}