ARTEMIS-2392 Enable remove on cancel policy for scheduled pool

By default, such a cancelled task is not automatically removed from the
work queue until its delay elapses. It may cause unbounded retention of
cancelled tasks. To avoid this, set remove on cancel policy to true.
This commit is contained in:
brusdev 2019-07-26 16:22:10 +02:00 committed by Clebert Suconic
parent a848572292
commit ee31a92d23
3 changed files with 76 additions and 1 deletions

View File

@ -2021,4 +2021,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 224100, value = "Timed out waiting for large messages deletion with IDs {0}, might not be deleted if broker crashes atm", @Message(id = 224100, value = "Timed out waiting for large messages deletion with IDs {0}, might not be deleted if broker crashes atm",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void timedOutWaitingForLargeMessagesDeletion(List<Long> largeMessageIds); void timedOutWaitingForLargeMessagesDeletion(List<Long> largeMessageIds);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224101, value = "Apache ActiveMQ Artemis is using a scheduled pool without remove on cancel policy, so a cancelled task could be not automatically removed from the work queue, it may also cause unbounded retention of cancelled tasks.", format = Message.Format.MESSAGE_FORMAT)
void scheduledPoolWithNoRemoveOnCancelPolicy();
} }

View File

@ -2707,10 +2707,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", false, ClientSessionFactoryImpl.class.getClassLoader()); return new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", false, ClientSessionFactoryImpl.class.getClassLoader());
} }
}); });
scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory);
ScheduledThreadPoolExecutor scheduledPoolExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory);
scheduledPoolExecutor.setRemoveOnCancelPolicy(true);
scheduledPool = scheduledPoolExecutor;
} else { } else {
this.scheduledPoolSupplied = true; this.scheduledPoolSupplied = true;
this.scheduledPool = serviceRegistry.getScheduledExecutorService(); this.scheduledPool = serviceRegistry.getScheduledExecutorService();
if (!(scheduledPool instanceof ScheduledThreadPoolExecutor) ||
!((ScheduledThreadPoolExecutor)scheduledPool).getRemoveOnCancelPolicy()) {
ActiveMQServerLogger.LOGGER.scheduledPoolWithNoRemoveOnCancelPolicy();
}
} }
} }

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.lang.ref.WeakReference;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
public class ActiveMQServerImplTest extends ActiveMQTestBase {
@Test
public void testScheduledPoolGC() throws Exception {
ActiveMQServer server = createServer(false);
server.start();
Runnable scheduledRunnable = new Runnable() {
@Override
public void run() {
Assert.fail();
}
};
WeakReference<Runnable> scheduledRunnableRef = new WeakReference<>(scheduledRunnable);
ScheduledExecutorService scheduledPool = server.getScheduledPool();
ScheduledFuture scheduledFuture = scheduledPool.schedule(scheduledRunnable, 5000, TimeUnit.MILLISECONDS);
Assert.assertFalse(scheduledFuture.isCancelled());
Assert.assertTrue(scheduledFuture.cancel(true));
Assert.assertTrue(scheduledFuture.isCancelled());
Assert.assertNotEquals(null, scheduledRunnableRef.get());
scheduledRunnable = null;
forceGC();
Assert.assertEquals(null, scheduledRunnableRef.get());
server.stop();
}
}