diff --git a/activemq-core/project.xml b/activemq-core/project.xml
index 79510c07d3..6486ced9b3 100755
--- a/activemq-core/project.xml
+++ b/activemq-core/project.xml
@@ -360,6 +360,11 @@
**/MultipleTestsWithSpringFactoryBeanTest.*
**/MultipleTestsWithXBeanFactoryBeanTest.*
**/MultipleTestsWithSpringXBeanFactoryBeanTest.*
+
+
+
+ **/PartialCommandTest.*
+ **/LastPartialCommandTest.*
diff --git a/activemq-core/src/gram/java/org/apache/activemq/openwire/tool/OpenWireJavaMarshallingScript.java b/activemq-core/src/gram/java/org/apache/activemq/openwire/tool/OpenWireJavaMarshallingScript.java
index 08ca1e7fe0..d49bd5b28e 100644
--- a/activemq-core/src/gram/java/org/apache/activemq/openwire/tool/OpenWireJavaMarshallingScript.java
+++ b/activemq-core/src/gram/java/org/apache/activemq/openwire/tool/OpenWireJavaMarshallingScript.java
@@ -288,6 +288,7 @@ public abstract class OpenWireJavaMarshallingScript extends OpenWireClassesScrip
out.println(" tightMarshalString2(" + getter + ", dataOut, bs);");
}
else if (type.equals("byte[]")) {
+ String mandatory = getMandatoryFlag(annotation);
if (size != null) {
out.println(" tightMarshalConstByteArray2(" + getter + ", dataOut, bs, " + size.asInt() + ");");
}
@@ -321,7 +322,6 @@ public abstract class OpenWireJavaMarshallingScript extends OpenWireClassesScrip
}
-
protected void generateLooseMarshalBody(PrintWriter out) {
List properties = getProperties();
for (Iterator iter = properties.iterator(); iter.hasNext();) {
@@ -480,4 +480,18 @@ public abstract class OpenWireJavaMarshallingScript extends OpenWireClassesScrip
out.println(" }");
}
}
+
+ /**
+ * Returns whether or not the given annotation has a mandatory flag on it or not
+ */
+ protected String getMandatoryFlag(JAnnotation annotation) {
+ JAnnotationValue value = annotation.getValue("mandatory");
+ if (value != null) {
+ String text = value.asString();
+ if (text != null && text.equalsIgnoreCase("true")) {
+ return "true";
+ }
+ }
+ return "false";
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
index 69d0c8882e..d01fbadb4b 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
@@ -186,7 +186,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
Response response=null;
boolean responseRequired = command.isResponseRequired();
- short commandId = command.getCommandId();
+ int commandId = command.getCommandId();
try {
response = command.visit(this);
} catch ( Throwable e ) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
index d52ff7a180..9cf173cc9d 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
@@ -193,7 +193,7 @@ public class MasterConnector implements Service{
}else{
boolean responseRequired = command.isResponseRequired();
- short commandId = command.getCommandId();
+ int commandId = command.getCommandId();
localBroker.oneway(command);
if (responseRequired){
Response response=new Response();
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
index c4fef98327..0da43479b4 100755
--- a/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
@@ -26,9 +26,12 @@ import org.apache.activemq.util.IntrospectionSupport;
*/
abstract public class BaseCommand implements Command {
- protected short commandId;
+ protected int commandId;
protected boolean responseRequired;
+ private transient Endpoint from;
+ private transient Endpoint to;
+
public void copy(BaseCommand copy) {
copy.commandId = commandId;
copy.responseRequired = responseRequired;
@@ -37,11 +40,11 @@ abstract public class BaseCommand implements Command {
/**
* @openwire:property version=1
*/
- public short getCommandId() {
+ public int getCommandId() {
return commandId;
}
- public void setCommandId(short commandId) {
+ public void setCommandId(int commandId) {
this.commandId = commandId;
}
@@ -95,4 +98,28 @@ abstract public class BaseCommand implements Command {
public boolean isShutdownInfo() {
return false;
}
+
+ /**
+ * The endpoint within the transport where this message came from.
+ */
+ public Endpoint getFrom() {
+ return from;
+ }
+
+ public void setFrom(Endpoint from) {
+ this.from = from;
+ }
+
+ /**
+ * The endpoint within the transport where this message is going to - null means all endpoints.
+ */
+ public Endpoint getTo() {
+ return to;
+ }
+
+ public void setTo(Endpoint to) {
+ this.to = to;
+ }
+
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java b/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java
similarity index 70%
rename from activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
rename to activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java
index cbdfac3e6e..2c90ab2720 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java
@@ -14,19 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.udp;
-
-import org.apache.activemq.command.Command;
-
-import java.io.IOException;
+package org.apache.activemq.command;
/**
- * A callback used to process inbound commands
+ * A default endpoint.
*
* @version $Revision$
*/
-public interface CommandProcessor {
+public class BaseEndpoint implements Endpoint {
- void process(Command command, DatagramHeader header) throws IOException;
+ private String name;
+
+ public BaseEndpoint(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Command.java b/activemq-core/src/main/java/org/apache/activemq/command/Command.java
index 33fe9ad2b3..88c87931d8 100755
--- a/activemq-core/src/main/java/org/apache/activemq/command/Command.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/Command.java
@@ -26,12 +26,12 @@ import org.apache.activemq.state.CommandVisitor;
*/
public interface Command extends DataStructure {
- void setCommandId(short value);
+ void setCommandId(int value);
/**
* @return the unique ID of this request used to map responses to requests
*/
- short getCommandId();
+ int getCommandId();
void setResponseRequired(boolean responseRequired);
boolean isResponseRequired();
@@ -44,6 +44,21 @@ public interface Command extends DataStructure {
boolean isMessageAck();
boolean isMessageDispatchNotification();
boolean isShutdownInfo();
-
+
Response visit( CommandVisitor visitor) throws Exception;
+
+ /**
+ * The endpoint within the transport where this message came from which could be null if the
+ * transport only supports a single endpoint.
+ */
+ public Endpoint getFrom();
+
+ public void setFrom(Endpoint from);
+
+ /**
+ * The endpoint within the transport where this message is going to - null means all endpoints.
+ */
+ public Endpoint getTo();
+
+ public void setTo(Endpoint to);
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java b/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
index 7971e68da5..264d8402d5 100755
--- a/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
@@ -76,14 +76,6 @@ public interface CommandTypes {
byte INTEGER_RESPONSE = 34;
- ///////////////////////////////////////////////////
- //
- // Optional additional responses
- //
- ///////////////////////////////////////////////////
- byte REPLAY = 38;
-
-
///////////////////////////////////////////////////
//
// Used by discovery
@@ -102,6 +94,20 @@ public interface CommandTypes {
byte JOURNAL_TRANSACTION = 54;
byte DURABLE_SUBSCRIPTION_INFO = 55;
+
+ ///////////////////////////////////////////////////
+ //
+ // Reliability and fragmentation
+ //
+ ///////////////////////////////////////////////////
+ byte PARTIAL_COMMAND = 60;
+ byte PARTIAL_LAST_COMMAND = 61;
+
+ byte REPLAY = 65;
+
+
+
+
///////////////////////////////////////////////////
//
// Types used represent basic Java types.
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Endpoint.java b/activemq-core/src/main/java/org/apache/activemq/command/Endpoint.java
new file mode 100644
index 0000000000..af98eeea90
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/command/Endpoint.java
@@ -0,0 +1,35 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * Represents the logical endpoint where commands come from or are sent to.
+ *
+ * For connection based transports like TCP / VM then there is a single endpoint
+ * for all commands. For transports like multicast there could be different
+ * endpoints being used on the same transport.
+ *
+ * @version $Revision$
+ */
+public interface Endpoint {
+
+ /**
+ * Returns the name of the endpoint.
+ */
+ public String getName();
+
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
index 6c9c23408f..2c8496df9c 100755
--- a/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
@@ -26,15 +26,18 @@ import org.apache.activemq.util.IntrospectionSupport;
public class KeepAliveInfo implements Command {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.KEEP_ALIVE_INFO;
-
+
+ private transient Endpoint from;
+ private transient Endpoint to;
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
- public void setCommandId(short value) {
+ public void setCommandId(int value) {
}
- public short getCommandId() {
+ public int getCommandId() {
return 0;
}
@@ -69,6 +72,29 @@ public class KeepAliveInfo implements Command {
return false;
}
+ /**
+ * The endpoint within the transport where this message came from.
+ */
+ public Endpoint getFrom() {
+ return from;
+ }
+
+ public void setFrom(Endpoint from) {
+ this.from = from;
+ }
+
+ /**
+ * The endpoint within the transport where this message is going to - null means all endpoints.
+ */
+ public Endpoint getTo() {
+ return to;
+ }
+
+ public void setTo(Endpoint to) {
+ this.to = to;
+ }
+
+
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processKeepAlive( this );
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java
new file mode 100644
index 0000000000..b1b20b16dd
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java
@@ -0,0 +1,47 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Represents a partial command; a large command that has been split up into
+ * pieces.
+ *
+ * @openwire:marshaller code="61"
+ * @version $Revision$
+ */
+public class LastPartialCommand extends PartialCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_LAST_COMMAND;
+
+ public LastPartialCommand() {
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isLastPart() {
+ return true;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this);
+ }
+
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
new file mode 100644
index 0000000000..f84f0d28a8
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
@@ -0,0 +1,62 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Represents a partial command; a large command that has been split up into
+ * pieces.
+ *
+ * @openwire:marshaller code="60"
+ * @version $Revision$
+ */
+public class PartialCommand extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_COMMAND;
+
+ private byte[] data;
+
+ public PartialCommand() {
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * The data for this part of the command
+ *
+ * @openwire:property version=1 mandatory=true
+ */
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public boolean isLastPart() {
+ return false;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this);
+ }
+
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java
index b8611cb0d4..26174eb750 100644
--- a/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java
@@ -24,7 +24,7 @@ import org.apache.activemq.state.CommandVisitor;
* non-reliable transport such as UDP or multicast but could also be used on
* TCP/IP if a socket has been re-established.
*
- * @openwire:marshaller code="38"
+ * @openwire:marshaller code="65"
* @version $Revision$
*/
public class ReplayCommand extends BaseCommand {
@@ -32,8 +32,10 @@ public class ReplayCommand extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY;
private String producerId;
- private long firstSequenceNumber;
- private long lastSequenceNumber;
+ private int firstAckNumber;
+ private int lastAckNumber;
+ private int firstNakNumber;
+ private int lastNakNumber;
public ReplayCommand() {
}
@@ -55,8 +57,36 @@ public class ReplayCommand extends BaseCommand {
this.producerId = producerId;
}
- public long getFirstSequenceNumber() {
- return firstSequenceNumber;
+ public int getFirstAckNumber() {
+ return firstAckNumber;
+ }
+
+ /**
+ * Is used to specify the first sequence number being acknowledged as delivered on the transport
+ * so that it can be removed from cache
+ *
+ * @openwire:property version=1
+ */
+ public void setFirstAckNumber(int firstSequenceNumber) {
+ this.firstAckNumber = firstSequenceNumber;
+ }
+
+ public int getLastAckNumber() {
+ return lastAckNumber;
+ }
+
+ /**
+ * Is used to specify the last sequence number being acknowledged as delivered on the transport
+ * so that it can be removed from cache
+ *
+ * @openwire:property version=1
+ */
+ public void setLastAckNumber(int lastSequenceNumber) {
+ this.lastAckNumber = lastSequenceNumber;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return null;
}
/**
@@ -64,12 +94,12 @@ public class ReplayCommand extends BaseCommand {
*
* @openwire:property version=1
*/
- public void setFirstSequenceNumber(long firstSequenceNumber) {
- this.firstSequenceNumber = firstSequenceNumber;
+ public int getFirstNakNumber() {
+ return firstNakNumber;
}
- public long getLastSequenceNumber() {
- return lastSequenceNumber;
+ public void setFirstNakNumber(int firstNakNumber) {
+ this.firstNakNumber = firstNakNumber;
}
/**
@@ -77,12 +107,13 @@ public class ReplayCommand extends BaseCommand {
*
* @openwire:property version=1
*/
- public void setLastSequenceNumber(long lastSequenceNumber) {
- this.lastSequenceNumber = lastSequenceNumber;
+ public int getLastNakNumber() {
+ return lastNakNumber;
}
- public Response visit(CommandVisitor visitor) throws Exception {
- return null;
+ public void setLastNakNumber(int lastNakNumber) {
+ this.lastNakNumber = lastNakNumber;
}
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Response.java b/activemq-core/src/main/java/org/apache/activemq/command/Response.java
index 83dcd58b58..aacb9415e3 100755
--- a/activemq-core/src/main/java/org/apache/activemq/command/Response.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/Response.java
@@ -25,7 +25,7 @@ import org.apache.activemq.state.CommandVisitor;
public class Response extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.RESPONSE;
- short correlationId;
+ int correlationId;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -34,11 +34,11 @@ public class Response extends BaseCommand {
/**
* @openwire:property version=1
*/
- public short getCorrelationId() {
+ public int getCorrelationId() {
return correlationId;
}
- public void setCorrelationId(short responseId) {
+ public void setCorrelationId(int responseId) {
this.correlationId = responseId;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
index e66d323646..449098c339 100755
--- a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
@@ -44,8 +44,11 @@ public class WireFormatInfo implements Command, MarshallAware {
protected byte magic[] = MAGIC;
protected int version;
- protected transient HashMap properties;
protected ByteSequence marshalledProperties;
+
+ protected transient HashMap properties;
+ private transient Endpoint from;
+ private transient Endpoint to;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -59,7 +62,6 @@ public class WireFormatInfo implements Command, MarshallAware {
return true;
}
-
/**
* @openwire:property version=1 size=8 testSize=-1
*/
@@ -90,6 +92,28 @@ public class WireFormatInfo implements Command, MarshallAware {
this.marshalledProperties = marshalledProperties;
}
+ /**
+ * The endpoint within the transport where this message came from.
+ */
+ public Endpoint getFrom() {
+ return from;
+ }
+
+ public void setFrom(Endpoint from) {
+ this.from = from;
+ }
+
+ /**
+ * The endpoint within the transport where this message is going to - null means all endpoints.
+ */
+ public Endpoint getTo() {
+ return to;
+ }
+
+ public void setTo(Endpoint to) {
+ this.to = to;
+ }
+
//////////////////////
//
// Implementation Methods.
@@ -249,9 +273,9 @@ public class WireFormatInfo implements Command, MarshallAware {
//
///////////////////////////////////////////////////////////////
- public void setCommandId(short value) {
+ public void setCommandId(int value) {
}
- public short getCommandId() {
+ public int getCommandId() {
return 0;
}
public boolean isResponseRequired() {
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java b/activemq-core/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java
new file mode 100644
index 0000000000..9ee62097e6
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/CommandIdComparator.java
@@ -0,0 +1,39 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import org.apache.activemq.command.Command;
+
+import java.util.Comparator;
+
+/**
+ * A @{link Comparator} of commands using their {@link Command#getCommandId()}
+ *
+ * @version $Revision$
+ */
+public class CommandIdComparator implements Comparator {
+
+ public int compare(Object o1, Object o2) {
+ assert o1 instanceof Command;
+ assert o2 instanceof Command;
+
+ Command c1 = (Command) o1;
+ Command c2 = (Command) o2;
+ return c1.getCommandId() - c2.getCommandId();
+ }
+
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
index e29063a869..971d0d09f5 100755
--- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
@@ -32,7 +32,9 @@ import org.activeio.command.WireFormat;
import org.activeio.packet.ByteArrayPacket;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.MarshallAware;
+import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.util.IdGenerator;
@@ -225,6 +227,13 @@ final public class OpenWireFormat implements WireFormat {
DataStructure c = (DataStructure) o;
byte type = c.getDataStructureType();
+
+ // TODO - we could remove this if we have a way to disable BooleanStream on
+ // certain types of message
+ if (type == CommandTypes.PARTIAL_COMMAND || type == CommandTypes.PARTIAL_LAST_COMMAND) {
+ marshalPartialCommand((PartialCommand) o, dataOut);
+ return;
+ }
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if( dsm == null )
throw new IOException("Unknown data type: "+type);
@@ -264,7 +273,7 @@ final public class OpenWireFormat implements WireFormat {
dataOut.writeByte(NULL_TYPE);
}
}
-
+
public Object unmarshal(DataInputStream dis) throws IOException {
if( !sizePrefixDisabled ) {
dis.readInt();
@@ -335,7 +344,13 @@ final public class OpenWireFormat implements WireFormat {
public Object doUnmarshal(DataInputStream dis) throws IOException {
byte dataType = dis.readByte();
- if( dataType!=NULL_TYPE ) {
+
+ // TODO - we could remove this if we have a way to disable BooleanStream on
+ // certain types of message
+ if (dataType == CommandTypes.PARTIAL_COMMAND || dataType == CommandTypes.PARTIAL_LAST_COMMAND) {
+ return doUnmarshalPartialCommand(dataType, dis);
+ }
+ else if( dataType!=NULL_TYPE ) {
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if( dsm == null )
throw new IOException("Unknown data type: "+dataType);
@@ -352,6 +367,7 @@ final public class OpenWireFormat implements WireFormat {
return null;
}
}
+
// public void debug(String msg) {
// String t = (Thread.currentThread().getName()+" ").substring(0, 40);
// System.out.println(t+": "+msg);
@@ -570,4 +586,53 @@ final public class OpenWireFormat implements WireFormat {
}
+
+
+ // Partial command marshalling
+ //
+ // TODO - remove if we can figure out a clean way to disable BooleanStream in OpenWire on commands
+ // with no optional values (partial commands only have a mandatory byte[])
+ //
+
+ protected void marshalPartialCommand(PartialCommand command, DataOutputStream dataOut) throws IOException {
+ byte[] data = command.getData();
+ int dataSize = data.length;
+
+ if (!isSizePrefixDisabled()) {
+ int size = dataSize + 1 + 4;
+ dataOut.writeInt(size);
+ }
+
+ if (command.isLastPart()) {
+ dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE);
+ }
+ else {
+ dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
+ }
+
+ dataOut.writeInt(command.getCommandId());
+ dataOut.writeInt(dataSize);
+ dataOut.write(data);
+
+ }
+
+ protected Object doUnmarshalPartialCommand(byte dataType, DataInputStream dis) throws IOException {
+ // size of entire command is already read
+
+ PartialCommand answer = null;
+ if (dataType == LastPartialCommand.DATA_STRUCTURE_TYPE) {
+ answer = new LastPartialCommand();
+ }
+ else {
+ answer = new PartialCommand();
+ }
+ answer.setCommandId(dis.readInt());
+
+ int size = dis.readInt();
+ byte[] data = new byte[size];
+ dis.readFully(data);
+ answer.setData(data);
+ return answer;
+ }
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseCommandMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseCommandMarshaller.java
index a1c213b6f4..4697816ea5 100755
--- a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseCommandMarshaller.java
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/BaseCommandMarshaller.java
@@ -50,7 +50,7 @@ public abstract class BaseCommandMarshaller extends BaseDataStreamMarshaller {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
BaseCommand info = (BaseCommand)o;
- info.setCommandId(dataIn.readShort());
+ info.setCommandId(dataIn.readInt());
info.setResponseRequired(bs.readBoolean());
}
@@ -66,7 +66,7 @@ public abstract class BaseCommandMarshaller extends BaseDataStreamMarshaller {
int rc = super.tightMarshal1(wireFormat, o, bs);
bs.writeBoolean(info.isResponseRequired());
- return rc + 2;
+ return rc + 4;
}
/**
@@ -80,7 +80,7 @@ public abstract class BaseCommandMarshaller extends BaseDataStreamMarshaller {
super.tightMarshal2(wireFormat, o, dataOut, bs);
BaseCommand info = (BaseCommand)o;
- dataOut.writeShort(info.getCommandId());
+ dataOut.writeInt(info.getCommandId());
bs.readBoolean();
}
@@ -96,7 +96,7 @@ public abstract class BaseCommandMarshaller extends BaseDataStreamMarshaller {
super.looseUnmarshal(wireFormat, o, dataIn);
BaseCommand info = (BaseCommand)o;
- info.setCommandId(dataIn.readShort());
+ info.setCommandId(dataIn.readInt());
info.setResponseRequired(dataIn.readBoolean());
}
@@ -110,7 +110,7 @@ public abstract class BaseCommandMarshaller extends BaseDataStreamMarshaller {
BaseCommand info = (BaseCommand)o;
super.looseMarshal(wireFormat, o, dataOut);
- dataOut.writeShort(info.getCommandId());
+ dataOut.writeInt(info.getCommandId());
dataOut.writeBoolean(info.isResponseRequired());
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java
new file mode 100644
index 0000000000..fff89afc74
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.openwire.v1;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.openwire.*;
+import org.apache.activemq.command.*;
+
+
+
+/**
+ * Marshalling code for Open Wire Format for LastPartialCommandMarshaller
+ *
+ *
+ * NOTE!: This file is auto generated - do not modify!
+ * if you need to make a change, please see the modify the groovy scripts in the
+ * under src/gram/script and then use maven openwire:generate to regenerate
+ * this file.
+ *
+ * @version $Revision$
+ */
+public class LastPartialCommandMarshaller extends PartialCommandMarshaller {
+
+ /**
+ * Return the type of Data Structure we marshal
+ * @return short representation of the type data structure
+ */
+ public byte getDataStructureType() {
+ return LastPartialCommand.DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @return a new object instance
+ */
+ public DataStructure createObject() {
+ return new LastPartialCommand();
+ }
+
+ /**
+ * Un-marshal an object instance from the data input stream
+ *
+ * @param o the object to un-marshal
+ * @param dataIn the data input stream to build the object from
+ * @throws IOException
+ */
+ public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException {
+ super.tightUnmarshal(wireFormat, o, dataIn, bs);
+
+ }
+
+
+ /**
+ * Write the booleans that this object uses to a BooleanStream
+ */
+ public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
+
+ int rc = super.tightMarshal1(wireFormat, o, bs);
+
+ return rc + 0;
+ }
+
+ /**
+ * Write a object instance to data output stream
+ *
+ * @param o the instance to be marshaled
+ * @param dataOut the output stream
+ * @throws IOException thrown if an error occurs
+ */
+ public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
+ super.tightMarshal2(wireFormat, o, dataOut, bs);
+
+ }
+
+ /**
+ * Un-marshal an object instance from the data input stream
+ *
+ * @param o the object to un-marshal
+ * @param dataIn the data input stream to build the object from
+ * @throws IOException
+ */
+ public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException {
+ super.looseUnmarshal(wireFormat, o, dataIn);
+
+ }
+
+
+ /**
+ * Write the booleans that this object uses to a BooleanStream
+ */
+ public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException {
+
+ super.looseMarshal(wireFormat, o, dataOut);
+
+ }
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
index 72012de26e..d733158a23 100755
--- a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
@@ -40,6 +40,7 @@ public class MarshallerFactory {
static {
add(new LocalTransactionIdMarshaller());
+ add(new PartialCommandMarshaller());
add(new IntegerResponseMarshaller());
add(new ActiveMQQueueMarshaller());
add(new ActiveMQObjectMessageMarshaller());
@@ -68,6 +69,7 @@ public class MarshallerFactory {
add(new SubscriptionInfoMarshaller());
add(new JournalTransactionMarshaller());
add(new ControlCommandMarshaller());
+ add(new LastPartialCommandMarshaller());
add(new NetworkBridgeFilterMarshaller());
add(new ActiveMQBytesMessageMarshaller());
add(new WireFormatInfoMarshaller());
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/PartialCommandMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/PartialCommandMarshaller.java
new file mode 100644
index 0000000000..a0b85cc1da
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/PartialCommandMarshaller.java
@@ -0,0 +1,128 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.openwire.v1;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.openwire.*;
+import org.apache.activemq.command.*;
+
+
+
+/**
+ * Marshalling code for Open Wire Format for PartialCommandMarshaller
+ *
+ *
+ * NOTE!: This file is auto generated - do not modify!
+ * if you need to make a change, please see the modify the groovy scripts in the
+ * under src/gram/script and then use maven openwire:generate to regenerate
+ * this file.
+ *
+ * @version $Revision$
+ */
+public class PartialCommandMarshaller extends BaseCommandMarshaller {
+
+ /**
+ * Return the type of Data Structure we marshal
+ * @return short representation of the type data structure
+ */
+ public byte getDataStructureType() {
+ return PartialCommand.DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @return a new object instance
+ */
+ public DataStructure createObject() {
+ return new PartialCommand();
+ }
+
+ /**
+ * Un-marshal an object instance from the data input stream
+ *
+ * @param o the object to un-marshal
+ * @param dataIn the data input stream to build the object from
+ * @throws IOException
+ */
+ public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException {
+ super.tightUnmarshal(wireFormat, o, dataIn, bs);
+
+ PartialCommand info = (PartialCommand)o;
+ info.setData(tightUnmarshalByteArray(dataIn, bs));
+
+ }
+
+
+ /**
+ * Write the booleans that this object uses to a BooleanStream
+ */
+ public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
+
+ PartialCommand info = (PartialCommand)o;
+
+ int rc = super.tightMarshal1(wireFormat, o, bs);
+ rc += tightMarshalByteArray1(info.getData(), bs);
+
+ return rc + 0;
+ }
+
+ /**
+ * Write a object instance to data output stream
+ *
+ * @param o the instance to be marshaled
+ * @param dataOut the output stream
+ * @throws IOException thrown if an error occurs
+ */
+ public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
+ super.tightMarshal2(wireFormat, o, dataOut, bs);
+
+ PartialCommand info = (PartialCommand)o;
+ tightMarshalByteArray2(info.getData(), dataOut, bs);
+
+ }
+
+ /**
+ * Un-marshal an object instance from the data input stream
+ *
+ * @param o the object to un-marshal
+ * @param dataIn the data input stream to build the object from
+ * @throws IOException
+ */
+ public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException {
+ super.looseUnmarshal(wireFormat, o, dataIn);
+
+ PartialCommand info = (PartialCommand)o;
+ info.setData(looseUnmarshalByteArray(dataIn));
+
+ }
+
+
+ /**
+ * Write the booleans that this object uses to a BooleanStream
+ */
+ public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException {
+
+ PartialCommand info = (PartialCommand)o;
+
+ super.looseMarshal(wireFormat, o, dataOut);
+ looseMarshalByteArray(wireFormat, info.getData(), dataOut);
+
+ }
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ReplayCommandMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ReplayCommandMarshaller.java
index 5d3951c924..a618d7b7df 100644
--- a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ReplayCommandMarshaller.java
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ReplayCommandMarshaller.java
@@ -64,6 +64,10 @@ public class ReplayCommandMarshaller extends BaseCommandMarshaller {
public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
+ ReplayCommand info = (ReplayCommand)o;
+ info.setFirstNakNumber(dataIn.readInt());
+ info.setLastNakNumber(dataIn.readInt());
+
}
@@ -72,9 +76,11 @@ public class ReplayCommandMarshaller extends BaseCommandMarshaller {
*/
public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
+ ReplayCommand info = (ReplayCommand)o;
+
int rc = super.tightMarshal1(wireFormat, o, bs);
- return rc + 0;
+ return rc + 8;
}
/**
@@ -87,6 +93,10 @@ public class ReplayCommandMarshaller extends BaseCommandMarshaller {
public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
super.tightMarshal2(wireFormat, o, dataOut, bs);
+ ReplayCommand info = (ReplayCommand)o;
+ dataOut.writeInt(info.getFirstNakNumber());
+ dataOut.writeInt(info.getLastNakNumber());
+
}
/**
@@ -99,6 +109,10 @@ public class ReplayCommandMarshaller extends BaseCommandMarshaller {
public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn) throws IOException {
super.looseUnmarshal(wireFormat, o, dataIn);
+ ReplayCommand info = (ReplayCommand)o;
+ info.setFirstNakNumber(dataIn.readInt());
+ info.setLastNakNumber(dataIn.readInt());
+
}
@@ -107,7 +121,11 @@ public class ReplayCommandMarshaller extends BaseCommandMarshaller {
*/
public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut) throws IOException {
+ ReplayCommand info = (ReplayCommand)o;
+
super.looseMarshal(wireFormat, o, dataOut);
+ dataOut.writeInt(info.getFirstNakNumber());
+ dataOut.writeInt(info.getLastNakNumber());
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ResponseMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ResponseMarshaller.java
index 5131911549..d998949846 100755
--- a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ResponseMarshaller.java
+++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/ResponseMarshaller.java
@@ -65,7 +65,7 @@ public class ResponseMarshaller extends BaseCommandMarshaller {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
Response info = (Response)o;
- info.setCorrelationId(dataIn.readShort());
+ info.setCorrelationId(dataIn.readInt());
}
@@ -79,7 +79,7 @@ public class ResponseMarshaller extends BaseCommandMarshaller {
int rc = super.tightMarshal1(wireFormat, o, bs);
- return rc + 2;
+ return rc + 4;
}
/**
@@ -93,7 +93,7 @@ public class ResponseMarshaller extends BaseCommandMarshaller {
super.tightMarshal2(wireFormat, o, dataOut, bs);
Response info = (Response)o;
- dataOut.writeShort(info.getCorrelationId());
+ dataOut.writeInt(info.getCorrelationId());
}
@@ -108,7 +108,7 @@ public class ResponseMarshaller extends BaseCommandMarshaller {
super.looseUnmarshal(wireFormat, o, dataIn);
Response info = (Response)o;
- info.setCorrelationId(dataIn.readShort());
+ info.setCorrelationId(dataIn.readInt());
}
@@ -121,7 +121,7 @@ public class ResponseMarshaller extends BaseCommandMarshaller {
Response info = (Response)o;
super.looseMarshal(wireFormat, o, dataOut);
- dataOut.writeShort(info.getCorrelationId());
+ dataOut.writeInt(info.getCorrelationId());
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java b/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
new file mode 100644
index 0000000000..d3a42af98a
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
@@ -0,0 +1,81 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.LastPartialCommand;
+import org.apache.activemq.command.PartialCommand;
+import org.apache.activemq.openwire.OpenWireFormat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+
+/**
+ * Joins together of partial commands which were split into individual chunks of data.
+ *
+ * @version $Revision$
+ */
+public class CommandJoiner extends TransportFilter {
+
+ private ByteArrayOutputStream out = new ByteArrayOutputStream();
+ private OpenWireFormat wireFormat;
+
+ public CommandJoiner(Transport next, OpenWireFormat wireFormat) {
+ super(next);
+ this.wireFormat = wireFormat;
+ }
+
+ public void onCommand(Command command) {
+ byte type = command.getDataStructureType();
+ if (type == PartialCommand.DATA_STRUCTURE_TYPE || type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
+ PartialCommand header = (PartialCommand) command;
+ byte[] partialData = header.getData();
+ try {
+ out.write(partialData);
+
+ if (header.isLastPart()) {
+ byte[] fullData = out.toByteArray();
+ Command completeCommand = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(fullData)));
+ resetBuffer();
+ getTransportListener().onCommand(completeCommand);
+ }
+ }
+ catch (IOException e) {
+ getTransportListener().onException(e);
+ }
+ }
+ else {
+ getTransportListener().onCommand(command);
+ }
+ }
+
+ public void stop() throws Exception {
+ super.stop();
+ resetBuffer();
+ }
+
+ public String toString() {
+ return next.toString();
+ }
+
+ protected void resetBuffer() {
+ out.reset();
+ }
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java
new file mode 100644
index 0000000000..a746a87db6
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java
@@ -0,0 +1,86 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.openwire.CommandIdComparator;
+import org.apache.activemq.transport.replay.ReplayStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * This interceptor deals with out of order commands together with being able to
+ * handle dropped commands and the re-requesting dropped commands.
+ *
+ * @version $Revision$
+ */
+public class ReliableTransport extends TransportFilter {
+ private static final Log log = LogFactory.getLog(ReliableTransport.class);
+
+ private ReplayStrategy replayStrategy;
+ private SortedSet headers = new TreeSet(new CommandIdComparator());
+ private int expectedCounter = 1;
+
+ public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
+ super(next);
+ this.replayStrategy = replayStrategy;
+ }
+
+ public void onCommand(Command command) {
+ int actualCounter = command.getCommandId();
+ boolean valid = expectedCounter != actualCounter;
+
+ if (!valid) {
+ if (actualCounter < expectedCounter) {
+ log.warn("Ignoring out of step packet: " + command);
+ }
+ else {
+ // lets add it to the list for later on
+ headers.add(command);
+
+ try {
+ replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
+ }
+ catch (IOException e) {
+ getTransportListener().onException(e);
+ }
+ }
+
+ if (!headers.isEmpty()) {
+ // lets see if the first item in the set is the next header
+ command = (Command) headers.first();
+ valid = expectedCounter == command.getCommandId();
+ }
+ }
+
+ if (valid) {
+ // we've got a valid header so increment counter
+ replayStrategy.onReceivedPacket(this, expectedCounter);
+ expectedCounter++;
+ getTransportListener().onCommand(command);
+ }
+ }
+
+ public String toString() {
+ return next.toString();
+ }
+
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
index 3cfe0036ab..9bed7cbb5f 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
@@ -27,9 +27,8 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
- * Creates a {@see org.activeio.RequestChannel} out of a {@see org.activeio.AsynchChannel}. This
- * {@see org.activeio.RequestChannel} is thread safe and mutiplexes concurrent requests and responses over
- * the underlying {@see org.activeio.AsynchChannel}.
+ * Adds the incrementing sequence number to commands along with performing the corelation of
+ * responses to requests to create a blocking request-response semantics.
*
* @version $Revision: 1.4 $
*/
@@ -38,9 +37,9 @@ final public class ResponseCorrelator extends TransportFilter {
private static final Log log = LogFactory.getLog(ResponseCorrelator.class);
private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
- private short lastCommandId = 0;
+ private int lastCommandId = 0;
- synchronized short getNextCommandId() {
+ synchronized int getNextCommandId() {
return ++lastCommandId;
}
@@ -58,7 +57,7 @@ final public class ResponseCorrelator extends TransportFilter {
command.setCommandId(getNextCommandId());
command.setResponseRequired(true);
FutureResponse future = new FutureResponse();
- requestMap.put(new Short(command.getCommandId()), future);
+ requestMap.put(new Integer(command.getCommandId()), future);
next.oneway(command);
return future;
}
@@ -72,7 +71,7 @@ final public class ResponseCorrelator extends TransportFilter {
boolean debug = log.isDebugEnabled();
if( command.isResponse() ) {
Response response = (Response) command;
- FutureResponse future = (FutureResponse) requestMap.remove(new Short(response.getCorrelationId()));
+ FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
if( future!=null ) {
future.set(response);
} else {
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
index cbb0f26239..85bc12e3e1 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
@@ -53,7 +53,7 @@ public class TransportLogger extends TransportFilter {
public void onCommand(Command command) {
if( log.isDebugEnabled() ) {
- log.debug("RECEIVED: "+command);
+ log.debug("RECEIVED: from: "+ command.getFrom() + " : " + command);
}
getTransportListener().onCommand(command);
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 6ecb8895b5..9547c5892e 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -83,7 +83,7 @@ public class FailoverTransport implements CompositeTransport {
return;
}
if (command.isResponse()) {
- requestMap.remove(new Short(((Response) command).getCorrelationId()));
+ requestMap.remove(new Integer(((Response) command).getCorrelationId()));
}
if (!initialized){
if (command.isBrokerInfo()){
@@ -343,7 +343,7 @@ public class FailoverTransport implements CompositeTransport {
// then hold it in the requestMap so that we can replay
// it later.
if (!stateTracker.track(command) && command.isResponseRequired()) {
- requestMap.put(new Short(command.getCommandId()), command);
+ requestMap.put(new Integer(command.getCommandId()), command);
}
// Send the message.
@@ -352,7 +352,7 @@ public class FailoverTransport implements CompositeTransport {
} catch (IOException e) {
// If there is an IOException in the send, remove the command from the requestMap
if (!stateTracker.track(command) && command.isResponseRequired()) {
- requestMap.remove(new Short(command.getCommandId()), command);
+ requestMap.remove(new Integer(command.getCommandId()), command);
}
// Rethrow the exception so it will handled by the outer catch
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
index e75c37466a..34bd6bd357 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
@@ -107,7 +107,7 @@ public class FanoutTransport implements CompositeTransport {
public void onCommand(Command command) {
if (command.isResponse()) {
- Short id = new Short(((Response) command).getCorrelationId());
+ Integer id = new Integer(((Response) command).getCorrelationId());
RequestCounter rc = (RequestCounter) requestMap.get(id);
if( rc != null ) {
if( rc.ackCount.decrementAndGet() <= 0 ) {
@@ -340,7 +340,7 @@ public class FanoutTransport implements CompositeTransport {
boolean fanout = isFanoutCommand(command);
if (!stateTracker.track(command) && command.isResponseRequired() ) {
int size = fanout ? minAckCount : 1;
- requestMap.put(new Short(command.getCommandId()), new RequestCounter(command, size));
+ requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
}
// Wait for transport to be connected.
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java
similarity index 66%
rename from activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
rename to activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java
index da215b82a5..166c46c3b2 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java
@@ -14,20 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.udp;
+package org.apache.activemq.transport.multicast;
-import org.apache.activemq.command.Command;
-
-import java.io.IOException;
+import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
/**
- * Represents an inbound buffer of datagrams for dealing with out of order
- * or fragmented commands.
- *
+ *
* @version $Revision$
*/
-public interface DatagramReadBuffer {
+public class MulticastDatagramHeaderMarshaller extends DatagramHeaderMarshaller {
- Command read(DatagramHeader header) throws IOException;
-
-}
\ No newline at end of file
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
new file mode 100644
index 0000000000..bab5d89246
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.multicast;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.UdpTransport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+/**
+ * A multicast based transport.
+ *
+ * @version $Revision$
+ */
+public class MulticastTransport extends UdpTransport {
+
+ public MulticastTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
+ super(wireFormat, port);
+ }
+
+ public MulticastTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
+ super(wireFormat, socketAddress);
+ }
+
+ public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
+ super(wireFormat, remoteLocation);
+ }
+
+ public MulticastTransport(OpenWireFormat wireFormat) throws IOException {
+ super(wireFormat);
+ }
+
+ protected String getProtocolName() {
+ return "Multicast";
+ }
+
+ protected String getProtocolUriScheme() {
+ return "multicast://";
+ }
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransportFactory.java
new file mode 100644
index 0000000000..7ecd6abe86
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransportFactory.java
@@ -0,0 +1,39 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.multicast;
+
+import org.activeio.command.WireFormat;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.udp.UdpTransportFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+/**
+ * A factory of multicast transport classes
+ *
+ * @version $Revision$
+ */
+public class MulticastTransportFactory extends UdpTransportFactory {
+
+ protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+ OpenWireFormat wireFormat = asOpenWireFormat(wf);
+ return new MulticastTransport(wireFormat, location);
+ }
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/package.html b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/package.html
new file mode 100755
index 0000000000..5547b0abbf
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/package.html
@@ -0,0 +1,9 @@
+
+
+
+
+
+A Multicast based Transport implementation.
+
+
+
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java b/activemq-core/src/main/java/org/apache/activemq/transport/replay/ExceptionIfDroppedReplayStrategy.java
similarity index 60%
rename from activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
rename to activemq-core/src/main/java/org/apache/activemq/transport/replay/ExceptionIfDroppedReplayStrategy.java
index e6c38bcc24..c0454de31b 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/replay/ExceptionIfDroppedReplayStrategy.java
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.udp.replay;
+package org.apache.activemq.transport.replay;
+
+import org.apache.activemq.transport.ReliableTransport;
import java.io.IOException;
@@ -23,14 +25,14 @@ import java.io.IOException;
*
* @version $Revision$
*/
-public class ExceptionIfDroppedPacketStrategy implements DatagramReplayStrategy {
+public class ExceptionIfDroppedReplayStrategy implements ReplayStrategy {
- public void onDroppedPackets(String name, long expectedCounter, long actualCounter) throws IOException {
+ public void onDroppedPackets(ReliableTransport transport, long expectedCounter, long actualCounter) throws IOException {
long count = actualCounter - expectedCounter;
- throw new IOException(name + count + " packet(s) dropped. Expected: " + expectedCounter + " but was: " + actualCounter);
+ throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter);
}
- public void onReceivedPacket(String name, long expectedCounter) {
+ public void onReceivedPacket(ReliableTransport transport, long expectedCounter) {
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java b/activemq-core/src/main/java/org/apache/activemq/transport/replay/ReplayStrategy.java
similarity index 69%
rename from activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
rename to activemq-core/src/main/java/org/apache/activemq/transport/replay/ReplayStrategy.java
index 2a408c9efb..350a86b6b8 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/replay/ReplayStrategy.java
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.udp.replay;
+package org.apache.activemq.transport.replay;
+
+import org.apache.activemq.transport.ReliableTransport;
import java.io.IOException;
@@ -23,11 +25,11 @@ import java.io.IOException;
*
* @version $Revision$
*/
-public interface DatagramReplayStrategy {
+public interface ReplayStrategy {
- void onDroppedPackets(String name, long expectedCounter, long actualCounter) throws IOException;
+ void onDroppedPackets(ReliableTransport transport, long expectedCounter, long actualCounter) throws IOException;
- void onReceivedPacket(String name, long expectedCounter);
+ void onReceivedPacket(ReliableTransport transport, long expectedCounter);
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
index dc617c2770..7546b73b3a 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
@@ -16,11 +16,15 @@
*/
package org.apache.activemq.transport.udp;
+import org.activeio.ByteSequence;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.LastPartialCommand;
+import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
+import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,34 +51,27 @@ public class CommandChannel implements Service {
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
private int datagramSize = 4 * 1024;
- private DatagramReplayStrategy replayStrategy;
private SocketAddress targetAddress;
private DatagramHeaderMarshaller headerMarshaller;
- private final boolean checkSequenceNumbers;
// reading
private Object readLock = new Object();
private ByteBuffer readBuffer;
- private DatagramReadBuffer readStack;
private SocketAddress lastReadDatagramAddress;
// writing
private Object writeLock = new Object();
private ByteBuffer writeBuffer;
- private BooleanStream bs = new BooleanStream();
- private int largeMessageBufferSize = 128 * 1024;
- private DatagramHeader header = new DatagramHeader();
+ private int defaultMarshalBufferSize = 64 * 1024;
public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
- DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers, DatagramHeaderMarshaller headerMarshaller) {
+ SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
this.name = name;
this.channel = channel;
this.wireFormat = wireFormat;
this.bufferPool = bufferPool;
this.datagramSize = datagramSize;
- this.replayStrategy = replayStrategy;
this.targetAddress = targetAddress;
- this.checkSequenceNumbers = checkSequenceNumbers;
this.headerMarshaller = headerMarshaller;
}
@@ -87,9 +84,6 @@ public class CommandChannel implements Service {
wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true);
- if (checkSequenceNumbers) {
- readStack = new CommandReadBuffer(name, wireFormat, replayStrategy);
- }
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
@@ -100,8 +94,7 @@ public class CommandChannel implements Service {
bufferPool.stop();
}
- public void read(CommandProcessor processor) throws IOException {
- DatagramHeader header = null;
+ public Command read() throws IOException {
Command answer = null;
lastReadDatagramAddress = null;
synchronized (readLock) {
@@ -109,53 +102,26 @@ public class CommandChannel implements Service {
lastReadDatagramAddress = channel.receive(readBuffer);
readBuffer.flip();
- header = headerMarshaller.readHeader(readBuffer);
- header.setFromAddress(lastReadDatagramAddress);
+ Endpoint from = headerMarshaller.createEndpoint(readBuffer, lastReadDatagramAddress);
- if (log.isDebugEnabled()) {
- log.debug("Received datagram on: " + name + " from: " + lastReadDatagramAddress + " header: " + header);
- }
int remaining = readBuffer.remaining();
- int size = header.getDataSize();
- /*
- * if (size > remaining) { throw new IOException("Invalid command
- * size: " + size + " when there are only: " + remaining + " byte(s)
- * remaining"); } else if (size < remaining) { log.warn("Extra bytes
- * in buffer. Expecting: " + size + " but has: " + remaining); }
- */
- if (size != remaining) {
- log.warn("Expecting: " + size + " but has: " + remaining);
- }
- if (header.isPartial()) {
- byte[] data = new byte[size];
- readBuffer.get(data);
- header.setPartialData(data);
- }
- else {
- byte[] data = new byte[remaining];
- readBuffer.get(data);
+
+ byte[] data = new byte[remaining];
+ readBuffer.get(data);
- // TODO use a DataInput implementation that talks direct to the
- // ByteBuffer
- DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
- Command command = (Command) wireFormat.unmarshal(dataIn);
- // Command command = (Command) wireFormat.doUnmarshal(dataIn);
- header.setCommand(command);
- }
-
- if (readStack != null) {
- answer = readStack.read(header);
- }
- else {
- answer = header.getCommand();
- }
+ // TODO could use a DataInput implementation that talks direct to
+ // the
+ // ByteBuffer
+ DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
+ answer = (Command) wireFormat.unmarshal(dataIn);
+ answer.setFrom(from);
}
if (answer != null) {
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " about to process: " + answer);
}
- processor.process(answer, header);
}
+ return answer;
}
/**
@@ -164,13 +130,7 @@ public class CommandChannel implements Service {
*
* @throws IOException
*/
- public Command onDatagramReceived(DatagramHeader header) throws IOException {
- if (readStack != null) {
- return readStack.read(header);
- }
- else {
- return header.getCommand();
- }
+ public void setWireFormatInfoEndpoint(DatagramEndpoint endpoint) throws IOException {
}
public void write(Command command) throws IOException {
@@ -180,46 +140,62 @@ public class CommandChannel implements Service {
public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) {
- ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
+ ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
byte[] data = largeBuffer.toByteArray();
int size = data.length;
if (size < datagramSize) {
- header.incrementCounter();
- header.setPartial(false);
- header.setComplete(true);
- header.setDataSize(size);
writeBuffer.clear();
- headerMarshaller.writeHeader(header, writeBuffer);
+ headerMarshaller.writeHeader(command, writeBuffer);
writeBuffer.put(data);
sendWriteBuffer(address);
}
else {
- header.setPartial(true);
- header.setComplete(false);
-
// lets split the command up into chunks
-
int offset = 0;
boolean lastFragment = false;
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
// write the header
writeBuffer.clear();
- int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
+ headerMarshaller.writeHeader(command, writeBuffer);
+
+ int chunkSize = writeBuffer.remaining();
+
+ // we need to remove the amount of overhead to write the partial command
+
+ // lets remove the header of the partial command
+ // which is the byte for the type and an int for the size of the byte[]
+ chunkSize -= 1 + 4 + 4;
+
+ if (!wireFormat.isSizePrefixDisabled()) {
+ // lets write the size of the command buffer
+ writeBuffer.putInt(chunkSize);
+ chunkSize -= 4;
+ }
+
lastFragment = offset + chunkSize >= length;
if (chunkSize + offset > length) {
chunkSize = length - offset;
}
- header.incrementCounter();
- header.setDataSize(chunkSize);
- header.setComplete(lastFragment);
- headerMarshaller.writeHeader(header, writeBuffer);
+ if (lastFragment) {
+ writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
+ }
+ else {
+ writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
+ }
+
+ writeBuffer.putInt(command.getCommandId());
+
+ // size of byte array
+ writeBuffer.putInt(chunkSize);
+
// now the data
writeBuffer.put(data, offset, chunkSize);
+
offset += chunkSize;
sendWriteBuffer(address);
}
@@ -272,7 +248,7 @@ public class CommandChannel implements Service {
writeBuffer.flip();
if (log.isDebugEnabled()) {
- log.debug("Channel: " + name + " sending datagram to: " + address + " header: " + header);
+ log.debug("Channel: " + name + " sending datagram to: " + address);
}
channel.send(writeBuffer, address);
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
deleted file mode 100644
index a532bfd5ad..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.udp;
-
-import org.apache.activemq.command.Command;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- * Buffers up incoming headers to reorder them. This class is only accessed by
- * one thread at once.
- *
- * @version $Revision$
- */
-public class CommandReadBuffer implements DatagramReadBuffer {
- private static final Log log = LogFactory.getLog(CommandReadBuffer.class);
-
- private OpenWireFormat wireFormat;
- private DatagramReplayStrategy replayStrategy;
- private SortedSet headers = new TreeSet();
- private long expectedCounter = 1;
- private ByteArrayOutputStream out = new ByteArrayOutputStream();
- private final String name;
-
- public CommandReadBuffer(String name, OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) {
- this.name = name;
- this.wireFormat = wireFormat;
- this.replayStrategy = replayStrategy;
- }
-
-
- public Command read(DatagramHeader header) throws IOException {
- long actualCounter = header.getCounter();
- if (expectedCounter != actualCounter) {
- if (actualCounter < expectedCounter) {
- log.warn("Ignoring out of step packet: " + header);
- }
- else {
- replayStrategy.onDroppedPackets(name, expectedCounter, actualCounter);
-
- // lets add it to the list for later on
- headers.add(header);
- }
-
- // lets see if the first item in the set is the next header
- if (headers.isEmpty()) {
- return null;
- }
- header = (DatagramHeader) headers.first();
- if (expectedCounter != header.getCounter()) {
- return null;
- }
- }
-
- // we've got a valid header so increment counter
- replayStrategy.onReceivedPacket(name, expectedCounter);
- expectedCounter++;
-
- Command answer = null;
- if (!header.isPartial()) {
- answer = header.getCommand();
- if (answer == null) {
- throw new IllegalStateException("The header should have a command!: " + header);
- }
- }
- else {
- byte[] data = header.getPartialData();
- out.write(data);
-
- if (header.isComplete()) {
- answer = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
- out.reset();
- }
- }
- return answer;
- }
-
-
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
new file mode 100644
index 0000000000..7c9674ed9b
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
@@ -0,0 +1,40 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.BaseEndpoint;
+
+import java.net.SocketAddress;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class DatagramEndpoint extends BaseEndpoint {
+
+ private final SocketAddress address;
+
+ public DatagramEndpoint(String name, SocketAddress address) {
+ super(name);
+ this.address = address;
+ }
+
+ public SocketAddress getAddress() {
+ return address;
+ }
+
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
deleted file mode 100644
index bb3e548f5c..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.udp;
-
-import org.apache.activemq.command.Command;
-
-import java.net.SocketAddress;
-
-/**
- * Represents a header used when sending data grams
- *
- * @version $Revision$
- */
-public class DatagramHeader implements Comparable {
-
- private String producerId;
- private long counter;
- private boolean partial;
- private boolean complete;
- private int dataSize;
-
- // transient caches
- private transient SocketAddress fromAddress;
- private transient byte[] partialData;
- private transient Command command;
-
- public int hashCode() {
- final int PRIME = 31;
- int result = 1;
- result = PRIME * result + (int) (counter ^ (counter >>> 32));
- return result;
- }
-
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- final DatagramHeader other = (DatagramHeader) obj;
- if (counter != other.counter)
- return false;
- return true;
- }
-
- public int compareTo(DatagramHeader that) {
- return (int) (this.counter - that.counter);
- }
-
- public int compareTo(Object that) {
- if (that instanceof DatagramHeader) {
- return compareTo((DatagramHeader) that);
- }
- return getClass().getName().compareTo(that.getClass().getName());
- }
-
-
- public String toString() {
- return "DatagramHeader[producer: " + producerId + " counter: " + counter + " flags: " + getFlags();
- }
-
- public boolean isComplete() {
- return complete;
- }
-
- public void setComplete(boolean complete) {
- this.complete = complete;
- }
-
- public long getCounter() {
- return counter;
- }
-
- public void setCounter(long counter) {
- this.counter = counter;
- }
-
- public boolean isPartial() {
- return partial;
- }
-
- public void setPartial(boolean partial) {
- this.partial = partial;
- }
-
- public String getProducerId() {
- return producerId;
- }
-
- public void setProducerId(String producerId) {
- this.producerId = producerId;
- }
-
- public int getDataSize() {
- return dataSize;
- }
-
- public void setDataSize(int dataSize) {
- this.dataSize = dataSize;
- }
-
- public void incrementCounter() {
- counter++;
- }
-
- public byte getFlags() {
- byte answer = 0;
- if (partial) {
- answer |= 0x1;
- }
- if (complete) {
- answer |= 0x2;
- }
- return answer;
- }
-
- public void setFlags(byte flags) {
- partial = (flags & 0x1) != 0;
- complete = (flags & 0x2) != 0;
- }
-
- // Transient cached properties
-
- public Command getCommand() {
- return command;
- }
-
- public void setCommand(Command command) {
- this.command = command;
- }
-
- public byte[] getPartialData() {
- return partialData;
- }
-
- public void setPartialData(byte[] partialData) {
- this.partialData = partialData;
- }
-
- public SocketAddress getFromAddress() {
- return fromAddress;
- }
-
- public void setFromAddress(SocketAddress fromAddress) {
- this.fromAddress = fromAddress;
- }
-
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
index e65c34e297..f3344f1af1 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
@@ -17,6 +17,10 @@
package org.apache.activemq.transport.udp;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
/**
@@ -25,29 +29,20 @@ import java.nio.ByteBuffer;
*/
public class DatagramHeaderMarshaller {
- public DatagramHeader createDatagramHeader() {
- return new DatagramHeader();
+ /**
+ * Reads any header if applicable and then creates an endpoint object
+ */
+ public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) {
+ return new DatagramEndpoint(address.toString(), address);
}
- public DatagramHeader readHeader(ByteBuffer readBuffer) {
- DatagramHeader answer = createDatagramHeader();
- answer.setCounter(readBuffer.getLong());
- answer.setDataSize(readBuffer.getInt());
- byte flags = readBuffer.get();
- answer.setFlags(flags);
- //System.out.println("Read header with counter: " + answer.getCounter() + "size: " + answer.getDataSize() + " with flags: " + flags);
- return answer;
- }
-
- public void writeHeader(DatagramHeader header, ByteBuffer writeBuffer) {
- writeBuffer.putLong(header.getCounter());
- writeBuffer.putInt(header.getDataSize());
- byte flags = header.getFlags();
+ public void writeHeader(Command command, ByteBuffer writeBuffer) {
+ /*
+ writeBuffer.putLong(command.getCounter());
+ writeBuffer.putInt(command.getDataSize());
+ byte flags = command.getFlags();
//System.out.println("Writing header with counter: " + header.getCounter() + " size: " + header.getDataSize() + " with flags: " + flags);
writeBuffer.put(flags);
- }
-
- public int getHeaderSize(DatagramHeader header) {
- return 8 + 4 + 1;
+ */
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
index b3e5633af6..fc226e0daa 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
@@ -21,8 +21,8 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
-import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
-import org.apache.activemq.transport.udp.replay.ExceptionIfDroppedPacketStrategy;
+import org.apache.activemq.transport.replay.ReplayStrategy;
+import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,7 +48,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private CommandChannel commandChannel;
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
- private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
+ private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
private int datagramSize = 4 * 1024;
private long maxInactivityDuration = 0; // 30000;
private SocketAddress targetAddress;
@@ -60,14 +60,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private int port;
private int minmumWireFormatVersion;
private String description = null;
-
- private CommandProcessor commandProcessor = new CommandProcessor() {
- public void process(Command command, DatagramHeader header) {
- doConsume(command);
- }
- };
-
- private DatagramHeader wireFormatHeader;
+ private DatagramEndpoint wireFormatHeader;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat;
@@ -113,8 +106,8 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
commandChannel.write(command, address);
}
- public void receivedHeader(DatagramHeader header) {
- wireFormatHeader = header;
+ public void receivedHeader(DatagramEndpoint endpoint) {
+ wireFormatHeader = endpoint;
}
/**
@@ -136,7 +129,8 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
log.trace("Consumer thread starting for: " + toString());
while (!isStopped()) {
try {
- commandChannel.read(commandProcessor);
+ Command command = commandChannel.read();
+ doConsume(command);
}
/*
* catch (SocketTimeoutException e) { } catch
@@ -237,14 +231,14 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.commandChannel = commandChannel;
}
- public DatagramReplayStrategy getReplayStrategy() {
+ public ReplayStrategy getReplayStrategy() {
return replayStrategy;
}
/**
* Sets the strategy used to replay missed datagrams
*/
- public void setReplayStrategy(DatagramReplayStrategy replayStrategy) {
+ public void setReplayStrategy(ReplayStrategy replayStrategy) {
this.replayStrategy = replayStrategy;
}
@@ -281,13 +275,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
// Implementation methods
// -------------------------------------------------------------------------
- protected CommandProcessor getCommandProcessor() {
- return commandProcessor;
- }
-
- protected void setCommandProcessor(CommandProcessor commandProcessor) {
- this.commandProcessor = commandProcessor;
- }
/**
* Creates an address from the given URI
@@ -328,13 +315,13 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
if (bufferPool == null) {
bufferPool = new DefaultBufferPool();
}
- commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers(), createDatagramHeaderMarshaller());
+ commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
commandChannel.start();
// lets pass the header & address into the channel so it avoids a
// re-request
if (wireFormatHeader != null) {
- commandChannel.onDatagramReceived(wireFormatHeader);
+ commandChannel.setWireFormatInfoEndpoint(wireFormatHeader);
}
super.doStart();
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
index 73b536c9b4..8edd359f92 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.udp;
import org.activeio.command.WireFormat;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
@@ -47,11 +48,11 @@ public class UdpTransportFactory extends TransportFactory {
}
WireFormat wf = createWireFormat(options);
int port = location.getPort();
- UdpTransport transport = new UdpTransport(asOpenWireFormat(wf), port);
+ OpenWireFormat openWireFormat = asOpenWireFormat(wf);
+ UdpTransport transport = new UdpTransport(openWireFormat, port);
Transport configuredTransport = configure(transport, wf, options, true);
UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport);
- transport.setCommandProcessor(server);
return server;
}
catch (URISyntaxException e) {
@@ -69,6 +70,10 @@ public class UdpTransportFactory extends TransportFactory {
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
final UdpTransport udpTransport = (UdpTransport) transport;
+
+ // deal with fragmentation
+ transport = new CommandJoiner(transport, asOpenWireFormat(format));
+
if (udpTransport.isTrace()) {
transport = new TransportLogger(transport);
}
@@ -105,6 +110,10 @@ public class UdpTransportFactory extends TransportFactory {
}
transport = new ResponseCorrelator(transport);
+
+ // deal with fragmentation
+ transport = new CommandJoiner(transport, asOpenWireFormat(format));
+
return transport;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
index 5809390ef1..098bf27c05 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
@@ -18,8 +18,10 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
@@ -43,7 +45,7 @@ import java.util.Map;
* @version $Revision$
*/
-public class UdpTransportServer extends TransportServerSupport implements CommandProcessor {
+public class UdpTransportServer extends TransportServerSupport {
private static final Log log = LogFactory.getLog(UdpTransportServer.class);
private UdpTransport serverTransport;
@@ -53,6 +55,8 @@ public class UdpTransportServer extends TransportServerSupport implements Comman
public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport) {
super(connectURI);
this.serverTransport = serverTransport;
+
+
this.configuredTransport = configuredTransport;
// lets disable the incremental checking of the sequence numbers
@@ -79,6 +83,7 @@ public class UdpTransportServer extends TransportServerSupport implements Comman
configuredTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
+ processInboundConnection(command);
}
public void onException(IOException error) {
@@ -97,24 +102,29 @@ public class UdpTransportServer extends TransportServerSupport implements Comman
configuredTransport.stop();
}
- public void process(Command command, DatagramHeader header) throws IOException {
- SocketAddress address = header.getFromAddress();
+ protected void processInboundConnection(Command command) {
+ DatagramEndpoint endpoint = (DatagramEndpoint) command.getFrom();
if (log.isDebugEnabled()) {
- log.debug("Received command on: " + this + " from address: " + address + " command: " + command);
+ log.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command);
}
Transport transport = null;
synchronized (transports) {
- transport = (Transport) transports.get(address);
+ transport = (Transport) transports.get(endpoint);
if (transport == null) {
if (log.isDebugEnabled()) {
log.debug("Creating a new UDP server connection");
}
- transport = createTransport(command, header);
- transport = configureTransport(transport);
- transports.put(address, transport);
+ try {
+ transport = createTransport(command, endpoint);
+ transport = configureTransport(transport);
+ transports.put(endpoint, transport);
+ }
+ catch (IOException e) {
+ getAcceptListener().onAcceptError(e);
+ }
}
else {
- log.warn("Discarding duplicate command to server: " + command + " from: " + address);
+ log.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command);
}
}
}
@@ -125,19 +135,22 @@ public class UdpTransportServer extends TransportServerSupport implements Comman
if (serverTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
}
-
+
getAcceptListener().onAccept(transport);
return transport;
}
- protected Transport createTransport(final Command command, DatagramHeader header) throws IOException {
- final SocketAddress address = header.getFromAddress();
+ protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
+ final SocketAddress address = endpoint.getAddress();
final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
- transport.receivedHeader(header);
+ // TODO - is this still required?
+ transport.receivedHeader(endpoint);
- return new WireFormatNegotiator(transport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {
+ Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat);
+
+ return new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {
public void start() throws Exception {
super.start();
diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java
index 3973744e45..878cfd4fc0 100644
--- a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java
@@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.1 $
*/
-public abstract class ServiceSupport {
+public abstract class ServiceSupport implements Service {
private static final Log log = LogFactory.getLog(ServiceSupport.class);
private AtomicBoolean started = new AtomicBoolean(false);
diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
index 286a070cfb..398a49a4c4 100755
--- a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
@@ -71,7 +71,7 @@ public class BrokerTestSupport extends CombinationTestSupport {
protected int tempDestGenerator=0;
protected PersistenceAdapter persistenceAdapter;
- protected int MAX_WAIT = 1000;
+ protected int MAX_WAIT = 4000;
protected UsageManager memoryManager;
diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
index a69c13d59a..bae069c39b 100755
--- a/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
@@ -207,7 +207,7 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport {
// Message should have been dropped due to broker restart.
Message m = receiveMessage(connection);
- assertNotNull(m);
+ assertNotNull("Should have received a message by now!", m);
assertEquals( m.getMessageId(), message.getMessageId() );
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java b/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java
index f324d1937f..2b2c1049ba 100755
--- a/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java
+++ b/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java
@@ -23,6 +23,7 @@ import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
@@ -38,7 +39,8 @@ public class StubConnection implements Service {
private Connection connection;
private Transport transport;
boolean shuttingDown = false;
-
+ private OpenWireFormat wireFormat = new OpenWireFormat();
+
public StubConnection(BrokerService broker) throws Exception {
this(broker, null);
}
@@ -53,6 +55,10 @@ public class StubConnection implements Service {
serviceException(e);
}
}
+
+ protected OpenWireFormat getWireFormat() {
+ return wireFormat;
+ }
};
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java
index 0ecfafc091..aa13731116 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQBytesMessageTest extends ActiveMQMessageTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java
index 9af8252a2e..62694c6f3e 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public abstract class ActiveMQDestinationTestSupport extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java
index ec7c3f963b..a0714a8f4b 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQMapMessageTest extends ActiveMQMessageTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java
index bc4048ce1f..2f2037b4c3 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQMessageTest extends MessageTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java
index e88dbef3d2..1b874e4779 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQObjectMessageTest extends ActiveMQMessageTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java
index a0ad108772..6516221b64 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQQueueTest extends ActiveMQDestinationTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java
index 1870f005fe..c39cd9c60a 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQStreamMessageTest extends ActiveMQMessageTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java
index 102cb53793..0ec0d93b3b 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public abstract class ActiveMQTempDestinationTestSupport extends ActiveMQDestinationTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java
index 9dfd271020..3bcee31b0e 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQTempQueueTest extends ActiveMQTempDestinationTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java
index c0e9ec3712..dc4b00d600 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQTempTopicTest extends ActiveMQTempDestinationTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java
index 783a167651..33cc69c173 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQTextMessageTest extends ActiveMQMessageTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java
index be43de2159..9c7ba1b064 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ActiveMQTopicTest extends ActiveMQDestinationTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java
index c9b3066255..7b9d9afc86 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public abstract class BaseCommandTestSupport extends DataFileGeneratorTestSupport {
@@ -41,7 +41,7 @@ public abstract class BaseCommandTestSupport extends DataFileGeneratorTestSuppor
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
BaseCommand info = (BaseCommand) object;
- info.setCommandId((short) 1);
+ info.setCommandId(1);
info.setResponseRequired(true);
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java
index b11771b8e6..59cb8f9de5 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class BrokerIdTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
index bfe675f4cc..8bc3b5fd7a 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class BrokerInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java
index 641191d970..f501335de2 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ConnectionErrorTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java
index 86d0a94632..7545e9aa2c 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ConnectionIdTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
index 0273c689d4..6b1103b349 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ConnectionInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java
index 733be2bf52..d1a8179016 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ConsumerIdTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java
index d55607e722..b7d5cad6c1 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ConsumerInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java
index 59febc60c9..0fb7297d84 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ControlCommandTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java
index dd63c62ef7..7fc957c05d 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class DataArrayResponseTest extends ResponseTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java
index 9f9700c38b..e6dfa3d3a4 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class DataResponseTest extends ResponseTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java
index a138760845..3dd1630f40 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class DestinationInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java
index 913b38a8ac..2ec41f321f 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class DiscoveryEventTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java
index 753504e393..f5fdb1e1ac 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ExceptionResponseTest extends ResponseTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java
index 550deae206..bff4893f4c 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class FlushCommandTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java
index ccdad39a02..eacf61f6d0 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class IntegerResponseTest extends ResponseTest {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java
index 2bb4d63726..8e473b7042 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class JournalQueueAckTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java
index c26851791d..da09663d0e 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class JournalTopicAckTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java
index 00c323aa46..c442747e40 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class JournalTraceTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTransactionTest.java
index c29e9856aa..0888375614 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTransactionTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTransactionTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class JournalTransactionTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/KeepAliveInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/KeepAliveInfoTest.java
index 3b6d466c83..0e7a1509e0 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/KeepAliveInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/KeepAliveInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class KeepAliveInfoTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LastPartialCommandTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LastPartialCommandTest.java
new file mode 100644
index 0000000000..e7845ae283
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LastPartialCommandTest.java
@@ -0,0 +1,55 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.v1;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.openwire.*;
+import org.apache.activemq.command.*;
+
+
+/**
+ * Test case for the OpenWire marshalling for LastPartialCommand
+ *
+ *
+ * NOTE!: This file is auto generated - do not modify!
+ * if you need to make a change, please see the modify the groovy scripts in the
+ * under src/gram/script and then use maven openwire:generate to regenerate
+ * this file.
+ *
+ * @version $Revision: $
+ */
+public class LastPartialCommandTest extends PartialCommandTest {
+
+
+ public static LastPartialCommandTest SINGLETON = new LastPartialCommandTest();
+
+ public Object createObject() throws Exception {
+ LastPartialCommand info = new LastPartialCommand();
+ populateObject(info);
+ return info;
+ }
+
+
+ protected void populateObject(Object object) throws Exception {
+ super.populateObject(object);
+ LastPartialCommand info = (LastPartialCommand) object;
+
+ }
+ }
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LocalTransactionIdTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LocalTransactionIdTest.java
index 5a5b363493..756a5f634b 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LocalTransactionIdTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LocalTransactionIdTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class LocalTransactionIdTest extends TransactionIdTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageAckTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageAckTest.java
index 07c6fcbd35..7cf1d5c653 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageAckTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageAckTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class MessageAckTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationTest.java
index 08218d54bc..45ee2f122f 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class MessageDispatchNotificationTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchTest.java
index d0c10a60e9..e9b4416656 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class MessageDispatchTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageIdTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageIdTest.java
index 97333f75a0..588e8d099d 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageIdTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageIdTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class MessageIdTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageTestSupport.java
index 8ea87f3f2a..fa5f353016 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageTestSupport.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public abstract class MessageTestSupport extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/PartialCommandTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/PartialCommandTest.java
new file mode 100644
index 0000000000..5522d1f2fb
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/PartialCommandTest.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.v1;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.openwire.*;
+import org.apache.activemq.command.*;
+
+
+/**
+ * Test case for the OpenWire marshalling for PartialCommand
+ *
+ *
+ * NOTE!: This file is auto generated - do not modify!
+ * if you need to make a change, please see the modify the groovy scripts in the
+ * under src/gram/script and then use maven openwire:generate to regenerate
+ * this file.
+ *
+ * @version $Revision: $
+ */
+public class PartialCommandTest extends BaseCommandTestSupport {
+
+
+ public static PartialCommandTest SINGLETON = new PartialCommandTest();
+
+ public Object createObject() throws Exception {
+ PartialCommand info = new PartialCommand();
+ populateObject(info);
+ return info;
+ }
+
+
+ protected void populateObject(Object object) throws Exception {
+ super.populateObject(object);
+ PartialCommand info = (PartialCommand) object;
+ info.setData("Data:1".getBytes());
+
+ }
+ }
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerIdTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerIdTest.java
index f22fa43df9..faedb172f8 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerIdTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerIdTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ProducerIdTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerInfoTest.java
index 178b426ee8..c0310e561f 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ProducerInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveInfoTest.java
index a57bd37da3..2e604a898c 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class RemoveInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveSubscriptionInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveSubscriptionInfoTest.java
index 0406719a4a..46b1bb8d2d 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveSubscriptionInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveSubscriptionInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class RemoveSubscriptionInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ReplayCommandTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ReplayCommandTest.java
index 963012e67c..bf25042770 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ReplayCommandTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ReplayCommandTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ReplayCommandTest extends BaseCommandTestSupport {
@@ -50,6 +50,8 @@ public class ReplayCommandTest extends BaseCommandTestSupport {
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
ReplayCommand info = (ReplayCommand) object;
+ info.setFirstNakNumber(1);
+ info.setLastNakNumber(2);
}
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ResponseTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ResponseTest.java
index 477f37d06f..e123ae77eb 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ResponseTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ResponseTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ResponseTest extends BaseCommandTestSupport {
@@ -50,7 +50,7 @@ public class ResponseTest extends BaseCommandTestSupport {
protected void populateObject(Object object) throws Exception {
super.populateObject(object);
Response info = (Response) object;
- info.setCorrelationId((short) 1);
+ info.setCorrelationId(1);
}
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionIdTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionIdTest.java
index 0714c5e06d..c8c6607bc5 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionIdTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionIdTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class SessionIdTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionInfoTest.java
index f40deaf6e4..086d840ec4 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class SessionInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ShutdownInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ShutdownInfoTest.java
index f9c2f81aff..31d7a25336 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ShutdownInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ShutdownInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class ShutdownInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SubscriptionInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SubscriptionInfoTest.java
index 0611c2edd9..5c65552bd1 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SubscriptionInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SubscriptionInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class SubscriptionInfoTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionIdTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionIdTestSupport.java
index 1fc6e494e8..489808af0d 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionIdTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionIdTestSupport.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public abstract class TransactionIdTestSupport extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionInfoTest.java
index 264e232560..1e910b6cc9 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class TransactionInfoTest extends BaseCommandTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/WireFormatInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/WireFormatInfoTest.java
index 0e09956ee9..4f64585954 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/WireFormatInfoTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/WireFormatInfoTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class WireFormatInfoTest extends DataFileGeneratorTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/XATransactionIdTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/XATransactionIdTest.java
index a9b47be6b5..25337bfc04 100644
--- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/XATransactionIdTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/XATransactionIdTest.java
@@ -33,7 +33,7 @@ import org.apache.activemq.command.*;
* under src/gram/script and then use maven openwire:generate to regenerate
* this file.
*
- * @version $Revision$
+ * @version $Revision: $
*/
public class XATransactionIdTest extends TransactionIdTestSupport {
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
index 26c0b33d94..eb4b54b51a 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
@@ -218,7 +218,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
synchronized (lock) {
answer = receivedCommand;
if (answer == null) {
- lock.wait(10000);
+ lock.wait(5000);
}
answer = receivedCommand;
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java
index be8d188a1f..c5267330f5 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java
@@ -17,8 +17,8 @@
package org.apache.activemq.transport.udp;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
import java.net.URI;
@@ -30,22 +30,24 @@ public class UdpTransportTest extends UdpTestSupport {
protected int consumerPort = 8830;
protected String producerURI = "udp://localhost:" + consumerPort;
- //protected String producerURI = "udp://localhost:8830";
- //protected String consumerURI = "udp://localhost:8831?port=8830";
protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + producerURI);
- // The WireFormatNegotiator means we can only connect to servers
- return new UdpTransport(createWireFormat(), new URI(producerURI));
+ // we are not using the TransportFactory as this assumes that
+ // UDP transports talk to a server using a WireFormat Negotiation step
+ // rather than talking directly to each other
- //return TransportFactory.connect(new URI(producerURI));
+ OpenWireFormat wireFormat = createWireFormat();
+ UdpTransport transport = new UdpTransport(wireFormat, new URI(producerURI));
+ return new CommandJoiner(transport, wireFormat);
}
protected Transport createConsumer() throws Exception {
System.out.println("Consumer on port: " + consumerPort);
- return new UdpTransport(createWireFormat(), consumerPort);
- //return TransportFactory.connect(new URI(consumerURI));
+ OpenWireFormat wireFormat = createWireFormat();
+ UdpTransport transport = new UdpTransport(wireFormat, consumerPort);
+ return new CommandJoiner(transport, wireFormat);
}
protected OpenWireFormat createWireFormat() {