From 4516c8df3f45db4ea3495e397744e9c235b68d7f Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 15 Sep 2016 13:24:18 -0400 Subject: [PATCH] NO-JIRA: Add some additional tests ported from the .NET AMQP client Adds some transaction tests ported from AMQP .NET client with some variances based on the way the test client works and limitations in the brokers handling of Transacted sends. --- .../amqp/interop/AmqpTransactionTest.java | 202 ++++++++++++++++++ 1 file changed, 202 insertions(+) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java index 97089a931f..7cf6026924 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java @@ -18,8 +18,10 @@ package org.apache.activemq.transport.amqp.interop; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.jmx.QueueViewMBean; @@ -372,4 +374,204 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(0, queue.getQueueSize()); } + + //----- Tests Ported from AmqpNetLite client -----------------------------// + + @Test(timeout = 60000) + public void testSendersCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception { + final int NUM_MESSAGES = 5; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + // Commit TXN work from a sender. + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + // Rollback an additional batch of TXN work from a sender. + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.rollback(); + + // Commit more TXN work from a sender. + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(NUM_MESSAGES * 2); + for (int i = 0; i < NUM_MESSAGES * 2; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(txnSession); + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < NUM_MESSAGES + 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + ArrayList messages = new ArrayList<>(NUM_MESSAGES); + receiver.flow((NUM_MESSAGES + 2) * 2); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + messages.add(message); + } + + // Commit half the consumed messages + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES / 2; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.commit(); + + // Rollback the other half the consumed messages + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.rollback(); + + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + // Commit the other half the consumed messages + // This is a variation from the .NET client tests which doesn't settle the + // messages in the TX until commit is called but on ActiveMQ they will be + // redispatched regardless and not stay in the acquired state. + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + } + txnSession.commit(); + + // The final message should still be pending. + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + receiver.flow(1); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(2); + AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); + + // Accept the first one in a TXN and send a new message in that TXN as well + txnSession.begin(); + { + message1.accept(txnSession); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES); + + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + // Accept the second one in a TXN and send a new message in that TXN as well but rollback + txnSession.begin(); + { + message2.accept(txnSession); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES + 1); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.rollback(); + + // Variation here from .NET code, the client settles the accepted message where + // the .NET client does not and instead releases here to have it redelivered. + + receiver.flow(NUM_MESSAGES); + for (int i = 1; i <= NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(i, message.getApplicationProperty("msgId")); + message.accept(); + } + + // Should be nothing left. + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + + connection.close(); + } }