diff --git a/activemq-core/project.xml b/activemq-core/project.xml index 6486ced9b3..1b172291f1 100755 --- a/activemq-core/project.xml +++ b/activemq-core/project.xml @@ -361,10 +361,8 @@ **/MultipleTestsWithXBeanFactoryBeanTest.* **/MultipleTestsWithSpringXBeanFactoryBeanTest.* - - - **/PartialCommandTest.* - **/LastPartialCommandTest.* + + **/UdpSendReceiveWithTwoConnectionsTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java index b1b20b16dd..518fc64450 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java @@ -19,29 +19,29 @@ package org.apache.activemq.command; import org.apache.activemq.state.CommandVisitor; /** - * Represents a partial command; a large command that has been split up into - * pieces. + * Represents the end marker of a stream of {@link PartialCommand} instances. * * @openwire:marshaller code="61" * @version $Revision$ */ -public class LastPartialCommand extends PartialCommand { +public class LastPartialCommand extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_LAST_COMMAND; public LastPartialCommand() { } + public LastPartialCommand(Command command) { + setCommandId(command.getCommandId()); + setResponseRequired(command.isResponseRequired()); + } + public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } - public boolean isLastPart() { - return true; - } - public Response visit(CommandVisitor visitor) throws Exception { - throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this); + throw new IllegalStateException("The transport layer should filter out LastPartialCommand instances but received: " + this); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java index f84f0d28a8..c098c3b936 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java @@ -25,12 +25,16 @@ import org.apache.activemq.state.CommandVisitor; * @openwire:marshaller code="60" * @version $Revision$ */ -public class PartialCommand extends BaseCommand { +public class PartialCommand implements Command { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_COMMAND; + private int commandId; private byte[] data; + private transient Endpoint from; + private transient Endpoint to; + public PartialCommand() { } @@ -38,6 +42,17 @@ public class PartialCommand extends BaseCommand { return DATA_STRUCTURE_TYPE; } + /** + * @openwire:property version=1 + */ + public int getCommandId() { + return commandId; + } + + public void setCommandId(int commandId) { + this.commandId = commandId; + } + /** * The data for this part of the command * @@ -51,12 +66,66 @@ public class PartialCommand extends BaseCommand { this.data = data; } - public boolean isLastPart() { - return false; + public Endpoint getFrom() { + return from; + } + + public void setFrom(Endpoint from) { + this.from = from; + } + + public Endpoint getTo() { + return to; + } + + public void setTo(Endpoint to) { + this.to = to; } public Response visit(CommandVisitor visitor) throws Exception { throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this); } + public boolean isResponseRequired() { + return false; + } + + public boolean isResponse() { + return false; + } + + public boolean isBrokerInfo() { + return false; + } + + public boolean isMessageDispatch() { + return false; + } + + public boolean isMessage() { + return false; + } + + public boolean isMessageAck() { + return false; + } + + public boolean isMessageDispatchNotification() { + return false; + } + + public boolean isShutdownInfo() { + return false; + } + + public void setResponseRequired(boolean responseRequired) { + } + + public boolean isWireFormatInfo() { + return false; + } + + public boolean isMarshallAware() { + return false; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/BooleanStream.java b/activemq-core/src/main/java/org/apache/activemq/openwire/BooleanStream.java index ac6525915c..21e56bdecf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/BooleanStream.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/BooleanStream.java @@ -19,6 +19,7 @@ package org.apache.activemq.openwire; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; final public class BooleanStream { @@ -74,6 +75,21 @@ final public class BooleanStream { clear(); } + public void marshal(ByteBuffer dataOut) { + if( arrayLimit < 64 ) { + dataOut.put((byte) arrayLimit); + } else if( arrayLimit < 256 ) { // max value of unsigned byte + dataOut.put((byte) 0xC0); + dataOut.put((byte) arrayLimit); + } else { + dataOut.put((byte) 0x80); + dataOut.putShort(arrayLimit); + } + + dataOut.put(data, 0, arrayLimit); + } + + public void unmarshal(DataInputStream dataIn) throws IOException { arrayLimit = (short) (dataIn.readByte() & 0xFF); diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java index 971d0d09f5..7fcd313280 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.openwire; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.HashMap; - import org.activeio.ByteArrayOutputStream; import org.activeio.ByteSequence; import org.activeio.Packet; @@ -32,12 +26,16 @@ import org.activeio.command.WireFormat; import org.activeio.packet.ByteArrayPacket; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.DataStructure; -import org.apache.activemq.command.LastPartialCommand; import org.apache.activemq.command.MarshallAware; -import org.apache.activemq.command.PartialCommand; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.util.IdGenerator; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.HashMap; + /** * * @version $Revision$ @@ -227,13 +225,6 @@ final public class OpenWireFormat implements WireFormat { DataStructure c = (DataStructure) o; byte type = c.getDataStructureType(); - - // TODO - we could remove this if we have a way to disable BooleanStream on - // certain types of message - if (type == CommandTypes.PARTIAL_COMMAND || type == CommandTypes.PARTIAL_LAST_COMMAND) { - marshalPartialCommand((PartialCommand) o, dataOut); - return; - } DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; if( dsm == null ) throw new IOException("Unknown data type: "+type); @@ -344,13 +335,7 @@ final public class OpenWireFormat implements WireFormat { public Object doUnmarshal(DataInputStream dis) throws IOException { byte dataType = dis.readByte(); - - // TODO - we could remove this if we have a way to disable BooleanStream on - // certain types of message - if (dataType == CommandTypes.PARTIAL_COMMAND || dataType == CommandTypes.PARTIAL_LAST_COMMAND) { - return doUnmarshalPartialCommand(dataType, dis); - } - else if( dataType!=NULL_TYPE ) { + if( dataType!=NULL_TYPE ) { DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF]; if( dsm == null ) throw new IOException("Unknown data type: "+dataType); @@ -585,54 +570,4 @@ final public class OpenWireFormat implements WireFormat { this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled(); } - - - - // Partial command marshalling - // - // TODO - remove if we can figure out a clean way to disable BooleanStream in OpenWire on commands - // with no optional values (partial commands only have a mandatory byte[]) - // - - protected void marshalPartialCommand(PartialCommand command, DataOutputStream dataOut) throws IOException { - byte[] data = command.getData(); - int dataSize = data.length; - - if (!isSizePrefixDisabled()) { - int size = dataSize + 1 + 4; - dataOut.writeInt(size); - } - - if (command.isLastPart()) { - dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE); - } - else { - dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE); - } - - dataOut.writeInt(command.getCommandId()); - dataOut.writeInt(dataSize); - dataOut.write(data); - - } - - protected Object doUnmarshalPartialCommand(byte dataType, DataInputStream dis) throws IOException { - // size of entire command is already read - - PartialCommand answer = null; - if (dataType == LastPartialCommand.DATA_STRUCTURE_TYPE) { - answer = new LastPartialCommand(); - } - else { - answer = new PartialCommand(); - } - answer.setCommandId(dis.readInt()); - - int size = dis.readInt(); - byte[] data = new byte[size]; - dis.readFully(data); - answer.setData(data); - return answer; - } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java index fff89afc74..05670bfb95 100644 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java @@ -37,7 +37,7 @@ import org.apache.activemq.command.*; * * @version $Revision$ */ -public class LastPartialCommandMarshaller extends PartialCommandMarshaller { +public class LastPartialCommandMarshaller extends BaseCommandMarshaller { /** * Return the type of Data Structure we marshal diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/PartialCommandMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/PartialCommandMarshaller.java index a0b85cc1da..c9f0f4e625 100644 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v1/PartialCommandMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v1/PartialCommandMarshaller.java @@ -37,7 +37,7 @@ import org.apache.activemq.command.*; * * @version $Revision$ */ -public class PartialCommandMarshaller extends BaseCommandMarshaller { +public class PartialCommandMarshaller extends BaseDataStreamMarshaller { /** * Return the type of Data Structure we marshal @@ -65,6 +65,7 @@ public class PartialCommandMarshaller extends BaseCommandMarshaller { super.tightUnmarshal(wireFormat, o, dataIn, bs); PartialCommand info = (PartialCommand)o; + info.setCommandId(dataIn.readInt()); info.setData(tightUnmarshalByteArray(dataIn, bs)); } @@ -80,7 +81,7 @@ public class PartialCommandMarshaller extends BaseCommandMarshaller { int rc = super.tightMarshal1(wireFormat, o, bs); rc += tightMarshalByteArray1(info.getData(), bs); - return rc + 0; + return rc + 4; } /** @@ -94,6 +95,7 @@ public class PartialCommandMarshaller extends BaseCommandMarshaller { super.tightMarshal2(wireFormat, o, dataOut, bs); PartialCommand info = (PartialCommand)o; + dataOut.writeInt(info.getCommandId()); tightMarshalByteArray2(info.getData(), dataOut, bs); } @@ -109,6 +111,7 @@ public class PartialCommandMarshaller extends BaseCommandMarshaller { super.looseUnmarshal(wireFormat, o, dataIn); PartialCommand info = (PartialCommand)o; + info.setCommandId(dataIn.readInt()); info.setData(looseUnmarshalByteArray(dataIn)); } @@ -122,6 +125,7 @@ public class PartialCommandMarshaller extends BaseCommandMarshaller { PartialCommand info = (PartialCommand)o; super.looseMarshal(wireFormat, o, dataOut); + dataOut.writeInt(info.getCommandId()); looseMarshalByteArray(wireFormat, info.getData(), dataOut); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java b/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java index d3a42af98a..4589e0b6e6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java @@ -26,14 +26,14 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; - /** - * Joins together of partial commands which were split into individual chunks of data. + * Joins together of partial commands which were split into individual chunks of + * data. * * @version $Revision$ */ public class CommandJoiner extends TransportFilter { - + private ByteArrayOutputStream out = new ByteArrayOutputStream(); private OpenWireFormat wireFormat; @@ -41,21 +41,27 @@ public class CommandJoiner extends TransportFilter { super(next); this.wireFormat = wireFormat; } - + public void onCommand(Command command) { byte type = command.getDataStructureType(); - if (type == PartialCommand.DATA_STRUCTURE_TYPE || type == LastPartialCommand.DATA_STRUCTURE_TYPE) { + if (type == PartialCommand.DATA_STRUCTURE_TYPE) { PartialCommand header = (PartialCommand) command; byte[] partialData = header.getData(); try { out.write(partialData); - - if (header.isLastPart()) { - byte[] fullData = out.toByteArray(); - Command completeCommand = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(fullData))); - resetBuffer(); - getTransportListener().onCommand(completeCommand); - } + } + catch (IOException e) { + getTransportListener().onException(e); + } + } + else if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) { + try { + byte[] fullData = out.toByteArray(); + Command completeCommand = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(fullData))); + completeCommand.setCommandId(command.getCommandId()); + completeCommand.setResponseRequired(command.isResponseRequired()); + resetBuffer(); + getTransportListener().onCommand(completeCommand); } catch (IOException e) { getTransportListener().onException(e); @@ -65,7 +71,7 @@ public class CommandJoiner extends TransportFilter { getTransportListener().onCommand(command); } } - + public void stop() throws Exception { super.stop(); resetBuffer(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java index a746a87db6..63f476be21 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java @@ -46,22 +46,17 @@ public class ReliableTransport extends TransportFilter { public void onCommand(Command command) { int actualCounter = command.getCommandId(); - boolean valid = expectedCounter != actualCounter; + boolean valid = expectedCounter == actualCounter; if (!valid) { - if (actualCounter < expectedCounter) { - log.warn("Ignoring out of step packet: " + command); - } - else { - // lets add it to the list for later on - headers.add(command); + // lets add it to the list for later on + headers.add(command); - try { - replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter); - } - catch (IOException e) { - getTransportListener().onException(e); - } + try { + replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter); + } + catch (IOException e) { + getTransportListener().onException(e); } if (!headers.isEmpty()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java index 3284dd0e32..7b6f72aab8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java @@ -21,6 +21,7 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.LastPartialCommand; import org.apache.activemq.command.PartialCommand; +import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -100,7 +101,8 @@ public class CommandChannel implements Service { readBuffer.get(data); // TODO could use a DataInput implementation that talks direct to - // the ByteBuffer to avoid object allocation and unnecessary buffering? + // the ByteBuffer to avoid object allocation and unnecessary + // buffering? DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); answer = (Command) wireFormat.unmarshal(dataIn); answer.setFrom(from); @@ -125,15 +127,7 @@ public class CommandChannel implements Service { byte[] data = largeBuffer.toByteArray(); int size = data.length; - if (size < datagramSize) { - writeBuffer.clear(); - headerMarshaller.writeHeader(command, writeBuffer); - - writeBuffer.put(data); - - sendWriteBuffer(address); - } - else { + if (size >= datagramSize) { // lets split the command up into chunks int offset = 0; boolean lastFragment = false; @@ -141,45 +135,80 @@ public class CommandChannel implements Service { // write the header writeBuffer.clear(); headerMarshaller.writeHeader(command, writeBuffer); - + int chunkSize = writeBuffer.remaining(); - // we need to remove the amount of overhead to write the partial command + // we need to remove the amount of overhead to write the + // partial command + + // lets write the flags in there + BooleanStream bs = null; + if (wireFormat.isTightEncodingEnabled()) { + bs = new BooleanStream(); + bs.writeBoolean(true); // the partial data byte[] is + // never null + } // lets remove the header of the partial command - // which is the byte for the type and an int for the size of the byte[] - chunkSize -= 1 + 4 + 4; - + // which is the byte for the type and an int for the size of + // the byte[] + chunkSize -= 1 // the data type + + 4 // the command ID + + 4; // the size of the partial data + + // the boolean flags + if (bs != null) { + chunkSize -= bs.marshalledSize(); + } + else { + chunkSize -= 1; + } + if (!wireFormat.isSizePrefixDisabled()) { // lets write the size of the command buffer writeBuffer.putInt(chunkSize); chunkSize -= 4; } - + lastFragment = offset + chunkSize >= length; if (chunkSize + offset > length) { chunkSize = length - offset; } - if (lastFragment) { - writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE); + writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE); + + if (bs != null) { + bs.marshal(writeBuffer); } - else { - writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE); - } - + writeBuffer.putInt(command.getCommandId()); - + if (bs == null) { + writeBuffer.put((byte) 1); + } + // size of byte array writeBuffer.putInt(chunkSize); - + // now the data writeBuffer.put(data, offset, chunkSize); offset += chunkSize; sendWriteBuffer(address); } + + // now lets write the last partial command + command = new LastPartialCommand(command); + largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); + wireFormat.marshal(command, new DataOutputStream(largeBuffer)); + data = largeBuffer.toByteArray(); } + + writeBuffer.clear(); + headerMarshaller.writeHeader(command, writeBuffer); + + writeBuffer.put(data); + + sendWriteBuffer(address); } } @@ -216,7 +245,6 @@ public class CommandChannel implements Service { this.headerMarshaller = headerMarshaller; } - // Implementation methods // ------------------------------------------------------------------------- protected void sendWriteBuffer(SocketAddress address) throws IOException { diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/multicast b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/multicast new file mode 100644 index 0000000000..59e96589b3 --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/multicast @@ -0,0 +1 @@ +class=org.apache.activemq.transport.multicast.MulticastTransportFactory diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java index 05203b2890..275b5f69d4 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java @@ -89,7 +89,9 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis message.setIntProperty("intProperty",i); if (verbose) { - log.info("About to send a message: " + message + " with text: " + data[i]); + if (log.isDebugEnabled()) { + log.debug("About to send a message: " + message + " with text: " + data[i]); + } } producer.send(producerDestination, message); @@ -123,7 +125,9 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis if (data.length != copyOfMessages.size()) { for (Iterator iter = copyOfMessages.iterator(); iter.hasNext();) { TextMessage message = (TextMessage) iter.next(); - log.info("<== " + counter++ + " = " + message); + if (log.isDebugEnabled()) { + log.info("<== " + counter++ + " = " + message); + } } } @@ -136,7 +140,9 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis int intProperty = received.getIntProperty("intProperty"); if (verbose) { - log.info("Received Text: " + text); + if (log.isDebugEnabled()) { + log.info("Received Text: " + text); + } } assertEquals("Message: " + i, data[i], text); @@ -182,7 +188,9 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis */ protected void consumeMessage(Message message, List messageList) { if (verbose) { - log.info("Received message: " + message); + if (log.isDebugEnabled()) { + log.info("Received message: " + message); + } } messageList.add(message); diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LastPartialCommandTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LastPartialCommandTest.java index e7845ae283..f566a8b190 100644 --- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LastPartialCommandTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LastPartialCommandTest.java @@ -35,7 +35,7 @@ import org.apache.activemq.command.*; * * @version $Revision: $ */ -public class LastPartialCommandTest extends PartialCommandTest { +public class LastPartialCommandTest extends BaseCommandTestSupport { public static LastPartialCommandTest SINGLETON = new LastPartialCommandTest(); diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/PartialCommandTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/PartialCommandTest.java index 5522d1f2fb..18d1f9b7a4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/openwire/v1/PartialCommandTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v1/PartialCommandTest.java @@ -35,7 +35,7 @@ import org.apache.activemq.command.*; * * @version $Revision: $ */ -public class PartialCommandTest extends BaseCommandTestSupport { +public class PartialCommandTest extends DataFileGeneratorTestSupport { public static PartialCommandTest SINGLETON = new PartialCommandTest(); @@ -50,6 +50,7 @@ public class PartialCommandTest extends BaseCommandTestSupport { protected void populateObject(Object object) throws Exception { super.populateObject(object); PartialCommand info = (PartialCommand) object; + info.setCommandId(1); info.setData("Data:1".getBytes()); } diff --git a/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java index 0ed6ea80c8..4b3952e26f 100755 --- a/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java @@ -54,6 +54,8 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis protected final Object lock = new Object(); protected boolean verbose = false; protected boolean useSeparateSession = false; + protected boolean largeMessages = false; + protected int largeMessageLoopSize = 4 * 1024; /* * @see junit.framework.TestCase#setUp() @@ -73,11 +75,27 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis data = new String[messageCount]; for (int i = 0; i < messageCount; i++) { - data[i] = "Text for message: " + i + " at " + new Date(); + data[i] = createMessageText(i); } } - - + + + protected String createMessageText(int i) { + if (largeMessages) { + return createMessageBodyText(); + } + else { + return "Text for message: " + i + " at " + new Date(); + } + } + + protected String createMessageBodyText() { + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < largeMessageLoopSize; i++) { + buffer.append("0123456789"); + } + return buffer.toString(); + } /** * Test if all the messages sent are being received. diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java new file mode 100644 index 0000000000..56ce0dfb64 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java @@ -0,0 +1,87 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import edu.emory.mathcs.backport.java.util.Queue; + +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy; +import org.apache.activemq.transport.replay.ReplayStrategy; + +import junit.framework.TestCase; + +/** + * + * @version $Revision$ + */ +public class ReliableTransportTest extends TestCase { + + protected TransportFilter transport; + protected StubTransportListener listener = new StubTransportListener(); + protected ReplayStrategy replayStrategy; + + public void testValidSequenceOfPackets() throws Exception { + int[] sequenceNumbers = { 1, 2, 3, 4, 5, 6, 7 }; + + sendStreamOfCommands(sequenceNumbers, true); + } + + public void testInvalidSequenceOfPackets() throws Exception { + int[] sequenceNumbers = { 1, 2, /* 3, */ 4, 5, 6, 7 }; + + sendStreamOfCommands(sequenceNumbers, false); + } + + protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected) { + for (int i = 0; i < sequenceNumbers.length; i++) { + int commandId = sequenceNumbers[i]; + + ConsumerInfo info = new ConsumerInfo(); + info.setSelector("Cheese: " + commandId); + info.setCommandId(commandId); + + transport.onCommand(info); + } + + Queue exceptions = listener.getExceptions(); + Queue commands = listener.getCommands(); + if (expected) { + if (!exceptions.isEmpty()) { + Exception e = (Exception) exceptions.remove(); + e.printStackTrace(); + fail("Caught exception: " + e); + } + assertEquals("number of messages received", sequenceNumbers.length, commands.size()); + } + else { + assertTrue("Should have received an exception!", exceptions.size() > 0); + Exception e = (Exception) exceptions.remove(); + System.out.println("Caught expected response: " + e); + } + + } + + protected void setUp() throws Exception { + if (replayStrategy == null) { + replayStrategy = new ExceptionIfDroppedReplayStrategy(); + } + transport = new ReliableTransport(new StubTransport(), replayStrategy); + transport.setTransportListener(listener); + transport.start(); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java b/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java new file mode 100644 index 0000000000..707b8cb252 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java @@ -0,0 +1,50 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import edu.emory.mathcs.backport.java.util.Queue; +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.activemq.command.Command; +import org.apache.activemq.util.ServiceStopper; + +import java.io.IOException; + +/** + * + * @version $Revision$ + */ +public class StubTransport extends TransportSupport { + + private Queue queue = new ConcurrentLinkedQueue(); + + protected void doStop(ServiceStopper stopper) throws Exception { + } + + protected void doStart() throws Exception { + } + + public void oneway(Command command) throws IOException { + queue.add(command); + } + + public Queue getQueue() { + return queue; + } + + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/StubTransportListener.java b/activemq-core/src/test/java/org/apache/activemq/transport/StubTransportListener.java new file mode 100644 index 0000000000..14955ece08 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/StubTransportListener.java @@ -0,0 +1,57 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import edu.emory.mathcs.backport.java.util.Queue; +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.activemq.command.Command; + +import java.io.IOException; + +/** + * + * @version $Revision$ + */ +public class StubTransportListener implements TransportListener { + + private Queue commands = new ConcurrentLinkedQueue(); + private Queue exceptions = new ConcurrentLinkedQueue(); + + public Queue getCommands() { + return commands; + } + + public Queue getExceptions() { + return exceptions; + } + + public void onCommand(Command command) { + commands.add(command); + } + + public void onException(IOException error) { + exceptions.add(error); + } + + public void transportInterupted() { + } + + public void transportResumed() { + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java index 55bdb321f6..a8fd6f42f3 100755 --- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java @@ -29,6 +29,7 @@ public class UdpSendReceiveWithTwoConnectionsTest extends JmsTopicSendReceiveWit protected BrokerService broker; protected void setUp() throws Exception { + largeMessages = true; broker = createBroker(); broker.start();