From 6df6d58efea43b369cd088d609d077aa901482e4 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 7 Jul 2006 17:25:03 +0000 Subject: [PATCH] Added support for AMQ-798 to enable a new boolean header called JMSXGroupFirstForConsumer git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@419930 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/QueueSubscription.java | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index b7f9cb8cd5..0253d3260c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -19,19 +19,25 @@ package org.apache.activemq.broker.region; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.group.MessageGroupMap; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.transaction.Synchronization; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; import java.io.IOException; import java.util.Iterator; public class QueueSubscription extends PrefetchSubscription implements LockOwner { + private static final Log log = LogFactory.getLog(QueueSubscription.class); + public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { super(broker,context, info); } @@ -80,7 +86,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner // If we can own the first, then no-one else should own the rest. if( sequence == 1 ) { if( node.lock(this) ) { - messageGroupOwners.put(groupId, info.getConsumerId()); + assignGroupToMe(messageGroupOwners, n, groupId); return true; } else { return false; @@ -94,7 +100,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner groupOwner = messageGroupOwners.get(groupId); if( groupOwner==null ) { if( node.lock(this) ) { - messageGroupOwners.put(groupId, info.getConsumerId()); + assignGroupToMe(messageGroupOwners, n, groupId); return true; } else { return false; @@ -117,6 +123,24 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner } } + + /** + * Assigns the message group to this subscription and set the flag on the message that it is the first message + * to be dispatched. + */ + protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { + messageGroupOwners.put(groupId, info.getConsumerId()); + Message message = n.getMessage(); + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage) message; + try { + activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true); + } + catch (JMSException e) { + log.warn("Failed to set boolean header: " + e, e); + } + } + } public String toString() { return