From fb2270dc057c773f99e679132fd8ce926ff2fc58 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 29 Oct 2021 11:01:05 -0500 Subject: [PATCH] ARTEMIS-3535 bytes messages not obeying management limit --- .../core/message/impl/CoreMessage.java | 8 +++- .../management/QueueControlTest.java | 42 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 6c55c3f075..48ea5fdf46 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -1378,7 +1378,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { rc.put(CompositeDataConstants.TYPE, m.getType()); if (!m.isLargeMessage()) { ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); - byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1]; + int arraySize; + if (valueSizeLimit == -1 || bodyCopy.readableBytes() <= valueSizeLimit) { + arraySize = bodyCopy.readableBytes(); + } else { + arraySize = valueSizeLimit; + } + byte[] bytes = new byte[arraySize]; bodyCopy.readBytes(bytes); rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit)); } else { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 7d4804c89d..aa7c897dfa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -577,6 +577,48 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testBytesMessageBodyWithoutLimits() throws Exception { + final int BYTE_COUNT = 2048; + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + AddressSettings addressSettings = new AddressSettings().setManagementMessageAttributeSizeLimit(-1); + server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings); + + session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable)); + + byte[] randomBytes = RandomUtil.randomBytes(BYTE_COUNT); + + ClientMessage clientMessage = session.createMessage(false); + clientMessage.getBodyBuffer().writeBytes(randomBytes); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, getMessageCount(queueControl)); + + ClientProducer producer = session.createProducer(address); + producer.send(clientMessage); + + Wait.assertEquals(1, () -> getMessageCount(queueControl)); + + CompositeData[] browseResult = queueControl.browse(1, 1); + boolean tested = false; + for (CompositeData compositeData : browseResult) { + for (String key : compositeData.getCompositeType().keySet()) { + Object value = compositeData.get(key); + if (value != null) { + if (value instanceof byte[]) { + assertEqualsByteArrays(randomBytes, (byte[]) value); + tested = true; + } + } + } + } + + assertTrue("Nothing tested!", tested); + session.deleteQueue(queue); + } + @Test public void testTextMessageAttributeLimits() throws Exception { SimpleString address = RandomUtil.randomSimpleString();