diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java index f29f605218..cf33ecec70 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java @@ -16,6 +16,19 @@ */ package org.apache.activemq.bugs; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.StreamMessage; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Rule; @@ -24,40 +37,44 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import static org.junit.Assert.*; - public class AMQ4887Test { private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4887Test.class); private static final Integer ITERATIONS = 10; + @Rule public TestName name = new TestName(); @Test - public void testSetPropertyBeforeCopy() throws Exception { + public void testBytesMessageSetPropertyBeforeCopy() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); connection.start(); + doTestBytesMessageSetPropertyBeforeCopy(connection); + } + + @Test + public void testBytesMessageSetPropertyBeforeCopyCompressed() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + connectionFactory.setUseCompression(true); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + doTestBytesMessageSetPropertyBeforeCopy(connection); + } + + public void doTestBytesMessageSetPropertyBeforeCopy(Connection connection) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(name.toString()); MessageConsumer consumer = session.createConsumer(destination); MessageProducer producer = session.createProducer(destination); - byte[] messageContent = "bytes message".getBytes(); - BytesMessage bytesMessage = session.createBytesMessage(); - bytesMessage.writeBytes(messageContent); + BytesMessage message = session.createBytesMessage(); for (int i=0; i < ITERATIONS; i++) { + long sendTime = System.currentTimeMillis(); - bytesMessage.setLongProperty("sendTime", sendTime); - producer.send(bytesMessage); + message.setLongProperty("sendTime", sendTime); + producer.send(message); LOG.debug("Receiving message " + i); Message receivedMessage = consumer.receive(5000); @@ -65,16 +82,83 @@ public class AMQ4887Test { assertTrue("On message " + i, receivedMessage instanceof BytesMessage); BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage; - byte[] receivedBytes = new byte[(int) receivedBytesMessage.getBodyLength()]; - receivedBytesMessage.readBytes(receivedBytes); - LOG.debug("Message " + i + " content [" + new String(receivedBytes) + "]"); - assertEquals("On message " + i, messageContent.length, receivedBytes.length); - assertArrayEquals("On message " + i, messageContent, receivedBytes); + + int numElements = 0; + try { + while (true) { + receivedBytesMessage.readBoolean(); + numElements++; + } + } catch (Exception ex) { + } + + LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements); + assertEquals(i, numElements); long receivedSendTime = receivedBytesMessage.getLongProperty("sendTime"); assertEquals("On message " + i, receivedSendTime, sendTime); - Thread.sleep(10); + // Add a new bool value on each iteration. + message.writeBoolean(true); + } + } + + @Test + public void testStreamMessageSetPropertyBeforeCopy() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + doTestStreamMessageSetPropertyBeforeCopy(connection); + } + + @Test + public void testStreamMessageSetPropertyBeforeCopyCompressed() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + connectionFactory.setUseCompression(true); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + doTestStreamMessageSetPropertyBeforeCopy(connection); + } + + public void doTestStreamMessageSetPropertyBeforeCopy(Connection connection) throws Exception { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.toString()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + StreamMessage message = session.createStreamMessage(); + + for (int i=0; i < ITERATIONS; i++) { + + long sendTime = System.currentTimeMillis(); + message.setLongProperty("sendTime", sendTime); + producer.send(message); + + LOG.debug("Receiving message " + i); + Message receivedMessage = consumer.receive(5000); + assertNotNull("On message " + i, receivedMessage); + assertTrue("On message " + i, receivedMessage instanceof StreamMessage); + + StreamMessage receivedStreamMessage = (StreamMessage) receivedMessage; + + int numElements = 0; + try { + while (true) { + receivedStreamMessage.readBoolean(); + numElements++; + } + } catch (Exception ex) { + } + + LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements); + assertEquals(i, numElements); + + long receivedSendTime = receivedStreamMessage.getLongProperty("sendTime"); + assertEquals("On message " + i, receivedSendTime, sendTime); + + // Add a new bool value on each iteration. + message.writeBoolean(true); } }