diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 528c2ec3c2..5ae2d286af 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -523,6 +523,7 @@ public abstract class BaseDestination implements Destination { */ @Override public void messageDelivered(ConnectionContext context, MessageReference messageReference) { + this.lastActiveTime = 0L; if (advisoryForDelivery) { broker.messageDelivered(context, messageReference); } @@ -777,7 +778,7 @@ public abstract class BaseDestination implements Destination { @Override public boolean canGC() { boolean result = false; - if (isGcIfInactive()&& this.lastActiveTime != 0l) { + if (isGcIfInactive() && this.lastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) { if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) { result = true; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java index d093693591..d75f8e3a04 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java @@ -17,12 +17,14 @@ package org.apache.activemq.broker.region; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; @@ -37,8 +39,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DestinationGCTest { + protected static final Logger logger = LoggerFactory.getLogger(DestinationGCTest.class); + private final ActiveMQQueue queue = new ActiveMQQueue("TEST"); private final ActiveMQQueue otherQueue = new ActiveMQQueue("TEST-OTHER"); @@ -137,4 +144,34 @@ public class DestinationGCTest { } })); } + + @Test(timeout = 60000) + public void testDestinationGcAnonymousProducer() throws Exception { + + final ActiveMQQueue q = new ActiveMQQueue("Q.TEST.ANONYMOUS.PRODUCER"); + + brokerService.getAdminView().addQueue(q.getPhysicalName()); + assertEquals(2, brokerService.getAdminView().getQueues().length); + + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + final Connection connection = factory.createConnection(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // wait for the queue to be marked for GC + logger.info("Waiting for '{}' to be marked for GC...", q); + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerService.getDestination(q).canGC(); + } + }, Wait.MAX_WAIT_MILLIS, 500L); + + // create anonymous producer and send a message + logger.info("Sending PERSISTENT message to QUEUE '{}'", q.getPhysicalName()); + final MessageProducer producer = session.createProducer(null); + producer.send(q, session.createTextMessage()); + producer.close(); + + assertFalse(brokerService.getDestination(q).canGC()); + } }