Added support for AMQ-798 to enable a new boolean header called JMSXGroupFirstForConsumer

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@419930 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-07-07 17:25:03 +00:00
parent 288f8767c3
commit 6df6d58efe
1 changed files with 26 additions and 2 deletions

View File

@ -19,19 +19,25 @@ package org.apache.activemq.broker.region;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
public class QueueSubscription extends PrefetchSubscription implements LockOwner { public class QueueSubscription extends PrefetchSubscription implements LockOwner {
private static final Log log = LogFactory.getLog(QueueSubscription.class);
public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info); super(broker,context, info);
} }
@ -80,7 +86,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
// If we can own the first, then no-one else should own the rest. // If we can own the first, then no-one else should own the rest.
if( sequence == 1 ) { if( sequence == 1 ) {
if( node.lock(this) ) { if( node.lock(this) ) {
messageGroupOwners.put(groupId, info.getConsumerId()); assignGroupToMe(messageGroupOwners, n, groupId);
return true; return true;
} else { } else {
return false; return false;
@ -94,7 +100,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
groupOwner = messageGroupOwners.get(groupId); groupOwner = messageGroupOwners.get(groupId);
if( groupOwner==null ) { if( groupOwner==null ) {
if( node.lock(this) ) { if( node.lock(this) ) {
messageGroupOwners.put(groupId, info.getConsumerId()); assignGroupToMe(messageGroupOwners, n, groupId);
return true; return true;
} else { } else {
return false; return false;
@ -117,6 +123,24 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
} }
} }
/**
* Assigns the message group to this subscription and set the flag on the message that it is the first message
* to be dispatched.
*/
protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
messageGroupOwners.put(groupId, info.getConsumerId());
Message message = n.getMessage();
if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
try {
activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true);
}
catch (JMSException e) {
log.warn("Failed to set boolean header: " + e, e);
}
}
}
public String toString() { public String toString() {
return return