diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index e9432af146..af99d413a3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -32,8 +32,11 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; @@ -93,6 +96,7 @@ public abstract class BaseDestination implements Destination { private boolean gcWithNetworkConsumers; private long lastActiveTime=0l; private boolean reduceMemoryFootprint = false; + protected final Scheduler scheduler; /** * @param brokerService @@ -113,6 +117,7 @@ public abstract class BaseDestination implements Destination { this.memoryUsage = this.systemUsage.getMemoryUsage(); this.memoryUsage.setUsagePortion(1.0f); this.regionBroker = brokerService.getRegionBroker(); + this.scheduler = brokerService.getBroker().getScheduler(); } /** @@ -707,4 +712,12 @@ public abstract class BaseDestination implements Destination { } return hasRegularConsumers; } + + protected ConnectionContext createConnectionContext() { + ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); + answer.setBroker(this.broker); + answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); + answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); + return answer; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 4fb89ef538..dd335252e3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -110,7 +110,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us try { this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName()); } catch (IOException e) { - JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e); + JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store "+ e); jmsEx.setLinkedException(e); throw jmsEx; } @@ -228,6 +228,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us super.dispatchPending(); } } + + public void removePending(MessageReference node) throws IOException { + pending.remove(node); + } protected void doAddRecoveredMessage(MessageReference message) throws Exception { synchronized(pending) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 7444d51e96..978c79db3f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -126,7 +126,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { private final Object iteratingMutex = new Object() { }; - private final Scheduler scheduler; + class TimeoutMessage implements Delayed { @@ -210,7 +210,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { super(brokerService, store, destination, parentStats); this.taskFactory = taskFactory; this.dispatchSelector = new QueueDispatchSelector(destination); - this.scheduler = brokerService.getBroker().getScheduler(); } public List getConsumers() { @@ -1615,14 +1614,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - protected ConnectionContext createConnectionContext() { - ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); - answer.setBroker(this.broker); - answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); - answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); - return answer; - } - final void sendMessage(final Message msg) throws Exception { messagesLock.writeLock().lock(); try{ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 630015aff3..5198fae01d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -510,6 +510,10 @@ public class Topic extends BaseDestination implements Task { memoryUsage.start(); } + if (getExpireMessagesPeriod() > 0) { + scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod()); + } + } public void stop() throws Exception { @@ -523,14 +527,22 @@ public class Topic extends BaseDestination implements Task { if (this.topicStore != null) { this.topicStore.stop(); } + + scheduler.cancel(expireMessagesTask); } public Message[] browse() { + final ConnectionContext connectionContext = createConnectionContext(); final Set result = new CopyOnWriteArraySet(); try { if (topicStore != null) { topicStore.recover(new MessageRecoveryListener() { public boolean recoverMessage(Message message) throws Exception { + if (message.isExpired()) { + for (Subscription sub : durableSubcribers.values()) { + messageExpired(connectionContext, sub, message); + } + } result.add(message); return true; } @@ -640,6 +652,12 @@ public class Topic extends BaseDestination implements Task { } } + private final Runnable expireMessagesTask = new Runnable() { + public void run() { + browse(); + } + }; + public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { broker.messageExpired(context, reference, subs); // AMQ-2586: Better to leave this stat at zero than to give the user @@ -652,8 +670,11 @@ public class Topic extends BaseDestination implements Task { ack.setDestination(destination); ack.setMessageID(reference.getMessageId()); try { + if (subs instanceof DurableTopicSubscription) { + ((DurableTopicSubscription)subs).removePending(reference); + } acknowledge(context, subs, ack, reference); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Failed to remove expired Message from the store ", e); } } @@ -663,4 +684,5 @@ public class Topic extends BaseDestination implements Task { return LOG; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 541cbe05dc..842dbd2988 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -276,8 +276,8 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { @Override public synchronized void remove(MessageReference node) { - if (currentCursor != null) { - currentCursor.remove(node); + for (PendingMessageCursor tsp : storePrefetches) { + tsp.remove(node); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index 23e493a6e4..70e256e4a0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -20,12 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; +import javax.jms.*; import javax.management.ObjectName; import junit.framework.Test; @@ -41,6 +36,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -401,6 +397,71 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { } + public void testExpireMessagesForDurableSubscriber() throws Exception { + createBroker(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connection = factory.createConnection(); + connection.setClientID("myConnection"); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + Topic destination = session.createTopic("test"); + producer = session.createProducer(destination); + final int ttl = 300; + producer.setTimeToLive(ttl); + + final long sendCount = 10; + + TopicSubscriber sub = session.createDurableSubscriber(destination, "mySub"); + sub.close(); + + for (int i=0; i < sendCount; i++) { + producer.send(session.createTextMessage("test")); + } + + DestinationViewMBean view = createView((ActiveMQTopic)destination); + + + LOG.info("messages sent"); + LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount()); + assertEquals(0, view.getExpiredCount()); + assertEquals(10, view.getEnqueueCount()); + + + Thread.sleep(4000); + + LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount()); + assertEquals(10, view.getExpiredCount()); + assertEquals(0, view.getEnqueueCount()); + + + final AtomicLong received = new AtomicLong(); + sub = session.createDurableSubscriber(destination, "mySub"); + sub.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.incrementAndGet(); + } + }); + + LOG.info("Waiting for messages to arrive"); + + + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return received.get() >= sendCount; + } + }, 1000); + + LOG.info("received=" + received.get()); + LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount()); + + assertEquals(0, received.get()); + assertEquals(10, view.getExpiredCount()); + assertEquals(0, view.getEnqueueCount()); + + } + + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { String domain = "org.apache.activemq";