diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 61d29333bc..010a7aa394 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.reader.MessageUtil; @@ -82,10 +83,13 @@ public class AMQConsumer { public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); + boolean preAck = false; if (info.isNoLocal()) { if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) { //tell the connection to add the property this.session.getConnection().setNoLocal(true); + } else { + preAck = true; } String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.getId().getConnectionId() + "'"; if (selector == null) { @@ -110,6 +114,8 @@ public class AMQConsumer { serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); + //only advisory topic consumers need this. + ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck); } else { SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName())); try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index a68516368a..98015e19d7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -137,7 +137,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private final SessionCallback callback; - private final boolean preAcknowledge; + private boolean preAcknowledge; private final ManagementService managementService; @@ -1139,6 +1139,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } }; + public void setPreAcknowledge(boolean preAcknowledge) { + this.preAcknowledge = preAcknowledge; + } + /** * Internal encapsulation of the logic on sending LargeMessages. * This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index f0fc8a6b23..dac67fccb6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -60,6 +60,7 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; @@ -1585,6 +1586,47 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { assertNull(transaction); } + @Test + public void testTempQueueLeak() throws Exception { + final Connection[] connections = new Connection[20]; + + try { + for (int i = 0; i < connections.length; i++) { + connections[i] = factory.createConnection(); + connections[i].start(); + } + + Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i = 0; i < connections.length; i++) { + TemporaryQueue temporaryQueue = session.createTemporaryQueue(); + temporaryQueue.delete(); + } + + Object[] addressResources = server.getManagementService().getResources(AddressControl.class); + AddressControl addressControl = null; + + for (Object addressResource : addressResources) { + + if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) { + addressControl = (AddressControl) addressResource; + } + } + + assertNotNull("addressControl for temp advisory", addressControl); + + //sleep a bit to allow message count to go down. + Thread.sleep(50); + assertEquals(0, addressControl.getMessageCount()); + } finally { + for (Connection conn : connections) { + if (conn != null) { + conn.close(); + } + } + } + } + private void checkQueueEmpty(String qName) { PostOffice po = server.getPostOffice(); LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));