ARTEMIS-2002 Proton transport objects leaked
Remove scheduled tasks when a client disconnects to allow garbage collector to delete the unused proton objects. Add a the unity test AMQPConnectionContextTest to check leaks after close.
This commit is contained in:
parent
55e59febf3
commit
74c79625b8
|
@ -23,6 +23,8 @@ import java.util.Map;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -85,6 +87,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
|
||||
private final boolean useCoreSubscriptionNaming;
|
||||
|
||||
private boolean isSchedulingCancelled;
|
||||
private ScheduledFuture scheduledFuture;
|
||||
private final Object schedulingLock = new Object();
|
||||
|
||||
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
|
||||
AMQPConnectionCallback connectionSP,
|
||||
String containerId,
|
||||
|
@ -111,6 +117,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
this.connectionProperties.putAll(connectionProperties);
|
||||
}
|
||||
|
||||
this.scheduledFuture = null;
|
||||
this.isSchedulingCancelled = false;
|
||||
this.scheduledPool = scheduledPool;
|
||||
connectionCallback.setConnection(this);
|
||||
EventLoop nettyExecutor;
|
||||
|
@ -183,6 +191,17 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
}
|
||||
|
||||
public void close(ErrorCondition errorCondition) {
|
||||
synchronized (schedulingLock) {
|
||||
isSchedulingCancelled = true;
|
||||
|
||||
if (scheduledPool != null && scheduledPool instanceof ThreadPoolExecutor &&
|
||||
scheduledFuture != null && scheduledFuture instanceof Runnable) {
|
||||
if (!((ThreadPoolExecutor) scheduledPool).remove((Runnable) scheduledFuture)) {
|
||||
log.warn("Scheduled task can't be removed from scheduledPool.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
handler.close(errorCondition, this);
|
||||
}
|
||||
|
||||
|
@ -389,13 +408,15 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
initialise();
|
||||
|
||||
/*
|
||||
* This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
|
||||
* but its here in case we add support for outbound connections.
|
||||
* */
|
||||
* This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
|
||||
* but its here in case we add support for outbound connections.
|
||||
* */
|
||||
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
|
||||
long nextKeepAliveTime = handler.tick(true);
|
||||
if (nextKeepAliveTime != 0 && scheduledPool != null) {
|
||||
scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
|
||||
synchronized (schedulingLock) {
|
||||
if (nextKeepAliveTime != 0 && scheduledPool != null && !isSchedulingCancelled) {
|
||||
scheduledFuture = scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -411,11 +432,16 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
@Override
|
||||
public void run() {
|
||||
Long rescheduleAt = handler.tick(false);
|
||||
if (rescheduleAt == null) {
|
||||
// this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
|
||||
scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS);
|
||||
} else if (rescheduleAt != 0) {
|
||||
scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
|
||||
|
||||
synchronized (schedulingLock) {
|
||||
if (!isSchedulingCancelled) {
|
||||
if (rescheduleAt == null) {
|
||||
// this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
|
||||
scheduledFuture = scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS);
|
||||
} else if (rescheduleAt != 0) {
|
||||
scheduledFuture = scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.proton;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.EventLoop;
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.qpid.proton.engine.Connection;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
||||
public class AMQPConnectionContextTest {
|
||||
|
||||
@Test
|
||||
public void testLeakAfterClose() throws Exception {
|
||||
ArtemisExecutor executor = Mockito.mock(ArtemisExecutor.class);
|
||||
ExecutorFactory executorFactory = Mockito.mock(ExecutorFactory.class);
|
||||
Mockito.when(executorFactory.getExecutor()).thenReturn(executor);
|
||||
|
||||
ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
|
||||
Mockito.when(server.getExecutorFactory()).thenReturn(executorFactory);
|
||||
|
||||
ProtonProtocolManager manager = Mockito.mock(ProtonProtocolManager.class);
|
||||
Mockito.when(manager.getServer()).thenReturn(server);
|
||||
|
||||
EventLoop eventLoop = Mockito.mock(EventLoop.class);
|
||||
Channel transportChannel = Mockito.mock(Channel.class);
|
||||
Mockito.when(transportChannel.config()).thenReturn(Mockito.mock(ChannelConfig.class));
|
||||
Mockito.when(transportChannel.eventLoop()).thenReturn(eventLoop);
|
||||
Mockito.when(eventLoop.inEventLoop()).thenReturn(true);
|
||||
NettyConnection transportConnection = new NettyConnection(new HashMap<>(), transportChannel, null, false, false);
|
||||
|
||||
Connection connection = Mockito.mock(Connection.class);
|
||||
AMQPConnectionCallback protonSPI = Mockito.mock(AMQPConnectionCallback.class);
|
||||
Mockito.when(protonSPI.getTransportConnection()).thenReturn(transportConnection);
|
||||
Mockito.when(protonSPI.validateConnection(connection, null)).thenReturn(true);
|
||||
|
||||
ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(
|
||||
ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize());
|
||||
|
||||
AMQPConnectionContext connectionContext = new AMQPConnectionContext(
|
||||
manager,
|
||||
protonSPI,
|
||||
null,
|
||||
(int) ActiveMQClient.DEFAULT_CONNECTION_TTL,
|
||||
manager.getMaxFrameSize(),
|
||||
AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
|
||||
false,
|
||||
scheduledPool,
|
||||
false,
|
||||
null,
|
||||
null);
|
||||
|
||||
connectionContext.onRemoteOpen(connection);
|
||||
|
||||
connectionContext.close(null);
|
||||
|
||||
Assert.assertEquals(0, scheduledPool.getTaskCount());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue