diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index a437d4de56..314951b289 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -34,7 +34,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i protected final Destination regionDestination; private final PendingList batchList; private Iterator iterator = null; - private boolean cacheEnabled=false; + protected boolean cacheEnabled=false; protected boolean batchResetNeeded = true; protected boolean storeHasMessages = false; protected int size; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 893b3fe1b2..d71bb29f69 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -49,6 +49,8 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private final PendingMessageCursor nonPersistent; private PendingMessageCursor currentCursor; private final Subscription subscription; + private int lastAddPriority = 0; + private boolean immediatePriorityDispatch = true; /** * @param broker Broker for this cursor * @param clientId clientId for this cursor @@ -75,6 +77,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { @Override public synchronized void start() throws Exception { if (!isStarted()) { + lastAddPriority = 0; super.start(); for (PendingMessageCursor tsp : storePrefetches) { tsp.setMessageAudit(getMessageAudit()); @@ -182,8 +185,18 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { TopicStorePrefetch tsp = topics.get(dest); if (tsp != null) { tsp.addMessageLast(node); + if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) { + final int priority = msg.getPriority(); + if (priority > lastAddPriority) { + // go get the latest priority message + LOG.debug("Clearing cursor on high priority message " + priority); + tsp.clear(); + } + lastAddPriority = priority; + } } } + } } @@ -344,4 +357,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { public String toString() { return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")"; } + + public boolean isImmediatePriorityDispatch() { + return immediatePriorityDispatch; + } + + public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) { + this.immediatePriorityDispatch = immediatePriorityDispatch; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java index 95eff5b94c..3293ccf45b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java @@ -32,6 +32,21 @@ import org.apache.activemq.kaha.Store; * @version $Revision$ */ public class StorePendingDurableSubscriberMessageStoragePolicy implements PendingDurableSubscriberMessageStoragePolicy { + boolean immediatePriorityDispatch = true; + + public boolean isImmediatePriorityDispatch() { + return immediatePriorityDispatch; + } + + /** + * Ensure that new higher priority messages will get an immediate dispatch + * rather than wait for the end of the current cursor batch. + * + * @param immediatePriorityDispatch + */ + public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) { + this.immediatePriorityDispatch = immediatePriorityDispatch; + } /** * Retrieve the configured pending message storage cursor; @@ -44,6 +59,8 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin * @return the Pending Message cursor */ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) { - return new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub); + StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub); + cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch()); + return cursor; } } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index 87a66e3a28..b6708c303f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -30,8 +30,10 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -69,6 +71,10 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { PolicyEntry policy = new PolicyEntry(); policy.setPrioritizedMessages(prioritizeMessages); policy.setUseCache(useCache); + StorePendingDurableSubscriberMessageStoragePolicy durableSubPending = + new StorePendingDurableSubscriberMessageStoragePolicy(); + durableSubPending.setImmediatePriorityDispatch(true); + policy.setPendingDurableSubscriberPolicy(durableSubPending); PolicyMap policyMap = new PolicyMap(); policyMap.put(new ActiveMQQueue("TEST"), policy); policyMap.put(new ActiveMQTopic("TEST"), policy); @@ -140,6 +146,14 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { e.printStackTrace(); } } + + public void setMessagePriority(int priority) { + this.priority = priority; + } + + public void setMessageCount(int messageCount) { + this.messageCount = messageCount; + } } @@ -261,5 +275,46 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { } } } + + public void testHighPriorityDelivery() throws Exception { + + // get zero prefetch + ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); + prefetch.setAll(0); + factory.setPrefetchPolicy(prefetch); + conn.close(); + conn = factory.createConnection(); + conn.setClientID("priority"); + conn.start(); + sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST"); + final String subName = "priorityDisconnect"; + TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); + sub.close(); + + ProducerThread producerThread = new ProducerThread(topic, 5000, LOW_PRI); + producerThread.run(); + LOG.info("Low priority messages sent"); + + sub = sess.createDurableSubscriber(topic, subName); + for (int i=0; i<200;i++) { + Message msg = sub.receive(15000); + LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null)); + assertNotNull("Message " + i + " was null", msg); + assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority()); + } + + producerThread.setMessagePriority(HIGH_PRI); + producerThread.setMessageCount(1); + producerThread.run(); + LOG.info("High priority message sent"); + + // try and get the high priority message + Message msg = sub.receive(15000); + assertNotNull("Message was null", msg); + LOG.info("received: " + msg); + assertEquals("high priority", HIGH_PRI, msg.getJMSPriority()); + } }