diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 9dc1138798..559364e63c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -45,7 +45,11 @@ import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.Message; import org.jgroups.util.UUID; import org.junit.Assert; import org.junit.Test; @@ -770,20 +774,25 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageWithHeaderMarkedDurableIsPersisted() throws Exception { - doTestBrokerRestartAndDurability(true, true); + doTestBrokerRestartAndDurability(true, true, false); } @Test(timeout = 60000) public void testMessageWithHeaderMarkedNonDurableIsNotPersisted() throws Exception { - doTestBrokerRestartAndDurability(false, true); + doTestBrokerRestartAndDurability(false, true, true); + } + + @Test(timeout = 60000) + public void testMessageWithHeaderDefaultedNonDurableIsNotPersisted() throws Exception { + doTestBrokerRestartAndDurability(false, true, false); } @Test(timeout = 60000) public void testMessageWithNoHeaderIsNotPersisted() throws Exception { - doTestBrokerRestartAndDurability(false, false); + doTestBrokerRestartAndDurability(false, false, false); } - private void doTestBrokerRestartAndDurability(boolean durable, boolean enforceHeader) throws Exception { + private void doTestBrokerRestartAndDurability(boolean durable, boolean enforceHeader, boolean explicitSetNonDurable) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); @@ -792,28 +801,37 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { final Queue queueView1 = getProxyToQueue(getQueueName()); - // Create default message that should be sent as non-durable - AmqpMessage message = new AmqpMessage(); - message.setText("Test-Message -> non-durable"); - message.setMessageId("ID:Message:1"); - - if (durable) { - message.setDurable(true); - } else { - if (enforceHeader) { - message.setDurable(false); - assertNotNull(message.getWrappedMessage().getHeader()); + Message protonMessage = Message.Factory.create(); + protonMessage.setMessageId("ID:Message:1"); + protonMessage.setBody(new AmqpValue("Test-Message -> " + (durable ? "durable" : "non-durable"))); + if (durable || enforceHeader) { + Header header = new Header(); + if (durable) { + header.setDurable(true); } else { - assertNull(message.getWrappedMessage().getHeader()); + if (explicitSetNonDurable) { + header.setDurable(false); + } else { + // Set priority so the durable field gets defaulted + header.setPriority(UnsignedByte.valueOf((byte) 5)); + assertNull(header.getDurable()); + } } + + protonMessage.setHeader(header); + } else { + assertNull("Should not have a header", protonMessage.getHeader()); } + AmqpMessage message = new AmqpMessage(protonMessage); + sender.send(message); connection.close(); Wait.assertEquals(1, queueView1::getMessageCount); // Restart the server and the Queue should be empty + // if the message was non-durable server.stop(); server.start();