This commit is contained in:
Justin Bertram 2021-12-13 21:58:03 -06:00
commit 1b32004c29
2 changed files with 49 additions and 1 deletions

View File

@ -1378,7 +1378,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
rc.put(CompositeDataConstants.TYPE, m.getType()); rc.put(CompositeDataConstants.TYPE, m.getType());
if (!m.isLargeMessage()) { if (!m.isLargeMessage()) {
ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1]; int arraySize;
if (valueSizeLimit == -1 || bodyCopy.readableBytes() <= valueSizeLimit) {
arraySize = bodyCopy.readableBytes();
} else {
arraySize = valueSizeLimit;
}
byte[] bytes = new byte[arraySize];
bodyCopy.readBytes(bytes); bodyCopy.readBytes(bytes);
rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit)); rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit));
} else { } else {

View File

@ -577,6 +577,48 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue); session.deleteQueue(queue);
} }
@Test
public void testBytesMessageBodyWithoutLimits() throws Exception {
final int BYTE_COUNT = 2048;
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
AddressSettings addressSettings = new AddressSettings().setManagementMessageAttributeSizeLimit(-1);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
byte[] randomBytes = RandomUtil.randomBytes(BYTE_COUNT);
ClientMessage clientMessage = session.createMessage(false);
clientMessage.getBodyBuffer().writeBytes(randomBytes);
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(0, getMessageCount(queueControl));
ClientProducer producer = session.createProducer(address);
producer.send(clientMessage);
Wait.assertEquals(1, () -> getMessageCount(queueControl));
CompositeData[] browseResult = queueControl.browse(1, 1);
boolean tested = false;
for (CompositeData compositeData : browseResult) {
for (String key : compositeData.getCompositeType().keySet()) {
Object value = compositeData.get(key);
if (value != null) {
if (value instanceof byte[]) {
assertEqualsByteArrays(randomBytes, (byte[]) value);
tested = true;
}
}
}
}
assertTrue("Nothing tested!", tested);
session.deleteQueue(queue);
}
@Test @Test
public void testTextMessageAttributeLimits() throws Exception { public void testTextMessageAttributeLimits() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();