NO-JIRA: Add some more variants of the .NET transaction tests

Adds ability to not settle accepted messages on the client to enable
creation of tests that are equivalent to the AmqpNetLite client's
transaction tests which hold settlement and expect the resource to
handle it on successful discharge.
This commit is contained in:
Timothy Bish 2016-09-19 17:36:58 -04:00
parent 9211661244
commit 0bb76c7fb4
3 changed files with 221 additions and 7 deletions

View File

@ -132,11 +132,36 @@ public class AmqpMessage {
* @throws Exception if an error occurs during the accept.
*/
public void accept() throws Exception {
accept(true);
}
/**
* Accepts the message marking it as consumed on the remote peer.
*
* @param settle
* true if the client should also settle the delivery when sending the accept.
*
* @throws Exception if an error occurs during the accept.
*/
public void accept(boolean settle) throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't accept non-received message.");
}
receiver.accept(delivery);
receiver.accept(delivery, settle);
}
/**
* Accepts the message marking it as consumed on the remote peer. This method
* will automatically settle the accepted delivery.
*
* @param session
* The session that is used to manage acceptance of the message.
*
* @throws Exception if an error occurs during the accept.
*/
public void accept(AmqpSession txnSession) throws Exception {
accept(txnSession, true);
}
/**
@ -147,12 +172,12 @@ public class AmqpMessage {
*
* @throws Exception if an error occurs during the accept.
*/
public void accept(AmqpSession txnSession) throws Exception {
public void accept(AmqpSession txnSession, boolean settle) throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't accept non-received message.");
}
receiver.accept(delivery, txnSession);
receiver.accept(delivery, txnSession, settle);
}
/**

View File

@ -414,20 +414,34 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
/**
* Accepts a message that was dispatched under the given Delivery instance.
* Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
*
* @param delivery
* the Delivery instance to accept.
*
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(final Delivery delivery) throws IOException {
accept(delivery, this.session);
public void accept(Delivery delivery) throws IOException {
accept(delivery, this.session, true);
}
/**
* Accepts a message that was dispatched under the given Delivery instance.
*
* @param delivery
* the Delivery instance to accept.
* @param settle
* true if the receiver should settle the delivery or just send the disposition.
*
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(Delivery delivery, boolean settle) throws IOException {
accept(delivery, this.session, settle);
}
/**
* Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
*
* This method allows for the session that is used in the accept to be specified by the
* caller. This allows for an accepted message to be involved in a transaction that is
* being managed by some other session other than the one that created this receiver.
@ -440,6 +454,26 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
accept(delivery, session, true);
}
/**
* Accepts a message that was dispatched under the given Delivery instance.
*
* This method allows for the session that is used in the accept to be specified by the
* caller. This allows for an accepted message to be involved in a transaction that is
* being managed by some other session other than the one that created this receiver.
*
* @param delivery
* the Delivery instance to accept.
* @param session
* the session under which the message is being accepted.
* @param settle
* true if the receiver should settle the delivery or just send the disposition.
*
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException {
checkClosed();
if (delivery == null) {
@ -469,11 +503,13 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
txState.setOutcome(Accepted.getInstance());
txState.setTxnId(txnId);
delivery.disposition(txState);
delivery.settle();
session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
}
} else {
delivery.disposition(Accepted.getInstance());
}
if (settle) {
delivery.settle();
}
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Ignore;
import org.junit.Test;
/**
@ -574,4 +575,156 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
// TODO - Direct ports of the AmqpNetLite client tests that don't currently with this broker.
@Ignore("Fails due to no support for TX enrollment without settlement.")
@Test(timeout = 60000)
public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
final int NUM_MESSAGES = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", i);
sender.send(message, txnSession.getTransactionId());
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
messages.add(message);
}
// Commit half the consumed messages
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
messages.get(i).accept(txnSession, false);
}
txnSession.commit();
// Rollback the other half the consumed messages
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession, false);
}
txnSession.rollback();
// After rollback message should still be acquired so we read last sent message.
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
// Commit the other half the consumed messages
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.commit();
// The final message should still be pending.
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
receiver.flow(1);
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.accept();
}
// We should have now drained the Queue
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
receiver.flow(1);
assertNull(message);
connection.close();
}
@Ignore("Fails due to no support for TX enrollment without settlement.")
@Test(timeout = 60000)
public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
final int NUM_MESSAGES = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", i);
sender.send(message, txnSession.getTransactionId());
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(2);
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
// Accept the first one in a TXN and send a new message in that TXN as well
txnSession.begin();
{
message1.accept(txnSession, false);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", NUM_MESSAGES);
sender.send(message, txnSession.getTransactionId());
}
txnSession.commit();
// Accept the second one in a TXN and send a new message in that TXN as well but rollback
txnSession.begin();
{
message2.accept(txnSession, false);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
sender.send(message, txnSession.getTransactionId());
}
txnSession.rollback();
message2.release();
// Should be two message available for dispatch given that we sent and committed one, and
// releases another we had previously received.
receiver.flow(2);
for (int i = 1; i <= NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(i, message.getApplicationProperty("msgId"));
message.accept();
}
// Should be nothing left.
receiver.flow(1);
assertNull(receiver.receive(1, TimeUnit.SECONDS));
connection.close();
}
}