From 1716655214f1605cf392f1be8862c7ff61785f64 Mon Sep 17 00:00:00 2001 From: brusdev Date: Wed, 15 Jan 2020 09:56:21 +0100 Subject: [PATCH] ARTEMIS-2597 Memory Leak when closing AMQP Consumers in the context Remove server senders on remote link close. --- .../amqp/proton/AMQPSessionContext.java | 15 ++++++- .../proton/ProtonServerSenderContext.java | 5 +++ .../integration/client/ConsumerTest.java | 44 +++++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index e57acecce7..671834ef1d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -86,6 +86,8 @@ public class AMQPSessionContext extends ProtonInitializable { public void disconnect(Object consumer, String queueName) { ProtonServerSenderContext protonConsumer = senders.remove(consumer); if (protonConsumer != null) { + serverSenders.remove(protonConsumer.getBrokerConsumer()); + try { protonConsumer.close(false); } catch (ActiveMQAMQPException e) { @@ -132,6 +134,7 @@ public class AMQPSessionContext extends ProtonInitializable { } } senders.clear(); + serverSenders.clear(); try { if (sessionSPI != null) { sessionSPI.close(); @@ -178,6 +181,9 @@ public class AMQPSessionContext extends ProtonInitializable { protonSender.start(); } catch (ActiveMQAMQPException e) { senders.remove(sender); + if (protonSender.getBrokerConsumer() != null) { + serverSenders.remove(protonSender.getBrokerConsumer()); + } sender.setSource(null); sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); connection.runNow(() -> { @@ -188,7 +194,6 @@ public class AMQPSessionContext extends ProtonInitializable { } public void removeSender(Sender sender) throws ActiveMQAMQPException { - senders.remove(sender); ProtonServerSenderContext senderRemoved = senders.remove(sender); if (senderRemoved != null) { serverSenders.remove(senderRemoved.getBrokerConsumer()); @@ -217,4 +222,12 @@ public class AMQPSessionContext extends ProtonInitializable { }); } } + + public int getReceiverCount() { + return receivers.size(); + } + + public int getSenderCount() { + return senders.size(); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 40d0412ab6..3bb9b4a995 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -558,6 +558,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { try { closed = true; + protonSession.removeSender(sender); sessionSPI.closeSender(brokerConsumer); // if this is a link close rather than a connection close or detach, we need to delete // any durable resources for say pub subs @@ -899,4 +900,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sender.drained(); connection.flush(); } + + public AMQPSessionContext getSessionContext() { + return protonSession; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 96c66dfe14..9cdf38a37c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -20,6 +20,8 @@ import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; +import javax.jms.JMSConsumer; +import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageConsumer; @@ -59,7 +61,11 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; @@ -315,6 +321,44 @@ public class ConsumerTest extends ActiveMQTestBase { } } + @Test + public void testContextOnConsumerAMQP() throws Throwable { + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue"))); + + ConnectionFactory factory = createFactory(2); + JMSContext context = factory.createContext("admin", "admin", Session.AUTO_ACKNOWLEDGE); + + try { + javax.jms.Queue queue = context.createQueue("queue"); + + JMSConsumer consumer = context.createConsumer(queue); + + ServerConsumer serverConsumer = null; + for (ServerSession session : server.getSessions()) { + for (ServerConsumer sessionConsumer : session.getServerConsumers()) { + serverConsumer = sessionConsumer; + } + } + + consumer.close(); + + Assert.assertTrue(serverConsumer.getProtocolContext() instanceof ProtonServerSenderContext); + + final AMQPSessionContext sessionContext = ((ProtonServerSenderContext) + serverConsumer.getProtocolContext()).getSessionContext(); + + Wait.assertEquals(0, () -> sessionContext.getSenderCount(), 1000, 10); + } finally { + context.stop(); + context.close(); + } + } + @Test public void testAutoDeleteAutoCreatedAddressAndQueue() throws Throwable { if (!isNetty()) {