mirror of https://github.com/apache/activemq.git
Added a 'noRangeAcks' flag to the ConsumerInfo command
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@397960 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4009fda30c
commit
f1a0614066
|
@ -50,6 +50,7 @@ public class ConsumerInfo extends BaseCommand {
|
|||
protected BrokerId[] brokerPath;
|
||||
protected boolean optimizedAcknowledge;
|
||||
protected transient int currentPrefetchSize;//used by the broker
|
||||
protected boolean noRangeAcks; // if true, the consumer will not send range acks.
|
||||
|
||||
protected BooleanExpression additionalPredicate;
|
||||
protected transient boolean networkSubscription; //this subscription originated from a network connection
|
||||
|
@ -338,4 +339,19 @@ public class ConsumerInfo extends BaseCommand {
|
|||
this.currentPrefetchSize=currentPrefetchSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* The broker may be able to optimize it's processing or provides better
|
||||
* QOS if it knows the consumer will not be sending ranged acks.
|
||||
*
|
||||
* @return true if the consumer will not send range acks.
|
||||
* @openwire:property version=1
|
||||
*/
|
||||
public boolean isNoRangeAcks() {
|
||||
return noRangeAcks;
|
||||
}
|
||||
|
||||
public void setNoRangeAcks(boolean noRangeAcks) {
|
||||
this.noRangeAcks = noRangeAcks;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -92,6 +92,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
|
|||
info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
|
||||
info.setNetworkSubscription(bs.readBoolean());
|
||||
info.setOptimizedAcknowledge(bs.readBoolean());
|
||||
info.setNoRangeAcks(bs.readBoolean());
|
||||
|
||||
}
|
||||
|
||||
|
@ -117,6 +118,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
|
|||
rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getAdditionalPredicate(), bs);
|
||||
bs.writeBoolean(info.isNetworkSubscription());
|
||||
bs.writeBoolean(info.isOptimizedAcknowledge());
|
||||
bs.writeBoolean(info.isNoRangeAcks());
|
||||
|
||||
return rc + 9;
|
||||
}
|
||||
|
@ -148,6 +150,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
|
|||
tightMarshalNestedObject2(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut, bs);
|
||||
bs.readBoolean();
|
||||
bs.readBoolean();
|
||||
bs.readBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -189,6 +192,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
|
|||
info.setAdditionalPredicate((org.apache.activemq.filter.BooleanExpression) looseUnmarsalNestedObject(wireFormat, dataIn));
|
||||
info.setNetworkSubscription(dataIn.readBoolean());
|
||||
info.setOptimizedAcknowledge(dataIn.readBoolean());
|
||||
info.setNoRangeAcks(dataIn.readBoolean());
|
||||
|
||||
}
|
||||
|
||||
|
@ -217,6 +221,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
|
|||
looseMarshalNestedObject(wireFormat, (DataStructure)info.getAdditionalPredicate(), dataOut);
|
||||
dataOut.writeBoolean(info.isNetworkSubscription());
|
||||
dataOut.writeBoolean(info.isOptimizedAcknowledge());
|
||||
dataOut.writeBoolean(info.isNoRangeAcks());
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,6 +73,7 @@ public class ConsumerInfoTest extends BaseCommandTestSupport {
|
|||
info.setAdditionalPredicate(createBooleanExpression("AdditionalPredicate:6"));
|
||||
info.setNetworkSubscription(false);
|
||||
info.setOptimizedAcknowledge(true);
|
||||
info.setNoRangeAcks(false);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,8 @@ namespace ActiveMQ.Commands
|
|||
BrokerInfo[] peerBrokerInfos;
|
||||
string brokerName;
|
||||
bool slaveBroker;
|
||||
bool masterBroker;
|
||||
bool faultTolerantConfiguration;
|
||||
|
||||
public override string ToString() {
|
||||
return GetType().Name + "["
|
||||
|
@ -49,6 +51,8 @@ namespace ActiveMQ.Commands
|
|||
+ " PeerBrokerInfos=" + PeerBrokerInfos
|
||||
+ " BrokerName=" + BrokerName
|
||||
+ " SlaveBroker=" + SlaveBroker
|
||||
+ " MasterBroker=" + MasterBroker
|
||||
+ " FaultTolerantConfiguration=" + FaultTolerantConfiguration
|
||||
+ " ]";
|
||||
|
||||
}
|
||||
|
@ -92,5 +96,17 @@ namespace ActiveMQ.Commands
|
|||
set { this.slaveBroker = value; }
|
||||
}
|
||||
|
||||
public bool MasterBroker
|
||||
{
|
||||
get { return masterBroker; }
|
||||
set { this.masterBroker = value; }
|
||||
}
|
||||
|
||||
public bool FaultTolerantConfiguration
|
||||
{
|
||||
get { return faultTolerantConfiguration; }
|
||||
set { this.faultTolerantConfiguration = value; }
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,8 @@ namespace ActiveMQ.Commands
|
|||
string password;
|
||||
string userName;
|
||||
BrokerId[] brokerPath;
|
||||
bool brokerMasterConnector;
|
||||
bool manageable;
|
||||
|
||||
public override string ToString() {
|
||||
return GetType().Name + "["
|
||||
|
@ -49,6 +51,8 @@ namespace ActiveMQ.Commands
|
|||
+ " Password=" + Password
|
||||
+ " UserName=" + UserName
|
||||
+ " BrokerPath=" + BrokerPath
|
||||
+ " BrokerMasterConnector=" + BrokerMasterConnector
|
||||
+ " Manageable=" + Manageable
|
||||
+ " ]";
|
||||
|
||||
}
|
||||
|
@ -92,5 +96,17 @@ namespace ActiveMQ.Commands
|
|||
set { this.brokerPath = value; }
|
||||
}
|
||||
|
||||
public bool BrokerMasterConnector
|
||||
{
|
||||
get { return brokerMasterConnector; }
|
||||
set { this.brokerMasterConnector = value; }
|
||||
}
|
||||
|
||||
public bool Manageable
|
||||
{
|
||||
get { return manageable; }
|
||||
set { this.manageable = value; }
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,8 @@ namespace ActiveMQ.Commands
|
|||
BrokerId[] brokerPath;
|
||||
BooleanExpression additionalPredicate;
|
||||
bool networkSubscription;
|
||||
bool optimizedAcknowledge;
|
||||
bool noRangeAcks;
|
||||
|
||||
public override string ToString() {
|
||||
return GetType().Name + "["
|
||||
|
@ -69,6 +71,8 @@ namespace ActiveMQ.Commands
|
|||
+ " BrokerPath=" + BrokerPath
|
||||
+ " AdditionalPredicate=" + AdditionalPredicate
|
||||
+ " NetworkSubscription=" + NetworkSubscription
|
||||
+ " OptimizedAcknowledge=" + OptimizedAcknowledge
|
||||
+ " NoRangeAcks=" + NoRangeAcks
|
||||
+ " ]";
|
||||
|
||||
}
|
||||
|
@ -172,5 +176,17 @@ namespace ActiveMQ.Commands
|
|||
set { this.networkSubscription = value; }
|
||||
}
|
||||
|
||||
public bool OptimizedAcknowledge
|
||||
{
|
||||
get { return optimizedAcknowledge; }
|
||||
set { this.optimizedAcknowledge = value; }
|
||||
}
|
||||
|
||||
public bool NoRangeAcks
|
||||
{
|
||||
get { return noRangeAcks; }
|
||||
set { this.noRangeAcks = value; }
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,6 +72,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
}
|
||||
info.BrokerName = TightUnmarshalString(dataIn, bs);
|
||||
info.SlaveBroker = bs.ReadBoolean();
|
||||
info.MasterBroker = bs.ReadBoolean();
|
||||
info.FaultTolerantConfiguration = bs.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -87,6 +89,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
rc += TightMarshalObjectArray1(wireFormat, info.PeerBrokerInfos, bs);
|
||||
rc += TightMarshalString1(info.BrokerName, bs);
|
||||
bs.WriteBoolean(info.SlaveBroker);
|
||||
bs.WriteBoolean(info.MasterBroker);
|
||||
bs.WriteBoolean(info.FaultTolerantConfiguration);
|
||||
|
||||
return rc + 0;
|
||||
}
|
||||
|
@ -103,6 +107,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
TightMarshalObjectArray2(wireFormat, info.PeerBrokerInfos, dataOut, bs);
|
||||
TightMarshalString2(info.BrokerName, dataOut, bs);
|
||||
bs.ReadBoolean();
|
||||
bs.ReadBoolean();
|
||||
bs.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -130,6 +136,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
}
|
||||
info.BrokerName = LooseUnmarshalString(dataIn);
|
||||
info.SlaveBroker = dataIn.ReadBoolean();
|
||||
info.MasterBroker = dataIn.ReadBoolean();
|
||||
info.FaultTolerantConfiguration = dataIn.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -146,6 +154,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
LooseMarshalObjectArray(wireFormat, info.PeerBrokerInfos, dataOut);
|
||||
LooseMarshalString(info.BrokerName, dataOut);
|
||||
dataOut.Write(info.SlaveBroker);
|
||||
dataOut.Write(info.MasterBroker);
|
||||
dataOut.Write(info.FaultTolerantConfiguration);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -72,6 +72,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
else {
|
||||
info.BrokerPath = null;
|
||||
}
|
||||
info.BrokerMasterConnector = bs.ReadBoolean();
|
||||
info.Manageable = bs.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -87,6 +89,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
rc += TightMarshalString1(info.Password, bs);
|
||||
rc += TightMarshalString1(info.UserName, bs);
|
||||
rc += TightMarshalObjectArray1(wireFormat, info.BrokerPath, bs);
|
||||
bs.WriteBoolean(info.BrokerMasterConnector);
|
||||
bs.WriteBoolean(info.Manageable);
|
||||
|
||||
return rc + 0;
|
||||
}
|
||||
|
@ -103,6 +107,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
TightMarshalString2(info.Password, dataOut, bs);
|
||||
TightMarshalString2(info.UserName, dataOut, bs);
|
||||
TightMarshalObjectArray2(wireFormat, info.BrokerPath, dataOut, bs);
|
||||
bs.ReadBoolean();
|
||||
bs.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -130,6 +136,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
else {
|
||||
info.BrokerPath = null;
|
||||
}
|
||||
info.BrokerMasterConnector = dataIn.ReadBoolean();
|
||||
info.Manageable = dataIn.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -146,6 +154,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
LooseMarshalString(info.Password, dataOut);
|
||||
LooseMarshalString(info.UserName, dataOut);
|
||||
LooseMarshalObjectArray(wireFormat, info.BrokerPath, dataOut);
|
||||
dataOut.Write(info.BrokerMasterConnector);
|
||||
dataOut.Write(info.Manageable);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -82,6 +82,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
}
|
||||
info.AdditionalPredicate = (BooleanExpression) TightUnmarshalNestedObject(wireFormat, dataIn, bs);
|
||||
info.NetworkSubscription = bs.ReadBoolean();
|
||||
info.OptimizedAcknowledge = bs.ReadBoolean();
|
||||
info.NoRangeAcks = bs.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -104,6 +106,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
rc += TightMarshalObjectArray1(wireFormat, info.BrokerPath, bs);
|
||||
rc += TightMarshalNestedObject1(wireFormat, (DataStructure)info.AdditionalPredicate, bs);
|
||||
bs.WriteBoolean(info.NetworkSubscription);
|
||||
bs.WriteBoolean(info.OptimizedAcknowledge);
|
||||
bs.WriteBoolean(info.NoRangeAcks);
|
||||
|
||||
return rc + 9;
|
||||
}
|
||||
|
@ -130,6 +134,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
TightMarshalObjectArray2(wireFormat, info.BrokerPath, dataOut, bs);
|
||||
TightMarshalNestedObject2(wireFormat, (DataStructure)info.AdditionalPredicate, dataOut, bs);
|
||||
bs.ReadBoolean();
|
||||
bs.ReadBoolean();
|
||||
bs.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -167,6 +173,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
}
|
||||
info.AdditionalPredicate = (BooleanExpression) LooseUnmarshalNestedObject(wireFormat, dataIn);
|
||||
info.NetworkSubscription = dataIn.ReadBoolean();
|
||||
info.OptimizedAcknowledge = dataIn.ReadBoolean();
|
||||
info.NoRangeAcks = dataIn.ReadBoolean();
|
||||
|
||||
}
|
||||
|
||||
|
@ -193,6 +201,8 @@ namespace ActiveMQ.OpenWire.V1
|
|||
LooseMarshalObjectArray(wireFormat, info.BrokerPath, dataOut);
|
||||
LooseMarshalNestedObject(wireFormat, (DataStructure)info.AdditionalPredicate, dataOut);
|
||||
dataOut.Write(info.NetworkSubscription);
|
||||
dataOut.Write(info.OptimizedAcknowledge);
|
||||
dataOut.Write(info.NoRangeAcks);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -81,8 +81,10 @@ namespace ActiveMQ.OpenWire.V1
|
|||
format.addMarshaller(new DestinationInfoMarshaller());
|
||||
format.addMarshaller(new ShutdownInfoMarshaller());
|
||||
format.addMarshaller(new DataResponseMarshaller());
|
||||
format.addMarshaller(new ConnectionControlMarshaller());
|
||||
format.addMarshaller(new KeepAliveInfoMarshaller());
|
||||
format.addMarshaller(new FlushCommandMarshaller());
|
||||
format.addMarshaller(new ConsumerControlMarshaller());
|
||||
format.addMarshaller(new JournalTopicAckMarshaller());
|
||||
format.addMarshaller(new BrokerIdMarshaller());
|
||||
format.addMarshaller(new MessageDispatchMarshaller());
|
||||
|
|
|
@ -1179,6 +1179,7 @@ apr_status_t ow_marshal1_ConsumerInfo(ow_bit_buffer *buffer, ow_ConsumerInfo *ob
|
|||
SUCCESS_CHECK(ow_marshal1_nested_object(buffer, (ow_DataStructure*)object->additionalPredicate));
|
||||
ow_bit_buffer_append(buffer, object->networkSubscription);
|
||||
ow_bit_buffer_append(buffer, object->optimizedAcknowledge);
|
||||
ow_bit_buffer_append(buffer, object->noRangeAcks);
|
||||
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
@ -1200,6 +1201,7 @@ apr_status_t ow_marshal2_ConsumerInfo(ow_byte_buffer *buffer, ow_bit_buffer *bit
|
|||
SUCCESS_CHECK(ow_marshal2_DataStructure_array(buffer, bitbuffer, object->brokerPath));
|
||||
SUCCESS_CHECK(ow_marshal2_nested_object(buffer, bitbuffer, (ow_DataStructure*)object->additionalPredicate));
|
||||
ow_bit_buffer_read(bitbuffer);
|
||||
ow_bit_buffer_read(bitbuffer);
|
||||
ow_bit_buffer_read(bitbuffer);
|
||||
|
||||
return APR_SUCCESS;
|
||||
|
@ -1224,6 +1226,7 @@ apr_status_t ow_unmarshal_ConsumerInfo(ow_byte_array *buffer, ow_bit_buffer *bit
|
|||
SUCCESS_CHECK(ow_unmarshal_nested_object(buffer, bitbuffer, (ow_DataStructure**)&object->additionalPredicate, pool));
|
||||
object->networkSubscription = ow_bit_buffer_read(bitbuffer);
|
||||
object->optimizedAcknowledge = ow_bit_buffer_read(bitbuffer);
|
||||
object->noRangeAcks = ow_bit_buffer_read(bitbuffer);
|
||||
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -372,6 +372,7 @@ typedef struct ow_ConsumerInfo {
|
|||
struct ow_BooleanExpression *additionalPredicate;
|
||||
ow_boolean networkSubscription;
|
||||
ow_boolean optimizedAcknowledge;
|
||||
ow_boolean noRangeAcks;
|
||||
|
||||
} ow_ConsumerInfo;
|
||||
ow_ConsumerInfo *ow_ConsumerInfo_create(apr_pool_t *pool);
|
||||
|
|
Loading…
Reference in New Issue