diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 8b378e1b9f..2b1b874975 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -132,11 +132,36 @@ public class AmqpMessage { * @throws Exception if an error occurs during the accept. */ public void accept() throws Exception { + accept(true); + } + + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @param settle + * true if the client should also settle the delivery when sending the accept. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(boolean settle) throws Exception { if (receiver == null) { throw new IllegalStateException("Can't accept non-received message."); } - receiver.accept(delivery); + receiver.accept(delivery, settle); + } + + /** + * Accepts the message marking it as consumed on the remote peer. This method + * will automatically settle the accepted delivery. + * + * @param session + * The session that is used to manage acceptance of the message. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(AmqpSession txnSession) throws Exception { + accept(txnSession, true); } /** @@ -147,12 +172,12 @@ public class AmqpMessage { * * @throws Exception if an error occurs during the accept. */ - public void accept(AmqpSession txnSession) throws Exception { + public void accept(AmqpSession txnSession, boolean settle) throws Exception { if (receiver == null) { throw new IllegalStateException("Can't accept non-received message."); } - receiver.accept(delivery, txnSession); + receiver.accept(delivery, txnSession, settle); } /** diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 999e033f89..3543ae3d5d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -414,20 +414,34 @@ public class AmqpReceiver extends AmqpAbstractResource { } /** - * Accepts a message that was dispatched under the given Delivery instance. + * Accepts a message that was dispatched under the given Delivery instance and settles the delivery. * * @param delivery * the Delivery instance to accept. * * @throws IOException if an error occurs while sending the accept. */ - public void accept(final Delivery delivery) throws IOException { - accept(delivery, this.session); + public void accept(Delivery delivery) throws IOException { + accept(delivery, this.session, true); } /** * Accepts a message that was dispatched under the given Delivery instance. * + * @param delivery + * the Delivery instance to accept. + * @param settle + * true if the receiver should settle the delivery or just send the disposition. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(Delivery delivery, boolean settle) throws IOException { + accept(delivery, this.session, settle); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance and settles the delivery. + * * This method allows for the session that is used in the accept to be specified by the * caller. This allows for an accepted message to be involved in a transaction that is * being managed by some other session other than the one that created this receiver. @@ -440,6 +454,26 @@ public class AmqpReceiver extends AmqpAbstractResource { * @throws IOException if an error occurs while sending the accept. */ public void accept(final Delivery delivery, final AmqpSession session) throws IOException { + accept(delivery, session, true); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance. + * + * This method allows for the session that is used in the accept to be specified by the + * caller. This allows for an accepted message to be involved in a transaction that is + * being managed by some other session other than the one that created this receiver. + * + * @param delivery + * the Delivery instance to accept. + * @param session + * the session under which the message is being accepted. + * @param settle + * true if the receiver should settle the delivery or just send the disposition. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException { checkClosed(); if (delivery == null) { @@ -469,11 +503,13 @@ public class AmqpReceiver extends AmqpAbstractResource { txState.setOutcome(Accepted.getInstance()); txState.setTxnId(txnId); delivery.disposition(txState); - delivery.settle(); session.getTransactionContext().registerTxConsumer(AmqpReceiver.this); } } else { delivery.disposition(Accepted.getInstance()); + } + + if (settle) { delivery.settle(); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java index 7cf6026924..994a2e73e3 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Ignore; import org.junit.Test; /** @@ -574,4 +575,156 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } + + // TODO - Direct ports of the AmqpNetLite client tests that don't currently with this broker. + + @Ignore("Fails due to no support for TX enrollment without settlement.") + @Test(timeout = 60000) + public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < NUM_MESSAGES + 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + ArrayList messages = new ArrayList<>(NUM_MESSAGES); + receiver.flow((NUM_MESSAGES + 2) * 2); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + messages.add(message); + } + + // Commit half the consumed messages + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES / 2; ++i) { + messages.get(i).accept(txnSession, false); + } + txnSession.commit(); + + // Rollback the other half the consumed messages + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession, false); + } + txnSession.rollback(); + + // After rollback message should still be acquired so we read last sent message. + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + // Commit the other half the consumed messages + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.commit(); + + // The final message should still be pending. + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + receiver.flow(1); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.accept(); + } + + // We should have now drained the Queue + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + receiver.flow(1); + assertNull(message); + + connection.close(); + } + + @Ignore("Fails due to no support for TX enrollment without settlement.") + @Test(timeout = 60000) + public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(2); + AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); + + // Accept the first one in a TXN and send a new message in that TXN as well + txnSession.begin(); + { + message1.accept(txnSession, false); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES); + + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + // Accept the second one in a TXN and send a new message in that TXN as well but rollback + txnSession.begin(); + { + message2.accept(txnSession, false); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES + 1); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.rollback(); + + message2.release(); + + // Should be two message available for dispatch given that we sent and committed one, and + // releases another we had previously received. + receiver.flow(2); + for (int i = 1; i <= NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(i, message.getApplicationProperty("msgId")); + message.accept(); + } + + // Should be nothing left. + receiver.flow(1); + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + + connection.close(); + } }