Fix for AMQ-1095:

- Added contributed test cases
 - We now filter out non-matching messages as they are loaded into the TopicStorePrefetch
 - Changed the TopicStorePrefetch and StoreDurableSubscriberCursor so that they don't depend
   on the pending message counter since some stores cannot give an accurate count for it.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@581053 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-10-01 20:02:18 +00:00
parent 19f9722445
commit 53c4e125f6
16 changed files with 526 additions and 97 deletions

View File

@ -48,7 +48,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws InvalidSelectorException { throws InvalidSelectorException {
super(broker, context, info, new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize())); super(broker, context, info);
this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
this.usageManager = usageManager; this.usageManager = usageManager;
this.keepDurableSubsActive = keepDurableSubsActive; this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());

View File

@ -410,6 +410,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (message == null) { if (message == null) {
return false; return false;
} }
// Make sure we can dispatch a message. // Make sure we can dispatch a message.
if (canDispatch(node) && !isSlave()) { if (canDispatch(node) && !isSlave()) {
MessageDispatch md = createMessageDispatch(node, message); MessageDispatch md = createMessageDispatch(node, message);

View File

@ -26,6 +26,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -42,7 +43,6 @@ import org.apache.commons.logging.LogFactory;
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class); private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
private int pendingCount;
private String clientId; private String clientId;
private String subscriberName; private String subscriberName;
private Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>(); private Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
@ -50,6 +50,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private boolean started; private boolean started;
private PendingMessageCursor nonPersistent; private PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor; private PendingMessageCursor currentCursor;
private final Subscription subscription;
/** /**
* @param topic * @param topic
@ -57,9 +58,10 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @param subscriberName * @param subscriberName
* @throws IOException * @throws IOException
*/ */
public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize) { public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize, Subscription subscription) {
this.clientId = clientId; this.clientId = clientId;
this.subscriberName = subscriberName; this.subscriberName = subscriberName;
this.subscription = subscription;
this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store); this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store);
storePrefetches.add(nonPersistent); storePrefetches.add(nonPersistent);
} }
@ -69,7 +71,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
started = true; started = true;
for (PendingMessageCursor tsp : storePrefetches) { for (PendingMessageCursor tsp : storePrefetches) {
tsp.start(); tsp.start();
pendingCount += tsp.size();
} }
} }
} }
@ -80,8 +81,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
for (PendingMessageCursor tsp : storePrefetches) { for (PendingMessageCursor tsp : storePrefetches) {
tsp.stop(); tsp.stop();
} }
pendingCount = 0;
} }
} }
@ -94,14 +93,13 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
*/ */
public synchronized void add(ConnectionContext context, Destination destination) throws Exception { public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName); TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName, subscription);
tsp.setMaxBatchSize(getMaxBatchSize()); tsp.setMaxBatchSize(getMaxBatchSize());
tsp.setSystemUsage(systemUsage); tsp.setSystemUsage(systemUsage);
topics.put(destination, tsp); topics.put(destination, tsp);
storePrefetches.add(tsp); storePrefetches.add(tsp);
if (started) { if (started) {
tsp.start(); tsp.start();
pendingCount += tsp.size();
} }
} }
} }
@ -124,14 +122,18 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @return true if there are no pending messages * @return true if there are no pending messages
*/ */
public synchronized boolean isEmpty() { public synchronized boolean isEmpty() {
return pendingCount <= 0; for (PendingMessageCursor tsp : storePrefetches) {
if( !tsp.isEmpty() )
return false;
}
return true;
} }
public boolean isEmpty(Destination destination) { public boolean isEmpty(Destination destination) {
boolean result = true; boolean result = true;
TopicStorePrefetch tsp = topics.get(destination); TopicStorePrefetch tsp = topics.get(destination);
if (tsp != null) { if (tsp != null) {
result = tsp.size() <= 0; result = tsp.isEmpty();
} }
return result; return result;
} }
@ -151,7 +153,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
if (node != null) { if (node != null) {
Message msg = node.getMessage(); Message msg = node.getMessage();
if (started) { if (started) {
pendingCount++;
if (!msg.isPersistent()) { if (!msg.isPersistent()) {
nonPersistent.addMessageLast(node); nonPersistent.addMessageLast(node);
} }
@ -171,7 +172,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
} }
public synchronized void clear() { public synchronized void clear() {
pendingCount = 0;
nonPersistent.clear(); nonPersistent.clear();
for (PendingMessageCursor tsp : storePrefetches) { for (PendingMessageCursor tsp : storePrefetches) {
tsp.clear(); tsp.clear();
@ -179,7 +179,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
} }
public synchronized boolean hasNext() { public synchronized boolean hasNext() {
boolean result = pendingCount > 0; boolean result = true;
if (result) { if (result) {
try { try {
currentCursor = getNextCursor(); currentCursor = getNextCursor();
@ -201,14 +201,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
if (currentCursor != null) { if (currentCursor != null) {
currentCursor.remove(); currentCursor.remove();
} }
pendingCount--;
} }
public synchronized void remove(MessageReference node) { public synchronized void remove(MessageReference node) {
if (currentCursor != null) { if (currentCursor != null) {
currentCursor.remove(node); currentCursor.remove(node);
} }
pendingCount--;
} }
public synchronized void reset() { public synchronized void reset() {
@ -226,6 +224,10 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
} }
public int size() { public int size() {
int pendingCount=0;
for (PendingMessageCursor tsp : storePrefetches) {
pendingCount += tsp.size();
}
return pendingCount; return pendingCount;
} }

View File

@ -17,12 +17,15 @@
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -44,16 +47,19 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
private Destination regionDestination; private Destination regionDestination;
private MessageId firstMessageId; private MessageId firstMessageId;
private MessageId lastMessageId; private MessageId lastMessageId;
private int pendingCount; private boolean batchResetNeeded = true;
private boolean storeMayHaveMoreMessages = true;
private boolean started; private boolean started;
private final Subscription subscription;
/** /**
* @param topic * @param topic
* @param clientId * @param clientId
* @param subscriberName * @param subscriberName
*/ */
public TopicStorePrefetch(Topic topic, String clientId, String subscriberName) { public TopicStorePrefetch(Topic topic, String clientId, String subscriberName, Subscription subscription) {
this.regionDestination = topic; this.regionDestination = topic;
this.subscription = subscription;
this.store = (TopicMessageStore)topic.getMessageStore(); this.store = (TopicMessageStore)topic.getMessageStore();
this.clientId = clientId; this.clientId = clientId;
this.subscriberName = subscriberName; this.subscriberName = subscriberName;
@ -62,13 +68,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public synchronized void start() { public synchronized void start() {
if (!started) { if (!started) {
started = true; started = true;
pendingCount = getStoreSize(); safeFillBatch();
try {
fillBatch();
} catch (Exception e) {
LOG.error("Failed to fill batch", e);
throw new RuntimeException(e);
}
} }
} }
@ -84,11 +84,13 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
* @return true if there are no pendingCount messages * @return true if there are no pendingCount messages
*/ */
public synchronized boolean isEmpty() { public synchronized boolean isEmpty() {
return pendingCount <= 0; safeFillBatch();
return batchList.isEmpty();
} }
public synchronized int size() { public synchronized int size() {
return getPendingCount(); safeFillBatch();
return batchList.size();
} }
public synchronized void addMessageLast(MessageReference node) throws Exception { public synchronized void addMessageLast(MessageReference node) throws Exception {
@ -98,7 +100,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
} }
lastMessageId = node.getMessageId(); lastMessageId = node.getMessageId();
node.decrementReferenceCount(); node.decrementReferenceCount();
pendingCount++; storeMayHaveMoreMessages=true;
} }
} }
@ -108,20 +110,18 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
firstMessageId = node.getMessageId(); firstMessageId = node.getMessageId();
} }
node.decrementReferenceCount(); node.decrementReferenceCount();
pendingCount++; storeMayHaveMoreMessages=true;
} }
} }
public synchronized void remove() { public synchronized void remove() {
pendingCount--;
} }
public synchronized void remove(MessageReference node) { public synchronized void remove(MessageReference node) {
pendingCount--;
} }
public synchronized void clear() { public synchronized void clear() {
pendingCount = 0; gc();
} }
public synchronized boolean hasNext() { public synchronized boolean hasNext() {
@ -130,19 +130,10 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public synchronized MessageReference next() { public synchronized MessageReference next() {
Message result = null; Message result = null;
if (!isEmpty()) { safeFillBatch();
if (batchList.isEmpty()) {
try {
fillBatch();
} catch (final Exception e) {
LOG.error("Failed to fill batch", e);
throw new RuntimeException(e);
}
if (batchList.isEmpty()) { if (batchList.isEmpty()) {
return null; return null;
} } else {
}
if (!batchList.isEmpty()) {
result = batchList.removeFirst(); result = batchList.removeFirst();
if (lastMessageId != null) { if (lastMessageId != null) {
if (result.getMessageId().equals(lastMessageId)) { if (result.getMessageId().equals(lastMessageId)) {
@ -151,7 +142,6 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
} }
result.setRegionDestination(regionDestination); result.setRegionDestination(regionDestination);
} }
}
return result; return result;
} }
@ -163,12 +153,16 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
} }
public synchronized boolean recoverMessage(Message message) throws Exception { public synchronized boolean recoverMessage(Message message) throws Exception {
MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
messageEvaluationContext.setMessageReference(message);
if( subscription.matches(message, messageEvaluationContext) ) {
message.setRegionDestination(regionDestination); message.setRegionDestination(regionDestination);
// only increment if count is zero (could have been cached) // only increment if count is zero (could have been cached)
if (message.getReferenceCount() == 0) { if (message.getReferenceCount() == 0) {
message.incrementReferenceCount(); message.incrementReferenceCount();
} }
batchList.addLast(message); batchList.addLast(message);
}
return true; return true;
} }
@ -178,36 +172,41 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
} }
// implementation // implementation
protected synchronized void fillBatch() throws Exception { protected void safeFillBatch() {
if (!isEmpty()) { try {
store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this);
if (firstMessageId != null) {
int pos = 0;
for (Message msg : batchList) {
if (msg.getMessageId().equals(firstMessageId)) {
firstMessageId = null;
break;
}
pos++;
}
if (pos > 0) {
for (int i = 0; i < pos && !batchList.isEmpty(); i++) {
batchList.removeFirst();
}
if (batchList.isEmpty()) {
LOG.debug("Refilling batch - haven't got past first message = " + firstMessageId);
fillBatch(); fillBatch();
} } catch (Exception e) {
} LOG.error("Failed to fill batch", e);
} throw new RuntimeException(e);
} }
} }
protected synchronized int getPendingCount() { protected synchronized void fillBatch() throws Exception {
if (pendingCount <= 0) { if( batchResetNeeded ) {
pendingCount = getStoreSize(); store.resetBatching(clientId, subscriberName);
batchResetNeeded=false;
storeMayHaveMoreMessages=true;
}
while( batchList.isEmpty() && storeMayHaveMoreMessages ) {
store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this);
if( batchList.isEmpty() ) {
storeMayHaveMoreMessages = false;
} else {
if (firstMessageId != null) {
int pos = 0;
for (Iterator<Message> iter = batchList.iterator(); iter.hasNext();) {
Message msg = iter.next();
if (msg.getMessageId().equals(firstMessageId)) {
firstMessageId = null;
break;
} else {
iter.remove();
}
}
}
}
} }
return pendingCount;
} }
protected synchronized int getStoreSize() { protected synchronized int getStoreSize() {
@ -224,6 +223,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
msg.decrementReferenceCount(); msg.decrementReferenceCount();
} }
batchList.clear(); batchList.clear();
batchResetNeeded = true;
} }
public String toString() { public String toString() {

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -39,7 +40,7 @@ public class FilePendingDurableSubscriberMessageStoragePolicy implements Pending
* @param maxBatchSize * @param maxBatchSize
* @return the Pending Message cursor * @return the Pending Message cursor
*/ */
public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
return new FilePendingMessageCursor(name, tmpStorage); return new FilePendingMessageCursor(name, tmpStorage);
} }
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -36,5 +37,5 @@ public interface PendingDurableSubscriberMessageStoragePolicy {
* @param maxBatchSize * @param maxBatchSize
* @return the Pending Message cursor * @return the Pending Message cursor
*/ */
PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize); PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub);
} }

View File

@ -116,7 +116,7 @@ public class PolicyEntry extends DestinationMapEntry {
String subName = sub.getSubscriptionName(); String subName = sub.getSubscriptionName();
int prefetch = sub.getPrefetchSize(); int prefetch = sub.getPrefetchSize();
if (pendingDurableSubscriberPolicy != null) { if (pendingDurableSubscriberPolicy != null) {
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch); PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch, sub);
cursor.setSystemUsage(memoryManager); cursor.setSystemUsage(memoryManager);
sub.setPending(cursor); sub.setPending(cursor);
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -40,7 +41,7 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin
* @param maxBatchSize * @param maxBatchSize
* @return the Pending Message cursor * @return the Pending Message cursor
*/ */
public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize); return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize, sub);
} }
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -38,7 +39,7 @@ public class VMPendingDurableSubscriberMessageStoragePolicy implements PendingDu
* @param maxBatchSize * @param maxBatchSize
* @return the Pending Message cursor * @return the Pending Message cursor
*/ */
public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
return new VMPendingMessageCursor(); return new VMPendingMessageCursor();
} }
} }

View File

@ -73,11 +73,8 @@ public class KahaMessageStore implements MessageStore {
} }
protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception { protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception {
if (listener.hasSpace()) {
listener.recoverMessage(msg); listener.recoverMessage(msg);
return true; return listener.hasSpace();
}
return false;
} }
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {

View File

@ -64,11 +64,8 @@ public class KahaReferenceStore implements ReferenceStore {
protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record) protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record)
throws Exception { throws Exception {
if (listener.hasSpace()) {
listener.recoverMessageReference(new MessageId(record.getMessageId())); listener.recoverMessageReference(new MessageId(record.getMessageId()));
return true; return listener.hasSpace();
}
return false;
} }
public synchronized void recover(MessageRecoveryListener listener) throws Exception { public synchronized void recover(MessageRecoveryListener listener) throws Exception {

View File

@ -284,7 +284,7 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport {
// The we should get the messages. // The we should get the messages.
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
Message m2 = receiveMessage(connection2); Message m2 = receiveMessage(connection2);
assertNotNull(m2); assertNotNull("Did not get message "+i, m2);
} }
assertNoMessagesLeft(connection2); assertNoMessagesLeft(connection2);
} }

View File

@ -77,7 +77,8 @@ public abstract class CursorSupport extends TestCase {
consumer = getConsumer(consumerConnection); consumer = getConsumer(consumerConnection);
List<Message> consumerList = new ArrayList<Message>(); List<Message> consumerList = new ArrayList<Message>();
for (int i = 0; i < MESSAGE_COUNT; i++) { for (int i = 0; i < MESSAGE_COUNT; i++) {
Message msg = consumer.receive(); Message msg = consumer.receive(1000*5);
assertNotNull("Message "+i+" was missing.", msg);
consumerList.add(msg); consumerList.add(msg);
} }
assertEquals(senderList, consumerList); assertEquals(senderList, consumerList);

View File

@ -0,0 +1,163 @@
/* ====================================================================
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================== */
package org.apache.activemq.bugs.amq1095;
import java.io.File;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
/**
* <p>
* Common functionality for ActiveMQ test cases.
* </p>
*
* @author Rainer Klute <a
* href="mailto:rainer.klute@dp-itsolutions.de">&lt;rainer.klute@dp-itsolutions.de&gt;</a>
* @since 2007-08-10
* @version $Id: ActiveMQTestCase.java 12 2007-08-14 12:02:02Z rke $
*/
public class ActiveMQTestCase extends TestCase
{
private Context context;
private BrokerService broker;
protected Connection connection;
protected Destination destination;
private List<MessageConsumer> consumersToEmpty = new LinkedList<MessageConsumer>();
protected final long RECEIVE_TIMEOUT = 500;
/** <p>Constructor</p> */
public ActiveMQTestCase()
{}
/** <p>Constructor</p>
* @param name the test case's name
*/
public ActiveMQTestCase(final String name)
{
super(name);
}
/**
* <p>Sets up the JUnit testing environment.
*/
protected void setUp()
{
URI uri;
try
{
/* Copy all system properties starting with "java.naming." to the initial context. */
final Properties systemProperties = System.getProperties();
final Properties jndiProperties = new Properties();
for (final Iterator i = systemProperties.keySet().iterator(); i.hasNext();)
{
final String key = (String) i.next();
if (key.startsWith("java.naming.") || key.startsWith("topic.") ||
key.startsWith("queue."))
{
final String value = (String) systemProperties.get(key);
jndiProperties.put(key, value);
}
}
context = new InitialContext(jndiProperties);
uri = new URI("xbean:org/apache/activemq/bugs/amq1095/activemq.xml");
broker = BrokerFactory.createBroker(uri);
broker.start();
}
catch (Exception ex)
{
throw new RuntimeException(ex);
}
final ConnectionFactory connectionFactory;
try
{
/* Lookup the connection factory. */
connectionFactory = (ConnectionFactory) context.lookup("TopicConnectionFactory");
destination = new ActiveMQTopic("TestTopic");
/* Create a connection: */
connection = connectionFactory.createConnection();
connection.setClientID("sampleClientID");
}
catch (JMSException ex1)
{
ex1.printStackTrace();
Assert.fail(ex1.toString());
}
catch (NamingException ex2) {
ex2.printStackTrace();
Assert.fail(ex2.toString());
}
catch (Throwable ex3) {
ex3.printStackTrace();
Assert.fail(ex3.toString());
}
}
/**
* <p>
* Tear down the testing environment by receiving any messages that might be
* left in the topic after a failure and shutting down the broker properly.
* This is quite important for subsequent test cases that assume the topic
* to be empty.
* </p>
*/
protected void tearDown() throws Exception {
TextMessage msg;
for (final Iterator i = consumersToEmpty.iterator(); i.hasNext();)
{
final MessageConsumer consumer = (MessageConsumer) i.next();
if (consumer != null)
do
msg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
while (msg != null);
}
if (connection != null) {
connection.stop();
}
broker.stop();
}
protected void registerToBeEmptiedOnShutdown(final MessageConsumer consumer)
{
consumersToEmpty.add(consumer);
}
}

View File

@ -0,0 +1,230 @@
/* ====================================================================
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================== */
package org.apache.activemq.bugs.amq1095;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import junit.framework.Assert;
/**
* <p>
* Test cases for various ActiveMQ functionalities.
* </p>
*
* <ul>
* <li>
* <p>
* Durable subscriptions are used.
* </p>
* </li>
* <li>
* <p>
* The Kaha persistence manager is used.
* </p>
* </li>
* <li>
* <p>
* An already existing Kaha directory is used. Everything runs fine if the
* ActiveMQ broker creates a new Kaha directory.
* </p>
* </li>
* </ul>
*
* @author Rainer Klute <a
* href="mailto:rainer.klute@dp-itsolutions.de">&lt;rainer.klute@dp-itsolutions.de&gt;</a>
* @since 2007-08-09
* @version $Id: MessageSelectorTest.java 12 2007-08-14 12:02:02Z rke $
*/
public class MessageSelectorTest extends ActiveMQTestCase {
private MessageConsumer consumer1;
private MessageConsumer consumer2;
/** <p>Constructor</p> */
public MessageSelectorTest()
{}
/** <p>Constructor</p>
* @param name the test case's name
*/
public MessageSelectorTest(final String name)
{
super(name);
}
/**
* <p>
* Tests whether message selectors work for durable subscribers.
* </p>
*/
public void testMessageSelectorForDurableSubscribersRunA()
{
runMessageSelectorTest(true);
}
/**
* <p>
* Tests whether message selectors work for durable subscribers.
* </p>
*/
public void testMessageSelectorForDurableSubscribersRunB()
{
runMessageSelectorTest(true);
}
/**
* <p>
* Tests whether message selectors work for non-durable subscribers.
* </p>
*/
public void testMessageSelectorForNonDurableSubscribers()
{
runMessageSelectorTest(false);
}
/**
* <p>
* Tests whether message selectors work. This is done by sending two
* messages to a topic. Both have an int property with different values. Two
* subscribers use message selectors to receive the messages. Each one
* should receive exactly one of the messages.
* </p>
*/
private void runMessageSelectorTest(final boolean isDurableSubscriber)
{
try
{
final String PROPERTY_CONSUMER = "consumer";
final String CONSUMER_1 = "Consumer 1";
final String CONSUMER_2 = "Consumer 2";
final String MESSAGE_1 = "Message to " + CONSUMER_1;
final String MESSAGE_2 = "Message to " + CONSUMER_2;
assertNotNull(connection);
assertNotNull(destination);
final Session producingSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = producingSession.createProducer(destination);
final Session consumingSession1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Session consumingSession2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (isDurableSubscriber)
{
consumer1 = consumingSession1.createDurableSubscriber
((Topic) destination, CONSUMER_1, PROPERTY_CONSUMER + " = 1", false);
consumer2 = consumingSession2.createDurableSubscriber
((Topic) destination, CONSUMER_2, PROPERTY_CONSUMER + " = 2", false);
}
else
{
consumer1 = consumingSession1.createConsumer(destination, PROPERTY_CONSUMER + " = 1");
consumer2 = consumingSession2.createConsumer(destination, PROPERTY_CONSUMER + " = 2");
}
registerToBeEmptiedOnShutdown(consumer1);
registerToBeEmptiedOnShutdown(consumer2);
connection.start();
TextMessage msg1;
TextMessage msg2;
int propertyValue;
String contents;
/* Try to receive any messages from the consumers. There shouldn't be any yet. */
msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
if (msg1 != null)
{
final StringBuffer msg = new StringBuffer("The consumer read a message that was left over from a former ActiveMQ broker run.");
propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER);
contents = msg1.getText();
if (propertyValue != 1) // Is the property value as expected?
{
msg.append(" That message does not match the consumer's message selector.");
fail(msg.toString());
}
assertEquals(1, propertyValue);
assertEquals(MESSAGE_1, contents);
}
msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
if (msg2 != null)
{
final StringBuffer msg = new StringBuffer("The consumer read a message that was left over from a former ActiveMQ broker run.");
propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER);
contents = msg2.getText();
if (propertyValue != 2) // Is the property value as expected?
{
msg.append(" That message does not match the consumer's message selector.");
fail(msg.toString());
}
assertEquals(2, propertyValue);
assertEquals(MESSAGE_2, contents);
}
/* Send two messages. Each is targeted at one of the consumers. */
TextMessage msg;
msg = producingSession.createTextMessage();
msg.setText(MESSAGE_1);
msg.setIntProperty(PROPERTY_CONSUMER, 1);
producer.send(msg);
msg = producingSession.createTextMessage();
msg.setText(MESSAGE_2);
msg.setIntProperty(PROPERTY_CONSUMER, 2);
producer.send(msg);
/* Receive the messages that have just been sent. */
/* Use consumer 1 to receive one of the messages. The receive()
* method is called twice to make sure there is nothing else in
* stock for this consumer. */
msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
assertNotNull(msg1);
propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER);
contents = msg1.getText();
assertEquals(1, propertyValue);
assertEquals(MESSAGE_1, contents);
msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
assertNull(msg1);
/* Use consumer 2 to receive the other message. The receive()
* method is called twice to make sure there is nothing else in
* stock for this consumer. */
msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
assertNotNull(msg2);
propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER);
contents = msg2.getText();
assertEquals(2, propertyValue);
assertEquals(MESSAGE_2, contents);
msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
assertNull(msg2);
}
catch (JMSException ex)
{
ex.printStackTrace();
Assert.fail();
}
}
}

View File

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans>
<broker brokerName="localhost" xmlns="http://activemq.org/config/1.0" persistent="true">
<persistenceAdapter>
<kahaPersistenceAdapter directory="file:kahadir" maxDataFileLength="200000"/>
</persistenceAdapter>
<destinations>
<queue physicalName="unused"/>
<topic physicalName="activemq.TestTopic"/>
</destinations>
</broker>
</beans>