From ee31a92d23b3d8bf07fafcf76a54a9b55fc02548 Mon Sep 17 00:00:00 2001 From: brusdev Date: Fri, 26 Jul 2019 16:22:10 +0200 Subject: [PATCH] ARTEMIS-2392 Enable remove on cancel policy for scheduled pool By default, such a cancelled task is not automatically removed from the work queue until its delay elapses. It may cause unbounded retention of cancelled tasks. To avoid this, set remove on cancel policy to true. --- .../core/server/ActiveMQServerLogger.java | 4 ++ .../core/server/impl/ActiveMQServerImpl.java | 10 ++- .../server/impl/ActiveMQServerImplTest.java | 63 +++++++++++++++++++ 3 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index e37e2bf5b1..8954839a07 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -2021,4 +2021,8 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 224100, value = "Timed out waiting for large messages deletion with IDs {0}, might not be deleted if broker crashes atm", format = Message.Format.MESSAGE_FORMAT) void timedOutWaitingForLargeMessagesDeletion(List largeMessageIds); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 224101, value = "Apache ActiveMQ Artemis is using a scheduled pool without remove on cancel policy, so a cancelled task could be not automatically removed from the work queue, it may also cause unbounded retention of cancelled tasks.", format = Message.Format.MESSAGE_FORMAT) + void scheduledPoolWithNoRemoveOnCancelPolicy(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index b5c7054380..9b5c3cb188 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2707,10 +2707,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { return new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", false, ClientSessionFactoryImpl.class.getClassLoader()); } }); - scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory); + + ScheduledThreadPoolExecutor scheduledPoolExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory); + scheduledPoolExecutor.setRemoveOnCancelPolicy(true); + scheduledPool = scheduledPoolExecutor; } else { this.scheduledPoolSupplied = true; this.scheduledPool = serviceRegistry.getScheduledExecutorService(); + + if (!(scheduledPool instanceof ScheduledThreadPoolExecutor) || + !((ScheduledThreadPoolExecutor)scheduledPool).getRemoveOnCancelPolicy()) { + ActiveMQServerLogger.LOGGER.scheduledPoolWithNoRemoveOnCancelPolicy(); + } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java new file mode 100644 index 0000000000..e443478ab0 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.lang.ref.WeakReference; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class ActiveMQServerImplTest extends ActiveMQTestBase { + + @Test + public void testScheduledPoolGC() throws Exception { + ActiveMQServer server = createServer(false); + + server.start(); + + Runnable scheduledRunnable = new Runnable() { + @Override + public void run() { + Assert.fail(); + } + }; + WeakReference scheduledRunnableRef = new WeakReference<>(scheduledRunnable); + + ScheduledExecutorService scheduledPool = server.getScheduledPool(); + ScheduledFuture scheduledFuture = scheduledPool.schedule(scheduledRunnable, 5000, TimeUnit.MILLISECONDS); + + Assert.assertFalse(scheduledFuture.isCancelled()); + Assert.assertTrue(scheduledFuture.cancel(true)); + Assert.assertTrue(scheduledFuture.isCancelled()); + + Assert.assertNotEquals(null, scheduledRunnableRef.get()); + + scheduledRunnable = null; + + forceGC(); + + Assert.assertEquals(null, scheduledRunnableRef.get()); + + server.stop(); + } + +}