This commit is contained in:
Clebert Suconic 2017-05-03 16:33:12 -04:00
commit cdaae1578b
1 changed files with 74 additions and 1 deletions

View File

@ -679,7 +679,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertFalse("First message sent should not be durable", message1.isDurable()); assertFalse("First message sent should not be durable", message1.isDurable());
message1.accept(); message1.accept();
// Create default message that should be sent as non-durable // Create default message that should be sent as durable
AmqpMessage message2 = new AmqpMessage(); AmqpMessage message2 = new AmqpMessage();
message2.setText("Test-Message -> durable"); message2.setText("Test-Message -> durable");
message2.setDurable(true); message2.setDurable(true);
@ -697,6 +697,79 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
@Test(timeout = 60000)
public void testMessageWithHeaderMarkedDurableIsPersisted() throws Exception {
doTestBrokerRestartAndDurability(true, true);
}
@Test(timeout = 60000)
public void testMessageWithHeaderMarkedNonDurableIsNotPersisted() throws Exception {
doTestBrokerRestartAndDurability(false, true);
}
@Test(timeout = 60000)
public void testMessageWithNoHeaderIsNotPersisted() throws Exception {
doTestBrokerRestartAndDurability(false, false);
}
private void doTestBrokerRestartAndDurability(boolean durable, boolean enforceHeader) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
final Queue queueView1 = getProxyToQueue(getQueueName());
// Create default message that should be sent as non-durable
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message -> non-durable");
message.setMessageId("ID:Message:1");
if (durable) {
message.setDurable(true);
} else {
if (enforceHeader) {
message.setDurable(false);
assertNotNull(message.getWrappedMessage().getHeader());
} else {
assertNull(message.getWrappedMessage().getHeader());
}
}
sender.send(message);
connection.close();
assertTrue("Message did not arrive", Wait.waitFor(() -> queueView1.getMessageCount() == 1));
// Restart the server and the Queue should be empty
server.stop();
server.start();
// Reconnect now
connection = addConnection(client.connect());
session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getQueueName());
final Queue queueView2 = getProxyToQueue(getQueueName());
if (durable) {
assertTrue("Message should not have returned", Wait.waitFor(() -> queueView2.getMessageCount() == 1));
} else {
assertTrue("Message should have been restored", Wait.waitFor(() -> queueView2.getMessageCount() == 0));
}
receiver.flow(1);
message = receiver.receive(1, TimeUnit.SECONDS);
if (durable) {
assertNotNull("Should have read a message", message);
} else {
assertNull("Should not have read a message", message);
}
connection.close();
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testReceiveMessageBeyondAckedAmountQueue() throws Exception { public void testReceiveMessageBeyondAckedAmountQueue() throws Exception {
final int MSG_COUNT = 50; final int MSG_COUNT = 50;