diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageBufferTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageBufferTest.java index 41fdaadf72..bf36711495 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageBufferTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageBufferTest.java @@ -19,8 +19,13 @@ package org.apache.activemq.artemis.tests.integration.client; import java.util.UUID; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; @@ -82,4 +87,50 @@ public class MessageBufferTest extends ActiveMQTestBase { message.acknowledge(); assertEquals(data, message.getBodyBuffer().readString()); } + + @Test + public void simpleTestBytes() throws Exception { + ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.buffer(1500); + for (int i = 0; i < 1024; i++) { + buf.writeByte(getSamplebyte(i)); + } + final String queueName = "simpleQueue"; + final String addressName = "simpleAddress"; + + session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST)); + ClientProducer producer = session.createProducer(addressName); + + { + ClientMessage message = (ClientMessageImpl) session.createMessage(true); + Assert.assertEquals(1024, buf.readableBytes()); + message.getBodyBuffer().writeBytes(buf, 0, buf.readableBytes()); + producer.send(message); + } + + session.commit(); + + producer.close(); + + ClientConsumer consumer = session.createConsumer(queueName); + session.start(); + + { + ClientMessage message = consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(1024, message.getBodySize()); + ActiveMQBuffer buffer = message.getDataBuffer(); + Assert.assertEquals(1024, message.getBodySize()); + ActiveMQBuffer bodyBuffer = message.getBodyBuffer(); + Assert.assertEquals(1024, message.getBodySize()); + + ActiveMQBuffer result = ActiveMQBuffers.fixedBuffer(message.getBodyBufferSize()); + buffer.readBytes(result); + Assert.assertEquals(1024, result.readableBytes()); + for (int i = 0; i < 1024; i++) { + Assert.assertEquals(getSamplebyte(i), result.readByte()); + } + assertNotNull(message); + message.acknowledge(); + } + } }