diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java index e4f40142e6..7d6c9cf322 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java @@ -82,6 +82,7 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess private BrokerId [] brokerPath; protected boolean droppable = false; + private BrokerId [] cluster; abstract public Message copy(); @@ -611,4 +612,18 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess public void setDroppable(boolean droppable) { this.droppable = droppable; } + + /** + * If a message is stored in multiple nodes on a cluster, + * all the cluster members will be listed here. + * Otherwise, it will be null. + * + * @openwire:property version=3 cache=true + */ + public BrokerId[] getCluster() { + return cluster; + } + public void setCluster(BrokerId[] cluster) { + this.cluster = cluster; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java index 2531d6cb9c..b72d74f5c4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MessageMarshaller.java @@ -92,6 +92,18 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { info.setRecievedByDFBridge(bs.readBoolean()); info.setDroppable(bs.readBoolean()); + if (bs.readBoolean()) { + short size = dataIn.readShort(); + org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size]; + for( int i=0; i < size; i++ ) { + value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs); + } + info.setCluster(value); + } + else { + info.setCluster(null); + } + info.afterUnmarshall(wireFormat); } @@ -130,6 +142,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { rc += tightMarshalString1(info.getUserID(), bs); bs.writeBoolean(info.isRecievedByDFBridge()); bs.writeBoolean(info.isDroppable()); + rc += tightMarshalObjectArray1(wireFormat, info.getCluster(), bs); return rc + 9; } @@ -171,6 +184,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { tightMarshalString2(info.getUserID(), dataOut, bs); bs.readBoolean(); bs.readBoolean(); + tightMarshalObjectArray2(wireFormat, info.getCluster(), dataOut, bs); info.afterMarshall(wireFormat); @@ -228,6 +242,18 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { info.setRecievedByDFBridge(dataIn.readBoolean()); info.setDroppable(dataIn.readBoolean()); + if (dataIn.readBoolean()) { + short size = dataIn.readShort(); + org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size]; + for( int i=0; i < size; i++ ) { + value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn); + } + info.setCluster(value); + } + else { + info.setCluster(null); + } + info.afterUnmarshall(wireFormat); } @@ -269,6 +295,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { looseMarshalString(info.getUserID(), dataOut); dataOut.writeBoolean(info.isRecievedByDFBridge()); dataOut.writeBoolean(info.isDroppable()); + looseMarshalObjectArray(wireFormat, info.getCluster(), dataOut); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java index 92d759500b..ef7ca2d1dd 100644 --- a/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v3/MessageTestSupport.java @@ -81,5 +81,12 @@ public abstract class MessageTestSupport extends BaseCommandTestSupport { info.setUserID("UserID:16"); info.setRecievedByDFBridge(true); info.setDroppable(false); + { + BrokerId value[] = new BrokerId[2]; + for( int i=0; i < 2; i++ ) { + value[i] = createBrokerId("Cluster:17"); + } + info.setCluster(value); + } } }