ARTEMIS-1508 Adding a test with Expiry, to make sure the message copy works correctly
This commit is contained in:
parent
ce9e824297
commit
f2b5114af8
|
@ -216,11 +216,11 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
|||
}
|
||||
|
||||
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
|
||||
// Default DLQ
|
||||
// Default Queue
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
|
||||
server.createQueue(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST, SimpleString.toSimpleString(getQueueName()), null, true, false, -1, false, true);
|
||||
|
||||
// Default Queue
|
||||
// Default DLQ
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getDeadLetterAddress()), RoutingType.ANYCAST));
|
||||
server.createQueue(SimpleString.toSimpleString(getDeadLetterAddress()), RoutingType.ANYCAST, SimpleString.toSimpleString(getDeadLetterAddress()), null, true, false, -1, false, true);
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
AmqpMessage received = receiver.receiveNoWait();
|
||||
assertNull(received);
|
||||
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
@ -63,6 +63,60 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testExpiryThroughTTL() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getQueueName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setTimeToLive(1);
|
||||
message.setText("Test-Message");
|
||||
message.setDurable(true);
|
||||
message.setApplicationProperty("key1", "Value1");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
Thread.sleep(100);
|
||||
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receiveNoWait();
|
||||
assertNull(received);
|
||||
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
||||
connection.close();
|
||||
|
||||
// This will stop and start the server
|
||||
// to make sure the message is decoded again from its binary format
|
||||
// avoiding any parsing cached at the server.
|
||||
server.stop();
|
||||
server.start();
|
||||
|
||||
client = createAmqpClient();
|
||||
connection = addConnection(client.connect());
|
||||
session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress());
|
||||
receiverDLQ.flow(1);
|
||||
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
|
||||
Assert.assertEquals(1, received.getTimeToLive());
|
||||
System.out.println("received.heandler.TTL" + received.getTimeToLive());
|
||||
Assert.assertNotNull(received);
|
||||
Assert.assertEquals("Value1", received.getApplicationProperty("key1"));
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
|
@ -119,7 +173,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
AmqpMessage received = receiver.receiveNoWait();
|
||||
assertNull(received);
|
||||
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
@ -154,7 +208,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
AmqpMessage received = receiver.receiveNoWait();
|
||||
assertNull(received);
|
||||
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
@ -253,7 +307,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
AmqpMessage received = receiver.receiveNoWait();
|
||||
assertNull(received);
|
||||
|
||||
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||
|
|
Loading…
Reference in New Issue