From 53c4e125f6790a9283687118e03567514f37d845 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 1 Oct 2007 20:02:18 +0000 Subject: [PATCH] Fix for AMQ-1095: - Added contributed test cases - We now filter out non-matching messages as they are loaded into the TopicStorePrefetch - Changed the TopicStorePrefetch and StoreDurableSubscriberCursor so that they don't depend on the pending message counter since some stores cannot give an accurate count for it. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@581053 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/DurableTopicSubscription.java | 3 +- .../broker/region/PrefetchSubscription.java | 1 + .../cursors/StoreDurableSubscriberCursor.java | 30 +-- .../region/cursors/TopicStorePrefetch.java | 128 +++++----- ...DurableSubscriberMessageStoragePolicy.java | 3 +- ...DurableSubscriberMessageStoragePolicy.java | 3 +- .../broker/region/policy/PolicyEntry.java | 2 +- ...DurableSubscriberMessageStoragePolicy.java | 5 +- ...DurableSubscriberMessageStoragePolicy.java | 3 +- .../store/kahadaptor/KahaMessageStore.java | 7 +- .../store/kahadaptor/KahaReferenceStore.java | 7 +- .../activemq/broker/RecoveryBrokerTest.java | 2 +- .../broker/region/cursors/CursorSupport.java | 3 +- .../bugs/amq1095/ActiveMQTestCase.java | 163 +++++++++++++ .../bugs/amq1095/MessageSelectorTest.java | 230 ++++++++++++++++++ .../apache/activemq/bugs/amq1095/activemq.xml | 33 +++ 16 files changed, 526 insertions(+), 97 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 6f9aa6ca41..4fa5886990 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -48,7 +48,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { - super(broker, context, info, new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize())); + super(broker, context, info); + this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this); this.usageManager = usageManager; this.keepDurableSubsActive = keepDurableSubsActive; subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index b6a852ed50..e9e4e32981 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -410,6 +410,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (message == null) { return false; } + // Make sure we can dispatch a message. if (canDispatch(node) && !isSlave()) { MessageDispatch md = createMessageDispatch(node, message); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 6460ea65b2..7f47c9e7eb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -26,6 +26,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; import org.apache.activemq.kaha.Store; @@ -42,7 +43,6 @@ import org.apache.commons.logging.LogFactory; public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class); - private int pendingCount; private String clientId; private String subscriberName; private Map topics = new HashMap(); @@ -50,6 +50,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private boolean started; private PendingMessageCursor nonPersistent; private PendingMessageCursor currentCursor; + private final Subscription subscription; /** * @param topic @@ -57,9 +58,10 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @param subscriberName * @throws IOException */ - public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize) { + public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize, Subscription subscription) { this.clientId = clientId; this.subscriberName = subscriberName; + this.subscription = subscription; this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store); storePrefetches.add(nonPersistent); } @@ -69,7 +71,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { started = true; for (PendingMessageCursor tsp : storePrefetches) { tsp.start(); - pendingCount += tsp.size(); } } } @@ -80,8 +81,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { for (PendingMessageCursor tsp : storePrefetches) { tsp.stop(); } - - pendingCount = 0; } } @@ -94,14 +93,13 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { */ public synchronized void add(ConnectionContext context, Destination destination) throws Exception { if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { - TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName); + TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName, subscription); tsp.setMaxBatchSize(getMaxBatchSize()); tsp.setSystemUsage(systemUsage); topics.put(destination, tsp); storePrefetches.add(tsp); if (started) { tsp.start(); - pendingCount += tsp.size(); } } } @@ -124,14 +122,18 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @return true if there are no pending messages */ public synchronized boolean isEmpty() { - return pendingCount <= 0; + for (PendingMessageCursor tsp : storePrefetches) { + if( !tsp.isEmpty() ) + return false; + } + return true; } public boolean isEmpty(Destination destination) { boolean result = true; TopicStorePrefetch tsp = topics.get(destination); if (tsp != null) { - result = tsp.size() <= 0; + result = tsp.isEmpty(); } return result; } @@ -151,7 +153,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { if (node != null) { Message msg = node.getMessage(); if (started) { - pendingCount++; if (!msg.isPersistent()) { nonPersistent.addMessageLast(node); } @@ -171,7 +172,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } public synchronized void clear() { - pendingCount = 0; nonPersistent.clear(); for (PendingMessageCursor tsp : storePrefetches) { tsp.clear(); @@ -179,7 +179,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } public synchronized boolean hasNext() { - boolean result = pendingCount > 0; + boolean result = true; if (result) { try { currentCursor = getNextCursor(); @@ -201,14 +201,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { if (currentCursor != null) { currentCursor.remove(); } - pendingCount--; } public synchronized void remove(MessageReference node) { if (currentCursor != null) { currentCursor.remove(node); } - pendingCount--; } public synchronized void reset() { @@ -226,6 +224,10 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } public int size() { + int pendingCount=0; + for (PendingMessageCursor tsp : storePrefetches) { + pendingCount += tsp.size(); + } return pendingCount; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index b067f52eca..81eb21b0b5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -17,12 +17,15 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.util.Iterator; import java.util.LinkedList; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; import org.apache.commons.logging.Log; @@ -44,16 +47,19 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message private Destination regionDestination; private MessageId firstMessageId; private MessageId lastMessageId; - private int pendingCount; + private boolean batchResetNeeded = true; + private boolean storeMayHaveMoreMessages = true; private boolean started; + private final Subscription subscription; /** * @param topic * @param clientId * @param subscriberName */ - public TopicStorePrefetch(Topic topic, String clientId, String subscriberName) { + public TopicStorePrefetch(Topic topic, String clientId, String subscriberName, Subscription subscription) { this.regionDestination = topic; + this.subscription = subscription; this.store = (TopicMessageStore)topic.getMessageStore(); this.clientId = clientId; this.subscriberName = subscriberName; @@ -62,13 +68,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message public synchronized void start() { if (!started) { started = true; - pendingCount = getStoreSize(); - try { - fillBatch(); - } catch (Exception e) { - LOG.error("Failed to fill batch", e); - throw new RuntimeException(e); - } + safeFillBatch(); } } @@ -84,11 +84,13 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message * @return true if there are no pendingCount messages */ public synchronized boolean isEmpty() { - return pendingCount <= 0; + safeFillBatch(); + return batchList.isEmpty(); } public synchronized int size() { - return getPendingCount(); + safeFillBatch(); + return batchList.size(); } public synchronized void addMessageLast(MessageReference node) throws Exception { @@ -98,7 +100,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message } lastMessageId = node.getMessageId(); node.decrementReferenceCount(); - pendingCount++; + storeMayHaveMoreMessages=true; } } @@ -108,20 +110,18 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message firstMessageId = node.getMessageId(); } node.decrementReferenceCount(); - pendingCount++; + storeMayHaveMoreMessages=true; } } public synchronized void remove() { - pendingCount--; } public synchronized void remove(MessageReference node) { - pendingCount--; } public synchronized void clear() { - pendingCount = 0; + gc(); } public synchronized boolean hasNext() { @@ -130,27 +130,17 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message public synchronized MessageReference next() { Message result = null; - if (!isEmpty()) { - if (batchList.isEmpty()) { - try { - fillBatch(); - } catch (final Exception e) { - LOG.error("Failed to fill batch", e); - throw new RuntimeException(e); - } - if (batchList.isEmpty()) { - return null; + safeFillBatch(); + if (batchList.isEmpty()) { + return null; + } else { + result = batchList.removeFirst(); + if (lastMessageId != null) { + if (result.getMessageId().equals(lastMessageId)) { + // pendingCount=0; } } - if (!batchList.isEmpty()) { - result = batchList.removeFirst(); - if (lastMessageId != null) { - if (result.getMessageId().equals(lastMessageId)) { - // pendingCount=0; - } - } - result.setRegionDestination(regionDestination); - } + result.setRegionDestination(regionDestination); } return result; } @@ -163,12 +153,16 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message } public synchronized boolean recoverMessage(Message message) throws Exception { - message.setRegionDestination(regionDestination); - // only increment if count is zero (could have been cached) - if (message.getReferenceCount() == 0) { - message.incrementReferenceCount(); + MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); + messageEvaluationContext.setMessageReference(message); + if( subscription.matches(message, messageEvaluationContext) ) { + message.setRegionDestination(regionDestination); + // only increment if count is zero (could have been cached) + if (message.getReferenceCount() == 0) { + message.incrementReferenceCount(); + } + batchList.addLast(message); } - batchList.addLast(message); return true; } @@ -178,38 +172,43 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message } // implementation + protected void safeFillBatch() { + try { + fillBatch(); + } catch (Exception e) { + LOG.error("Failed to fill batch", e); + throw new RuntimeException(e); + } + } + protected synchronized void fillBatch() throws Exception { - if (!isEmpty()) { + if( batchResetNeeded ) { + store.resetBatching(clientId, subscriberName); + batchResetNeeded=false; + storeMayHaveMoreMessages=true; + } + + while( batchList.isEmpty() && storeMayHaveMoreMessages ) { store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this); - if (firstMessageId != null) { - int pos = 0; - for (Message msg : batchList) { - if (msg.getMessageId().equals(firstMessageId)) { - firstMessageId = null; - break; - } - pos++; - } - if (pos > 0) { - for (int i = 0; i < pos && !batchList.isEmpty(); i++) { - batchList.removeFirst(); - } - if (batchList.isEmpty()) { - LOG.debug("Refilling batch - haven't got past first message = " + firstMessageId); - fillBatch(); + if( batchList.isEmpty() ) { + storeMayHaveMoreMessages = false; + } else { + if (firstMessageId != null) { + int pos = 0; + for (Iterator iter = batchList.iterator(); iter.hasNext();) { + Message msg = iter.next(); + if (msg.getMessageId().equals(firstMessageId)) { + firstMessageId = null; + break; + } else { + iter.remove(); + } } } } } } - protected synchronized int getPendingCount() { - if (pendingCount <= 0) { - pendingCount = getStoreSize(); - } - return pendingCount; - } - protected synchronized int getStoreSize() { try { return store.getMessageCount(clientId, subscriberName); @@ -224,6 +223,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message msg.decrementReferenceCount(); } batchList.clear(); + batchResetNeeded = true; } public String toString() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java index 419abe710f..00a65febf5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.kaha.Store; @@ -39,7 +40,7 @@ public class FilePendingDurableSubscriberMessageStoragePolicy implements Pending * @param maxBatchSize * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { + public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { return new FilePendingMessageCursor(name, tmpStorage); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java index 629aec6eb4..1c37bafbde 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.kaha.Store; @@ -36,5 +37,5 @@ public interface PendingDurableSubscriberMessageStoragePolicy { * @param maxBatchSize * @return the Pending Message cursor */ - PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize); + PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 40d636eba2..aef27d9487 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -116,7 +116,7 @@ public class PolicyEntry extends DestinationMapEntry { String subName = sub.getSubscriptionName(); int prefetch = sub.getPrefetchSize(); if (pendingDurableSubscriberPolicy != null) { - PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch); + PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch, sub); cursor.setSystemUsage(memoryManager); sub.setPending(cursor); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java index b41551f878..de7d651c8b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.kaha.Store; @@ -40,7 +41,7 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin * @param maxBatchSize * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { - return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize); + public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { + return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize, sub); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java index bfd8d22489..c28306aa24 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.kaha.Store; @@ -38,7 +39,7 @@ public class VMPendingDurableSubscriberMessageStoragePolicy implements PendingDu * @param maxBatchSize * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { + public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { return new VMPendingMessageCursor(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java index 40a300ca43..c7ee63c538 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java @@ -73,11 +73,8 @@ public class KahaMessageStore implements MessageStore { } protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception { - if (listener.hasSpace()) { - listener.recoverMessage(msg); - return true; - } - return false; + listener.recoverMessage(msg); + return listener.hasSpace(); } public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index f4c5af7332..aaa07e0244 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -64,11 +64,8 @@ public class KahaReferenceStore implements ReferenceStore { protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record) throws Exception { - if (listener.hasSpace()) { - listener.recoverMessageReference(new MessageId(record.getMessageId())); - return true; - } - return false; + listener.recoverMessageReference(new MessageId(record.getMessageId())); + return listener.hasSpace(); } public synchronized void recover(MessageRecoveryListener listener) throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java index b976de8e08..ba8f133fc5 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java @@ -284,7 +284,7 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport { // The we should get the messages. for (int i = 0; i < 4; i++) { Message m2 = receiveMessage(connection2); - assertNotNull(m2); + assertNotNull("Did not get message "+i, m2); } assertNoMessagesLeft(connection2); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java index da9545e66e..0404fde233 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java @@ -77,7 +77,8 @@ public abstract class CursorSupport extends TestCase { consumer = getConsumer(consumerConnection); List consumerList = new ArrayList(); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message msg = consumer.receive(); + Message msg = consumer.receive(1000*5); + assertNotNull("Message "+i+" was missing.", msg); consumerList.add(msg); } assertEquals(senderList, consumerList); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java b/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java new file mode 100644 index 0000000000..34a5f11cee --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java @@ -0,0 +1,163 @@ +/* ==================================================================== + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +==================================================================== */ + +package org.apache.activemq.bugs.amq1095; + +import java.io.File; +import java.net.URI; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; + +/** + *

+ * Common functionality for ActiveMQ test cases. + *

+ * + * @author Rainer Klute <rainer.klute@dp-itsolutions.de> + * @since 2007-08-10 + * @version $Id: ActiveMQTestCase.java 12 2007-08-14 12:02:02Z rke $ + */ +public class ActiveMQTestCase extends TestCase +{ + private Context context; + private BrokerService broker; + protected Connection connection; + protected Destination destination; + private List consumersToEmpty = new LinkedList(); + protected final long RECEIVE_TIMEOUT = 500; + + + /**

Constructor

*/ + public ActiveMQTestCase() + {} + + /**

Constructor

+ * @param name the test case's name + */ + public ActiveMQTestCase(final String name) + { + super(name); + } + + /** + *

Sets up the JUnit testing environment. + */ + protected void setUp() + { + URI uri; + try + { + /* Copy all system properties starting with "java.naming." to the initial context. */ + final Properties systemProperties = System.getProperties(); + final Properties jndiProperties = new Properties(); + for (final Iterator i = systemProperties.keySet().iterator(); i.hasNext();) + { + final String key = (String) i.next(); + if (key.startsWith("java.naming.") || key.startsWith("topic.") || + key.startsWith("queue.")) + { + final String value = (String) systemProperties.get(key); + jndiProperties.put(key, value); + } + } + context = new InitialContext(jndiProperties); + uri = new URI("xbean:org/apache/activemq/bugs/amq1095/activemq.xml"); + broker = BrokerFactory.createBroker(uri); + broker.start(); + } + catch (Exception ex) + { + throw new RuntimeException(ex); + } + + final ConnectionFactory connectionFactory; + try + { + /* Lookup the connection factory. */ + connectionFactory = (ConnectionFactory) context.lookup("TopicConnectionFactory"); + + destination = new ActiveMQTopic("TestTopic"); + + /* Create a connection: */ + connection = connectionFactory.createConnection(); + connection.setClientID("sampleClientID"); + } + catch (JMSException ex1) + { + ex1.printStackTrace(); + Assert.fail(ex1.toString()); + } + catch (NamingException ex2) { + ex2.printStackTrace(); + Assert.fail(ex2.toString()); + } + catch (Throwable ex3) { + ex3.printStackTrace(); + Assert.fail(ex3.toString()); + } + } + + + /** + *

+ * Tear down the testing environment by receiving any messages that might be + * left in the topic after a failure and shutting down the broker properly. + * This is quite important for subsequent test cases that assume the topic + * to be empty. + *

+ */ + protected void tearDown() throws Exception { + TextMessage msg; + for (final Iterator i = consumersToEmpty.iterator(); i.hasNext();) + { + final MessageConsumer consumer = (MessageConsumer) i.next(); + if (consumer != null) + do + msg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT); + while (msg != null); + } + if (connection != null) { + connection.stop(); + } + broker.stop(); + } + + protected void registerToBeEmptiedOnShutdown(final MessageConsumer consumer) + { + consumersToEmpty.add(consumer); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java new file mode 100644 index 0000000000..a127422124 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java @@ -0,0 +1,230 @@ +/* ==================================================================== + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +==================================================================== */ + +package org.apache.activemq.bugs.amq1095; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import junit.framework.Assert; + + +/** + *

+ * Test cases for various ActiveMQ functionalities. + *

+ * + *
    + *
  • + *

    + * Durable subscriptions are used. + *

    + *
  • + *
  • + *

    + * The Kaha persistence manager is used. + *

    + *
  • + *
  • + *

    + * An already existing Kaha directory is used. Everything runs fine if the + * ActiveMQ broker creates a new Kaha directory. + *

    + *
  • + *
+ * + * @author Rainer Klute <rainer.klute@dp-itsolutions.de> + * @since 2007-08-09 + * @version $Id: MessageSelectorTest.java 12 2007-08-14 12:02:02Z rke $ + */ +public class MessageSelectorTest extends ActiveMQTestCase { + + private MessageConsumer consumer1; + private MessageConsumer consumer2; + + /**

Constructor

*/ + public MessageSelectorTest() + {} + + /**

Constructor

+ * @param name the test case's name + */ + public MessageSelectorTest(final String name) + { + super(name); + } + + /** + *

+ * Tests whether message selectors work for durable subscribers. + *

+ */ + public void testMessageSelectorForDurableSubscribersRunA() + { + runMessageSelectorTest(true); + } + + /** + *

+ * Tests whether message selectors work for durable subscribers. + *

+ */ + public void testMessageSelectorForDurableSubscribersRunB() + { + runMessageSelectorTest(true); + } + + /** + *

+ * Tests whether message selectors work for non-durable subscribers. + *

+ */ + public void testMessageSelectorForNonDurableSubscribers() + { + runMessageSelectorTest(false); + } + + /** + *

+ * Tests whether message selectors work. This is done by sending two + * messages to a topic. Both have an int property with different values. Two + * subscribers use message selectors to receive the messages. Each one + * should receive exactly one of the messages. + *

+ */ + private void runMessageSelectorTest(final boolean isDurableSubscriber) + { + try + { + final String PROPERTY_CONSUMER = "consumer"; + final String CONSUMER_1 = "Consumer 1"; + final String CONSUMER_2 = "Consumer 2"; + final String MESSAGE_1 = "Message to " + CONSUMER_1; + final String MESSAGE_2 = "Message to " + CONSUMER_2; + + assertNotNull(connection); + assertNotNull(destination); + + final Session producingSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = producingSession.createProducer(destination); + + final Session consumingSession1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session consumingSession2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (isDurableSubscriber) + { + consumer1 = consumingSession1.createDurableSubscriber + ((Topic) destination, CONSUMER_1, PROPERTY_CONSUMER + " = 1", false); + consumer2 = consumingSession2.createDurableSubscriber + ((Topic) destination, CONSUMER_2, PROPERTY_CONSUMER + " = 2", false); + } + else + { + consumer1 = consumingSession1.createConsumer(destination, PROPERTY_CONSUMER + " = 1"); + consumer2 = consumingSession2.createConsumer(destination, PROPERTY_CONSUMER + " = 2"); + } + registerToBeEmptiedOnShutdown(consumer1); + registerToBeEmptiedOnShutdown(consumer2); + + connection.start(); + + TextMessage msg1; + TextMessage msg2; + int propertyValue; + String contents; + + /* Try to receive any messages from the consumers. There shouldn't be any yet. */ + msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT); + if (msg1 != null) + { + final StringBuffer msg = new StringBuffer("The consumer read a message that was left over from a former ActiveMQ broker run."); + propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER); + contents = msg1.getText(); + if (propertyValue != 1) // Is the property value as expected? + { + msg.append(" That message does not match the consumer's message selector."); + fail(msg.toString()); + } + assertEquals(1, propertyValue); + assertEquals(MESSAGE_1, contents); + } + msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT); + if (msg2 != null) + { + final StringBuffer msg = new StringBuffer("The consumer read a message that was left over from a former ActiveMQ broker run."); + propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER); + contents = msg2.getText(); + if (propertyValue != 2) // Is the property value as expected? + { + msg.append(" That message does not match the consumer's message selector."); + fail(msg.toString()); + } + assertEquals(2, propertyValue); + assertEquals(MESSAGE_2, contents); + } + + /* Send two messages. Each is targeted at one of the consumers. */ + TextMessage msg; + msg = producingSession.createTextMessage(); + msg.setText(MESSAGE_1); + msg.setIntProperty(PROPERTY_CONSUMER, 1); + producer.send(msg); + + msg = producingSession.createTextMessage(); + msg.setText(MESSAGE_2); + msg.setIntProperty(PROPERTY_CONSUMER, 2); + producer.send(msg); + + /* Receive the messages that have just been sent. */ + + /* Use consumer 1 to receive one of the messages. The receive() + * method is called twice to make sure there is nothing else in + * stock for this consumer. */ + msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT); + assertNotNull(msg1); + propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER); + contents = msg1.getText(); + assertEquals(1, propertyValue); + assertEquals(MESSAGE_1, contents); + msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT); + assertNull(msg1); + + /* Use consumer 2 to receive the other message. The receive() + * method is called twice to make sure there is nothing else in + * stock for this consumer. */ + msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT); + assertNotNull(msg2); + propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER); + contents = msg2.getText(); + assertEquals(2, propertyValue); + assertEquals(MESSAGE_2, contents); + msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT); + assertNull(msg2); + } + catch (JMSException ex) + { + ex.printStackTrace(); + Assert.fail(); + } + } + +} diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml new file mode 100644 index 0000000000..2edc6582fa --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file