Revert "ARTEMIS-888 - AMQP headers arent always set"

This reverts commit d471f6b15f.
This commit is contained in:
Clebert Suconic 2016-12-14 10:20:18 -05:00
parent 9954b42b72
commit 08e0c5e4f1
3 changed files with 29 additions and 185 deletions

View File

@ -54,6 +54,7 @@ import java.util.Set;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageEOFException; import javax.jms.MessageEOFException;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.TemporaryQueue; import javax.jms.TemporaryQueue;
@ -130,7 +131,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
} }
long messageFormat = 0; long messageFormat = 0;
Header header = new Header(); Header header = null;
Properties properties = null; Properties properties = null;
Map<Symbol, Object> daMap = null; Map<Symbol, Object> daMap = null;
Map<Symbol, Object> maMap = null; Map<Symbol, Object> maMap = null;
@ -139,12 +140,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
Section body = convertBody(message); Section body = convertBody(message);
header.setDurable(message.getInnerMessage().isDurable()); if (message.getInnerMessage().isDurable()) {
if (header == null) {
header = new Header();
}
header.setDurable(true);
}
byte priority = (byte) message.getJMSPriority(); byte priority = (byte) message.getJMSPriority();
if (priority != Message.DEFAULT_PRIORITY) {
if (header == null) {
header = new Header();
}
header.setPriority(UnsignedByte.valueOf(priority)); header.setPriority(UnsignedByte.valueOf(priority));
}
String type = message.getJMSType(); String type = message.getJMSType();
if (type != null) { if (type != null) {
if (properties == null) { if (properties == null) {
@ -152,7 +160,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
} }
properties.setSubject(type); properties.setSubject(type);
} }
String messageId = message.getJMSMessageID(); String messageId = message.getJMSMessageID();
if (messageId != null) { if (messageId != null) {
if (properties == null) { if (properties == null) {
@ -204,6 +211,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
ttl = 1; ttl = 1;
} }
if (header == null) {
header = new Header();
}
header.setTtl(new UnsignedInteger((int) ttl)); header.setTtl(new UnsignedInteger((int) ttl));
if (properties == null) { if (properties == null) {
@ -227,6 +237,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
// whereas JMSXDeliveryCount includes the first/current delivery attempt. // whereas JMSXDeliveryCount includes the first/current delivery attempt.
int amqpDeliveryCount = message.getDeliveryCount() - 1; int amqpDeliveryCount = message.getDeliveryCount() - 1;
if (amqpDeliveryCount > 0) { if (amqpDeliveryCount > 0) {
if (header == null) {
header = new Header();
}
header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
} }
continue; continue;
@ -264,9 +277,15 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
// skip..internal use only // skip..internal use only
continue; continue;
} else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) { } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
if (header == null) {
header = new Header();
}
header.setFirstAcquirer(message.getBooleanProperty(key)); header.setFirstAcquirer(message.getBooleanProperty(key));
continue; continue;
} else if (key.equals(JMS_AMQP_HEADER)) { } else if (key.equals(JMS_AMQP_HEADER)) {
if (header == null) {
header = new Header();
}
continue; continue;
} else if (key.startsWith(JMS_AMQP_PROPERTIES)) { } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
if (properties == null) { if (properties == null) {
@ -346,8 +365,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
EncoderImpl encoder = tlsCodec.get().encoder; EncoderImpl encoder = tlsCodec.get().encoder;
encoder.setByteBuffer(buffer); encoder.setByteBuffer(buffer);
if (header != null) {
encoder.writeObject(header); encoder.writeObject(header);
}
if (daMap != null) { if (daMap != null) {
encoder.writeObject(new DeliveryAnnotations(daMap)); encoder.writeObject(new DeliveryAnnotations(daMap));
} }

View File

@ -401,25 +401,6 @@ public class AmqpMessage {
return message.getHeader().getDurable(); return message.getHeader().getDurable();
} }
/**
* Sets the priority header on the outgoing message.
*
* @param priority the priority value to set.
*/
public void setPriority(short priority) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setPriority(priority);
}
/**
* Sets the priority header on the outgoing message.
*/
public short getPriority() {
return getWrappedMessage().getPriority();
}
/** /**
* Sets a given application property on an outbound message. * Sets a given application property on an outbound message.
* *

View File

@ -174,126 +174,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
@Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception {
sendMessages(getTestName(), 1, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(receive);
assertFalse(receive.isDurable());
receiver.close();
assertEquals(1, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testMessageDurableTrue() throws Exception {
sendMessages(getTestName(), 1, true);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(receive);
assertTrue(receive.isDurable());
receiver.close();
assertEquals(1, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testMessageDefaultPriority() throws Exception {
sendMessages(getTestName(), 1, (short) 4);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(receive);
assertEquals((short) 4, receive.getPriority());
receiver.close();
assertEquals(1, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testMessageNonDefaultPriority() throws Exception {
sendMessages(getTestName(), 1, (short) 0);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(receive);
assertEquals((short) 0, receive.getPriority());
receiver.close();
assertEquals(1, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testMessageNoPriority() throws Exception {
sendMessages(getTestName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
Queue queueView = getProxyToQueue(getTestName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(receive);
assertEquals((short) 4, receive.getPriority());
receiver.close();
assertEquals(1, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4; int MSG_COUNT = 4;
@ -969,41 +849,4 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
} }
public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(destinationName);
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
message.setDurable(durable);
sender.send(message);
}
} finally {
connection.close();
}
}
public void sendMessages(String destinationName, int count, short priority) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(destinationName);
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
message.setPriority(priority);
sender.send(message);
}
} finally {
connection.close();
}
}
} }