From 592b31343e62d909be52be79845a55833813c2ce Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 20 Sep 2017 15:49:53 -0400 Subject: [PATCH] AMQ-6809 Fix issue where stream message getBytes returned wrong value StreamMessage implementation should return zero when the full value of a written byte array has been consumed before then returning -1 on the next read. Ands fix and additional testing of ActiveMQStreamMessage (cherry picked from commit 61d3231d3647fc136b12fd88a9165b783e36585b) --- .../command/ActiveMQStreamMessage.java | 2 +- .../command/ActiveMQStreamMessageTest.java | 155 +++++++++++++----- 2 files changed, 116 insertions(+), 41 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java index 67159104b0..e30d355a23 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java @@ -748,7 +748,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess // big buffer int rc = this.dataIn.read(value, 0, remainingBytes); remainingBytes = 0; - return rc; + return rc != -1 ? rc : 0; } } catch (EOFException e) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java index 9e0f468416..1c9b3b1a65 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,52 +16,31 @@ */ package org.apache.activemq.command; -import java.io.Serializable; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import javax.jms.JMSException; +import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; -import junit.framework.TestCase; +import org.junit.Test; /** - * + * Tests for the ActiveMQ StreamMessage implementation */ -public class ActiveMQStreamMessageTest extends TestCase { - - /** - * Constructor for ActiveMQStreamMessageTest. - * - * @param name - */ - public ActiveMQStreamMessageTest(String name) { - super(name); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(ActiveMQStreamMessageTest.class); - } - - /* - * @see TestCase#setUp() - */ - protected void setUp() throws Exception { - super.setUp(); - } - - /* - * @see TestCase#tearDown() - */ - protected void tearDown() throws Exception { - super.tearDown(); - } +public class ActiveMQStreamMessageTest { + @Test public void testGetDataStructureType() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); assertEquals(msg.getDataStructureType(), CommandTypes.ACTIVEMQ_STREAM_MESSAGE); } + @Test public void testReadBoolean() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -124,6 +103,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testreadByte() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -175,6 +155,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadShort() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -230,6 +211,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadChar() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -293,6 +275,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadInt() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -352,6 +335,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadLong() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -420,6 +404,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadFloat() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -479,6 +464,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadDouble() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -540,9 +526,9 @@ public class ActiveMQStreamMessageTest extends TestCase { jmsEx.printStackTrace(); assertTrue(false); } - } + @Test public void testReadString() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -602,13 +588,14 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadBigString() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { // Test with a 1Meg String StringBuffer bigSB = new StringBuffer(1024 * 1024); for (int i = 0; i < 1024 * 1024; i++) { - bigSB.append((char)'a' + i % 26); + bigSB.append('a' + i % 26); } String bigString = bigSB.toString(); @@ -622,6 +609,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadBytes() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -684,6 +672,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadObject() { ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); try { @@ -744,13 +733,13 @@ public class ActiveMQStreamMessageTest extends TestCase { msg.writeBoolean(true); msg.reset(); assertTrue(((Boolean)msg.readObject()).booleanValue()); - } catch (JMSException jmsEx) { jmsEx.printStackTrace(); assertTrue(false); } } + @Test public void testClearBody() throws JMSException { ActiveMQStreamMessage streamMessage = new ActiveMQStreamMessage(); try { @@ -766,6 +755,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReset() throws JMSException { ActiveMQStreamMessage streamMessage = new ActiveMQStreamMessage(); try { @@ -789,6 +779,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testReadOnlyBody() throws JMSException { ActiveMQStreamMessage message = new ActiveMQStreamMessage(); try { @@ -888,6 +879,7 @@ public class ActiveMQStreamMessageTest extends TestCase { } } + @Test public void testWriteOnlyBody() throws JMSException { ActiveMQStreamMessage message = new ActiveMQStreamMessage(); message.clearBody(); @@ -968,7 +960,8 @@ public class ActiveMQStreamMessageTest extends TestCase { } catch (MessageNotReadableException e) { } } - + + @Test public void testWriteObject() { try { ActiveMQStreamMessage message = new ActiveMQStreamMessage(); @@ -982,7 +975,7 @@ public class ActiveMQStreamMessageTest extends TestCase { message.writeObject(new Long(2l)); message.writeObject(new Float(2.0f)); message.writeObject(new Double(2.0d)); - }catch(Exception e) { + } catch(Exception e) { fail(e.getMessage()); } try { @@ -990,11 +983,93 @@ public class ActiveMQStreamMessageTest extends TestCase { message.clearBody(); message.writeObject(new Object()); fail("should throw an exception"); - }catch(MessageFormatException e) { - - }catch(Exception e) { + } catch(MessageFormatException e) { + } catch(Exception e) { fail(e.getMessage()); } } + @Test + public void testReadEmptyBufferFromStream() throws JMSException { + ActiveMQStreamMessage message = new ActiveMQStreamMessage(); + message.clearBody(); + + final byte[] BYTE_LIST = {1, 2, 4}; + + byte[] readList = new byte[BYTE_LIST.length - 1]; + byte[] emptyList = {}; + + message.writeBytes(emptyList); + message.reset(); + + // First call should return zero as the array written was zero sized. + assertEquals(0, message.readBytes(readList)); + + // Second call should return -1 as we've reached the end of element. + assertEquals(-1, message.readBytes(readList)); + } + + @Test + public void testReadMixBufferValuesFromStream() throws JMSException { + ActiveMQStreamMessage message = new ActiveMQStreamMessage(); + message.clearBody(); + + final int size = 3; + + final byte[] BYTE_LIST_1 = {1, 2, 3}; + final byte[] BYTE_LIST_2 = {4, 5, 6}; + final byte[] EMPTY_LIST = {}; + + byte[] bigBuffer = new byte[size + size]; + byte[] smallBuffer = new byte[size - 1]; + + message.writeBytes(BYTE_LIST_1); + message.writeBytes(EMPTY_LIST); + message.writeBytes(BYTE_LIST_2); + message.writeBytes(EMPTY_LIST); + message.reset(); + + // Read first with big buffer + assertEquals(size, message.readBytes(bigBuffer)); + assertEquals(1, bigBuffer[0]); + assertEquals(2, bigBuffer[1]); + assertEquals(3, bigBuffer[2]); + assertEquals(-1, message.readBytes(bigBuffer)); + + // Read the empty buffer, should not be able to read anything else until + // the bytes read is completed. + assertEquals(0, message.readBytes(bigBuffer)); + try { + message.readBoolean(); + } catch (JMSException ex) {} + assertEquals(-1, message.readBytes(bigBuffer)); + + // Read the third buffer with small buffer, anything that is attempted + // to be read in between reads or before read completion should throw. + assertEquals(smallBuffer.length, message.readBytes(smallBuffer)); + assertEquals(4, smallBuffer[0]); + assertEquals(5, smallBuffer[1]); + try { + message.readByte(); + } catch (JMSException ex) {} + assertEquals(1, message.readBytes(smallBuffer)); + assertEquals(6, smallBuffer[0]); + try { + message.readBoolean(); + } catch (JMSException ex) {} + assertEquals(-1, message.readBytes(bigBuffer)); + + // Read the empty buffer, should not be able to read anything else until + // the bytes read is completed. + assertEquals(0, message.readBytes(bigBuffer)); + try { + message.readBoolean(); + } catch (JMSException ex) {} + assertEquals(-1, message.readBytes(bigBuffer)); + + // Message should be empty now + try { + message.readBoolean(); + } catch (MessageEOFException ex) {} + } }