NO-JIRA: add test of defaulted durable field
Verifies broker isnt affected by same behaviour from 5.x issue AMQ 7189
This commit is contained in:
parent
0dae70bc51
commit
bfe0c5cc10
|
@ -45,7 +45,11 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpValidator;
|
import org.apache.activemq.transport.amqp.client.AmqpValidator;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
|
import org.apache.qpid.proton.amqp.UnsignedByte;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||||
import org.apache.qpid.proton.engine.Sender;
|
import org.apache.qpid.proton.engine.Sender;
|
||||||
|
import org.apache.qpid.proton.message.Message;
|
||||||
import org.jgroups.util.UUID;
|
import org.jgroups.util.UUID;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -770,20 +774,25 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testMessageWithHeaderMarkedDurableIsPersisted() throws Exception {
|
public void testMessageWithHeaderMarkedDurableIsPersisted() throws Exception {
|
||||||
doTestBrokerRestartAndDurability(true, true);
|
doTestBrokerRestartAndDurability(true, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testMessageWithHeaderMarkedNonDurableIsNotPersisted() throws Exception {
|
public void testMessageWithHeaderMarkedNonDurableIsNotPersisted() throws Exception {
|
||||||
doTestBrokerRestartAndDurability(false, true);
|
doTestBrokerRestartAndDurability(false, true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageWithHeaderDefaultedNonDurableIsNotPersisted() throws Exception {
|
||||||
|
doTestBrokerRestartAndDurability(false, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testMessageWithNoHeaderIsNotPersisted() throws Exception {
|
public void testMessageWithNoHeaderIsNotPersisted() throws Exception {
|
||||||
doTestBrokerRestartAndDurability(false, false);
|
doTestBrokerRestartAndDurability(false, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doTestBrokerRestartAndDurability(boolean durable, boolean enforceHeader) throws Exception {
|
private void doTestBrokerRestartAndDurability(boolean durable, boolean enforceHeader, boolean explicitSetNonDurable) throws Exception {
|
||||||
AmqpClient client = createAmqpClient();
|
AmqpClient client = createAmqpClient();
|
||||||
AmqpConnection connection = addConnection(client.connect());
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
AmqpSession session = connection.createSession();
|
AmqpSession session = connection.createSession();
|
||||||
|
@ -792,28 +801,37 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
final Queue queueView1 = getProxyToQueue(getQueueName());
|
final Queue queueView1 = getProxyToQueue(getQueueName());
|
||||||
|
|
||||||
// Create default message that should be sent as non-durable
|
Message protonMessage = Message.Factory.create();
|
||||||
AmqpMessage message = new AmqpMessage();
|
protonMessage.setMessageId("ID:Message:1");
|
||||||
message.setText("Test-Message -> non-durable");
|
protonMessage.setBody(new AmqpValue("Test-Message -> " + (durable ? "durable" : "non-durable")));
|
||||||
message.setMessageId("ID:Message:1");
|
if (durable || enforceHeader) {
|
||||||
|
Header header = new Header();
|
||||||
if (durable) {
|
if (durable) {
|
||||||
message.setDurable(true);
|
header.setDurable(true);
|
||||||
} else {
|
|
||||||
if (enforceHeader) {
|
|
||||||
message.setDurable(false);
|
|
||||||
assertNotNull(message.getWrappedMessage().getHeader());
|
|
||||||
} else {
|
} else {
|
||||||
assertNull(message.getWrappedMessage().getHeader());
|
if (explicitSetNonDurable) {
|
||||||
|
header.setDurable(false);
|
||||||
|
} else {
|
||||||
|
// Set priority so the durable field gets defaulted
|
||||||
|
header.setPriority(UnsignedByte.valueOf((byte) 5));
|
||||||
|
assertNull(header.getDurable());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protonMessage.setHeader(header);
|
||||||
|
} else {
|
||||||
|
assertNull("Should not have a header", protonMessage.getHeader());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage(protonMessage);
|
||||||
|
|
||||||
sender.send(message);
|
sender.send(message);
|
||||||
connection.close();
|
connection.close();
|
||||||
|
|
||||||
Wait.assertEquals(1, queueView1::getMessageCount);
|
Wait.assertEquals(1, queueView1::getMessageCount);
|
||||||
|
|
||||||
// Restart the server and the Queue should be empty
|
// Restart the server and the Queue should be empty
|
||||||
|
// if the message was non-durable
|
||||||
server.stop();
|
server.stop();
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue