ensure new high priority messages get dispatched immediately rather than at the end of the next batch, configurable via PendingDurableSubscriberMessageStoragePolicy.immediatePriorityDispatch default true, most relevant with prefetch=1

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1040151 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-11-29 15:33:03 +00:00
parent 81f3935cf3
commit 7d5494258b
4 changed files with 95 additions and 2 deletions

View File

@ -34,7 +34,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected final Destination regionDestination;
private final PendingList batchList;
private Iterator<MessageReference> iterator = null;
private boolean cacheEnabled=false;
protected boolean cacheEnabled=false;
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
protected int size;

View File

@ -49,6 +49,8 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private final PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
private final Subscription subscription;
private int lastAddPriority = 0;
private boolean immediatePriorityDispatch = true;
/**
* @param broker Broker for this cursor
* @param clientId clientId for this cursor
@ -75,6 +77,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
@Override
public synchronized void start() throws Exception {
if (!isStarted()) {
lastAddPriority = 0;
super.start();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.setMessageAudit(getMessageAudit());
@ -182,8 +185,18 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
TopicStorePrefetch tsp = topics.get(dest);
if (tsp != null) {
tsp.addMessageLast(node);
if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) {
final int priority = msg.getPriority();
if (priority > lastAddPriority) {
// go get the latest priority message
LOG.debug("Clearing cursor on high priority message " + priority);
tsp.clear();
}
lastAddPriority = priority;
}
}
}
}
}
@ -344,4 +357,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
public String toString() {
return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
}
public boolean isImmediatePriorityDispatch() {
return immediatePriorityDispatch;
}
public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
this.immediatePriorityDispatch = immediatePriorityDispatch;
}
}

View File

@ -32,6 +32,21 @@ import org.apache.activemq.kaha.Store;
* @version $Revision$
*/
public class StorePendingDurableSubscriberMessageStoragePolicy implements PendingDurableSubscriberMessageStoragePolicy {
boolean immediatePriorityDispatch = true;
public boolean isImmediatePriorityDispatch() {
return immediatePriorityDispatch;
}
/**
* Ensure that new higher priority messages will get an immediate dispatch
* rather than wait for the end of the current cursor batch.
*
* @param immediatePriorityDispatch
*/
public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
this.immediatePriorityDispatch = immediatePriorityDispatch;
}
/**
* Retrieve the configured pending message storage cursor;
@ -44,6 +59,8 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
return new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch());
return cursor;
}
}

View File

@ -30,8 +30,10 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -69,6 +71,10 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
PolicyEntry policy = new PolicyEntry();
policy.setPrioritizedMessages(prioritizeMessages);
policy.setUseCache(useCache);
StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
new StorePendingDurableSubscriberMessageStoragePolicy();
durableSubPending.setImmediatePriorityDispatch(true);
policy.setPendingDurableSubscriberPolicy(durableSubPending);
PolicyMap policyMap = new PolicyMap();
policyMap.put(new ActiveMQQueue("TEST"), policy);
policyMap.put(new ActiveMQTopic("TEST"), policy);
@ -140,6 +146,14 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
e.printStackTrace();
}
}
public void setMessagePriority(int priority) {
this.priority = priority;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
}
@ -261,5 +275,46 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
}
}
}
public void testHighPriorityDelivery() throws Exception {
// get zero prefetch
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setAll(0);
factory.setPrefetchPolicy(prefetch);
conn.close();
conn = factory.createConnection();
conn.setClientID("priority");
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
final String subName = "priorityDisconnect";
TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
sub.close();
ProducerThread producerThread = new ProducerThread(topic, 5000, LOW_PRI);
producerThread.run();
LOG.info("Low priority messages sent");
sub = sess.createDurableSubscriber(topic, subName);
for (int i=0; i<200;i++) {
Message msg = sub.receive(15000);
LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority());
}
producerThread.setMessagePriority(HIGH_PRI);
producerThread.setMessageCount(1);
producerThread.run();
LOG.info("High priority message sent");
// try and get the high priority message
Message msg = sub.receive(15000);
assertNotNull("Message was null", msg);
LOG.info("received: " + msg);
assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
}
}