From 08e0c5e4f1c34b6fe82bf232ce87fd5bc774def4 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 14 Dec 2016 10:20:18 -0500 Subject: [PATCH] Revert "ARTEMIS-888 - AMQP headers arent always set" This reverts commit d471f6b15fab7d7afad8e34635869df18ac0cef4. --- .../JMSMappingOutboundTransformer.java | 38 ++++- .../transport/amqp/client/AmqpMessage.java | 19 --- .../integration/amqp/AmqpSendReceiveTest.java | 157 ------------------ 3 files changed, 29 insertions(+), 185 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index baec5f9c83..2fa714506e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -54,6 +54,7 @@ import java.util.Set; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageEOFException; import javax.jms.Queue; import javax.jms.TemporaryQueue; @@ -130,7 +131,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { } long messageFormat = 0; - Header header = new Header(); + Header header = null; Properties properties = null; Map daMap = null; Map maMap = null; @@ -139,12 +140,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { Section body = convertBody(message); - header.setDurable(message.getInnerMessage().isDurable()); - + if (message.getInnerMessage().isDurable()) { + if (header == null) { + header = new Header(); + } + header.setDurable(true); + } byte priority = (byte) message.getJMSPriority(); - - header.setPriority(UnsignedByte.valueOf(priority)); - + if (priority != Message.DEFAULT_PRIORITY) { + if (header == null) { + header = new Header(); + } + header.setPriority(UnsignedByte.valueOf(priority)); + } String type = message.getJMSType(); if (type != null) { if (properties == null) { @@ -152,7 +160,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { } properties.setSubject(type); } - String messageId = message.getJMSMessageID(); if (messageId != null) { if (properties == null) { @@ -204,6 +211,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { ttl = 1; } + if (header == null) { + header = new Header(); + } header.setTtl(new UnsignedInteger((int) ttl)); if (properties == null) { @@ -227,6 +237,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { // whereas JMSXDeliveryCount includes the first/current delivery attempt. int amqpDeliveryCount = message.getDeliveryCount() - 1; if (amqpDeliveryCount > 0) { + if (header == null) { + header = new Header(); + } header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); } continue; @@ -264,9 +277,15 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { // skip..internal use only continue; } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) { + if (header == null) { + header = new Header(); + } header.setFirstAcquirer(message.getBooleanProperty(key)); continue; } else if (key.equals(JMS_AMQP_HEADER)) { + if (header == null) { + header = new Header(); + } continue; } else if (key.startsWith(JMS_AMQP_PROPERTIES)) { if (properties == null) { @@ -346,8 +365,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { EncoderImpl encoder = tlsCodec.get().encoder; encoder.setByteBuffer(buffer); - encoder.writeObject(header); - + if (header != null) { + encoder.writeObject(header); + } if (daMap != null) { encoder.writeObject(new DeliveryAnnotations(daMap)); } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 5cf2c0aa12..f7a9364ffb 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -401,25 +401,6 @@ public class AmqpMessage { return message.getHeader().getDurable(); } - /** - * Sets the priority header on the outgoing message. - * - * @param priority the priority value to set. - */ - public void setPriority(short priority) { - checkReadOnly(); - lazyCreateHeader(); - getWrappedMessage().setPriority(priority); - } - - /** - * Sets the priority header on the outgoing message. - */ - public short getPriority() { - return getWrappedMessage().getPriority(); - } - - /** * Sets a given application property on an outbound message. * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 3e9072c863..aae265050e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -174,126 +174,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } - @Test(timeout = 60000) - public void testMessageDurableFalse() throws Exception { - sendMessages(getTestName(), 1, false); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getTestName()); - - Queue queueView = getProxyToQueue(getTestName()); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(receive); - assertFalse(receive.isDurable()); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - - @Test(timeout = 60000) - public void testMessageDurableTrue() throws Exception { - sendMessages(getTestName(), 1, true); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getTestName()); - - Queue queueView = getProxyToQueue(getTestName()); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(receive); - assertTrue(receive.isDurable()); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - - @Test(timeout = 60000) - public void testMessageDefaultPriority() throws Exception { - sendMessages(getTestName(), 1, (short) 4); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getTestName()); - - Queue queueView = getProxyToQueue(getTestName()); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(receive); - assertEquals((short) 4, receive.getPriority()); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - - @Test(timeout = 60000) - public void testMessageNonDefaultPriority() throws Exception { - sendMessages(getTestName(), 1, (short) 0); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getTestName()); - - Queue queueView = getProxyToQueue(getTestName()); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(receive); - assertEquals((short) 0, receive.getPriority()); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - - @Test(timeout = 60000) - public void testMessageNoPriority() throws Exception { - sendMessages(getTestName(), 1); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getTestName()); - - Queue queueView = getProxyToQueue(getTestName()); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(receive); - assertEquals((short) 4, receive.getPriority()); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; @@ -969,41 +849,4 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } } - - - public void sendMessages(String destinationName, int count, boolean durable) throws Exception { - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - try { - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(destinationName); - - for (int i = 0; i < count; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setMessageId("MessageID:" + i); - message.setDurable(durable); - sender.send(message); - } - } finally { - connection.close(); - } - } - - public void sendMessages(String destinationName, int count, short priority) throws Exception { - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - try { - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(destinationName); - - for (int i = 0; i < count; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setMessageId("MessageID:" + i); - message.setPriority(priority); - sender.send(message); - } - } finally { - connection.close(); - } - } }