NO-JIRA: Add some additional tests ported from the .NET AMQP client

Adds some transaction tests ported from AMQP .NET client with some
variances based on the way the test client works and limitations in the
brokers handling of Transacted sends.
This commit is contained in:
Timothy Bish 2016-09-15 13:24:18 -04:00
parent b4ab0e1af9
commit 4516c8df3f
1 changed files with 202 additions and 0 deletions

View File

@ -18,8 +18,10 @@ package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.QueueViewMBean;
@ -372,4 +374,204 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(0, queue.getQueueSize());
}
//----- Tests Ported from AmqpNetLite client -----------------------------//
@Test(timeout = 60000)
public void testSendersCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
final int NUM_MESSAGES = 5;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
// Commit TXN work from a sender.
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message, txnSession.getTransactionId());
}
txnSession.commit();
// Rollback an additional batch of TXN work from a sender.
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message, txnSession.getTransactionId());
}
txnSession.rollback();
// Commit more TXN work from a sender.
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message, txnSession.getTransactionId());
}
txnSession.commit();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(NUM_MESSAGES * 2);
for (int i = 0; i < NUM_MESSAGES * 2; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept(txnSession);
}
connection.close();
}
@Test(timeout = 60000)
public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
final int NUM_MESSAGES = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", i);
sender.send(message, txnSession.getTransactionId());
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
messages.add(message);
}
// Commit half the consumed messages
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.commit();
// Rollback the other half the consumed messages
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.rollback();
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
// Commit the other half the consumed messages
// This is a variation from the .NET client tests which doesn't settle the
// messages in the TX until commit is called but on ActiveMQ they will be
// redispatched regardless and not stay in the acquired state.
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
}
txnSession.commit();
// The final message should still be pending.
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
receiver.flow(1);
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
connection.close();
}
@Test(timeout = 60000)
public void testCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
final int NUM_MESSAGES = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", i);
sender.send(message, txnSession.getTransactionId());
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(2);
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
// Accept the first one in a TXN and send a new message in that TXN as well
txnSession.begin();
{
message1.accept(txnSession);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", NUM_MESSAGES);
sender.send(message, txnSession.getTransactionId());
}
txnSession.commit();
// Accept the second one in a TXN and send a new message in that TXN as well but rollback
txnSession.begin();
{
message2.accept(txnSession);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
sender.send(message, txnSession.getTransactionId());
}
txnSession.rollback();
// Variation here from .NET code, the client settles the accepted message where
// the .NET client does not and instead releases here to have it redelivered.
receiver.flow(NUM_MESSAGES);
for (int i = 1; i <= NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(i, message.getApplicationProperty("msgId"));
message.accept();
}
// Should be nothing left.
assertNull(receiver.receive(1, TimeUnit.SECONDS));
connection.close();
}
}