This closes #917
This commit is contained in:
commit
ceb2b38c83
|
@ -69,12 +69,16 @@ public final class AMQPMessageSupport {
|
||||||
public static final String CONTENT_TYPE = "ContentType";
|
public static final String CONTENT_TYPE = "ContentType";
|
||||||
public static final String CONTENT_ENCODING = "ContentEncoding";
|
public static final String CONTENT_ENCODING = "ContentEncoding";
|
||||||
public static final String REPLYTO_GROUP_ID = "ReplyToGroupID";
|
public static final String REPLYTO_GROUP_ID = "ReplyToGroupID";
|
||||||
|
public static final String DURABLE = "DURABLE";
|
||||||
|
public static final String PRIORITY = "PRIORITY";
|
||||||
|
|
||||||
public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
|
public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
|
||||||
public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
|
public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
|
||||||
public static final String FOOTER_PREFIX = "FT_";
|
public static final String FOOTER_PREFIX = "FT_";
|
||||||
|
|
||||||
public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
|
public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
|
||||||
|
public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE;
|
||||||
|
public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY;
|
||||||
public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
|
public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
|
||||||
public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
|
public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
|
||||||
public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT;
|
public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT;
|
||||||
|
|
|
@ -22,6 +22,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMe
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
|
||||||
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
|
||||||
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
|
||||||
|
|
||||||
|
@ -76,12 +78,14 @@ public abstract class InboundTransformer {
|
||||||
jms.setBooleanProperty(JMS_AMQP_HEADER, true);
|
jms.setBooleanProperty(JMS_AMQP_HEADER, true);
|
||||||
|
|
||||||
if (header.getDurable() != null) {
|
if (header.getDurable() != null) {
|
||||||
|
jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true);
|
||||||
jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||||
} else {
|
} else {
|
||||||
jms.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
|
jms.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (header.getPriority() != null) {
|
if (header.getPriority() != null) {
|
||||||
|
jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true);
|
||||||
jms.setJMSPriority(header.getPriority().intValue());
|
jms.setJMSPriority(header.getPriority().intValue());
|
||||||
} else {
|
} else {
|
||||||
jms.setJMSPriority(Message.DEFAULT_PRIORITY);
|
jms.setJMSPriority(Message.DEFAULT_PRIORITY);
|
||||||
|
|
|
@ -32,6 +32,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMe
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
|
||||||
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
|
||||||
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
|
||||||
|
@ -287,6 +289,18 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
||||||
header = new Header();
|
header = new Header();
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
} else if (key.equals(JMS_AMQP_HEADER_DURABLE)) {
|
||||||
|
if (header == null) {
|
||||||
|
header = new Header();
|
||||||
|
}
|
||||||
|
header.setDurable(message.getInnerMessage().isDurable());
|
||||||
|
continue;
|
||||||
|
} else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) {
|
||||||
|
if (header == null) {
|
||||||
|
header = new Header();
|
||||||
|
}
|
||||||
|
header.setPriority(UnsignedByte.valueOf(priority));
|
||||||
|
continue;
|
||||||
} else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
|
} else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
|
||||||
if (properties == null) {
|
if (properties == null) {
|
||||||
properties = new Properties();
|
properties = new Properties();
|
||||||
|
|
|
@ -401,6 +401,25 @@ public class AmqpMessage {
|
||||||
return message.getHeader().getDurable();
|
return message.getHeader().getDurable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the priority header on the outgoing message.
|
||||||
|
*
|
||||||
|
* @param priority the priority value to set.
|
||||||
|
*/
|
||||||
|
public void setPriority(short priority) {
|
||||||
|
checkReadOnly();
|
||||||
|
lazyCreateHeader();
|
||||||
|
getWrappedMessage().setPriority(priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the priority header on the outgoing message.
|
||||||
|
*/
|
||||||
|
public short getPriority() {
|
||||||
|
return getWrappedMessage().getPriority();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets a given application property on an outbound message.
|
* Sets a given application property on an outbound message.
|
||||||
*
|
*
|
||||||
|
|
|
@ -238,6 +238,126 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
|
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageDurableFalse() throws Exception {
|
||||||
|
sendMessages(getTestName(), 1, false);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
Queue queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(receive);
|
||||||
|
assertFalse(receive.isDurable());
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageDurableTrue() throws Exception {
|
||||||
|
sendMessages(getTestName(), 1, true);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
Queue queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(receive);
|
||||||
|
assertTrue(receive.isDurable());
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageDefaultPriority() throws Exception {
|
||||||
|
sendMessages(getTestName(), 1, (short) 4);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
Queue queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(receive);
|
||||||
|
assertEquals((short) 4, receive.getPriority());
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageNonDefaultPriority() throws Exception {
|
||||||
|
sendMessages(getTestName(), 1, (short) 0);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
Queue queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(receive);
|
||||||
|
assertEquals((short) 0, receive.getPriority());
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageNoPriority() throws Exception {
|
||||||
|
sendMessages(getTestName(), 1);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
Queue queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(receive);
|
||||||
|
assertEquals((short) 4, receive.getPriority());
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
|
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
|
||||||
int MSG_COUNT = 4;
|
int MSG_COUNT = 4;
|
||||||
|
@ -940,4 +1060,41 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
try {
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpSender sender = session.createSender(destinationName);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; ++i) {
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setMessageId("MessageID:" + i);
|
||||||
|
message.setDurable(durable);
|
||||||
|
sender.send(message);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendMessages(String destinationName, int count, short priority) throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
try {
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpSender sender = session.createSender(destinationName);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; ++i) {
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setMessageId("MessageID:" + i);
|
||||||
|
message.setPriority(priority);
|
||||||
|
sender.send(message);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue