diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 07b2875c92..3790f365ac 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; @@ -85,6 +87,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH private final boolean useCoreSubscriptionNaming; + private boolean isSchedulingCancelled; + private ScheduledFuture scheduledFuture; + private final Object schedulingLock = new Object(); + public AMQPConnectionContext(ProtonProtocolManager protocolManager, AMQPConnectionCallback connectionSP, String containerId, @@ -111,6 +117,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH this.connectionProperties.putAll(connectionProperties); } + this.scheduledFuture = null; + this.isSchedulingCancelled = false; this.scheduledPool = scheduledPool; connectionCallback.setConnection(this); EventLoop nettyExecutor; @@ -183,6 +191,17 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH } public void close(ErrorCondition errorCondition) { + synchronized (schedulingLock) { + isSchedulingCancelled = true; + + if (scheduledPool != null && scheduledPool instanceof ThreadPoolExecutor && + scheduledFuture != null && scheduledFuture instanceof Runnable) { + if (!((ThreadPoolExecutor) scheduledPool).remove((Runnable) scheduledFuture)) { + log.warn("Scheduled task can't be removed from scheduledPool."); + } + } + } + handler.close(errorCondition, this); } @@ -389,13 +408,15 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH initialise(); /* - * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections - * but its here in case we add support for outbound connections. - * */ + * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections + * but its here in case we add support for outbound connections. + * */ if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { long nextKeepAliveTime = handler.tick(true); - if (nextKeepAliveTime != 0 && scheduledPool != null) { - scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); + synchronized (schedulingLock) { + if (nextKeepAliveTime != 0 && scheduledPool != null && !isSchedulingCancelled) { + scheduledFuture = scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); + } } } } @@ -411,11 +432,16 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void run() { Long rescheduleAt = handler.tick(false); - if (rescheduleAt == null) { - // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. - scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS); - } else if (rescheduleAt != 0) { - scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); + + synchronized (schedulingLock) { + if (!isSchedulingCancelled) { + if (rescheduleAt == null) { + // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. + scheduledFuture = scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS); + } else if (rescheduleAt != 0) { + scheduledFuture = scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); + } + } } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java new file mode 100644 index 0000000000..8bdda7ec75 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContextTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.proton; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.EventLoop; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.qpid.proton.engine.Connection; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.HashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +public class AMQPConnectionContextTest { + + @Test + public void testLeakAfterClose() throws Exception { + ArtemisExecutor executor = Mockito.mock(ArtemisExecutor.class); + ExecutorFactory executorFactory = Mockito.mock(ExecutorFactory.class); + Mockito.when(executorFactory.getExecutor()).thenReturn(executor); + + ActiveMQServer server = Mockito.mock(ActiveMQServer.class); + Mockito.when(server.getExecutorFactory()).thenReturn(executorFactory); + + ProtonProtocolManager manager = Mockito.mock(ProtonProtocolManager.class); + Mockito.when(manager.getServer()).thenReturn(server); + + EventLoop eventLoop = Mockito.mock(EventLoop.class); + Channel transportChannel = Mockito.mock(Channel.class); + Mockito.when(transportChannel.config()).thenReturn(Mockito.mock(ChannelConfig.class)); + Mockito.when(transportChannel.eventLoop()).thenReturn(eventLoop); + Mockito.when(eventLoop.inEventLoop()).thenReturn(true); + NettyConnection transportConnection = new NettyConnection(new HashMap<>(), transportChannel, null, false, false); + + Connection connection = Mockito.mock(Connection.class); + AMQPConnectionCallback protonSPI = Mockito.mock(AMQPConnectionCallback.class); + Mockito.when(protonSPI.getTransportConnection()).thenReturn(transportConnection); + Mockito.when(protonSPI.validateConnection(connection, null)).thenReturn(true); + + ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor( + ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize()); + + AMQPConnectionContext connectionContext = new AMQPConnectionContext( + manager, + protonSPI, + null, + (int) ActiveMQClient.DEFAULT_CONNECTION_TTL, + manager.getMaxFrameSize(), + AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, + false, + scheduledPool, + false, + null, + null); + + connectionContext.onRemoteOpen(connection); + + connectionContext.close(null); + + Assert.assertEquals(0, scheduledPool.getTaskCount()); + } +}