diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 3592dbc77c..034cb720e2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -60,6 +60,7 @@ import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; @@ -406,7 +407,15 @@ public class AMQPSessionCallback implements SessionCallback { @Override public void done() { synchronized (connection.getLock()) { - delivery.disposition(Accepted.getInstance()); + if (delivery.getRemoteState() instanceof TransactionalState) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId()); + + delivery.disposition(txAccepted); + } else { + delivery.disposition(Accepted.getInstance()); + } delivery.settle(); } connection.flush(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index 41bc5e782c..d49d499158 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -17,6 +17,11 @@ package org.apache.activemq.artemis.tests.integration.amqp; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; @@ -24,10 +29,6 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -38,15 +39,23 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.engine.Delivery; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test various aspects of Transaction support. */ public class AmqpTransactionTest extends AmqpClientTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionTest.class); + @Test(timeout = 30000) public void testBeginAndCommitTransaction() throws Exception { AmqpClient client = createAmqpClient(); @@ -77,6 +86,45 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 30000) + public void testSentTransactionalMessageIsSettleWithTransactionalDisposition() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + assertNotNull(session); + + AmqpSender sender = session.createSender(getTestName()); + sender.setStateInspector(new AmqpValidator() { + + @Override + public void inspectDeliveryUpdate(Delivery delivery) { + if (delivery.remotelySettled()) { + DeliveryState state = delivery.getRemoteState(); + if (state instanceof TransactionalState) { + LOG.debug("Remote settled with TX state: {}", state); + } else { + LOG.warn("Remote settled with non-TX state: {}", state); + markAsInvalid("Remote did not settled with TransactionState."); + } + } + } + }); + + session.begin(); + + assertTrue(session.isInTransaction()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + session.commit(); + + sender.getStateInspector().assertValid(); + + connection.close(); + } + @Test(timeout = 30000) public void testBeginAndRollbackTransaction() throws Exception { AmqpClient client = createAmqpClient();