https://issues.apache.org/jira/browse/AMQ-4092 - resolve by removing dynamic property modification - JMSXGroupFirstForConsumer is now a message attribute with a property accessor

This commit is contained in:
gtully 2013-09-17 11:03:18 +01:00
parent d771ebb97e
commit dd91e8592e
5 changed files with 26 additions and 25 deletions

View File

@ -2109,14 +2109,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
Message message = n.getMessage();
if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
try {
activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
} catch (JMSException e) {
LOG.warn("Failed to set boolean header", e);
}
}
message.setJMSXGroupFirstForConsumer(true);
subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);
}

View File

@ -74,23 +74,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
return result;
}
/**
* Assigns the message group to this subscription and set the flag on the
* message that it is the first message to be dispatched.
*/
protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
messageGroupOwners.put(groupId, info.getConsumerId());
Message message = n.getMessage();
if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMessage = (ActiveMQMessage)message;
try {
activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
} catch (JMSException e) {
LOG.warn("Failed to set boolean header: " + e, e);
}
}
}
@Override
public synchronized String toString() {
return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="

View File

@ -87,6 +87,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
protected boolean readOnlyBody;
protected transient boolean recievedByDFBridge;
protected boolean droppable;
protected boolean jmsXGroupFirstForConsumer;
private transient short referenceCount;
private transient ActiveMQConnection connection;
@ -156,6 +157,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
copy.brokerOutTime = brokerOutTime;
copy.memoryUsage=this.memoryUsage;
copy.brokerPath = brokerPath;
copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
// lets not copy the following fields
// copy.targetConsumerId = targetConsumerId;
@ -781,6 +783,17 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
return false;
}
/**
* @openwire:property version=10
*/
public boolean isJMSXGroupFirstForConsumer() {
return jmsXGroupFirstForConsumer;
}
public void setJMSXGroupFirstForConsumer(boolean val) {
jmsXGroupFirstForConsumer = val;
}
public void compress() throws IOException {
if (!isCompressed()) {
storeContent();

View File

@ -195,6 +195,13 @@ public class PropertyExpression implements Expression {
return Arrays.toString(message.getBrokerPath());
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupFirstForConsumer", new SubExpression() {
@Override
public Object evaluate(Message message) {
return Boolean.valueOf(message.isJMSXGroupFirstForConsumer());
}
});
}
private final String name;

View File

@ -105,6 +105,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
}
info.setBrokerInTime(tightUnmarshalLong(wireFormat, dataIn, bs));
info.setBrokerOutTime(tightUnmarshalLong(wireFormat, dataIn, bs));
info.setJMSXGroupFirstForConsumer(bs.readBoolean());
info.afterUnmarshall(wireFormat);
@ -147,6 +148,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
rc += tightMarshalObjectArray1(wireFormat, info.getCluster(), bs);
rc+=tightMarshalLong1(wireFormat, info.getBrokerInTime(), bs);
rc+=tightMarshalLong1(wireFormat, info.getBrokerOutTime(), bs);
bs.writeBoolean(info.isJMSXGroupFirstForConsumer());
return rc + 9;
}
@ -191,6 +193,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
tightMarshalObjectArray2(wireFormat, info.getCluster(), dataOut, bs);
tightMarshalLong2(wireFormat, info.getBrokerInTime(), dataOut, bs);
tightMarshalLong2(wireFormat, info.getBrokerOutTime(), dataOut, bs);
bs.readBoolean();
info.afterMarshall(wireFormat);
@ -261,6 +264,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
}
info.setBrokerInTime(looseUnmarshalLong(wireFormat, dataIn));
info.setBrokerOutTime(looseUnmarshalLong(wireFormat, dataIn));
info.setJMSXGroupFirstForConsumer(dataIn.readBoolean());
info.afterUnmarshall(wireFormat);
@ -306,6 +310,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
looseMarshalObjectArray(wireFormat, info.getCluster(), dataOut);
looseMarshalLong(wireFormat, info.getBrokerInTime(), dataOut);
looseMarshalLong(wireFormat, info.getBrokerOutTime(), dataOut);
dataOut.writeBoolean(info.isJMSXGroupFirstForConsumer());
}
}