diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java index 94f9f868b0..656abd3488 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java @@ -50,6 +50,7 @@ public class ConsumerInfo extends BaseCommand { protected BrokerId[] brokerPath; protected boolean optimizedAcknowledge; protected transient int currentPrefetchSize;//used by the broker + protected boolean noRangeAcks; // if true, the consumer will not send range acks. protected BooleanExpression additionalPredicate; protected transient boolean networkSubscription; //this subscription originated from a network connection @@ -338,4 +339,19 @@ public class ConsumerInfo extends BaseCommand { this.currentPrefetchSize=currentPrefetchSize; } + /** + * The broker may be able to optimize it's processing or provides better + * QOS if it knows the consumer will not be sending ranged acks. + * + * @return true if the consumer will not send range acks. + * @openwire:property version=1 + */ + public boolean isNoRangeAcks() { + return noRangeAcks; + } + + public void setNoRangeAcks(boolean noRangeAcks) { + this.noRangeAcks = noRangeAcks; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java index 2e1074db38..8abe271dda 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ConsumerInfoMarshaller.java @@ -92,6 +92,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller { info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) tightUnmarsalNestedObject(wireFormat, dataIn, bs)); info.setNetworkSubscription(bs.readBoolean()); info.setOptimizedAcknowledge(bs.readBoolean()); + info.setNoRangeAcks(bs.readBoolean()); } @@ -117,6 +118,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller { rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getAdditionalPredicate(), bs); bs.writeBoolean(info.isNetworkSubscription()); bs.writeBoolean(info.isOptimizedAcknowledge()); + bs.writeBoolean(info.isNoRangeAcks()); return rc + 9; } @@ -148,6 +150,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller { tightMarshalNestedObject2(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut, bs); bs.readBoolean(); bs.readBoolean(); + bs.readBoolean(); } @@ -189,6 +192,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller { info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) looseUnmarsalNestedObject(wireFormat, dataIn)); info.setNetworkSubscription(dataIn.readBoolean()); info.setOptimizedAcknowledge(dataIn.readBoolean()); + info.setNoRangeAcks(dataIn.readBoolean()); } @@ -217,6 +221,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller { looseMarshalNestedObject(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut); dataOut.writeBoolean(info.isNetworkSubscription()); dataOut.writeBoolean(info.isOptimizedAcknowledge()); + dataOut.writeBoolean(info.isNoRangeAcks()); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java index 135e2f739c..5ab51a978d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java @@ -73,6 +73,7 @@ public class ConsumerInfoTest extends BaseCommandTestSupport { info.setAdditionalPredicate(createBooleanExpression("AdditionalPredicate:6")); info.setNetworkSubscription(false); info.setOptimizedAcknowledge(true); + info.setNoRangeAcks(false); } } diff --git a/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/BrokerInfo.cs b/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/BrokerInfo.cs index d9ba67b425..73d43edead 100644 --- a/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/BrokerInfo.cs +++ b/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/BrokerInfo.cs @@ -41,6 +41,8 @@ namespace ActiveMQ.Commands BrokerInfo[] peerBrokerInfos; string brokerName; bool slaveBroker; + bool masterBroker; + bool faultTolerantConfiguration; public override string ToString() { return GetType().Name + "[" @@ -49,6 +51,8 @@ namespace ActiveMQ.Commands + " PeerBrokerInfos=" + PeerBrokerInfos + " BrokerName=" + BrokerName + " SlaveBroker=" + SlaveBroker + + " MasterBroker=" + MasterBroker + + " FaultTolerantConfiguration=" + FaultTolerantConfiguration + " ]"; } @@ -92,5 +96,17 @@ namespace ActiveMQ.Commands set { this.slaveBroker = value; } } + public bool MasterBroker + { + get { return masterBroker; } + set { this.masterBroker = value; } + } + + public bool FaultTolerantConfiguration + { + get { return faultTolerantConfiguration; } + set { this.faultTolerantConfiguration = value; } + } + } } diff --git a/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/ConnectionInfo.cs b/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/ConnectionInfo.cs index 9c2a3a4c3b..390cfc0a00 100644 --- a/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/ConnectionInfo.cs +++ b/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/ConnectionInfo.cs @@ -41,6 +41,8 @@ namespace ActiveMQ.Commands string password; string userName; BrokerId[] brokerPath; + bool brokerMasterConnector; + bool manageable; public override string ToString() { return GetType().Name + "[" @@ -49,6 +51,8 @@ namespace ActiveMQ.Commands + " Password=" + Password + " UserName=" + UserName + " BrokerPath=" + BrokerPath + + " BrokerMasterConnector=" + BrokerMasterConnector + + " Manageable=" + Manageable + " ]"; } @@ -92,5 +96,17 @@ namespace ActiveMQ.Commands set { this.brokerPath = value; } } + public bool BrokerMasterConnector + { + get { return brokerMasterConnector; } + set { this.brokerMasterConnector = value; } + } + + public bool Manageable + { + get { return manageable; } + set { this.manageable = value; } + } + } } diff --git a/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/ConsumerInfo.cs b/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/ConsumerInfo.cs index c1a06a977c..739cc4c836 100644 --- a/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/ConsumerInfo.cs +++ b/activemq-dotnet/src/main/csharp/ActiveMQ/Commands/ConsumerInfo.cs @@ -51,6 +51,8 @@ namespace ActiveMQ.Commands BrokerId[] brokerPath; BooleanExpression additionalPredicate; bool networkSubscription; + bool optimizedAcknowledge; + bool noRangeAcks; public override string ToString() { return GetType().Name + "[" @@ -69,6 +71,8 @@ namespace ActiveMQ.Commands + " BrokerPath=" + BrokerPath + " AdditionalPredicate=" + AdditionalPredicate + " NetworkSubscription=" + NetworkSubscription + + " OptimizedAcknowledge=" + OptimizedAcknowledge + + " NoRangeAcks=" + NoRangeAcks + " ]"; } @@ -172,5 +176,17 @@ namespace ActiveMQ.Commands set { this.networkSubscription = value; } } + public bool OptimizedAcknowledge + { + get { return optimizedAcknowledge; } + set { this.optimizedAcknowledge = value; } + } + + public bool NoRangeAcks + { + get { return noRangeAcks; } + set { this.noRangeAcks = value; } + } + } } diff --git a/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/BrokerInfoMarshaller.cs b/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/BrokerInfoMarshaller.cs index 851bb70c26..10c32a1106 100644 --- a/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/BrokerInfoMarshaller.cs +++ b/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/BrokerInfoMarshaller.cs @@ -72,6 +72,8 @@ namespace ActiveMQ.OpenWire.V1 } info.BrokerName = TightUnmarshalString(dataIn, bs); info.SlaveBroker = bs.ReadBoolean(); + info.MasterBroker = bs.ReadBoolean(); + info.FaultTolerantConfiguration = bs.ReadBoolean(); } @@ -87,6 +89,8 @@ namespace ActiveMQ.OpenWire.V1 rc += TightMarshalObjectArray1(wireFormat, info.PeerBrokerInfos, bs); rc += TightMarshalString1(info.BrokerName, bs); bs.WriteBoolean(info.SlaveBroker); + bs.WriteBoolean(info.MasterBroker); + bs.WriteBoolean(info.FaultTolerantConfiguration); return rc + 0; } @@ -103,6 +107,8 @@ namespace ActiveMQ.OpenWire.V1 TightMarshalObjectArray2(wireFormat, info.PeerBrokerInfos, dataOut, bs); TightMarshalString2(info.BrokerName, dataOut, bs); bs.ReadBoolean(); + bs.ReadBoolean(); + bs.ReadBoolean(); } @@ -130,6 +136,8 @@ namespace ActiveMQ.OpenWire.V1 } info.BrokerName = LooseUnmarshalString(dataIn); info.SlaveBroker = dataIn.ReadBoolean(); + info.MasterBroker = dataIn.ReadBoolean(); + info.FaultTolerantConfiguration = dataIn.ReadBoolean(); } @@ -146,6 +154,8 @@ namespace ActiveMQ.OpenWire.V1 LooseMarshalObjectArray(wireFormat, info.PeerBrokerInfos, dataOut); LooseMarshalString(info.BrokerName, dataOut); dataOut.Write(info.SlaveBroker); + dataOut.Write(info.MasterBroker); + dataOut.Write(info.FaultTolerantConfiguration); } diff --git a/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/ConnectionInfoMarshaller.cs b/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/ConnectionInfoMarshaller.cs index 53d67c0d58..ec6dd2d5db 100644 --- a/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/ConnectionInfoMarshaller.cs +++ b/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/ConnectionInfoMarshaller.cs @@ -72,6 +72,8 @@ namespace ActiveMQ.OpenWire.V1 else { info.BrokerPath = null; } + info.BrokerMasterConnector = bs.ReadBoolean(); + info.Manageable = bs.ReadBoolean(); } @@ -87,6 +89,8 @@ namespace ActiveMQ.OpenWire.V1 rc += TightMarshalString1(info.Password, bs); rc += TightMarshalString1(info.UserName, bs); rc += TightMarshalObjectArray1(wireFormat, info.BrokerPath, bs); + bs.WriteBoolean(info.BrokerMasterConnector); + bs.WriteBoolean(info.Manageable); return rc + 0; } @@ -103,6 +107,8 @@ namespace ActiveMQ.OpenWire.V1 TightMarshalString2(info.Password, dataOut, bs); TightMarshalString2(info.UserName, dataOut, bs); TightMarshalObjectArray2(wireFormat, info.BrokerPath, dataOut, bs); + bs.ReadBoolean(); + bs.ReadBoolean(); } @@ -130,6 +136,8 @@ namespace ActiveMQ.OpenWire.V1 else { info.BrokerPath = null; } + info.BrokerMasterConnector = dataIn.ReadBoolean(); + info.Manageable = dataIn.ReadBoolean(); } @@ -146,6 +154,8 @@ namespace ActiveMQ.OpenWire.V1 LooseMarshalString(info.Password, dataOut); LooseMarshalString(info.UserName, dataOut); LooseMarshalObjectArray(wireFormat, info.BrokerPath, dataOut); + dataOut.Write(info.BrokerMasterConnector); + dataOut.Write(info.Manageable); } diff --git a/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/ConsumerInfoMarshaller.cs b/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/ConsumerInfoMarshaller.cs index c9f036c1e0..ca1ed00f07 100644 --- a/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/ConsumerInfoMarshaller.cs +++ b/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/ConsumerInfoMarshaller.cs @@ -82,6 +82,8 @@ namespace ActiveMQ.OpenWire.V1 } info.AdditionalPredicate = (BooleanExpression) TightUnmarshalNestedObject(wireFormat, dataIn, bs); info.NetworkSubscription = bs.ReadBoolean(); + info.OptimizedAcknowledge = bs.ReadBoolean(); + info.NoRangeAcks = bs.ReadBoolean(); } @@ -104,6 +106,8 @@ namespace ActiveMQ.OpenWire.V1 rc += TightMarshalObjectArray1(wireFormat, info.BrokerPath, bs); rc += TightMarshalNestedObject1(wireFormat, (DataStructure)info.AdditionalPredicate, bs); bs.WriteBoolean(info.NetworkSubscription); + bs.WriteBoolean(info.OptimizedAcknowledge); + bs.WriteBoolean(info.NoRangeAcks); return rc + 9; } @@ -130,6 +134,8 @@ namespace ActiveMQ.OpenWire.V1 TightMarshalObjectArray2(wireFormat, info.BrokerPath, dataOut, bs); TightMarshalNestedObject2(wireFormat, (DataStructure)info.AdditionalPredicate, dataOut, bs); bs.ReadBoolean(); + bs.ReadBoolean(); + bs.ReadBoolean(); } @@ -167,6 +173,8 @@ namespace ActiveMQ.OpenWire.V1 } info.AdditionalPredicate = (BooleanExpression) LooseUnmarshalNestedObject(wireFormat, dataIn); info.NetworkSubscription = dataIn.ReadBoolean(); + info.OptimizedAcknowledge = dataIn.ReadBoolean(); + info.NoRangeAcks = dataIn.ReadBoolean(); } @@ -193,6 +201,8 @@ namespace ActiveMQ.OpenWire.V1 LooseMarshalObjectArray(wireFormat, info.BrokerPath, dataOut); LooseMarshalNestedObject(wireFormat, (DataStructure)info.AdditionalPredicate, dataOut); dataOut.Write(info.NetworkSubscription); + dataOut.Write(info.OptimizedAcknowledge); + dataOut.Write(info.NoRangeAcks); } diff --git a/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs b/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs index 7c0752067b..e95426573d 100644 --- a/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs +++ b/activemq-dotnet/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs @@ -81,8 +81,10 @@ namespace ActiveMQ.OpenWire.V1 format.addMarshaller(new DestinationInfoMarshaller()); format.addMarshaller(new ShutdownInfoMarshaller()); format.addMarshaller(new DataResponseMarshaller()); + format.addMarshaller(new ConnectionControlMarshaller()); format.addMarshaller(new KeepAliveInfoMarshaller()); format.addMarshaller(new FlushCommandMarshaller()); + format.addMarshaller(new ConsumerControlMarshaller()); format.addMarshaller(new JournalTopicAckMarshaller()); format.addMarshaller(new BrokerIdMarshaller()); format.addMarshaller(new MessageDispatchMarshaller()); diff --git a/openwire-c/src/libopenwire/ow_commands_v1.c b/openwire-c/src/libopenwire/ow_commands_v1.c index 06161ab7ed..7cfd279ab0 100644 --- a/openwire-c/src/libopenwire/ow_commands_v1.c +++ b/openwire-c/src/libopenwire/ow_commands_v1.c @@ -1179,6 +1179,7 @@ apr_status_t ow_marshal1_ConsumerInfo(ow_bit_buffer *buffer, ow_ConsumerInfo *ob SUCCESS_CHECK(ow_marshal1_nested_object(buffer, (ow_DataStructure*)object->additionalPredicate)); ow_bit_buffer_append(buffer, object->networkSubscription); ow_bit_buffer_append(buffer, object->optimizedAcknowledge); + ow_bit_buffer_append(buffer, object->noRangeAcks); return APR_SUCCESS; } @@ -1200,6 +1201,7 @@ apr_status_t ow_marshal2_ConsumerInfo(ow_byte_buffer *buffer, ow_bit_buffer *bit SUCCESS_CHECK(ow_marshal2_DataStructure_array(buffer, bitbuffer, object->brokerPath)); SUCCESS_CHECK(ow_marshal2_nested_object(buffer, bitbuffer, (ow_DataStructure*)object->additionalPredicate)); ow_bit_buffer_read(bitbuffer); + ow_bit_buffer_read(bitbuffer); ow_bit_buffer_read(bitbuffer); return APR_SUCCESS; @@ -1224,6 +1226,7 @@ apr_status_t ow_unmarshal_ConsumerInfo(ow_byte_array *buffer, ow_bit_buffer *bit SUCCESS_CHECK(ow_unmarshal_nested_object(buffer, bitbuffer, (ow_DataStructure**)&object->additionalPredicate, pool)); object->networkSubscription = ow_bit_buffer_read(bitbuffer); object->optimizedAcknowledge = ow_bit_buffer_read(bitbuffer); + object->noRangeAcks = ow_bit_buffer_read(bitbuffer); return APR_SUCCESS; } diff --git a/openwire-c/src/libopenwire/ow_commands_v1.h b/openwire-c/src/libopenwire/ow_commands_v1.h index d7c813e3a4..502ddbaa14 100644 --- a/openwire-c/src/libopenwire/ow_commands_v1.h +++ b/openwire-c/src/libopenwire/ow_commands_v1.h @@ -372,6 +372,7 @@ typedef struct ow_ConsumerInfo { struct ow_BooleanExpression *additionalPredicate; ow_boolean networkSubscription; ow_boolean optimizedAcknowledge; + ow_boolean noRangeAcks; } ow_ConsumerInfo; ow_ConsumerInfo *ow_ConsumerInfo_create(apr_pool_t *pool);