diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index e3e9681531..2d95d29e70 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -128,11 +128,23 @@ public class AmqpMessage { * @throws Exception if an error occurs during the accept. */ public void accept() throws Exception { + accept(true); + } + + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @param settle + * true if the client should also settle the delivery when sending the accept. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(boolean settle) throws Exception { if (receiver == null) { throw new IllegalStateException("Can't accept non-received message."); } - receiver.accept(delivery); + receiver.accept(delivery, settle); } /** @@ -142,11 +154,23 @@ public class AmqpMessage { * @throws Exception if an error occurs during the accept. */ public void accept(AmqpSession txnSession) throws Exception { + accept(txnSession, true); + } + + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @param session + * The session that is used to manage acceptance of the message. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(AmqpSession txnSession, boolean settle) throws Exception { if (receiver == null) { throw new IllegalStateException("Can't accept non-received message."); } - receiver.accept(delivery, txnSession); + receiver.accept(delivery, txnSession, settle); } /** diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 88267133c8..b6d2ba1915 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.transport.amqp.client; -import javax.jms.InvalidDestinationException; +import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; @@ -27,6 +30,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.InvalidDestinationException; + import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; @@ -52,10 +57,6 @@ import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; -import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; -import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; - /** * Receiver class that manages a Proton receiver endpoint. */ @@ -390,13 +391,47 @@ public class AmqpReceiver extends AmqpAbstractResource { } /** - * Accepts a message that was dispatched under the given Delivery instance. + * Accepts a message that was dispatched under the given Delivery instance and settles the delivery. + * + * @param delivery + * the Delivery instance to accept. * - * @param delivery the Delivery instance to accept. * @throws IOException if an error occurs while sending the accept. */ - public void accept(final Delivery delivery) throws IOException { - accept(delivery, this.session); + public void accept(Delivery delivery) throws IOException { + accept(delivery, this.session, true); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance. + * + * @param delivery + * the Delivery instance to accept. + * @param settle + * true if the receiver should settle the delivery or just send the disposition. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(Delivery delivery, boolean settle) throws IOException { + accept(delivery, this.session, settle); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance and settles the delivery. + * + * This method allows for the session that is used in the accept to be specified by the + * caller. This allows for an accepted message to be involved in a transaction that is + * being managed by some other session other than the one that created this receiver. + * + * @param delivery + * the Delivery instance to accept. + * @param session + * the session under which the message is being accepted. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(final Delivery delivery, final AmqpSession session) throws IOException { + accept(delivery, session, true); } /** @@ -406,11 +441,16 @@ public class AmqpReceiver extends AmqpAbstractResource { * caller. This allows for an accepted message to be involved in a transaction that is * being managed by some other session other than the one that created this receiver. * - * @param delivery the Delivery instance to accept. - * @param session the session under which the message is being accepted. + * @param delivery + * the Delivery instance to accept. + * @param session + * the session under which the message is being accepted. + * @param settle + * true if the receiver should settle the delivery or just send the disposition. + * * @throws IOException if an error occurs while sending the accept. */ - public void accept(final Delivery delivery, final AmqpSession session) throws IOException { + public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException { checkClosed(); if (delivery == null) { @@ -440,11 +480,13 @@ public class AmqpReceiver extends AmqpAbstractResource { txState.setOutcome(Accepted.getInstance()); txState.setTxnId(txnId); delivery.disposition(txState); - delivery.settle(); session.getTransactionContext().registerTxConsumer(AmqpReceiver.this); } } else { delivery.disposition(Accepted.getInstance()); + } + + if (settle) { delivery.settle(); } } @@ -462,8 +504,8 @@ public class AmqpReceiver extends AmqpAbstractResource { /** * Mark a message that was dispatched under the given Delivery instance as Modified. * - * @param delivery the Delivery instance to mark modified. - * @param deliveryFailed indicates that the delivery failed for some reason. + * @param delivery the Delivery instance to mark modified. + * @param deliveryFailed indicates that the delivery failed for some reason. * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to. * @throws IOException if an error occurs while sending the reject. */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index e84534f476..e82e7b334d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -622,4 +622,166 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); + + for (int i = 0; i < NUM_MESSAGES + 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver(getTestName()); + ArrayList messages = new ArrayList<>(NUM_MESSAGES); + receiver.flow((NUM_MESSAGES + 2) * 2); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + System.out.println("Read message: " + message.getApplicationProperty("msgId")); + assertNotNull(message); + messages.add(message); + } + + // Commit half the consumed messages [0, 1, 2, 3, 4] + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES / 2; ++i) { + System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId")); + messages.get(i).accept(txnSession, false); + } + txnSession.commit(); + + // Rollback the other half the consumed messages [5, 6, 7, 8, 9] + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId")); + messages.get(i).accept(txnSession, false); + } + txnSession.rollback(); + + // After rollback messages should still be acquired so we read last sent message [10] + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + System.out.println("Read message: " + message.getApplicationProperty("msgId")); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.commit(); + + // The final message [10] should still be pending as we released it previously and committed + // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX + { + receiver.flow(1); + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + System.out.println("Read message: " + message.getApplicationProperty("msgId")); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.accept(); + } + + // We should have now drained the Queue + receiver.flow(1); + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + if (message != null) { + System.out.println("Read message: " + message.getApplicationProperty("msgId")); + } + assertNull(message); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); + + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(2); + AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); + + // Accept the first one in a TXN and send a new message in that TXN as well + txnSession.begin(); + { + // This will result in message [0[ being consumed once we commit. + message1.accept(txnSession, false); + System.out.println("Commit: accepting message: " + message1.getApplicationProperty("msgId")); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES); + + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + // Accept the second one in a TXN and send a new message in that TXN as well but rollback + txnSession.begin(); + { + message2.accept(txnSession, false); + System.out.println("Rollback: accepting message: " + message2.getApplicationProperty("msgId")); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES + 1); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.rollback(); + + // This releases message [1] + message2.release(); + + // Should be ten message available for dispatch given that we sent and committed one, and + // releases another we had previously received. + receiver.flow(10); + for (int i = 1; i <= NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Expected a message for: " + i, message); + System.out.println("Accepting message: " + message.getApplicationProperty("msgId")); + assertEquals(i, message.getApplicationProperty("msgId")); + message.accept(); + } + + // Should be nothing left. + receiver.flow(1); + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + + connection.close(); + } }