From 195046c50359667fc6d9480cf13c3d9e1d123d66 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 21 Sep 2016 16:12:52 -0400 Subject: [PATCH] NO-JIRA: Adding an extra test on AmqpTransactionTest The test I'm adding was back ported from Artemis. It will validate if the ACKs are nacked in case of a connection.close(); To avoid a situation where the TX would sit on a Transaction Resource Manager somewhere like an XID. --- .../amqp/interop/AmqpTransactionTest.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java index 994a2e73e3..0815f8ab9a 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java @@ -150,6 +150,51 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testReceiveAfterConnectionClose() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + final QueueViewMBean queue = getProxyToQueue(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + assertEquals(1, queue.getQueueSize()); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + session.begin(); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + // this will force a rollback on the TX (It should at least) + connection.close(); + + connection = client.connect(); + session = connection.createSession(); + receiver = session.createReceiver(getTestName()); + session.begin(); + receiver.flow(1); + + received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + session.commit(); + + assertEquals(0, queue.getQueueSize()); + + connection.close(); + } + + @Test(timeout = 60000) public void testReceiveMessageWithRollback() throws Exception { AmqpClient client = createAmqpClient();