diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index b423a86579..5428470aa2 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -506,6 +506,9 @@ **/BrokerNetworkWithStuckMessagesTest.* + + **/JDBCMessagePriorityTest.* + diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java index 075160cc9d..4d3c331e74 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java @@ -82,7 +82,7 @@ public class PrioritizedPendingList implements PendingList { protected int getPriority(MessageReference message) { int priority = javax.jms.Message.DEFAULT_PRIORITY; if (message.getMessageId() != null) { - Math.max(message.getMessage().getPriority(), 0); + priority = Math.max(message.getMessage().getPriority(), 0); priority = Math.min(priority, 9); } return priority; diff --git a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index 735a47d047..c5fedd23e0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -18,19 +18,28 @@ package org.apache.activemq.store; import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; - -import junit.framework.TestCase; +import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; -abstract public class MessagePriorityTest extends TestCase { +abstract public class MessagePriorityTest extends CombinationTestSupport { + + private static final Log LOG = LogFactory.getLog(MessagePriorityTest.class); BrokerService broker; PersistenceAdapter adapter; @@ -39,6 +48,12 @@ abstract public class MessagePriorityTest extends TestCase { Connection conn; Session sess; + public boolean useCache; + + int MSG_NUM = 1000; + int HIGH_PRI = 7; + int LOW_PRI = 3; + abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception; protected void setUp() throws Exception { @@ -48,6 +63,7 @@ abstract public class MessagePriorityTest extends TestCase { broker.setPersistenceAdapter(adapter); PolicyEntry policy = new PolicyEntry(); policy.setPrioritizedMessages(true); + policy.setUseCache(useCache); PolicyMap policyMap = new PolicyMap(); policyMap.setDefaultEntry(policy); broker.setDestinationPolicy(policyMap); @@ -56,6 +72,8 @@ abstract public class MessagePriorityTest extends TestCase { factory = new ActiveMQConnectionFactory("vm://priorityTest"); conn = factory.createConnection(); + conn.setClientID("priority"); + conn.start(); sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); } @@ -75,7 +93,7 @@ abstract public class MessagePriorityTest extends TestCase { MessageProducer topicProducer = sess.createProducer(topic); - Thread.sleep(100); // get it all propagated + Thread.sleep(500); // get it all propagated assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages()); assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages()); @@ -85,4 +103,84 @@ abstract public class MessagePriorityTest extends TestCase { } + class ProducerThread extends Thread { + + int priority; + int messageCount; + ActiveMQDestination dest; + + public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) { + this.messageCount = messageCount; + this.priority = priority; + this.dest = dest; + } + + public void run() { + try { + MessageProducer producer = sess.createProducer(dest); + producer.setPriority(priority); + for (int i = 0; i < messageCount; i++) { + producer.send(sess.createTextMessage("message priority: " + priority)); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + } + + public void initCombosForTestQueues() { + addCombinationValues("useCache", new Object[] {new Boolean(true), new Boolean(false)}); + } + + public void testQueues() throws Exception { + ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST"); + + ProducerThread lowPri = new ProducerThread(queue, MSG_NUM, LOW_PRI); + ProducerThread highPri = new ProducerThread(queue, MSG_NUM, HIGH_PRI); + + lowPri.start(); + highPri.start(); + + lowPri.join(); + highPri.join(); + + MessageConsumer queueConsumer = sess.createConsumer(queue); + for (int i = 0; i < MSG_NUM * 2; i++) { + Message msg = queueConsumer.receive(1000); + assertNotNull(msg); + assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); + } + } + + protected Message createMessage(int priority) throws Exception { + final String text = "Message with priority " + priority; + Message msg = sess.createTextMessage(text); + LOG.info("Sending " + text); + return msg; + } + + public void testDurableSubs() throws Exception { + ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST"); + TopicSubscriber sub = sess.createDurableSubscriber(topic, "priority"); + sub.close(); + + ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI); + ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI); + + lowPri.start(); + highPri.start(); + + lowPri.join(); + highPri.join(); + + sub = sess.createDurableSubscriber(topic, "priority"); + for (int i = 0; i < MSG_NUM * 2; i++) { + Message msg = sub.receive(1000); + assertNotNull(msg); + assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); + } + + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java index 6dceb760f5..8f55505852 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.store.jdbc; +import junit.framework.Test; + import org.apache.activemq.store.MessagePriorityTest; import org.apache.activemq.store.PersistenceAdapter; import org.apache.derby.jdbc.EmbeddedDataSource; @@ -30,7 +32,12 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { dataSource.setDatabaseName("derbyDb"); dataSource.setCreateDatabase("create"); jdbc.setDataSource(dataSource); + jdbc.deleteAllMessages(); return jdbc; } + + public static Test suite() { + return suite(JDBCMessagePriorityTest.class); + } }