diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 52b3d946ae..025e6f5a83 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1538,7 +1538,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return (Long) getMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION); } - // JMS Style property access methods. These can result in additional decode of AMQP message // data from Application properties. Updates to application properties puts the message in a // dirty state and requires a re-encode of the data to update all buffer state data otherwise @@ -1617,14 +1616,18 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. private Object getApplicationObjectProperty(String key) { Object value = getApplicationPropertiesMap(false).get(key); if (value instanceof Number) { - // slow path + // AMQP Numeric types must be converted to a compatible value. if (value instanceof UnsignedInteger || - value instanceof UnsignedByte || - value instanceof UnsignedLong || - value instanceof UnsignedShort) { + value instanceof UnsignedByte || + value instanceof UnsignedLong || + value instanceof UnsignedShort) { return ((Number) value).longValue(); } + } else if (value instanceof Binary) { + // Binary wrappers must be unwrapped into a byte[] form. + return getBytesProperty(key); } + return value; } @@ -1671,7 +1674,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public final byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException { - return (byte[]) getApplicationPropertiesMap(false).get(key); + final Object value = getApplicationPropertiesMap(false).get(key); + + if (value instanceof Binary) { + final Binary binary = (Binary) value; + + if (binary.getArray() == null) { + return null; + } else if (binary.getArrayOffset() == 0 && binary.getLength() == binary.getArray().length) { + return binary.getArray(); + } else { + final byte[] payload = new byte[binary.getLength()]; + + System.arraycopy(binary.getArray(), binary.getArrayOffset(), payload, 0, binary.getLength()); + + return payload; + } + } else { + return (byte[]) value; + } } @Override @@ -1741,7 +1762,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public final org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) { - getApplicationPropertiesMap(true).put(key, value); + Binary payload = null; + + if (value != null) { + payload = new Binary(value); + } + + getApplicationPropertiesMap(true).put(key, payload); + return this; } @@ -1835,7 +1863,13 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public final org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException { - getApplicationPropertiesMap(true).put(key, value); + if (value instanceof byte[]) { + // Prevent error from proton encoding, byte array must be wrapped in a Binary type. + putBytesProperty(key, (byte[]) value); + } else { + getApplicationPropertiesMap(true).put(key, value); + } + return this; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 26722de5d2..6f54217304 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -2690,8 +2690,143 @@ public class AMQPMessageTest { assertEquals("test-ma", readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma"))); } + @Test + public void testPutAndGetOfMessageBytesProperties() { + final byte[] empty = new byte[0]; + final byte[] array = new byte[] {0, 1, 2, 3}; + + final AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null); + + assertNull(message.getBytesProperty("test")); + + // Empty byte array + message.putBytesProperty("test", empty); + final byte[] emptyResult = message.getBytesProperty("test"); + assertArrayEquals(empty, emptyResult); + + // Populated byte array + message.putBytesProperty("test", array); + final byte[] arrayResult = message.getBytesProperty("test"); + assertArrayEquals(array, arrayResult); + + // null value set and get + message.putBytesProperty("test", null); + + assertNull(message.getBytesProperty("test")); + } + + @Test + public void testPutAndGetOfMessageBytesPropertiesViaObjectPropertyAccessor() { + final byte[] empty = new byte[0]; + final byte[] array = new byte[] {0, 1, 2, 3}; + + final AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null); + + assertNull(message.getObjectProperty("test")); + + // Empty byte array + message.putObjectProperty("test", empty); + final byte[] emptyResult = (byte[]) message.getObjectProperty("test"); + assertArrayEquals(empty, emptyResult); + + // Populated byte array + message.putObjectProperty("test", array); + final byte[] arrayResult = (byte[]) message.getObjectProperty("test"); + assertArrayEquals(array, arrayResult); + + // null value set and get + message.putObjectProperty("test", null); + + assertNull(message.getObjectProperty("test")); + } + + @Test + public void testGetBytesPropertyHandlesOffsetArrayInBinary() { + final byte[] array = new byte[] {0, 1, 2, 3}; + final byte[] offsetArray = new byte[] {1, 0, 1, 2, 3, 4}; + final Binary offsetBinary = new Binary(offsetArray, 1, array.length); + + final AMQPMessageTestExtension message = new AMQPMessageTestExtension(0, encodedProtonMessage, null); + + assertNull(message.getBytesProperty("test")); + + final ApplicationProperties applicationProperties = message.getDecodedApplicationProperties(); + assertNotNull(applicationProperties.getValue()); + + applicationProperties.getValue().put("test", offsetBinary); + + assertTrue(message.containsProperty("test")); + + final byte[] arrayResult = message.getBytesProperty("test"); + + assertArrayEquals(array, arrayResult); + } + + @Test + public void testGetBytesFromObjectPropertyHandlesOffsetArrayInBinary() { + final byte[] array = new byte[] {0, 1, 2, 3}; + final byte[] offsetArray = new byte[] {1, 0, 1, 2, 3, 4}; + final Binary offsetBinary = new Binary(offsetArray, 1, array.length); + + final AMQPMessageTestExtension message = new AMQPMessageTestExtension(0, encodedProtonMessage, null); + + assertNull(message.getObjectProperty("test")); + + final ApplicationProperties applicationProperties = message.getDecodedApplicationProperties(); + assertNotNull(applicationProperties.getValue()); + + applicationProperties.getValue().put("test", offsetBinary); + + assertTrue(message.containsProperty("test")); + + final Object result = message.getObjectProperty("test"); + + assertTrue(result instanceof byte[]); + assertArrayEquals(array, (byte[]) result); + } + + @Test + public void testBytesPropertySetAndReencode() { + final byte[] array = new byte[] {0, 1, 2, 3}; + + final AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null); + + assertNull(message.getBytesProperty("test")); + + message.putBytesProperty("test", array); + message.reencode(); + + assertTrue(message.containsProperty("test")); + + // Decodes the value from the AMQP data encoding and then checks on the contents + final Binary result = (Binary) message.getApplicationProperties().getValue().get("test"); + + assertNotNull(result); + assertEquals(array.length, result.getLength()); + + // Safe copy to account for possible offset array + final byte[] copy = new byte[array.length]; + System.arraycopy(result.getArray(), result.getArrayOffset(), copy, 0, result.getLength()); + + assertArrayEquals(array, copy); + } + //----- Test Support ------------------------------------------------------// + // Extension allows public access to decoded application properties that is otherwise + // not accessible directly. + private class AMQPMessageTestExtension extends AMQPStandardMessage { + + AMQPMessageTestExtension(long messageFormat, byte[] data, TypedProperties extraProperties) { + super(messageFormat, data, extraProperties, null); + } + + @Override + public ApplicationProperties getDecodedApplicationProperties() { + return applicationProperties; + } + } + private MessageImpl createProtonMessage() { MessageImpl message = (MessageImpl) Proton.message(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java index 5a04044e22..33c0109bc2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java @@ -19,13 +19,18 @@ package org.apache.activemq.artemis.tests.integration.amqp; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; - +import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.Test; @@ -40,6 +45,13 @@ import org.junit.jupiter.api.Timeout; public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { + private final int MIN_LARGE_MESSAGE_SIZE = 2048; + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE); + } + @Test @Timeout(30) public void testReleasedDisposition() throws Exception { @@ -187,4 +199,68 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { connection.close(); } + + @Test + @Timeout(30) + public void testReplayMessageForFQQNRejectedMessage() throws Exception { + doTestReplayMessageForFQQNRejectedMessage(10); + } + + @Test + @Timeout(30) + public void testReplayMessageForFQQNRejectedLargeMessage() throws Exception { + doTestReplayMessageForFQQNRejectedMessage(MIN_LARGE_MESSAGE_SIZE); + } + + public void doTestReplayMessageForFQQNRejectedMessage(int payloadSize) throws Exception { + server.createQueue(QueueConfiguration.of("A1").setAddress("A") + .setRoutingType(RoutingType.MULTICAST) + .setDurable(true)); + + final String targetFQQN = "A::A1"; + final AmqpClient client = createAmqpClient(); + final AmqpConnection connection = addConnection(client.connect()); + final AmqpSession session = connection.createSession(); + final AmqpSender sender = session.createSender(targetFQQN); + final AmqpMessage message = new AmqpMessage(); + + final String payload = "#".repeat(payloadSize); + + message.setMessageId("MSG:1"); + message.setText("Test-Message: " + payload); + message.setDurable(true); + + sender.send(message); + + final AmqpReceiver receiver = session.createReceiver(targetFQQN); + receiver.flow(1); + + final AmqpMessage rejected = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(rejected, "Did not receive message that we want to reject"); + + rejected.reject(); + + final Queue queueView = server.locateQueue(targetFQQN); + final Queue dlqView = server.locateQueue("ActiveMQ.DLQ"); + + Wait.assertEquals(0L, () -> queueView.getMessageCount(), 5000, 100); + Wait.assertEquals(1L, () -> dlqView.getMessageCount(), 5000, 100); + + // This call with the message sent to an FQQN results in a new application + // property being added whose payload is a byte array + + dlqView.retryMessages(null); // No filter so all should match + + Wait.assertEquals(0L, () -> dlqView.getMessageCount(), 5000, 100); + Wait.assertEquals(1L, () -> queueView.getMessageCount(), 5000, 100); + + receiver.flow(2); + + final AmqpMessage retriedMessage = receiver.receive(5, TimeUnit.SECONDS); + + assertNotNull(retriedMessage); + assertEquals("MSG:1", retriedMessage.getMessageId()); + + connection.close(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java index 04563d00d6..31c1fde12f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -32,15 +33,18 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Properties; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Test basic send and receive scenarios using only AMQP sender and receiver links. @@ -166,7 +170,6 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport { private static final String REPLY_TO = "replyTo"; private static final String TIME_TO_LIVE = "timeToLive"; - private boolean checkMessageProperties(AMQPMessage message, Map expectedProperties) { assertNotNull(message); assertNotNull(server.getNodeID()); @@ -301,4 +304,151 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport { receiver.close(); connection.close(); } + + @Test + @Timeout(60) + public void testReencodeMessageWithByteArrayPropertyAddedOnIngress() throws Exception { + final CountDownLatch arrived = new CountDownLatch(1); + final CountDownLatch departed = new CountDownLatch(1); + final AtomicBoolean propertyAddedOnReceive = new AtomicBoolean(false); + final AtomicBoolean propertyFoundOnDispatch = new AtomicBoolean(false); + + final String BYTE_PROPERTY_KEY = "byte-property"; + final byte[] BYTE_PROPERTY_VALUE = "test".getBytes(StandardCharsets.UTF_8); + + server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() { + + @Override + public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException { + message.putBytesProperty(BYTE_PROPERTY_KEY, BYTE_PROPERTY_VALUE); + + try { + message.reencode(); + propertyAddedOnReceive.set(true); + } catch (Exception ex) { + return false; // Reject message if updated encode fails, test will fail + } finally { + arrived.countDown(); + } + + return true; + } + }); + + final AmqpClient client = createAmqpClient(); + final AmqpConnection connection = addConnection(client.connect()); + final AmqpSession session = connection.createSession(); + final AmqpSender sender = session.createSender(getTestName()); + final AmqpMessage message = new AmqpMessage(); + + message.setMessageId("MSG:1"); + message.setText("Test-Message"); + + sender.send(message); + + assertTrue(arrived.await(2, TimeUnit.SECONDS)); + assertTrue(propertyAddedOnReceive.get()); + + server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() { + + @Override + public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException { + if (packet.containsProperty(BYTE_PROPERTY_KEY)) { + propertyFoundOnDispatch.set(true); + } + + departed.countDown(); + return true; + } + }); + + final AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(2); + + final AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(departed.getCount(), 0); + assertTrue(propertyFoundOnDispatch.get()); + assertNotNull(amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY)); + assertTrue(amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY) instanceof Binary); + + final Binary binary = (Binary) amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY); + assertEquals(BYTE_PROPERTY_VALUE.length, binary.getLength()); + + // Make a safe copy in case binary payload is not in exact sized array. + final byte[] copy = new byte[BYTE_PROPERTY_VALUE.length]; + System.arraycopy(binary.getArray(), binary.getArrayOffset(), copy, 0, binary.getLength()); + + assertArrayEquals(BYTE_PROPERTY_VALUE, copy); + + sender.close(); + receiver.close(); + connection.close(); + } + + @Test + @Timeout(60) + public void testGetBytesPropertyReturnsByteArray() throws Exception { + doTestGetBytesPropertyReturnsByteArray("array-payload".getBytes(StandardCharsets.UTF_8)); + } + + @Test + @Timeout(60) + public void testGetBytesPropertyReturnsEmptyByteArray() throws Exception { + doTestGetBytesPropertyReturnsByteArray(new byte[0]); + } + + public void doTestGetBytesPropertyReturnsByteArray(byte[] array) throws Exception { + + final CountDownLatch arrived = new CountDownLatch(1); + final AtomicBoolean bytesReturnedFromBrokerMessage = new AtomicBoolean(false); + + final String BYTE_PROPERTY_KEY = "byte-property"; + + server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() { + + @Override + public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException { + try { + final Object appProperty = message.getApplicationProperties().getValue().get(BYTE_PROPERTY_KEY); + + // The application property should return the encoded Binary value + assertNotNull(appProperty); + assertTrue(appProperty instanceof Binary); + + final byte[] payload = message.getBytesProperty(BYTE_PROPERTY_KEY); + + // The getBytesProperty API should unwrap and return the byte array + assertEquals(array.length, payload.length); + assertArrayEquals(array, payload); + + bytesReturnedFromBrokerMessage.set(true); + } catch (Exception ex) { + return false; // Reject message if updated encode fails, test will fail + } finally { + arrived.countDown(); + } + + return true; + } + }); + + final AmqpClient client = createAmqpClient(); + final AmqpConnection connection = addConnection(client.connect()); + final AmqpSession session = connection.createSession(); + final AmqpSender sender = session.createSender(getTestName()); + final AmqpMessage message = new AmqpMessage(); + + message.setMessageId("MSG:1"); + message.setText("Test-Message"); + message.setApplicationProperty(BYTE_PROPERTY_KEY, new Binary(array)); + + sender.send(message); + + assertTrue(arrived.await(2, TimeUnit.SECONDS)); + assertTrue(bytesReturnedFromBrokerMessage.get()); + + sender.close(); + connection.close(); + } }