Some modifications needed for the patch to work correctly.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1222275 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-12-22 15:15:48 +00:00
parent d97e8fe354
commit cb1b92bf58
5 changed files with 402 additions and 36 deletions

View File

@ -40,13 +40,18 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.ResourceAllocationException; import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.OrderedPendingList;
import org.apache.activemq.broker.region.cursors.PendingList;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
@ -92,8 +97,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// Messages that are paged in but have not yet been targeted at a // Messages that are paged in but have not yet been targeted at a
// subscription // subscription
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100); protected PendingList pagedInPendingDispatch = new OrderedPendingList();
private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>(); protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@ -123,9 +128,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
}; };
private final Object iteratingMutex = new Object() { private final Object iteratingMutex = new Object();
};
class TimeoutMessage implements Delayed { class TimeoutMessage implements Delayed {
@ -304,8 +307,22 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
@Override
public void setPrioritizedMessages(boolean prioritizedMessages) {
super.setPrioritizedMessages(prioritizedMessages);
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
pagedInPendingDispatch = new PrioritizedPendingList();
redeliveredWaitingDispatch = new PrioritizedPendingList();
} else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
pagedInPendingDispatch = new OrderedPendingList();
redeliveredWaitingDispatch = new OrderedPendingList();
}
}
@Override @Override
public void initialize() throws Exception { public void initialize() throws Exception {
if (this.messages == null) { if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) { if (destination.isTemporary() || broker == null || store == null) {
this.messages = new VMPendingMessageCursor(isPrioritizedMessages()); this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
@ -313,6 +330,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
this.messages = new StoreQueueCursor(broker, this); this.messages = new StoreQueueCursor(broker, this);
} }
} }
// If a VMPendingMessageCursor don't use the default Producer System // If a VMPendingMessageCursor don't use the default Producer System
// Usage // Usage
// since it turns into a shared blocking queue which can lead to a // since it turns into a shared blocking queue which can lead to a
@ -529,10 +547,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
} }
redeliveredWaitingDispatch.add(qmr); redeliveredWaitingDispatch.addMessageLast(qmr);
} }
if (!redeliveredWaitingDispatch.isEmpty()) { if (!redeliveredWaitingDispatch.isEmpty()) {
doDispatch(new ArrayList<QueueMessageReference>()); doDispatch(new OrderedPendingList());
} }
}finally { }finally {
consumersLock.writeLock().unlock(); consumersLock.writeLock().unlock();
@ -994,7 +1012,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
pagedInPendingDispatchLock.writeLock().lock(); pagedInPendingDispatchLock.writeLock().lock();
try { try {
addAll(pagedInPendingDispatch, browseList, max, toExpire); addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
for (MessageReference ref : toExpire) { for (MessageReference ref : toExpire) {
pagedInPendingDispatch.remove(ref); pagedInPendingDispatch.remove(ref);
if (broker.isExpired(ref)) { if (broker.isExpired(ref)) {
@ -1066,10 +1084,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize, private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize,
List<MessageReference> toExpire) throws Exception { List<MessageReference> toExpire) throws Exception {
for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) { for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
QueueMessageReference ref = i.next(); QueueMessageReference ref = (QueueMessageReference) i.next();
if (ref.isExpired()) { if (ref.isExpired()) {
toExpire.add(ref); toExpire.add(ref);
} else if (l.contains(ref.getMessage()) == false) { } else if (l.contains(ref.getMessage()) == false) {
@ -1675,15 +1693,16 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
private void doPageIn(boolean force) throws Exception { private void doPageIn(boolean force) throws Exception {
List<QueueMessageReference> newlyPaged = doPageInForDispatch(force); PendingList newlyPaged = doPageInForDispatch(force);
pagedInPendingDispatchLock.writeLock().lock(); pagedInPendingDispatchLock.writeLock().lock();
try { try {
if (pagedInPendingDispatch.isEmpty()) { if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(newlyPaged); pagedInPendingDispatch.addAll(newlyPaged);
} else { } else {
for (QueueMessageReference qmr : newlyPaged) { for (MessageReference qmr : newlyPaged) {
if (!pagedInPendingDispatch.contains(qmr)) { if (!pagedInPendingDispatch.contains(qmr)) {
pagedInPendingDispatch.add(qmr); pagedInPendingDispatch.addMessageLast(qmr);
} }
} }
} }
@ -1692,9 +1711,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
private List<QueueMessageReference> doPageInForDispatch(boolean force) throws Exception { private PendingList doPageInForDispatch(boolean force) throws Exception {
List<QueueMessageReference> result = null; List<QueueMessageReference> result = null;
List<QueueMessageReference> resultList = null; PendingList resultList = null;
int toPageIn = Math.min(getMaxPageSize(), messages.size()); int toPageIn = Math.min(getMaxPageSize(), messages.size());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -1750,11 +1769,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// dispatch attempts // dispatch attempts
pagedInMessagesLock.writeLock().lock(); pagedInMessagesLock.writeLock().lock();
try { try {
resultList = new ArrayList<QueueMessageReference>(result.size()); if(isPrioritizedMessages()) {
resultList = new PrioritizedPendingList();
} else {
resultList = new OrderedPendingList();
}
for (QueueMessageReference ref : result) { for (QueueMessageReference ref : result) {
if (!pagedInMessages.containsKey(ref.getMessageId())) { if (!pagedInMessages.containsKey(ref.getMessageId())) {
pagedInMessages.put(ref.getMessageId(), ref); pagedInMessages.put(ref.getMessageId(), ref);
resultList.add(ref); resultList.addMessageLast(ref);
} else { } else {
ref.decrementReferenceCount(); ref.decrementReferenceCount();
} }
@ -1764,13 +1787,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} else { } else {
// Avoid return null list, if condition is not validated // Avoid return null list, if condition is not validated
resultList = new ArrayList<QueueMessageReference>(); resultList = new OrderedPendingList();
} }
return resultList; return resultList;
} }
private void doDispatch(List<QueueMessageReference> list) throws Exception { private void doDispatch(PendingList list) throws Exception {
boolean doWakeUp = false; boolean doWakeUp = false;
pagedInPendingDispatchLock.writeLock().lock(); pagedInPendingDispatchLock.writeLock().lock();
@ -1792,9 +1815,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (pagedInPendingDispatch.isEmpty()) { if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list)); pagedInPendingDispatch.addAll(doActualDispatch(list));
} else { } else {
for (QueueMessageReference qmr : list) { for (MessageReference qmr : list) {
if (!pagedInPendingDispatch.contains(qmr)) { if (!pagedInPendingDispatch.contains(qmr)) {
pagedInPendingDispatch.add(qmr); pagedInPendingDispatch.addMessageLast(qmr);
} }
} }
doWakeUp = true; doWakeUp = true;
@ -1814,9 +1837,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
* @return list of messages that could get dispatched to consumers if they * @return list of messages that could get dispatched to consumers if they
* were not full. * were not full.
*/ */
private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception { private PendingList doActualDispatch(PendingList list) throws Exception {
List<Subscription> consumers; List<Subscription> consumers;
consumersLock.writeLock().lock(); consumersLock.writeLock().lock();
try { try {
if (this.consumers.isEmpty() || isSlave()) { if (this.consumers.isEmpty() || isSlave()) {
// slave dispatch happens in processDispatchNotification // slave dispatch happens in processDispatchNotification
@ -1827,10 +1851,18 @@ public class Queue extends BaseDestination implements Task, UsageListener {
consumersLock.writeLock().unlock(); consumersLock.writeLock().unlock();
} }
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size()); PendingList rc;
if(isPrioritizedMessages()) {
rc = new PrioritizedPendingList();
} else {
rc = new OrderedPendingList();
}
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
for (MessageReference node : list) { for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
MessageReference node = (MessageReference) iterator.next();
Subscription target = null; Subscription target = null;
int interestCount = 0; int interestCount = 0;
for (Subscription s : consumers) { for (Subscription s : consumers) {
@ -1863,7 +1895,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if ((target == null && interestCount > 0) || consumers.size() == 0) { if ((target == null && interestCount > 0) || consumers.size() == 0) {
// This means all subs were full or that there are no // This means all subs were full or that there are no
// consumers... // consumers...
rc.add((QueueMessageReference) node); rc.addMessageLast((QueueMessageReference) node);
} }
// If it got dispatched, rotate the consumer list to get round robin // If it got dispatched, rotate the consumer list to get round robin
@ -1886,7 +1918,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception { protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
//QueueMessageReference node = (QueueMessageReference) m;
boolean result = true; boolean result = true;
// Keep message groups together. // Keep message groups together.
String groupId = node.getGroupID(); String groupId = node.getGroupID();
@ -2002,9 +2033,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
pagedInPendingDispatchLock.writeLock().lock(); pagedInPendingDispatchLock.writeLock().lock();
try { try {
for (QueueMessageReference ref : pagedInPendingDispatch) { for (MessageReference ref : pagedInPendingDispatch) {
if (messageId.equals(ref.getMessageId())) { if (messageId.equals(ref.getMessageId())) {
message = ref; message = (QueueMessageReference)ref;
pagedInPendingDispatch.remove(ref); pagedInPendingDispatch.remove(ref);
break; break;
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -25,9 +26,10 @@ import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
public class OrderedPendingList implements PendingList { public class OrderedPendingList implements PendingList {
PendingNode root = null;
PendingNode tail = null; private PendingNode root = null;
final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>(); private PendingNode tail = null;
private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
public PendingNode addMessageFirst(MessageReference message) { public PendingNode addMessageFirst(MessageReference message) {
PendingNode node = new PendingNode(this, message); PendingNode node = new PendingNode(this, message);
@ -130,4 +132,28 @@ public class OrderedPendingList implements PendingList {
return "OrderedPendingList(" + System.identityHashCode(this) + ")"; return "OrderedPendingList(" + System.identityHashCode(this) + ")";
} }
@Override
public boolean contains(MessageReference message) {
if(map.values().contains(message)) {
return true;
} else {
return false;
}
}
@Override
public Collection<MessageReference> values() {
List<MessageReference> messageReferences = new ArrayList<MessageReference>();
for(PendingNode pendingNode : map.values()) {
messageReferences.add(pendingNode.getMessage());
}
return messageReferences;
}
@Override
public void addAll(PendingList pendingList) {
for(MessageReference messageReference : pendingList) {
addMessageLast(messageReference);
}
}
} }

View File

@ -16,16 +16,96 @@
*/ */
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
public interface PendingList { public interface PendingList extends Iterable<MessageReference> {
/**
* Returns true if there are no Messages in the PendingList currently.
* @return true if the PendingList is currently empty.
*/
public boolean isEmpty(); public boolean isEmpty();
/**
* Discards all Messages currently held in the PendingList.
*/
public void clear(); public void clear();
/**
* Adds the given message to the head of the list.
*
* @param message
* The MessageReference that is to be added to this list.
*
* @return the PendingNode that contains the newly added message.
*/
public PendingNode addMessageFirst(MessageReference message); public PendingNode addMessageFirst(MessageReference message);
/**
* Adds the given message to the tail of the list.
*
* @param message
* The MessageReference that is to be added to this list.
*
* @return the PendingNode that contains the newly added message.
*/
public PendingNode addMessageLast(MessageReference message); public PendingNode addMessageLast(MessageReference message);
/**
* Removes the given MessageReference from the PendingList if it is
* contained within.
*
* @param message
* The MessageReference that is to be removed to this list.
*
* @return the PendingNode that contains the removed message or null if the
* message was not present in this list.
*/
public PendingNode remove(MessageReference message); public PendingNode remove(MessageReference message);
/**
* Returns the number of MessageReferences that are awaiting dispatch.
* @return current count of the pending messages.
*/
public int size(); public int size();
/**
* Returns an iterator over the pending Messages. The subclass controls how
* the returned iterator actually traverses the list of pending messages allowing
* for the order to vary based on factors like Message priority or some other
* mechanism.
*
* @return an Iterator that returns MessageReferences contained in this list.
*/
public Iterator<MessageReference> iterator(); public Iterator<MessageReference> iterator();
/**
* Query the PendingList to determine if the given message is contained within.
*
* @param message
* The Message that is the target of this query.
*
* @return true if the MessageReference is contained in this list.
*/
public boolean contains(MessageReference message);
/**
* Returns a new Collection that contains all the MessageReferences currently
* held in this PendingList. The elements of the list are ordered using the
* same rules as the subclass uses for iteration.
*
* @return a new Collection containing this lists MessageReferences.
*/
public Collection<MessageReference> values();
/**
* Adds all the elements of the given PendingList to this PendingList.
*
* @param pendingList
* The PendingList that is to be added to this collection.
*/
public void addAll(PendingList pendingList);
} }

View File

@ -17,23 +17,27 @@
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
public class PrioritizedPendingList implements PendingList { public class PrioritizedPendingList implements PendingList {
static final Integer MAX_PRIORITY = 10;
private static final Integer MAX_PRIORITY = 10;
private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY]; private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>(); private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
public PrioritizedPendingList() { public PrioritizedPendingList() {
for (int i = 0; i < MAX_PRIORITY; i++) { for (int i = 0; i < MAX_PRIORITY; i++) {
this.lists[i] = new OrderedPendingList(); this.lists[i] = new OrderedPendingList();
} }
} }
public PendingNode addMessageFirst(MessageReference message) { public PendingNode addMessageFirst(MessageReference message) {
PendingNode node = getList(message).addMessageFirst(message); PendingNode node = getList(message).addMessageFirst(message);
this.map.put(message.getMessageId(), node); this.map.put(message.getMessageId(), node);
@ -124,9 +128,32 @@ public class PrioritizedPendingList implements PendingList {
map.remove(node.getMessage().getMessageId()); map.remove(node.getMessage().getMessageId());
node.getList().removeNode(node); node.getList().removeNode(node);
} }
}
}
@Override
public boolean contains(MessageReference message) {
if (map.values().contains(message)) {
return true;
} }
return false;
}
@Override
public Collection<MessageReference> values() {
List<MessageReference> messageReferences = new ArrayList<MessageReference>();
for (PendingNode pendingNode : map.values()) {
messageReferences.add(pendingNode.getMessage());
}
return messageReferences;
}
@Override
public void addAll(PendingList pendingList) {
for(MessageReference messageReference : pendingList) {
addMessageLast(messageReference);
}
} }
} }

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ3436Test {
protected static final Logger LOG = LoggerFactory.getLogger(AMQ3436Test.class);
private BrokerService broker;
private PersistenceAdapter adapter;
private boolean useCache = true;
private boolean prioritizeMessages = true;
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setConcurrentStoreAndDispatchQueues(false);
adapter.setConcurrentStoreAndDispatchTopics(false);
adapter.deleteAllMessages();
return adapter;
}
@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setBrokerName("priorityTest");
broker.setAdvisorySupport(false);
broker.setUseJmx(false);
adapter = createPersistenceAdapter(true);
broker.setPersistenceAdapter(adapter);
PolicyEntry policy = new PolicyEntry();
policy.setPrioritizedMessages(prioritizeMessages);
policy.setUseCache(useCache);
policy.setProducerFlowControl(false);
PolicyMap policyMap = new PolicyMap();
policyMap.put(new ActiveMQQueue("TEST"), policy);
// do not process expired for one test
PolicyEntry ignoreExpired = new PolicyEntry();
SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
ignoreExpiredStrategy.setProcessExpired(false);
ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
broker.setDestinationPolicy(policyMap);
broker.start();
broker.waitUntilStarted();
}
protected void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
@Test
public void testPriorityWhenConsumerCreatedBeforeProduction() throws Exception {
int messageCount = 200;
URI failoverUri = new URI("vm://priorityTest?jms.prefetchPolicy.all=1");
ActiveMQQueue dest = new ActiveMQQueue("TEST?consumer.dispatchAsync=false");
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri);
cf.setDispatchAsync(false);
// Create producer
ActiveMQConnection producerConnection = (ActiveMQConnection) cf.createConnection();
producerConnection.setMessagePrioritySupported(true);
producerConnection.start();
final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(dest);
ActiveMQMessageConsumer consumer;
// Create consumer on separate connection
ActiveMQConnection consumerConnection = (ActiveMQConnection) cf.createConnection();
consumerConnection.setMessagePrioritySupported(true);
consumerConnection.start();
final ActiveMQSession consumerSession = (ActiveMQSession) consumerConnection.createSession(true,
Session.SESSION_TRANSACTED);
consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest);
// Produce X number of messages with a session commit after each message
Random random = new Random();
for (int i = 0; i < messageCount; ++i) {
Message message = producerSession.createTextMessage("Test message #" + i);
producer.send(message, DeliveryMode.PERSISTENT, random.nextInt(10), 45*1000);
producerSession.commit();
}
producer.close();
// ***************************************************
// If we create the consumer here instead of above, the
// the messages will be consumed in priority order
// ***************************************************
//consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest);
// Consume all of the messages we produce using a listener.
// Don't exit until we get all the messages.
final CountDownLatch latch = new CountDownLatch(messageCount);
final StringBuffer failureMessage = new StringBuffer();
consumer.setMessageListener(new MessageListener() {
int lowestPrioritySeen = 10;
boolean firstMessage = true;
public void onMessage(Message msg) {
try {
int currentPriority = msg.getJMSPriority();
LOG.debug(currentPriority + "<=" + lowestPrioritySeen);
// Ignore the first message priority since it is prefetched
// and is out of order by design
if (firstMessage == true) {
firstMessage = false;
LOG.debug("Ignoring first message since it was prefetched");
} else {
// Verify that we never see a priority higher than the
// lowest
// priority seen
if (lowestPrioritySeen > currentPriority) {
lowestPrioritySeen = currentPriority;
}
if (lowestPrioritySeen < currentPriority) {
failureMessage.append("Incorrect priority seen (Lowest Priority = " + lowestPrioritySeen
+ " Current Priority = " + currentPriority + ")"
+ System.getProperty("line.separator"));
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
latch.countDown();
LOG.debug("Messages remaining = " + latch.getCount());
}
}
});
latch.await();
consumer.close();
// Cleanup producer resources
producerSession.close();
producerConnection.stop();
producerConnection.close();
// Cleanup consumer resources
consumerSession.close();
consumerConnection.stop();
consumerConnection.close();
// Report the failure if found
if (failureMessage.length() > 0) {
Assert.fail(failureMessage.toString());
}
}
}