Fixed bad default for message persistence that breaks AMQP specification
defined behavior when the durable value is not present in the Header.
This commit is contained in:
Timothy Bish 2015-06-05 11:21:27 -04:00
parent a095e9b9dd
commit 11da37b991
5 changed files with 85 additions and 8 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.message;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Message;
public class AMQPRawInboundTransformer extends InboundTransformer {
@ -40,7 +41,9 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
BytesMessage rc = vendor.createBytesMessage();
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
rc.setJMSDeliveryMode(defaultDeliveryMode);
// We cannot decode the message headers to check so err on the side of caution
// and mark all messages as persistent.
rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
rc.setJMSPriority(defaultPriority);
final long now = System.currentTimeMillis();

View File

@ -51,7 +51,7 @@ public abstract class InboundTransformer {
String prefixMessageAnnotations = "MA_";
String prefixFooter = "FT_";
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;

View File

@ -25,6 +25,7 @@ import java.net.URI;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -89,7 +90,7 @@ public class AmqpTransformerTest {
Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
assertEquals(0L, messageFormat.longValue());
assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
assertEquals(2, message.getJMSDeliveryMode());
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
assertEquals(7, message.getJMSPriority());
c.close();
@ -138,10 +139,9 @@ public class AmqpTransformerTest {
Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
assertEquals(0L, messageFormat.longValue());
assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed);
assertEquals(2, message.getJMSDeliveryMode());
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
// should not equal 7 (should equal the default) because "raw" does not map
// headers
// should not equal 7 (should equal the default) because "raw" does not map headers
assertEquals(4, message.getJMSPriority());
c.close();
@ -187,7 +187,7 @@ public class AmqpTransformerTest {
Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT");
assertEquals(0L, messageFormat.longValue());
assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed);
assertEquals(2, message.getJMSDeliveryMode());
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
c.close();
session.close();

View File

@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.engine.Delivery;
@ -53,7 +54,6 @@ public class AmqpMessage {
delivery = null;
message = Proton.message();
message.setDurable(true);
}
/**
@ -246,6 +246,32 @@ public class AmqpMessage {
return message.getProperties().getGroupId();
}
/**
* Sets the durable header on the outgoing message.
*
* @param durable
* the boolean durable value to set.
*/
public void setDurable(boolean durable) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setDurable(durable);
}
/**
* Checks the durable value in the Message Headers to determine if
* the message was sent as a durable Message.
*
* @return true if the message is marked as being durable.
*/
public boolean isDurable() {
if (message.getHeader() == null) {
return false;
}
return message.getHeader().getDurable();
}
/**
* Sets a given application property on an outbound message.
*
@ -448,6 +474,11 @@ public class AmqpMessage {
}
}
private void lazyCreateHeader() {
if (message.getHeader() == null) {
message.setHeader(new Header());
}
}
private void lazyCreateProperties() {
if (message.getProperties() == null) {
message.setProperties(new Properties());

View File

@ -17,7 +17,9 @@
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
@ -154,4 +156,45 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver1.close();
receiver2.close();
}
@Test(timeout = 60000)
public void testMessageDurabliltyFollowsSpec() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
QueueViewMBean queue = getProxyToQueue(getTestName());
// Create default message that should be sent as non-durable
AmqpMessage message1 = new AmqpMessage();
message1.setText("Test-Message -> non-durable");
message1.setDurable(false);
message1.setMessageId("ID:Message:1");
sender.send(message1);
assertEquals(1, queue.getQueueSize());
receiver1.flow(1);
message1 = receiver1.receive(50, TimeUnit.SECONDS);
assertFalse("First message sent should not be durable", message1.isDurable());
message1.accept();
// Create default message that should be sent as non-durable
AmqpMessage message2 = new AmqpMessage();
message2.setText("Test-Message -> durable");
message2.setDurable(true);
message2.setMessageId("ID:Message:2");
sender.send(message2);
assertEquals(1, queue.getQueueSize());
receiver1.flow(1);
message2 = receiver1.receive(50, TimeUnit.SECONDS);
assertTrue("Second message sent should be durable", message2.isDurable());
message2.accept();
sender.close();
connection.close();
}
}