diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index 2fa714506e..baec5f9c83 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -54,7 +54,6 @@ import java.util.Set; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.Message; import javax.jms.MessageEOFException; import javax.jms.Queue; import javax.jms.TemporaryQueue; @@ -131,7 +130,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { } long messageFormat = 0; - Header header = null; + Header header = new Header(); Properties properties = null; Map daMap = null; Map maMap = null; @@ -140,19 +139,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { Section body = convertBody(message); - if (message.getInnerMessage().isDurable()) { - if (header == null) { - header = new Header(); - } - header.setDurable(true); - } + header.setDurable(message.getInnerMessage().isDurable()); + byte priority = (byte) message.getJMSPriority(); - if (priority != Message.DEFAULT_PRIORITY) { - if (header == null) { - header = new Header(); - } - header.setPriority(UnsignedByte.valueOf(priority)); - } + + header.setPriority(UnsignedByte.valueOf(priority)); + String type = message.getJMSType(); if (type != null) { if (properties == null) { @@ -160,6 +152,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { } properties.setSubject(type); } + String messageId = message.getJMSMessageID(); if (messageId != null) { if (properties == null) { @@ -211,9 +204,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { ttl = 1; } - if (header == null) { - header = new Header(); - } header.setTtl(new UnsignedInteger((int) ttl)); if (properties == null) { @@ -237,9 +227,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { // whereas JMSXDeliveryCount includes the first/current delivery attempt. int amqpDeliveryCount = message.getDeliveryCount() - 1; if (amqpDeliveryCount > 0) { - if (header == null) { - header = new Header(); - } header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); } continue; @@ -277,15 +264,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { // skip..internal use only continue; } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) { - if (header == null) { - header = new Header(); - } header.setFirstAcquirer(message.getBooleanProperty(key)); continue; } else if (key.equals(JMS_AMQP_HEADER)) { - if (header == null) { - header = new Header(); - } continue; } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { if (properties == null) { @@ -365,9 +346,8 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { EncoderImpl encoder = tlsCodec.get().encoder; encoder.setByteBuffer(buffer); - if (header != null) { - encoder.writeObject(header); - } + encoder.writeObject(header); + if (daMap != null) { encoder.writeObject(new DeliveryAnnotations(daMap)); } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index f7a9364ffb..5cf2c0aa12 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -401,6 +401,25 @@ public class AmqpMessage { return message.getHeader().getDurable(); } + /** + * Sets the priority header on the outgoing message. + * + * @param priority the priority value to set. + */ + public void setPriority(short priority) { + checkReadOnly(); + lazyCreateHeader(); + getWrappedMessage().setPriority(priority); + } + + /** + * Sets the priority header on the outgoing message. + */ + public short getPriority() { + return getWrappedMessage().getPriority(); + } + + /** * Sets a given application property on an outbound message. * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index aae265050e..3e9072c863 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -174,6 +174,126 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testMessageDurableFalse() throws Exception { + sendMessages(getTestName(), 1, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertFalse(receive.isDurable()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageDurableTrue() throws Exception { + sendMessages(getTestName(), 1, true); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertTrue(receive.isDurable()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageDefaultPriority() throws Exception { + sendMessages(getTestName(), 1, (short) 4); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 4, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageNonDefaultPriority() throws Exception { + sendMessages(getTestName(), 1, (short) 0); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 0, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageNoPriority() throws Exception { + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 4, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; @@ -849,4 +969,41 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } } + + + public void sendMessages(String destinationName, int count, boolean durable) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + message.setDurable(durable); + sender.send(message); + } + } finally { + connection.close(); + } + } + + public void sendMessages(String destinationName, int count, short priority) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(destinationName); + + for (int i = 0; i < count; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + message.setPriority(priority); + sender.send(message); + } + } finally { + connection.close(); + } + } }