From e4fb722ad8480dd12c037230706ef97d9a57f801 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 13 Sep 2017 20:50:56 +0800 Subject: [PATCH] ARTEMIS-1419 OpenWire advisory message never deleted By default, every openwire connection will create a queue under the multicast address ActiveMQ.Advisory.TempQueue. If a openwire client is create temporary queues these queues will fill up with messages for as long as the associated openwire connection is alive. It appears these messages do not get consumed from the queues. The reason behind is that advisory messages don't require acknowledgement so the messages stay at the queue. --- .../protocol/openwire/amq/AMQConsumer.java | 6 +++ .../core/server/impl/ServerConsumerImpl.java | 6 ++- .../openwire/SimpleOpenWireTest.java | 42 +++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 61d29333bc..010a7aa394 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.reader.MessageUtil; @@ -82,10 +83,13 @@ public class AMQConsumer { public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); + boolean preAck = false; if (info.isNoLocal()) { if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) { //tell the connection to add the property this.session.getConnection().setNoLocal(true); + } else { + preAck = true; } String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.getId().getConnectionId() + "'"; if (selector == null) { @@ -110,6 +114,8 @@ public class AMQConsumer { serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); + //only advisory topic consumers need this. + ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck); } else { SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName())); try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index a68516368a..98015e19d7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -137,7 +137,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private final SessionCallback callback; - private final boolean preAcknowledge; + private boolean preAcknowledge; private final ManagementService managementService; @@ -1139,6 +1139,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } }; + public void setPreAcknowledge(boolean preAcknowledge) { + this.preAcknowledge = preAcknowledge; + } + /** * Internal encapsulation of the logic on sending LargeMessages. * This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index f0fc8a6b23..dac67fccb6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -60,6 +60,7 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; @@ -1585,6 +1586,47 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { assertNull(transaction); } + @Test + public void testTempQueueLeak() throws Exception { + final Connection[] connections = new Connection[20]; + + try { + for (int i = 0; i < connections.length; i++) { + connections[i] = factory.createConnection(); + connections[i].start(); + } + + Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i = 0; i < connections.length; i++) { + TemporaryQueue temporaryQueue = session.createTemporaryQueue(); + temporaryQueue.delete(); + } + + Object[] addressResources = server.getManagementService().getResources(AddressControl.class); + AddressControl addressControl = null; + + for (Object addressResource : addressResources) { + + if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) { + addressControl = (AddressControl) addressResource; + } + } + + assertNotNull("addressControl for temp advisory", addressControl); + + //sleep a bit to allow message count to go down. + Thread.sleep(50); + assertEquals(0, addressControl.getMessageCount()); + } finally { + for (Connection conn : connections) { + if (conn != null) { + conn.close(); + } + } + } + } + private void checkQueueEmpty(String qName) { PostOffice po = server.getPostOffice(); LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));