diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 478aa81ae3..b3005d0f56 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -164,8 +164,11 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic } } }else{ - //no message held - removeMessage = true; + + if (ackContainer.isEmpty() || isUnreferencedBySubscribers(subscriberMessages, messageId)) { + // no message reference held + removeMessage = true; + } } } }finally { @@ -174,6 +177,28 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic return removeMessage; } + // verify that no subscriber has a reference to this message. In the case where the subscribers + // references are persisted but more than the persisted consumers get the message, the ack from the non + // persisted consumer would remove the message in error + // + // see: https://issues.apache.org/activemq/browse/AMQ-2123 + private boolean isUnreferencedBySubscribers( + Map subscriberContainers, MessageId messageId) { + boolean isUnreferenced = true; + for (TopicSubContainer container: subscriberContainers.values()) { + if (!container.isEmpty()) { + for (Iterator i = container.iterator(); i.hasNext();) { + ConsumerMessageRef ref = (ConsumerMessageRef) i.next(); + if (messageId.equals(ref.getMessageId())) { + isUnreferenced = false; + break; + } + } + } + } + return isUnreferenced; + } + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { acknowledgeReference(context, clientId, subscriptionName, messageId); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java index 2669c5e66a..0c64f9aaa6 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java @@ -16,6 +16,13 @@ */ package org.apache.activemq.bugs; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -28,6 +35,7 @@ import javax.jms.Topic; import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.commons.logging.Log; @@ -44,15 +52,92 @@ public class DurableConsumerTest extends TestCase { protected BrokerService broker; protected String bindAddress="tcp://localhost:61616"; - - - - - protected byte[] payload = new byte[1024*16]; + protected byte[] payload = new byte[1024*32]; protected ConnectionFactory factory; + protected Vector exceptions = new Vector(); + public void testConcurrentDurableConsumer() throws Exception { + factory = createConnectionFactory(); + final String topicName = getName(); + final int numMessages = 500; + int numConsumers = 20; + final CountDownLatch counsumerStarted = new CountDownLatch(0); + final AtomicInteger receivedCount = new AtomicInteger(); + Runnable consumer = new Runnable() { + public void run() { + final String consumerName = Thread.currentThread().getName(); + int acked = 0; + int received = 0; + + + try { + while (acked < numMessages/2) { + // take one message and close, ack on occasion + Connection consumerConnection = factory.createConnection(); + ((ActiveMQConnection)consumerConnection).setWatchTopicAdvisories(false); + consumerConnection.setClientID(consumerName); + Session consumerSession = consumerConnection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + Topic topic = consumerSession.createTopic(topicName); + consumerConnection.start(); + + MessageConsumer consumer = consumerSession + .createDurableSubscriber(topic, consumerName); + + counsumerStarted.countDown(); + Message msg = null; + do { + msg = consumer.receive(5000); + if (msg != null) { + receivedCount.incrementAndGet(); + if (received++ % 2 == 0) { + msg.acknowledge(); + acked++; + } + } + } while (msg == null); + + consumerConnection.close(); + } + assertTrue(received >= acked); + } catch (Exception e) { + e.printStackTrace(); + exceptions.add(e); + } + } + }; + + ExecutorService executor = Executors.newCachedThreadPool(); + + for (int i=0; i numMessages); + assertTrue(exceptions.isEmpty()); + } public void testConsumer() throws Exception{ factory = createConnectionFactory(); @@ -107,8 +192,6 @@ public class DurableConsumerTest extends TestCase { if (broker == null) { broker = createBroker(true); } - - super.setUp(); } @@ -144,6 +227,8 @@ public class DurableConsumerTest extends TestCase { answer.setDeleteAllMessagesOnStartup(deleteStore); answer.addConnector(bindAddress); answer.setUseShutdownHook(false); + answer.setUseJmx(false); + answer.setAdvisorySupport(false); } protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {