mirror of https://github.com/apache/activemq.git
AMQ-6809 Fix issue where stream message getBytes returned wrong value
StreamMessage implementation should return zero when the full value of a
written byte array has been consumed before then returning -1 on the
next read. Ands fix and additional testing of ActiveMQStreamMessage
(cherry picked from commit 61d3231d36
)
This commit is contained in:
parent
db9b1a5569
commit
592b31343e
|
@ -748,7 +748,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
|
||||||
// big buffer
|
// big buffer
|
||||||
int rc = this.dataIn.read(value, 0, remainingBytes);
|
int rc = this.dataIn.read(value, 0, remainingBytes);
|
||||||
remainingBytes = 0;
|
remainingBytes = 0;
|
||||||
return rc;
|
return rc != -1 ? rc : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
* this work for additional information regarding copyright ownership.
|
* this work for additional information regarding copyright ownership.
|
||||||
|
@ -16,52 +16,31 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.command;
|
package org.apache.activemq.command;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageEOFException;
|
||||||
import javax.jms.MessageFormatException;
|
import javax.jms.MessageFormatException;
|
||||||
import javax.jms.MessageNotReadableException;
|
import javax.jms.MessageNotReadableException;
|
||||||
import javax.jms.MessageNotWriteableException;
|
import javax.jms.MessageNotWriteableException;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
* Tests for the ActiveMQ StreamMessage implementation
|
||||||
*/
|
*/
|
||||||
public class ActiveMQStreamMessageTest extends TestCase {
|
public class ActiveMQStreamMessageTest {
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor for ActiveMQStreamMessageTest.
|
|
||||||
*
|
|
||||||
* @param name
|
|
||||||
*/
|
|
||||||
public ActiveMQStreamMessageTest(String name) {
|
|
||||||
super(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
junit.textui.TestRunner.run(ActiveMQStreamMessageTest.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* @see TestCase#setUp()
|
|
||||||
*/
|
|
||||||
protected void setUp() throws Exception {
|
|
||||||
super.setUp();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* @see TestCase#tearDown()
|
|
||||||
*/
|
|
||||||
protected void tearDown() throws Exception {
|
|
||||||
super.tearDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testGetDataStructureType() {
|
public void testGetDataStructureType() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
assertEquals(msg.getDataStructureType(), CommandTypes.ACTIVEMQ_STREAM_MESSAGE);
|
assertEquals(msg.getDataStructureType(), CommandTypes.ACTIVEMQ_STREAM_MESSAGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadBoolean() {
|
public void testReadBoolean() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -124,6 +103,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testreadByte() {
|
public void testreadByte() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -175,6 +155,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadShort() {
|
public void testReadShort() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -230,6 +211,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadChar() {
|
public void testReadChar() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -293,6 +275,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadInt() {
|
public void testReadInt() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -352,6 +335,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadLong() {
|
public void testReadLong() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -420,6 +404,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadFloat() {
|
public void testReadFloat() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -479,6 +464,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadDouble() {
|
public void testReadDouble() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -540,9 +526,9 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
jmsEx.printStackTrace();
|
jmsEx.printStackTrace();
|
||||||
assertTrue(false);
|
assertTrue(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadString() {
|
public void testReadString() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -602,13 +588,14 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadBigString() {
|
public void testReadBigString() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
// Test with a 1Meg String
|
// Test with a 1Meg String
|
||||||
StringBuffer bigSB = new StringBuffer(1024 * 1024);
|
StringBuffer bigSB = new StringBuffer(1024 * 1024);
|
||||||
for (int i = 0; i < 1024 * 1024; i++) {
|
for (int i = 0; i < 1024 * 1024; i++) {
|
||||||
bigSB.append((char)'a' + i % 26);
|
bigSB.append('a' + i % 26);
|
||||||
}
|
}
|
||||||
String bigString = bigSB.toString();
|
String bigString = bigSB.toString();
|
||||||
|
|
||||||
|
@ -622,6 +609,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadBytes() {
|
public void testReadBytes() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -684,6 +672,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadObject() {
|
public void testReadObject() {
|
||||||
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -744,13 +733,13 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
msg.writeBoolean(true);
|
msg.writeBoolean(true);
|
||||||
msg.reset();
|
msg.reset();
|
||||||
assertTrue(((Boolean)msg.readObject()).booleanValue());
|
assertTrue(((Boolean)msg.readObject()).booleanValue());
|
||||||
|
|
||||||
} catch (JMSException jmsEx) {
|
} catch (JMSException jmsEx) {
|
||||||
jmsEx.printStackTrace();
|
jmsEx.printStackTrace();
|
||||||
assertTrue(false);
|
assertTrue(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testClearBody() throws JMSException {
|
public void testClearBody() throws JMSException {
|
||||||
ActiveMQStreamMessage streamMessage = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage streamMessage = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -766,6 +755,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReset() throws JMSException {
|
public void testReset() throws JMSException {
|
||||||
ActiveMQStreamMessage streamMessage = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage streamMessage = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -789,6 +779,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testReadOnlyBody() throws JMSException {
|
public void testReadOnlyBody() throws JMSException {
|
||||||
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
|
||||||
try {
|
try {
|
||||||
|
@ -888,6 +879,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testWriteOnlyBody() throws JMSException {
|
public void testWriteOnlyBody() throws JMSException {
|
||||||
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
|
||||||
message.clearBody();
|
message.clearBody();
|
||||||
|
@ -969,6 +961,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testWriteObject() {
|
public void testWriteObject() {
|
||||||
try {
|
try {
|
||||||
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
|
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
|
||||||
|
@ -982,7 +975,7 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
message.writeObject(new Long(2l));
|
message.writeObject(new Long(2l));
|
||||||
message.writeObject(new Float(2.0f));
|
message.writeObject(new Float(2.0f));
|
||||||
message.writeObject(new Double(2.0d));
|
message.writeObject(new Double(2.0d));
|
||||||
}catch(Exception e) {
|
} catch(Exception e) {
|
||||||
fail(e.getMessage());
|
fail(e.getMessage());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -990,11 +983,93 @@ public class ActiveMQStreamMessageTest extends TestCase {
|
||||||
message.clearBody();
|
message.clearBody();
|
||||||
message.writeObject(new Object());
|
message.writeObject(new Object());
|
||||||
fail("should throw an exception");
|
fail("should throw an exception");
|
||||||
}catch(MessageFormatException e) {
|
} catch(MessageFormatException e) {
|
||||||
|
} catch(Exception e) {
|
||||||
}catch(Exception e) {
|
|
||||||
fail(e.getMessage());
|
fail(e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadEmptyBufferFromStream() throws JMSException {
|
||||||
|
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
|
||||||
|
message.clearBody();
|
||||||
|
|
||||||
|
final byte[] BYTE_LIST = {1, 2, 4};
|
||||||
|
|
||||||
|
byte[] readList = new byte[BYTE_LIST.length - 1];
|
||||||
|
byte[] emptyList = {};
|
||||||
|
|
||||||
|
message.writeBytes(emptyList);
|
||||||
|
message.reset();
|
||||||
|
|
||||||
|
// First call should return zero as the array written was zero sized.
|
||||||
|
assertEquals(0, message.readBytes(readList));
|
||||||
|
|
||||||
|
// Second call should return -1 as we've reached the end of element.
|
||||||
|
assertEquals(-1, message.readBytes(readList));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadMixBufferValuesFromStream() throws JMSException {
|
||||||
|
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
|
||||||
|
message.clearBody();
|
||||||
|
|
||||||
|
final int size = 3;
|
||||||
|
|
||||||
|
final byte[] BYTE_LIST_1 = {1, 2, 3};
|
||||||
|
final byte[] BYTE_LIST_2 = {4, 5, 6};
|
||||||
|
final byte[] EMPTY_LIST = {};
|
||||||
|
|
||||||
|
byte[] bigBuffer = new byte[size + size];
|
||||||
|
byte[] smallBuffer = new byte[size - 1];
|
||||||
|
|
||||||
|
message.writeBytes(BYTE_LIST_1);
|
||||||
|
message.writeBytes(EMPTY_LIST);
|
||||||
|
message.writeBytes(BYTE_LIST_2);
|
||||||
|
message.writeBytes(EMPTY_LIST);
|
||||||
|
message.reset();
|
||||||
|
|
||||||
|
// Read first with big buffer
|
||||||
|
assertEquals(size, message.readBytes(bigBuffer));
|
||||||
|
assertEquals(1, bigBuffer[0]);
|
||||||
|
assertEquals(2, bigBuffer[1]);
|
||||||
|
assertEquals(3, bigBuffer[2]);
|
||||||
|
assertEquals(-1, message.readBytes(bigBuffer));
|
||||||
|
|
||||||
|
// Read the empty buffer, should not be able to read anything else until
|
||||||
|
// the bytes read is completed.
|
||||||
|
assertEquals(0, message.readBytes(bigBuffer));
|
||||||
|
try {
|
||||||
|
message.readBoolean();
|
||||||
|
} catch (JMSException ex) {}
|
||||||
|
assertEquals(-1, message.readBytes(bigBuffer));
|
||||||
|
|
||||||
|
// Read the third buffer with small buffer, anything that is attempted
|
||||||
|
// to be read in between reads or before read completion should throw.
|
||||||
|
assertEquals(smallBuffer.length, message.readBytes(smallBuffer));
|
||||||
|
assertEquals(4, smallBuffer[0]);
|
||||||
|
assertEquals(5, smallBuffer[1]);
|
||||||
|
try {
|
||||||
|
message.readByte();
|
||||||
|
} catch (JMSException ex) {}
|
||||||
|
assertEquals(1, message.readBytes(smallBuffer));
|
||||||
|
assertEquals(6, smallBuffer[0]);
|
||||||
|
try {
|
||||||
|
message.readBoolean();
|
||||||
|
} catch (JMSException ex) {}
|
||||||
|
assertEquals(-1, message.readBytes(bigBuffer));
|
||||||
|
|
||||||
|
// Read the empty buffer, should not be able to read anything else until
|
||||||
|
// the bytes read is completed.
|
||||||
|
assertEquals(0, message.readBytes(bigBuffer));
|
||||||
|
try {
|
||||||
|
message.readBoolean();
|
||||||
|
} catch (JMSException ex) {}
|
||||||
|
assertEquals(-1, message.readBytes(bigBuffer));
|
||||||
|
|
||||||
|
// Message should be empty now
|
||||||
|
try {
|
||||||
|
message.readBoolean();
|
||||||
|
} catch (MessageEOFException ex) {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue