https://issues.apache.org/jira/browse/AMQ-3532 - expiry of offline durable subscription on activation can lead do duplicate expiry processing and negative pending cursor size, resolve duplicate cursor remove and contention with dispatch, additional test

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1181112 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-10-10 18:21:46 +00:00
parent 3a806e98dc
commit 9c9b85659c
7 changed files with 162 additions and 18 deletions

View File

@ -20,11 +20,9 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -66,7 +64,7 @@ public class Topic extends BaseDestination implements Task {
protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
private final TopicMessageStore topicStore; private final TopicMessageStore topicStore;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
protected final Valve dispatchValve = new Valve(true); private final Valve dispatchValve = new Valve(true);
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
@ -541,15 +539,11 @@ public class Topic extends BaseDestination implements Task {
private void doBrowse(final List<Message> browseList, final int max) { private void doBrowse(final List<Message> browseList, final int max) {
try { try {
if (topicStore != null) { if (topicStore != null) {
final ConnectionContext connectionContext = createConnectionContext(); final List<Message> toExpire = new ArrayList<Message>();
topicStore.recover(new MessageRecoveryListener() { topicStore.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception { public boolean recoverMessage(Message message) throws Exception {
if (message.isExpired()) { if (message.isExpired()) {
for (DurableTopicSubscription sub : durableSubcribers.values()) { toExpire.add(message);
if (!sub.isActive()) {
messageExpired(connectionContext, sub, message);
}
}
} }
browseList.add(message); browseList.add(message);
return true; return true;
@ -567,6 +561,14 @@ public class Topic extends BaseDestination implements Task {
return false; return false;
} }
}); });
final ConnectionContext connectionContext = createConnectionContext();
for (Message message : toExpire) {
for (DurableTopicSubscription sub : durableSubcribers.values()) {
if (!sub.isActive()) {
messageExpired(connectionContext, sub, message);
}
}
}
Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
if (msgs != null) { if (msgs != null) {
for (int i = 0; i < msgs.length && browseList.size() < max; i++) { for (int i = 0; i < msgs.length && browseList.size() < max; i++) {

View File

@ -222,9 +222,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final synchronized void remove(MessageReference node) { public final synchronized void remove(MessageReference node) {
if (batchList.remove(node) != null) {
size--; size--;
setCacheEnabled(false); setCacheEnabled(false);
batchList.remove(node); }
} }

View File

@ -89,11 +89,13 @@ public class OrderedPendingList implements PendingList {
}; };
} }
public void remove(MessageReference message) { public PendingNode remove(MessageReference message) {
PendingNode node = null;
if (message != null) { if (message != null) {
PendingNode node = this.map.remove(message.getMessageId()); node = this.map.remove(message.getMessageId());
removeNode(node); removeNode(node);
} }
return node;
} }
public int size() { public int size() {

View File

@ -25,7 +25,7 @@ public interface PendingList {
public void clear(); public void clear();
public PendingNode addMessageFirst(MessageReference message); public PendingNode addMessageFirst(MessageReference message);
public PendingNode addMessageLast(MessageReference message); public PendingNode addMessageLast(MessageReference message);
public void remove(MessageReference message); public PendingNode remove(MessageReference message);
public int size(); public int size();
public Iterator<MessageReference> iterator(); public Iterator<MessageReference> iterator();
} }

View File

@ -61,13 +61,15 @@ public class PrioritizedPendingList implements PendingList {
return new PrioritizedPendingListIterator(); return new PrioritizedPendingListIterator();
} }
public void remove(MessageReference message) { public PendingNode remove(MessageReference message) {
PendingNode node = null;
if (message != null) { if (message != null) {
PendingNode node = this.map.remove(message.getMessageId()); node = this.map.remove(message.getMessageId());
if (node != null) { if (node != null) {
node.getList().removeNode(node); node.getList().removeNode(node);
} }
} }
return node;
} }
public int size() { public int size() {

View File

@ -1197,6 +1197,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
// The following method handles deleting un-referenced messages. // The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, sequence); removeAckLocation(tx, sd, subscriptionKey, sequence);
} else if (LOG.isDebugEnabled()) {
LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
} }
} }
@ -1900,7 +1902,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
* @param tx * @param tx
* @param sd * @param sd
* @param subscriptionKey * @param subscriptionKey
* @param sequenceId * @param messageSequence
* @throws IOException * @throws IOException
*/ */
private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNotNull;
public class DurableSubscriptionHangTestCase {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class);
final static String brokerName = "DurableSubscriptionHangTestCase";
final static String clientID = "myId";
private static final String topicName = "myTopic";
private static final String durableSubName = "mySub";
BrokerService brokerService;
@Before
public void startBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setBrokerName(brokerName);
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(5000);
policyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(policyMap);
brokerService.start();
}
@After
public void brokerStop() throws Exception {
brokerService.stop();
}
@Test
public void testHanging() throws Exception
{
registerDurableSubscription();
produceExpiredAndOneNonExpiredMessages();
TimeUnit.SECONDS.sleep(10); // make sure messages are expired
Message message = collectMessagesFromDurableSubscriptionForOneMinute();
LOG.info("got message:" + message);
assertNotNull("Unable to read unexpired message", message);
}
private void produceExpiredAndOneNonExpiredMessages() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
TopicConnection connection = connectionFactory.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1));
for(int i=0; i<40000; i++)
{
sendRandomMessage(session, producer);
}
producer.setTimeToLive(TimeUnit.DAYS.toMillis(1));
sendRandomMessage(session, producer);
connection.close();
LOG.info("produceExpiredAndOneNonExpiredMessages done");
}
private void registerDurableSubscription() throws JMSException
{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.setClientID(clientID);
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, durableSubName);
connection.start();
durableSubscriber.close();
connection.close();
LOG.info("Durable Sub Registered");
}
private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception
{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.setClientID(clientID);
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
connection.start();
TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName);
LOG.info("About to receive messages");
Message message = subscriber.receive(120000);
subscriber.close();
connection.close();
LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done");
return message;
}
private void sendRandomMessage(TopicSession session, MessageProducer producer) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(RandomStringUtils.random(500, "abcdefghijklmnopqrstuvwxyz"));
producer.send(textMessage);
}
}