ARTEMIS-2493 OpenWire session close doesn't cleanup consumer refs

When an openwire client closes the session, the broker doesn't
clean up its server consumer references even though the core
consumers are closed. This results a leak when sessions within
a connection are created and closed when the connection keeps open.
This commit is contained in:
Howard Gao 2019-09-17 09:44:25 +08:00
parent 0e876c8100
commit 05a93314cd
3 changed files with 51 additions and 10 deletions

View File

@ -1178,16 +1178,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
// Don't let new consumers or producers get added while we are closing // Don't let new consumers or producers get added while we are closing
// this down. // this down.
session.shutdown(); session.shutdown();
// Cascade the connection stop producers.
// we don't stop consumer because in core
// closing the session will do the job
for (ProducerId producerId : session.getProducerIds()) { for (ProducerId producerId : session.getProducerIds()) {
try {
processRemoveProducer(producerId); processRemoveProducer(producerId);
} catch (Throwable e) {
// LOG.warn("Failed to remove producer: {}", producerId, e);
} }
for (ConsumerId consumerId : session.getConsumerIds()) {
processRemoveConsumer(consumerId, lastDeliveredSequenceId);
} }
state.removeSession(id); state.removeSession(id);
propagateLastSequenceId(session, lastDeliveredSequenceId); propagateLastSequenceId(session, lastDeliveredSequenceId);
removeSession(context, session.getInfo()); removeSession(context, session.getInfo());

View File

@ -682,4 +682,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
} }
return mappedDestination; return mappedDestination;
} }
public List<OpenWireConnection> getConnections() {
return connections;
}
} }

View File

@ -47,6 +47,7 @@ import javax.jms.XASession;
import javax.transaction.xa.XAException; import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
@ -64,12 +65,17 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.state.SessionState;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
@ -244,8 +250,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
Connection connection = factory.createConnection(); Connection connection = factory.createConnection();
Collection<Session> sessions = new LinkedList<>();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName); Queue queue = session.createQueue(queueName);
System.out.println("Queue:" + queue); System.out.println("Queue:" + queue);
@ -270,6 +274,40 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
} }
} }
@Test
public void testSessionCloseWithOpenConnection() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
session.createConsumer(queue);
session.createConsumer(queue);
connection.start();
Field infoField = ActiveMQSession.class.getDeclaredField("info");
infoField.setAccessible(true);
SessionInfo info = (SessionInfo) infoField.get(session);
NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("netty");
OpenWireProtocolManager protocolManager = (OpenWireProtocolManager) acceptor.getProtocolMap().get("OPENWIRE");
List<OpenWireConnection> connections = protocolManager.getConnections();
assertEquals(1, connections.size());
OpenWireConnection conn = connections.get(0);
SessionState sessionState = conn.getState().getSessionState(info.getSessionId());
Wait.assertEquals(2, sessionState.getConsumerIds()::size, 5000);
session.close();
Wait.assertEquals(0, sessionState.getConsumerIds()::size, 5000);
}
}
@Test @Test
public void testRollback() throws Exception { public void testRollback() throws Exception {
try (Connection connection = factory.createConnection()) { try (Connection connection = factory.createConnection()) {