ARTEMIS-5074 Fix encoding of bytes properties as Binary in AMQPMessage

When a bytes property is added to an AMQPMessage and it is then reencoded it
will fail without first wrapping the byte array in an AMQP binary as required
by the ApplicationProperties section specification defined type allowances.
This commit is contained in:
Timothy Bish 2024-09-27 12:08:45 -04:00 committed by Robbie Gemmell
parent 1f90b5be0d
commit 65db6c60c9
4 changed files with 405 additions and 10 deletions

View File

@ -1538,7 +1538,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return (Long) getMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION);
}
// JMS Style property access methods. These can result in additional decode of AMQP message
// data from Application properties. Updates to application properties puts the message in a
// dirty state and requires a re-encode of the data to update all buffer state data otherwise
@ -1617,14 +1616,18 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
private Object getApplicationObjectProperty(String key) {
Object value = getApplicationPropertiesMap(false).get(key);
if (value instanceof Number) {
// slow path
// AMQP Numeric types must be converted to a compatible value.
if (value instanceof UnsignedInteger ||
value instanceof UnsignedByte ||
value instanceof UnsignedLong ||
value instanceof UnsignedShort) {
value instanceof UnsignedByte ||
value instanceof UnsignedLong ||
value instanceof UnsignedShort) {
return ((Number) value).longValue();
}
} else if (value instanceof Binary) {
// Binary wrappers must be unwrapped into a byte[] form.
return getBytesProperty(key);
}
return value;
}
@ -1671,7 +1674,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override
public final byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
return (byte[]) getApplicationPropertiesMap(false).get(key);
final Object value = getApplicationPropertiesMap(false).get(key);
if (value instanceof Binary) {
final Binary binary = (Binary) value;
if (binary.getArray() == null) {
return null;
} else if (binary.getArrayOffset() == 0 && binary.getLength() == binary.getArray().length) {
return binary.getArray();
} else {
final byte[] payload = new byte[binary.getLength()];
System.arraycopy(binary.getArray(), binary.getArrayOffset(), payload, 0, binary.getLength());
return payload;
}
} else {
return (byte[]) value;
}
}
@Override
@ -1741,7 +1762,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override
public final org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
getApplicationPropertiesMap(true).put(key, value);
Binary payload = null;
if (value != null) {
payload = new Binary(value);
}
getApplicationPropertiesMap(true).put(key, payload);
return this;
}
@ -1835,7 +1863,13 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override
public final org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
getApplicationPropertiesMap(true).put(key, value);
if (value instanceof byte[]) {
// Prevent error from proton encoding, byte array must be wrapped in a Binary type.
putBytesProperty(key, (byte[]) value);
} else {
getApplicationPropertiesMap(true).put(key, value);
}
return this;
}

View File

@ -2690,8 +2690,143 @@ public class AMQPMessageTest {
assertEquals("test-ma", readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma")));
}
@Test
public void testPutAndGetOfMessageBytesProperties() {
final byte[] empty = new byte[0];
final byte[] array = new byte[] {0, 1, 2, 3};
final AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null);
assertNull(message.getBytesProperty("test"));
// Empty byte array
message.putBytesProperty("test", empty);
final byte[] emptyResult = message.getBytesProperty("test");
assertArrayEquals(empty, emptyResult);
// Populated byte array
message.putBytesProperty("test", array);
final byte[] arrayResult = message.getBytesProperty("test");
assertArrayEquals(array, arrayResult);
// null value set and get
message.putBytesProperty("test", null);
assertNull(message.getBytesProperty("test"));
}
@Test
public void testPutAndGetOfMessageBytesPropertiesViaObjectPropertyAccessor() {
final byte[] empty = new byte[0];
final byte[] array = new byte[] {0, 1, 2, 3};
final AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null);
assertNull(message.getObjectProperty("test"));
// Empty byte array
message.putObjectProperty("test", empty);
final byte[] emptyResult = (byte[]) message.getObjectProperty("test");
assertArrayEquals(empty, emptyResult);
// Populated byte array
message.putObjectProperty("test", array);
final byte[] arrayResult = (byte[]) message.getObjectProperty("test");
assertArrayEquals(array, arrayResult);
// null value set and get
message.putObjectProperty("test", null);
assertNull(message.getObjectProperty("test"));
}
@Test
public void testGetBytesPropertyHandlesOffsetArrayInBinary() {
final byte[] array = new byte[] {0, 1, 2, 3};
final byte[] offsetArray = new byte[] {1, 0, 1, 2, 3, 4};
final Binary offsetBinary = new Binary(offsetArray, 1, array.length);
final AMQPMessageTestExtension message = new AMQPMessageTestExtension(0, encodedProtonMessage, null);
assertNull(message.getBytesProperty("test"));
final ApplicationProperties applicationProperties = message.getDecodedApplicationProperties();
assertNotNull(applicationProperties.getValue());
applicationProperties.getValue().put("test", offsetBinary);
assertTrue(message.containsProperty("test"));
final byte[] arrayResult = message.getBytesProperty("test");
assertArrayEquals(array, arrayResult);
}
@Test
public void testGetBytesFromObjectPropertyHandlesOffsetArrayInBinary() {
final byte[] array = new byte[] {0, 1, 2, 3};
final byte[] offsetArray = new byte[] {1, 0, 1, 2, 3, 4};
final Binary offsetBinary = new Binary(offsetArray, 1, array.length);
final AMQPMessageTestExtension message = new AMQPMessageTestExtension(0, encodedProtonMessage, null);
assertNull(message.getObjectProperty("test"));
final ApplicationProperties applicationProperties = message.getDecodedApplicationProperties();
assertNotNull(applicationProperties.getValue());
applicationProperties.getValue().put("test", offsetBinary);
assertTrue(message.containsProperty("test"));
final Object result = message.getObjectProperty("test");
assertTrue(result instanceof byte[]);
assertArrayEquals(array, (byte[]) result);
}
@Test
public void testBytesPropertySetAndReencode() {
final byte[] array = new byte[] {0, 1, 2, 3};
final AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null);
assertNull(message.getBytesProperty("test"));
message.putBytesProperty("test", array);
message.reencode();
assertTrue(message.containsProperty("test"));
// Decodes the value from the AMQP data encoding and then checks on the contents
final Binary result = (Binary) message.getApplicationProperties().getValue().get("test");
assertNotNull(result);
assertEquals(array.length, result.getLength());
// Safe copy to account for possible offset array
final byte[] copy = new byte[array.length];
System.arraycopy(result.getArray(), result.getArrayOffset(), copy, 0, result.getLength());
assertArrayEquals(array, copy);
}
//----- Test Support ------------------------------------------------------//
// Extension allows public access to decoded application properties that is otherwise
// not accessible directly.
private class AMQPMessageTestExtension extends AMQPStandardMessage {
AMQPMessageTestExtension(long messageFormat, byte[] data, TypedProperties extraProperties) {
super(messageFormat, data, extraProperties, null);
}
@Override
public ApplicationProperties getDecodedApplicationProperties() {
return applicationProperties;
}
}
private MessageImpl createProtonMessage() {
MessageImpl message = (MessageImpl) Proton.message();

View File

@ -19,13 +19,18 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.message.Message;
import org.junit.jupiter.api.Test;
@ -40,6 +45,13 @@ import org.junit.jupiter.api.Timeout;
public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
private final int MIN_LARGE_MESSAGE_SIZE = 2048;
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE);
}
@Test
@Timeout(30)
public void testReleasedDisposition() throws Exception {
@ -187,4 +199,68 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
connection.close();
}
@Test
@Timeout(30)
public void testReplayMessageForFQQNRejectedMessage() throws Exception {
doTestReplayMessageForFQQNRejectedMessage(10);
}
@Test
@Timeout(30)
public void testReplayMessageForFQQNRejectedLargeMessage() throws Exception {
doTestReplayMessageForFQQNRejectedMessage(MIN_LARGE_MESSAGE_SIZE);
}
public void doTestReplayMessageForFQQNRejectedMessage(int payloadSize) throws Exception {
server.createQueue(QueueConfiguration.of("A1").setAddress("A")
.setRoutingType(RoutingType.MULTICAST)
.setDurable(true));
final String targetFQQN = "A::A1";
final AmqpClient client = createAmqpClient();
final AmqpConnection connection = addConnection(client.connect());
final AmqpSession session = connection.createSession();
final AmqpSender sender = session.createSender(targetFQQN);
final AmqpMessage message = new AmqpMessage();
final String payload = "#".repeat(payloadSize);
message.setMessageId("MSG:1");
message.setText("Test-Message: " + payload);
message.setDurable(true);
sender.send(message);
final AmqpReceiver receiver = session.createReceiver(targetFQQN);
receiver.flow(1);
final AmqpMessage rejected = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(rejected, "Did not receive message that we want to reject");
rejected.reject();
final Queue queueView = server.locateQueue(targetFQQN);
final Queue dlqView = server.locateQueue("ActiveMQ.DLQ");
Wait.assertEquals(0L, () -> queueView.getMessageCount(), 5000, 100);
Wait.assertEquals(1L, () -> dlqView.getMessageCount(), 5000, 100);
// This call with the message sent to an FQQN results in a new application
// property being added whose payload is a byte array
dlqView.retryMessages(null); // No filter so all should match
Wait.assertEquals(0L, () -> dlqView.getMessageCount(), 5000, 100);
Wait.assertEquals(1L, () -> queueView.getMessageCount(), 5000, 100);
receiver.flow(2);
final AmqpMessage retriedMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(retriedMessage);
assertEquals("MSG:1", retriedMessage.getMessageId());
connection.close();
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -32,15 +33,18 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Test basic send and receive scenarios using only AMQP sender and receiver links.
@ -166,7 +170,6 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport {
private static final String REPLY_TO = "replyTo";
private static final String TIME_TO_LIVE = "timeToLive";
private boolean checkMessageProperties(AMQPMessage message, Map<String, Object> expectedProperties) {
assertNotNull(message);
assertNotNull(server.getNodeID());
@ -301,4 +304,151 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
}
@Test
@Timeout(60)
public void testReencodeMessageWithByteArrayPropertyAddedOnIngress() throws Exception {
final CountDownLatch arrived = new CountDownLatch(1);
final CountDownLatch departed = new CountDownLatch(1);
final AtomicBoolean propertyAddedOnReceive = new AtomicBoolean(false);
final AtomicBoolean propertyFoundOnDispatch = new AtomicBoolean(false);
final String BYTE_PROPERTY_KEY = "byte-property";
final byte[] BYTE_PROPERTY_VALUE = "test".getBytes(StandardCharsets.UTF_8);
server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException {
message.putBytesProperty(BYTE_PROPERTY_KEY, BYTE_PROPERTY_VALUE);
try {
message.reencode();
propertyAddedOnReceive.set(true);
} catch (Exception ex) {
return false; // Reject message if updated encode fails, test will fail
} finally {
arrived.countDown();
}
return true;
}
});
final AmqpClient client = createAmqpClient();
final AmqpConnection connection = addConnection(client.connect());
final AmqpSession session = connection.createSession();
final AmqpSender sender = session.createSender(getTestName());
final AmqpMessage message = new AmqpMessage();
message.setMessageId("MSG:1");
message.setText("Test-Message");
sender.send(message);
assertTrue(arrived.await(2, TimeUnit.SECONDS));
assertTrue(propertyAddedOnReceive.get());
server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException {
if (packet.containsProperty(BYTE_PROPERTY_KEY)) {
propertyFoundOnDispatch.set(true);
}
departed.countDown();
return true;
}
});
final AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(2);
final AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(departed.getCount(), 0);
assertTrue(propertyFoundOnDispatch.get());
assertNotNull(amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY));
assertTrue(amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY) instanceof Binary);
final Binary binary = (Binary) amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY);
assertEquals(BYTE_PROPERTY_VALUE.length, binary.getLength());
// Make a safe copy in case binary payload is not in exact sized array.
final byte[] copy = new byte[BYTE_PROPERTY_VALUE.length];
System.arraycopy(binary.getArray(), binary.getArrayOffset(), copy, 0, binary.getLength());
assertArrayEquals(BYTE_PROPERTY_VALUE, copy);
sender.close();
receiver.close();
connection.close();
}
@Test
@Timeout(60)
public void testGetBytesPropertyReturnsByteArray() throws Exception {
doTestGetBytesPropertyReturnsByteArray("array-payload".getBytes(StandardCharsets.UTF_8));
}
@Test
@Timeout(60)
public void testGetBytesPropertyReturnsEmptyByteArray() throws Exception {
doTestGetBytesPropertyReturnsByteArray(new byte[0]);
}
public void doTestGetBytesPropertyReturnsByteArray(byte[] array) throws Exception {
final CountDownLatch arrived = new CountDownLatch(1);
final AtomicBoolean bytesReturnedFromBrokerMessage = new AtomicBoolean(false);
final String BYTE_PROPERTY_KEY = "byte-property";
server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() {
@Override
public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException {
try {
final Object appProperty = message.getApplicationProperties().getValue().get(BYTE_PROPERTY_KEY);
// The application property should return the encoded Binary value
assertNotNull(appProperty);
assertTrue(appProperty instanceof Binary);
final byte[] payload = message.getBytesProperty(BYTE_PROPERTY_KEY);
// The getBytesProperty API should unwrap and return the byte array
assertEquals(array.length, payload.length);
assertArrayEquals(array, payload);
bytesReturnedFromBrokerMessage.set(true);
} catch (Exception ex) {
return false; // Reject message if updated encode fails, test will fail
} finally {
arrived.countDown();
}
return true;
}
});
final AmqpClient client = createAmqpClient();
final AmqpConnection connection = addConnection(client.connect());
final AmqpSession session = connection.createSession();
final AmqpSender sender = session.createSender(getTestName());
final AmqpMessage message = new AmqpMessage();
message.setMessageId("MSG:1");
message.setText("Test-Message");
message.setApplicationProperty(BYTE_PROPERTY_KEY, new Binary(array));
sender.send(message);
assertTrue(arrived.await(2, TimeUnit.SECONDS));
assertTrue(bytesReturnedFromBrokerMessage.get());
sender.close();
connection.close();
}
}