add queue priority with backlog test, disable cache and expiry processing

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1429818 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-01-07 14:40:35 +00:00
parent 667237f72a
commit f79f7ad426
1 changed files with 32 additions and 1 deletions

View File

@ -59,6 +59,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
public boolean prioritizeMessages = true;
public boolean immediatePriorityDispatch = true;
public int prefetchVal = 500;
public int expireMessagePeriod = 30000;
public int MSG_NUM = 600;
public int HIGH_PRI = 7;
@ -75,6 +76,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
PolicyEntry policy = new PolicyEntry();
policy.setPrioritizedMessages(prioritizeMessages);
policy.setUseCache(useCache);
policy.setExpireMessagesPeriod(expireMessagePeriod);
StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
new StorePendingDurableSubscriberMessageStoragePolicy();
durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch);
@ -206,7 +208,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
}
protected Message createMessage(int priority) throws Exception {
final String text = "priority " + priority;
Message msg = sess.createTextMessage(text);
@ -550,4 +552,33 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
}
}
public void initCombosForTestQueueBacklog() {
// the cache limits the priority ordering to available memory
addCombinationValues("useCache", new Object[] {new Boolean(false)});
// expiry processing can fill the cursor with a snapshot of the producer
// priority, before producers are complete
addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)});
}
public void testQueueBacklog() throws Exception {
final int backlog = 180000;
ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
ProducerThread lowPri = new ProducerThread(queue, backlog, LOW_PRI);
ProducerThread highPri = new ProducerThread(queue, 10, HIGH_PRI);
lowPri.start();
lowPri.join();
highPri.start();
highPri.join();
LOG.info("Starting consumer...");
MessageConsumer queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < 500; i++) {
Message msg = queueConsumer.receive(5000);
LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < 10 ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
}
}