diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java index 9eab737100..8c4612df43 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java @@ -69,12 +69,16 @@ public final class AMQPMessageSupport { public static final String CONTENT_TYPE = "ContentType"; public static final String CONTENT_ENCODING = "ContentEncoding"; public static final String REPLYTO_GROUP_ID = "ReplyToGroupID"; + public static final String DURABLE = "DURABLE"; + public static final String PRIORITY = "PRIORITY"; public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; public static final String FOOTER_PREFIX = "FT_"; public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; + public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE; + public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY; public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java index 5094af5abe..9c40cd80c7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java @@ -22,6 +22,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMe import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; @@ -76,12 +78,14 @@ public abstract class InboundTransformer { jms.setBooleanProperty(JMS_AMQP_HEADER, true); if (header.getDurable() != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true); jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); } else { jms.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); } if (header.getPriority() != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true); jms.setJMSPriority(header.getPriority().intValue()); } else { jms.setJMSPriority(Message.DEFAULT_PRIORITY); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index 2fa714506e..7dbc6d4868 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -32,6 +32,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMe import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE; @@ -287,6 +289,18 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { header = new Header(); } continue; + } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) { + if (header == null) { + header = new Header(); + } + header.setDurable(message.getInnerMessage().isDurable()); + continue; + } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) { + if (header == null) { + header = new Header(); + } + header.setPriority(UnsignedByte.valueOf(priority)); + continue; } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { if (properties == null) { properties = new Properties(); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index f7a9364ffb..5cf2c0aa12 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -401,6 +401,25 @@ public class AmqpMessage { return message.getHeader().getDurable(); } + /** + * Sets the priority header on the outgoing message. + * + * @param priority the priority value to set. + */ + public void setPriority(short priority) { + checkReadOnly(); + lazyCreateHeader(); + getWrappedMessage().setPriority(priority); + } + + /** + * Sets the priority header on the outgoing message. + */ + public short getPriority() { + return getWrappedMessage().getPriority(); + } + + /** * Sets a given application property on an outbound message. * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index e102c77038..b9d550440e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -238,6 +238,126 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount()); } + @Test(timeout = 60000) + public void testMessageDurableFalse() throws Exception { + sendMessages(getTestName(), 1, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertFalse(receive.isDurable()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageDurableTrue() throws Exception { + sendMessages(getTestName(), 1, true); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertTrue(receive.isDurable()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageDefaultPriority() throws Exception { + sendMessages(getTestName(), 1, (short) 4); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 4, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageNonDefaultPriority() throws Exception { + sendMessages(getTestName(), 1, (short) 0); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 0, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageNoPriority() throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 4, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; @@ -940,4 +1060,41 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } } + + + public void sendMessages(String destinationName, int count, boolean durable) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + message.setDurable(durable); + sender.send(message); + } + } finally { + connection.close(); + } + } + + public void sendMessages(String destinationName, int count, short priority) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + message.setPriority(priority); + sender.send(message); + } + } finally { + connection.close(); + } + } }