From ec14f6569d7c9b0dcffbb799a7fa19aafa25a210 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Wed, 7 Mar 2007 17:23:07 +0000 Subject: [PATCH] Added a new windowSize field to the ProducerInfo command and added a new ProducerAck command. These will be needed to implement better producer flow control where threads do not block on the broker side. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515654 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 10 ++- .../apache/activemq/command/CommandTypes.java | 1 + .../apache/activemq/command/ProducerAck.java | 82 +++++++++++++++++++ .../apache/activemq/command/ProducerInfo.java | 16 ++++ .../openwire/v3/MarshallerFactory.java | 1 + .../openwire/v3/ProducerAckMarshaller.java | 1 + .../openwire/v3/ProducerInfoMarshaller.java | 2 +- .../apache/activemq/state/CommandVisitor.java | 2 + .../state/ConnectionStateTracker.java | 6 ++ .../activemq/openwire/v3/ProducerAckTest.java | 1 + .../openwire/v3/ProducerInfoTest.java | 2 +- 11 files changed, 121 insertions(+), 3 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index ca6b5d47a4..3d03e2a854 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -51,6 +51,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; @@ -685,6 +686,12 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit return null; } + + public Response processProducerAck(ProducerAck ack) throws Exception { + // A broker should not get ProducerAck messages. + return null; + } + public Connector getConnector(){ return connector; } @@ -1150,5 +1157,6 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit log.debug("Could not stop transport: "+e,e); } } - } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java b/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java index c6df016539..42f4376eb8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java @@ -57,6 +57,7 @@ public interface CommandTypes { // and the server. // /////////////////////////////////////////////////// + byte PRODUCER_ACK = 19; byte MESSAGE_PULL = 20; byte MESSAGE_DISPATCH = 21; byte MESSAGE_ACK = 22; diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java b/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java new file mode 100644 index 0000000000..c0982410cb --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/command/ProducerAck.java @@ -0,0 +1,82 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.command; + +import org.apache.activemq.state.CommandVisitor; + + +/** + * A ProducerAck command is sent by a broker to a producer to let it know it has received and processed + * messages that it has produced. The producer will be flow controlled if it does not receive + * ProducerAck commands back from the broker. + * + * @openwire:marshaller code="19" version="3" + * @version $Revision: 1.11 $ + */ +public class ProducerAck extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE=CommandTypes.PRODUCER_ACK; + + protected ProducerId producerId; + protected int size; + + public ProducerAck() { + } + + public void copy(ProducerAck copy) { + super.copy(copy); + copy.producerId = producerId; + copy.size = size; + } + + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + public Response visit(CommandVisitor visitor) throws Exception { + return visitor.processProducerAck( this ); + } + + /** + * The producer id that this ack message is destined for. + * + * @openwire:property version=3 + */ + public ProducerId getProducerId() { + return producerId; + } + + public void setProducerId(ProducerId producerId) { + this.producerId = producerId; + } + + /** + * The number of bytes that are being acked. + * + * @openwire:property version=3 + */ + public int getSize() { + return size; + } + + public void setSize(int size) { + this.size = size; + } + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java index 63b73d1cc0..b416c6399c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ProducerInfo.java @@ -32,6 +32,7 @@ public class ProducerInfo extends BaseCommand { protected ActiveMQDestination destination; protected BrokerId[] brokerPath; protected boolean dispatchAsync; + protected int windowSize; public ProducerInfo() { } @@ -117,4 +118,19 @@ public class ProducerInfo extends BaseCommand { this.dispatchAsync = dispatchAsync; } + /** + * Used to configure the producer window size. A producer will + * send up to the configured window size worth of payload data to + * the broker before waiting for an Ack that allows him to send more. + * + * @openwire:property version=3 + */ + public int getWindowSize() { + return windowSize; + } + + public void setWindowSize(int windowSize) { + this.windowSize = windowSize; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java index e322dc27fe..f6f1bdcae7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v3/MarshallerFactory.java @@ -82,6 +82,7 @@ public class MarshallerFactory { add(new MessagePullMarshaller()); add(new NetworkBridgeFilterMarshaller()); add(new PartialCommandMarshaller()); + add(new ProducerAckMarshaller()); add(new ProducerIdMarshaller()); add(new ProducerInfoMarshaller()); add(new RemoveInfoMarshaller()); diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java new file mode 100644 index 0000000000..a96a135d89 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerAckMarshaller.java @@ -0,0 +1 @@ +/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v3; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Marshalling code for Open Wire Format for ProducerAckMarshaller * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision$ */ public class ProducerAckMarshaller extends BaseCommandMarshaller { /** * Return the type of Data Structure we marshal * @return short representation of the type data structure */ public byte getDataStructureType() { return ProducerAck.DATA_STRUCTURE_TYPE; } /** * @return a new object instance */ public DataStructure createObject() { return new ProducerAck(); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException { super.tightUnmarshal(wireFormat, o, dataIn, bs); ProducerAck info = (ProducerAck)o; info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalNestedObject(wireFormat, dataIn, bs)); info.setSize(dataIn.readInt()); } /** * Write the booleans that this object uses to a BooleanStream */ public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException { ProducerAck info = (ProducerAck)o; int rc = super.tightMarshal1(wireFormat, o, bs); rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getProducerId(), bs); return rc + 4; } /** * Write a object instance to data output stream * * @param o the instance to be marshaled * @param dataOut the output stream * @throws IOException thrown if an error occurs */ public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException { super.tightMarshal2(wireFormat, o, dataOut, bs); ProducerAck info = (ProducerAck)o; tightMarshalNestedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs); dataOut.writeInt(info.getSize()); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException { super.looseUnmarshal(wireFormat, o, dataIn); ProducerAck info = (ProducerAck)o; info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalNestedObject(wireFormat, dataIn)); info.setSize(dataIn.readInt()); } /** * Write the booleans that this object uses to a BooleanStream */ public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException { ProducerAck info = (ProducerAck)o; super.looseMarshal(wireFormat, o, dataOut); looseMarshalNestedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut); dataOut.writeInt(info.getSize()); } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java index 60dc336aec..4d23fa64e3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v3/ProducerInfoMarshaller.java @@ -1 +1 @@ -/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v3; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Marshalling code for Open Wire Format for ProducerInfoMarshaller * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision$ */ public class ProducerInfoMarshaller extends BaseCommandMarshaller { /** * Return the type of Data Structure we marshal * @return short representation of the type data structure */ public byte getDataStructureType() { return ProducerInfo.DATA_STRUCTURE_TYPE; } /** * @return a new object instance */ public DataStructure createObject() { return new ProducerInfo(); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException { super.tightUnmarshal(wireFormat, o, dataIn, bs); ProducerInfo info = (ProducerInfo)o; info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs)); info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs)); if (bs.readBoolean()) { short size = dataIn.readShort(); org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs); } info.setBrokerPath(value); } else { info.setBrokerPath(null); } info.setDispatchAsync(bs.readBoolean()); } /** * Write the booleans that this object uses to a BooleanStream */ public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException { ProducerInfo info = (ProducerInfo)o; int rc = super.tightMarshal1(wireFormat, o, bs); rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getProducerId(), bs); rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs); rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs); bs.writeBoolean(info.isDispatchAsync()); return rc + 0; } /** * Write a object instance to data output stream * * @param o the instance to be marshaled * @param dataOut the output stream * @throws IOException thrown if an error occurs */ public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException { super.tightMarshal2(wireFormat, o, dataOut, bs); ProducerInfo info = (ProducerInfo)o; tightMarshalCachedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs); tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs); tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs); bs.readBoolean(); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException { super.looseUnmarshal(wireFormat, o, dataIn); ProducerInfo info = (ProducerInfo)o; info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalCachedObject(wireFormat, dataIn)); info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn)); if (dataIn.readBoolean()) { short size = dataIn.readShort(); org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn); } info.setBrokerPath(value); } else { info.setBrokerPath(null); } info.setDispatchAsync(dataIn.readBoolean()); } /** * Write the booleans that this object uses to a BooleanStream */ public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException { ProducerInfo info = (ProducerInfo)o; super.looseMarshal(wireFormat, o, dataOut); looseMarshalCachedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut); looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut); looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut); dataOut.writeBoolean(info.isDispatchAsync()); } } \ No newline at end of file +/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v3; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Marshalling code for Open Wire Format for ProducerInfoMarshaller * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision$ */ public class ProducerInfoMarshaller extends BaseCommandMarshaller { /** * Return the type of Data Structure we marshal * @return short representation of the type data structure */ public byte getDataStructureType() { return ProducerInfo.DATA_STRUCTURE_TYPE; } /** * @return a new object instance */ public DataStructure createObject() { return new ProducerInfo(); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException { super.tightUnmarshal(wireFormat, o, dataIn, bs); ProducerInfo info = (ProducerInfo)o; info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs)); info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs)); if (bs.readBoolean()) { short size = dataIn.readShort(); org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs); } info.setBrokerPath(value); } else { info.setBrokerPath(null); } info.setDispatchAsync(bs.readBoolean()); info.setWindowSize(dataIn.readInt()); } /** * Write the booleans that this object uses to a BooleanStream */ public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException { ProducerInfo info = (ProducerInfo)o; int rc = super.tightMarshal1(wireFormat, o, bs); rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getProducerId(), bs); rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs); rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs); bs.writeBoolean(info.isDispatchAsync()); return rc + 4; } /** * Write a object instance to data output stream * * @param o the instance to be marshaled * @param dataOut the output stream * @throws IOException thrown if an error occurs */ public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException { super.tightMarshal2(wireFormat, o, dataOut, bs); ProducerInfo info = (ProducerInfo)o; tightMarshalCachedObject2(wireFormat, (DataStructure)info.getProducerId(), dataOut, bs); tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs); tightMarshalObjectArray2(wireFormat, info.getBrokerPath(), dataOut, bs); bs.readBoolean(); dataOut.writeInt(info.getWindowSize()); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException { super.looseUnmarshal(wireFormat, o, dataIn); ProducerInfo info = (ProducerInfo)o; info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalCachedObject(wireFormat, dataIn)); info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn)); if (dataIn.readBoolean()) { short size = dataIn.readShort(); org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn); } info.setBrokerPath(value); } else { info.setBrokerPath(null); } info.setDispatchAsync(dataIn.readBoolean()); info.setWindowSize(dataIn.readInt()); } /** * Write the booleans that this object uses to a BooleanStream */ public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException { ProducerInfo info = (ProducerInfo)o; super.looseMarshal(wireFormat, o, dataOut); looseMarshalCachedObject(wireFormat, (DataStructure)info.getProducerId(), dataOut); looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut); looseMarshalObjectArray(wireFormat, info.getBrokerPath(), dataOut); dataOut.writeBoolean(info.isDispatchAsync()); dataOut.writeInt(info.getWindowSize()); } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java b/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java index 61b48f0443..9a822f2626 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java @@ -29,6 +29,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; @@ -75,6 +76,7 @@ public interface CommandVisitor { Response processForgetTransaction(TransactionInfo info) throws Exception; Response processEndTransaction(TransactionInfo info) throws Exception; Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception; + Response processProducerAck(ProducerAck ack) throws Exception; } diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index bcdda2112a..97ed30a8f3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -33,6 +33,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; @@ -480,6 +481,10 @@ public class ConnectionStateTracker implements CommandVisitor { return null; } + public Response processProducerAck(ProducerAck ack) throws Exception { + return null; + } + public boolean isRestoreConsumers() { return restoreConsumers; } @@ -520,4 +525,5 @@ public class ConnectionStateTracker implements CommandVisitor { this.restoreTransaction = restoreTransaction; } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java new file mode 100644 index 0000000000..93893ca5b0 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerAckTest.java @@ -0,0 +1 @@ +/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v3; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for ProducerAck * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public class ProducerAckTest extends BaseCommandTestSupport { public static ProducerAckTest SINGLETON = new ProducerAckTest(); public Object createObject() throws Exception { ProducerAck info = new ProducerAck(); populateObject(info); return info; } protected void populateObject(Object object) throws Exception { super.populateObject(object); ProducerAck info = (ProducerAck) object; info.setProducerId(createProducerId("ProducerId:1")); info.setSize(1); } } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java index a8593b4478..b7d47f74ab 100644 --- a/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v3/ProducerInfoTest.java @@ -1 +1 @@ -/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v3; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for ProducerInfo * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public class ProducerInfoTest extends BaseCommandTestSupport { public static ProducerInfoTest SINGLETON = new ProducerInfoTest(); public Object createObject() throws Exception { ProducerInfo info = new ProducerInfo(); populateObject(info); return info; } protected void populateObject(Object object) throws Exception { super.populateObject(object); ProducerInfo info = (ProducerInfo) object; info.setProducerId(createProducerId("ProducerId:1")); info.setDestination(createActiveMQDestination("Destination:2")); { BrokerId value[] = new BrokerId[2]; for( int i=0; i < 2; i++ ) { value[i] = createBrokerId("BrokerPath:3"); } info.setBrokerPath(value); } info.setDispatchAsync(true); } } \ No newline at end of file +/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v3; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activemq.command.*; /** * Test case for the OpenWire marshalling for ProducerInfo * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision: $ */ public class ProducerInfoTest extends BaseCommandTestSupport { public static ProducerInfoTest SINGLETON = new ProducerInfoTest(); public Object createObject() throws Exception { ProducerInfo info = new ProducerInfo(); populateObject(info); return info; } protected void populateObject(Object object) throws Exception { super.populateObject(object); ProducerInfo info = (ProducerInfo) object; info.setProducerId(createProducerId("ProducerId:1")); info.setDestination(createActiveMQDestination("Destination:2")); { BrokerId value[] = new BrokerId[2]; for( int i=0; i < 2; i++ ) { value[i] = createBrokerId("BrokerPath:3"); } info.setBrokerPath(value); } info.setDispatchAsync(true); info.setWindowSize(1); } } \ No newline at end of file