From 0ab75b9968fe115aead128076341b46161756f2d Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Mon, 18 Nov 2019 10:36:05 +0000 Subject: [PATCH] ARTEMIS-2554 - Queue control browse broken with large messages https://issues.apache.org/jira/browse/ARTEMIS-2554 --- .../impl/openmbean/OpenTypeSupport.java | 10 ++- .../client/LargeMessageCompressTest.java | 63 +++++++++++++++++++ 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java index dfd8ce3cab..027bbdb608 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java @@ -302,10 +302,14 @@ public final class OpenTypeSupport { Map rc = super.getFields(ref); ICoreMessage m = ref.getMessage().toCore(); if (!m.isLargeMessage()) { - SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); - rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : ""); + if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) { + rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]"); + } else { + SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); + rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : ""); + } } else { - rc.put(CompositeDataConstants.TEXT_BODY, ""); + rc.put(CompositeDataConstants.TEXT_BODY, "[large message]"); } return rc; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index 26454823e6..117ca75ae5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -33,11 +33,14 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Test; +import javax.management.openmbean.CompositeData; + /** * A LargeMessageCompressTest *
@@ -61,6 +64,66 @@ public class LargeMessageCompressTest extends LargeMessageTest { return super.createFactory(isNetty).setCompressLargeMessage(true); } + @Test + public void testLargeMessageCompressionNotCompressedAndBrowsed() throws Exception { + final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + ActiveMQServer server = createServer(true, isNetty()); + + server.start(); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, false, false)); + + session.createTemporaryQueue(ADDRESS, ADDRESS); + + ClientProducer producer = session.createProducer(ADDRESS); + + Message clientFile = createLargeClientMessageStreaming(session, messageSize, true); + + clientFile.setType(Message.TEXT_TYPE); + + producer.send(clientFile); + + session.commit(); + + session.close(); + + QueueControlImpl queueControl = (QueueControlImpl) server.getManagementService().getResource("queue.SimpleAddress"); + + CompositeData[] browse = queueControl.browse(); + + Assert.assertNotNull(browse); + + Assert.assertEquals(browse.length, 1); + + Assert.assertEquals(browse[0].get("text"), "[compressed]"); + + //clean up + session = addClientSession(sf.createSession(false, false, false)); + + session.start(); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + ClientMessage msg1 = consumer.receive(1000); + Assert.assertNotNull(msg1); + + for (int i = 0; i < messageSize; i++) { + byte b = msg1.getBodyBuffer().readByte(); + assertEquals("position = " + i, getSamplebyte(i), b); + } + + msg1.acknowledge(); + session.commit(); + + consumer.close(); + + session.close(); + + validateNoFilesOnLargeDir(); + } + @Test public void testLargeMessageCompression() throws Exception { final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);