mirror of https://github.com/apache/activemq.git
Added a cluster field to each message so that each message can be persisted to a unique set of brokers working together as a master slave cluster.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@546196 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b852dc87fc
commit
8ec100f564
|
@ -82,6 +82,7 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
|
|
||||||
private BrokerId [] brokerPath;
|
private BrokerId [] brokerPath;
|
||||||
protected boolean droppable = false;
|
protected boolean droppable = false;
|
||||||
|
private BrokerId [] cluster;
|
||||||
|
|
||||||
abstract public Message copy();
|
abstract public Message copy();
|
||||||
|
|
||||||
|
@ -611,4 +612,18 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
public void setDroppable(boolean droppable) {
|
public void setDroppable(boolean droppable) {
|
||||||
this.droppable = droppable;
|
this.droppable = droppable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If a message is stored in multiple nodes on a cluster,
|
||||||
|
* all the cluster members will be listed here.
|
||||||
|
* Otherwise, it will be null.
|
||||||
|
*
|
||||||
|
* @openwire:property version=3 cache=true
|
||||||
|
*/
|
||||||
|
public BrokerId[] getCluster() {
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
|
public void setCluster(BrokerId[] cluster) {
|
||||||
|
this.cluster = cluster;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,6 +92,18 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
|
||||||
info.setRecievedByDFBridge(bs.readBoolean());
|
info.setRecievedByDFBridge(bs.readBoolean());
|
||||||
info.setDroppable(bs.readBoolean());
|
info.setDroppable(bs.readBoolean());
|
||||||
|
|
||||||
|
if (bs.readBoolean()) {
|
||||||
|
short size = dataIn.readShort();
|
||||||
|
org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
|
||||||
|
for( int i=0; i < size; i++ ) {
|
||||||
|
value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs);
|
||||||
|
}
|
||||||
|
info.setCluster(value);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
info.setCluster(null);
|
||||||
|
}
|
||||||
|
|
||||||
info.afterUnmarshall(wireFormat);
|
info.afterUnmarshall(wireFormat);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -130,6 +142,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
|
||||||
rc += tightMarshalString1(info.getUserID(), bs);
|
rc += tightMarshalString1(info.getUserID(), bs);
|
||||||
bs.writeBoolean(info.isRecievedByDFBridge());
|
bs.writeBoolean(info.isRecievedByDFBridge());
|
||||||
bs.writeBoolean(info.isDroppable());
|
bs.writeBoolean(info.isDroppable());
|
||||||
|
rc += tightMarshalObjectArray1(wireFormat, info.getCluster(), bs);
|
||||||
|
|
||||||
return rc + 9;
|
return rc + 9;
|
||||||
}
|
}
|
||||||
|
@ -171,6 +184,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
|
||||||
tightMarshalString2(info.getUserID(), dataOut, bs);
|
tightMarshalString2(info.getUserID(), dataOut, bs);
|
||||||
bs.readBoolean();
|
bs.readBoolean();
|
||||||
bs.readBoolean();
|
bs.readBoolean();
|
||||||
|
tightMarshalObjectArray2(wireFormat, info.getCluster(), dataOut, bs);
|
||||||
|
|
||||||
info.afterMarshall(wireFormat);
|
info.afterMarshall(wireFormat);
|
||||||
|
|
||||||
|
@ -228,6 +242,18 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
|
||||||
info.setRecievedByDFBridge(dataIn.readBoolean());
|
info.setRecievedByDFBridge(dataIn.readBoolean());
|
||||||
info.setDroppable(dataIn.readBoolean());
|
info.setDroppable(dataIn.readBoolean());
|
||||||
|
|
||||||
|
if (dataIn.readBoolean()) {
|
||||||
|
short size = dataIn.readShort();
|
||||||
|
org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
|
||||||
|
for( int i=0; i < size; i++ ) {
|
||||||
|
value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn);
|
||||||
|
}
|
||||||
|
info.setCluster(value);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
info.setCluster(null);
|
||||||
|
}
|
||||||
|
|
||||||
info.afterUnmarshall(wireFormat);
|
info.afterUnmarshall(wireFormat);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -269,6 +295,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
|
||||||
looseMarshalString(info.getUserID(), dataOut);
|
looseMarshalString(info.getUserID(), dataOut);
|
||||||
dataOut.writeBoolean(info.isRecievedByDFBridge());
|
dataOut.writeBoolean(info.isRecievedByDFBridge());
|
||||||
dataOut.writeBoolean(info.isDroppable());
|
dataOut.writeBoolean(info.isDroppable());
|
||||||
|
looseMarshalObjectArray(wireFormat, info.getCluster(), dataOut);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,5 +81,12 @@ public abstract class MessageTestSupport extends BaseCommandTestSupport {
|
||||||
info.setUserID("UserID:16");
|
info.setUserID("UserID:16");
|
||||||
info.setRecievedByDFBridge(true);
|
info.setRecievedByDFBridge(true);
|
||||||
info.setDroppable(false);
|
info.setDroppable(false);
|
||||||
|
{
|
||||||
|
BrokerId value[] = new BrokerId[2];
|
||||||
|
for( int i=0; i < 2; i++ ) {
|
||||||
|
value[i] = createBrokerId("Cluster:17");
|
||||||
|
}
|
||||||
|
info.setCluster(value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue