From 9c9b85659ccdc4593ec8b02c716fa28400974f1c Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Mon, 10 Oct 2011 18:21:46 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3532 - expiry of offline durable subscription on activation can lead do duplicate expiry processing and negative pending cursor size, resolve duplicate cursor remove and contention with dispatch, additional test git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1181112 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Topic.java | 20 +-- .../region/cursors/AbstractStoreCursor.java | 7 +- .../region/cursors/OrderedPendingList.java | 6 +- .../broker/region/cursors/PendingList.java | 2 +- .../cursors/PrioritizedPendingList.java | 6 +- .../store/kahadb/MessageDatabase.java | 4 +- .../DurableSubscriptionHangTestCase.java | 135 ++++++++++++++++++ 7 files changed, 162 insertions(+), 18 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index dd5ff07fc3..0704030088 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -20,11 +20,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Future; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -66,7 +64,7 @@ public class Topic extends BaseDestination implements Task { protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); private final TopicMessageStore topicStore; protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); - protected final Valve dispatchValve = new Valve(true); + private final Valve dispatchValve = new Valve(true); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap(); @@ -541,15 +539,11 @@ public class Topic extends BaseDestination implements Task { private void doBrowse(final List browseList, final int max) { try { if (topicStore != null) { - final ConnectionContext connectionContext = createConnectionContext(); + final List toExpire = new ArrayList(); topicStore.recover(new MessageRecoveryListener() { public boolean recoverMessage(Message message) throws Exception { if (message.isExpired()) { - for (DurableTopicSubscription sub : durableSubcribers.values()) { - if (!sub.isActive()) { - messageExpired(connectionContext, sub, message); - } - } + toExpire.add(message); } browseList.add(message); return true; @@ -567,6 +561,14 @@ public class Topic extends BaseDestination implements Task { return false; } }); + final ConnectionContext connectionContext = createConnectionContext(); + for (Message message : toExpire) { + for (DurableTopicSubscription sub : durableSubcribers.values()) { + if (!sub.isActive()) { + messageExpired(connectionContext, sub, message); + } + } + } Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); if (msgs != null) { for (int i = 0; i < msgs.length && browseList.size() < max; i++) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index d8c081eca6..359b0d36a9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -222,9 +222,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public final synchronized void remove(MessageReference node) { - size--; - setCacheEnabled(false); - batchList.remove(node); + if (batchList.remove(node) != null) { + size--; + setCacheEnabled(false); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java index 4a9c6e482b..fe8184fc77 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java @@ -89,11 +89,13 @@ public class OrderedPendingList implements PendingList { }; } - public void remove(MessageReference message) { + public PendingNode remove(MessageReference message) { + PendingNode node = null; if (message != null) { - PendingNode node = this.map.remove(message.getMessageId()); + node = this.map.remove(message.getMessageId()); removeNode(node); } + return node; } public int size() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java index 7b9844c1db..146872ea66 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java @@ -25,7 +25,7 @@ public interface PendingList { public void clear(); public PendingNode addMessageFirst(MessageReference message); public PendingNode addMessageLast(MessageReference message); - public void remove(MessageReference message); + public PendingNode remove(MessageReference message); public int size(); public Iterator iterator(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java index 4d3c331e74..f2dc08c785 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java @@ -61,13 +61,15 @@ public class PrioritizedPendingList implements PendingList { return new PrioritizedPendingListIterator(); } - public void remove(MessageReference message) { + public PendingNode remove(MessageReference message) { + PendingNode node = null; if (message != null) { - PendingNode node = this.map.remove(message.getMessageId()); + node = this.map.remove(message.getMessageId()); if (node != null) { node.getList().removeNode(node); } } + return node; } public int size() { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 80738ea094..33048c6516 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1197,6 +1197,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } // The following method handles deleting un-referenced messages. removeAckLocation(tx, sd, subscriptionKey, sequence); + } else if (LOG.isDebugEnabled()) { + LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); } } @@ -1900,7 +1902,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * @param tx * @param sd * @param subscriptionKey - * @param sequenceId + * @param messageSequence * @throws IOException */ private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java new file mode 100644 index 0000000000..bd4f64a9da --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.util.concurrent.TimeUnit; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.commons.lang.RandomStringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertNotNull; + +public class DurableSubscriptionHangTestCase { + private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class); + final static String brokerName = "DurableSubscriptionHangTestCase"; + final static String clientID = "myId"; + private static final String topicName = "myTopic"; + private static final String durableSubName = "mySub"; + BrokerService brokerService; + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setBrokerName(brokerName); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(5000); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + brokerService.start(); + } + + @After + public void brokerStop() throws Exception { + brokerService.stop(); + } + + @Test + public void testHanging() throws Exception + { + registerDurableSubscription(); + produceExpiredAndOneNonExpiredMessages(); + TimeUnit.SECONDS.sleep(10); // make sure messages are expired + Message message = collectMessagesFromDurableSubscriptionForOneMinute(); + LOG.info("got message:" + message); + assertNotNull("Unable to read unexpired message", message); + } + + private void produceExpiredAndOneNonExpiredMessages() throws JMSException { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName); + TopicConnection connection = connectionFactory.createTopicConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1)); + for(int i=0; i<40000; i++) + { + sendRandomMessage(session, producer); + } + producer.setTimeToLive(TimeUnit.DAYS.toMillis(1)); + sendRandomMessage(session, producer); + connection.close(); + LOG.info("produceExpiredAndOneNonExpiredMessages done"); + } + + private void registerDurableSubscription() throws JMSException + { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName); + TopicConnection connection = connectionFactory.createTopicConnection(); + connection.setClientID(clientID); + TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = topicSession.createTopic(topicName); + TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, durableSubName); + connection.start(); + durableSubscriber.close(); + connection.close(); + LOG.info("Durable Sub Registered"); + } + + private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception + { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName); + TopicConnection connection = connectionFactory.createTopicConnection(); + + connection.setClientID(clientID); + TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = topicSession.createTopic(topicName); + connection.start(); + TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName); + LOG.info("About to receive messages"); + Message message = subscriber.receive(120000); + subscriber.close(); + connection.close(); + LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done"); + + return message; + } + + private void sendRandomMessage(TopicSession session, MessageProducer producer) throws JMSException { + TextMessage textMessage = session.createTextMessage(); + textMessage.setText(RandomStringUtils.random(500, "abcdefghijklmnopqrstuvwxyz")); + producer.send(textMessage); + } +}