diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 27ea50d3c5..53f686f714 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -2109,14 +2109,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); Message message = n.getMessage(); - if (message instanceof ActiveMQMessage) { - ActiveMQMessage activeMessage = (ActiveMQMessage) message; - try { - activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false); - } catch (JMSException e) { - LOG.warn("Failed to set boolean header", e); - } - } + message.setJMSXGroupFirstForConsumer(true); subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index e77714fbea..7c7027fedf 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -74,23 +74,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner return result; } - /** - * Assigns the message group to this subscription and set the flag on the - * message that it is the first message to be dispatched. - */ - protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { - messageGroupOwners.put(groupId, info.getConsumerId()); - Message message = n.getMessage(); - if (message instanceof ActiveMQMessage) { - ActiveMQMessage activeMessage = (ActiveMQMessage)message; - try { - activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false); - } catch (JMSException e) { - LOG.warn("Failed to set boolean header: " + e, e); - } - } - } - @Override public synchronized String toString() { return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered=" diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java index b3df690ecc..e0f0b21f1c 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java @@ -87,6 +87,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess protected boolean readOnlyBody; protected transient boolean recievedByDFBridge; protected boolean droppable; + protected boolean jmsXGroupFirstForConsumer; private transient short referenceCount; private transient ActiveMQConnection connection; @@ -156,6 +157,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess copy.brokerOutTime = brokerOutTime; copy.memoryUsage=this.memoryUsage; copy.brokerPath = brokerPath; + copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer; // lets not copy the following fields // copy.targetConsumerId = targetConsumerId; @@ -781,6 +783,17 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess return false; } + /** + * @openwire:property version=10 + */ + public boolean isJMSXGroupFirstForConsumer() { + return jmsXGroupFirstForConsumer; + } + + public void setJMSXGroupFirstForConsumer(boolean val) { + jmsXGroupFirstForConsumer = val; + } + public void compress() throws IOException { if (!isCompressed()) { storeContent(); diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java b/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java index c7973198d7..964a81d695 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java @@ -195,6 +195,13 @@ public class PropertyExpression implements Expression { return Arrays.toString(message.getBrokerPath()); } }); + JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupFirstForConsumer", new SubExpression() { + + @Override + public Object evaluate(Message message) { + return Boolean.valueOf(message.isJMSXGroupFirstForConsumer()); + } + }); } private final String name; diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java b/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java index 95fda28151..262faffd78 100644 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java @@ -105,6 +105,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { } info.setBrokerInTime(tightUnmarshalLong(wireFormat, dataIn, bs)); info.setBrokerOutTime(tightUnmarshalLong(wireFormat, dataIn, bs)); + info.setJMSXGroupFirstForConsumer(bs.readBoolean()); info.afterUnmarshall(wireFormat); @@ -147,6 +148,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { rc += tightMarshalObjectArray1(wireFormat, info.getCluster(), bs); rc+=tightMarshalLong1(wireFormat, info.getBrokerInTime(), bs); rc+=tightMarshalLong1(wireFormat, info.getBrokerOutTime(), bs); + bs.writeBoolean(info.isJMSXGroupFirstForConsumer()); return rc + 9; } @@ -191,6 +193,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { tightMarshalObjectArray2(wireFormat, info.getCluster(), dataOut, bs); tightMarshalLong2(wireFormat, info.getBrokerInTime(), dataOut, bs); tightMarshalLong2(wireFormat, info.getBrokerOutTime(), dataOut, bs); + bs.readBoolean(); info.afterMarshall(wireFormat); @@ -261,6 +264,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { } info.setBrokerInTime(looseUnmarshalLong(wireFormat, dataIn)); info.setBrokerOutTime(looseUnmarshalLong(wireFormat, dataIn)); + info.setJMSXGroupFirstForConsumer(dataIn.readBoolean()); info.afterUnmarshall(wireFormat); @@ -306,6 +310,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { looseMarshalObjectArray(wireFormat, info.getCluster(), dataOut); looseMarshalLong(wireFormat, info.getBrokerInTime(), dataOut); looseMarshalLong(wireFormat, info.getBrokerOutTime(), dataOut); + dataOut.writeBoolean(info.isJMSXGroupFirstForConsumer()); } }