diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 0c7044f3ca..1c1787535a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -203,6 +203,7 @@ public class BrokerService implements Service { private final AtomicBoolean preShutdownHooksInvoked = new AtomicBoolean(false); private BrokerPlugin[] plugins; private boolean keepDurableSubsActive = true; + private boolean enableMessageExpirationOnActiveDurableSubs = false; private boolean useVirtualTopics = true; private boolean useMirroredQueues = false; private boolean useTempMirroredQueues = true; @@ -1729,6 +1730,14 @@ public class BrokerService implements Service { public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { this.keepDurableSubsActive = keepDurableSubsActive; } + + public boolean isEnableMessageExpirationOnActiveDurableSubs() { + return enableMessageExpirationOnActiveDurableSubs; + } + + public void setEnableMessageExpirationOnActiveDurableSubs(boolean enableMessageExpirationOnActiveDurableSubs) { + this.enableMessageExpirationOnActiveDurableSubs = enableMessageExpirationOnActiveDurableSubs; + } public boolean isUseVirtualTopics() { return useVirtualTopics; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 6a5c599023..e58da626a9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -58,6 +58,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us private final ConcurrentMap durableDestinations = new ConcurrentHashMap(); private final SubscriptionKey subscriptionKey; private boolean keepDurableSubsActive; + private boolean enableMessageExpirationOnActiveDurableSubs; private final AtomicBoolean active = new AtomicBoolean(); private final AtomicLong offlineTimestamp = new AtomicLong(-1); private final HashSet ackedAndPrepared = new HashSet(); @@ -69,6 +70,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us this.pending.setSystemUsage(usageManager); this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); this.keepDurableSubsActive = keepDurableSubsActive; + this.enableMessageExpirationOnActiveDurableSubs = broker.getBrokerService().isEnableMessageExpirationOnActiveDurableSubs(); subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); } @@ -429,4 +431,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us public boolean isKeepDurableSubsActive() { return keepDurableSubsActive; } + + public boolean isEnableMessageExpirationOnActiveDurableSubs() { + return enableMessageExpirationOnActiveDurableSubs; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 7df0138d8c..7210fa8b20 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -686,7 +686,7 @@ public class Topic extends BaseDestination implements Task { final ConnectionContext connectionContext = createConnectionContext(); for (Message message : toExpire) { for (DurableTopicSubscription sub : durableSubscribers.values()) { - if (!sub.isActive()) { + if (!sub.isActive() || sub.isEnableMessageExpirationOnActiveDurableSubs()) { message.setRegionDestination(this); messageExpired(connectionContext, sub, message); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java new file mode 100644 index 0000000000..69c734e270 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport.PersistenceAdapterChoice; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.MessageId; +import org.junit.Test; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class ActiveDurableSubscriptionBrowseExpireTest extends DurableSubscriptionOfflineTestBase { + //private static final Logger LOG = LoggerFactory.getLogger(ActiveDurableSubscriptionBrowseExpireTest.class); + private boolean enableExpiration = true; + + public ActiveDurableSubscriptionBrowseExpireTest(boolean enableExpiration) { + keepDurableSubsActive = true; + this.enableExpiration = enableExpiration; + } + + @Parameterized.Parameters(name = "enableExpiration_{0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + { false }, + { true } + }); + } + + @Override + public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException { + return super.setPersistenceAdapter(broker, PersistenceAdapterChoice.MEM); + } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); + connectionFactory.setWatchTopicAdvisories(false); + return connectionFactory; + } + + @Test(timeout = 60 * 1000) + public void testBrowseExpireActiveSub() throws Exception { + final int numberOfMessages = 10; + + broker.setEnableMessageExpirationOnActiveDurableSubs(enableExpiration); + + // create durable subscription + Connection con = createConnection("consumer"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId"); + + long timeStamp = System.currentTimeMillis(); + sendMessages(numberOfMessages, timeStamp); + + ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers(); + assertEquals(1, subs.length); + + ObjectName subName = subs[0]; + DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) + broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); + assertEquals(true, sub.isActive()); + + // browse the durable sub + CompositeData[] data = sub.browse(); + assertNotNull(data); + assertEquals(numberOfMessages, data.length); + + Destination dest = broker.getDestination(topic); + assertEquals(0, dest.getDestinationStatistics().getExpired().getCount()); + + // add every 3rd message to the expiration list + TopicMessageStore topicStore = (TopicMessageStore)dest.getMessageStore(); + LinkedList messagesToExpire = new LinkedList<>(); + topicStore.recover(new MessageRecoveryListener() { + @Override + public boolean recoverMessage(org.apache.activemq.command.Message message) throws Exception { + int index = (int)message.getProperty("index"); + if(index % 3 == 0) + messagesToExpire.add(message); + return true; + } + + @Override + public boolean recoverMessageReference(MessageId messageReference) throws Exception { + return true; + } + + @Override + public boolean hasSpace() { + + return true; + } + + @Override + public boolean isDuplicate(MessageId id) { + return false; + } + }); + + // expire messages in the topic store + for(org.apache.activemq.command.Message message: messagesToExpire) { + message.setExpiration(timeStamp - 1); + topicStore.updateMessage(message); + } + + // browse (should | should not) expire the messages on the destination if expiration is (enabled | not enabled) + data = sub.browse(); + assertNotNull(data); + assertEquals(enableExpiration ? messagesToExpire.size() : 0, dest.getDestinationStatistics().getExpired().getCount()); + + session.close(); + con.close(); + } + + private void sendMessages(int numberOfMessages, long timeStamp) throws Exception { + Connection con = createConnection("producer"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + for (int i = 0; i < numberOfMessages; i++) { + Message message = session.createMessage(); + message.setIntProperty("index", i); + message.setJMSTimestamp(timeStamp); + producer.send(topic, message); + } + + session.close(); + con.close(); + } +}