diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index bdb3a02223..25e5f7a372 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -459,7 +459,8 @@ public class ClientProducerImpl implements ClientProducerInternal { } pos += numberOfBytesRead; - } while (pos < minLargeMessageSize); + } + while (pos < minLargeMessageSize); totalSize += pos; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index 2e00436088..b1e96713c5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -2795,27 +2795,36 @@ public class LargeMessageTest extends LargeMessageTestBase { } @Test - public void testStream() throws Exception { + public void testStreamedMessage() throws Exception { + testStream(false); + } + + @Test + public void testStreamedMessageCompressed() throws Exception { + testStream(true); + } + + private void testStream(boolean compressed) throws Exception { ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); - locator.setCompressLargeMessage(true); + locator.setCompressLargeMessage(compressed); ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); ClientSession session = sf.createSession(false, false); - final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE"); + session.createQueue(new QueueConfiguration(ADDRESS)); - server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST)); - - ClientProducer producer = session.createProducer(MY_QUEUE); + ClientProducer producer = session.createProducer(ADDRESS); AtomicBoolean closed = new AtomicBoolean(false); + final int BYTES = 1_000; + InputStream inputStream = new InputStream() { - int bytes = 10_000; + int bytes = BYTES; @Override public int read() throws IOException { if (bytes-- > 0) { @@ -2830,12 +2839,10 @@ public class LargeMessageTest extends LargeMessageTestBase { closed.set(true); } - @Override - public int available () throws IOException { + public int available() { return bytes; } - }; ClientMessage message = session.createMessage(true); @@ -2844,11 +2851,26 @@ public class LargeMessageTest extends LargeMessageTestBase { Wait.assertTrue(closed::get); + session.commit(); + + session.start(); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + + ClientMessage receivedMessage = consumer.receive(5000); + Assert.assertNotNull(receivedMessage); + + ActiveMQBuffer buffer = receivedMessage.getBodyBuffer(); + Assert.assertEquals(BYTES, buffer.readableBytes()); + + for (int i = 0; i < BYTES; i++) { + Assert.assertEquals((byte)10, buffer.readByte()); + } + + Assert.assertEquals(0, buffer.readableBytes()); + session.close(); } - - - } \ No newline at end of file