readLength counter not reset after reading the content of the first BytesMessage.

Added new test that sends multiple BytesMessages to cover this case.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@990827 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2010-08-30 15:47:16 +00:00
parent f3c81eebb9
commit f8a601a85b
2 changed files with 53 additions and 20 deletions

View File

@ -45,14 +45,14 @@ import org.apache.activemq.wireformat.WireFormat;
/**
* An implementation of the {@link Transport} interface for using Stomp over NIO
*
*
* @version $Revision$
*/
public class StompNIOTransport extends TcpTransport {
private SocketChannel channel;
private SelectorSelection selection;
private ByteBuffer inputBuffer;
ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
boolean processedHeaders = false;
@ -94,10 +94,10 @@ public class StompNIOTransport extends TcpTransport {
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
}
private void serviceRead() {
try {
while (true) {
// read channel
int readSize = channel.read(inputBuffer);
@ -111,12 +111,12 @@ public class StompNIOTransport extends TcpTransport {
if (readSize == 0) {
break;
}
inputBuffer.flip();
int b;
ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
int i = 0;
while(i++ < readSize) {
b = input.read();
@ -124,7 +124,7 @@ public class StompNIOTransport extends TcpTransport {
if (!processedHeaders && previousByte == 0 && b == 0) {
continue;
}
if (!processedHeaders) {
currentCommand.write(b);
// end of headers section, parse action and header
@ -144,7 +144,7 @@ public class StompNIOTransport extends TcpTransport {
currentCommand.reset();
}
} else {
if (contentLength == -1) {
// end of command reached, unmarshal
if (b == 0) {
@ -156,31 +156,32 @@ public class StompNIOTransport extends TcpTransport {
// read desired content length
if (readLength++ == contentLength) {
processCommand();
readLength = 0;
} else {
currentCommand.write(b);
}
}
}
previousByte = b;
}
// clear the buffer
inputBuffer.clear();
}
} catch (IOException e) {
onException(e);
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
private void processCommand() throws Exception {
StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
doConsume(frame);
processedHeaders = false;
currentCommand.reset();
contentLength = -1;
contentLength = -1;
}
protected void doStart() throws Exception {

View File

@ -397,7 +397,7 @@ public class StompTest extends CombinationTestSupport {
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testBytesMessageWithNulls() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
@ -408,21 +408,53 @@ public class StompTest extends CombinationTestSupport {
frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().startsWith("MESSAGE"));
String length = message.getHeaders().get("content-length");
assertEquals("5", length);
assertEquals(5, message.getContent().length);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
}
public void testSendMultipleBytesMessages() throws Exception {
final int MSG_COUNT = 50;
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
for( int ix = 0; ix < MSG_COUNT; ix++) {
frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
for( int ix = 0; ix < MSG_COUNT; ix++) {
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().startsWith("MESSAGE"));
String length = message.getHeaders().get("content-length");
assertEquals("5", length);
assertEquals(5, message.getContent().length);
}
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testSubscribeWithMessageSentWithProperties() throws Exception {