ARTEMIS-59 Accept transacted message using AMQP TransactionState

When a message is sent to the broker with a TransactionState indicating
that the message should be included in a transaction the disposition from
the broker indicating acceptance of the message should be done using a
TransactionState value that contained the TX ID and the Accepted
disposition.
This commit is contained in:
Timothy Bish 2017-03-20 12:33:22 -04:00
parent e29c463732
commit 29796151c3
2 changed files with 62 additions and 5 deletions

View File

@ -60,6 +60,7 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
@ -406,7 +407,15 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public void done() {
synchronized (connection.getLock()) {
delivery.disposition(Accepted.getInstance());
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
delivery.disposition(txAccepted);
} else {
delivery.disposition(Accepted.getInstance());
}
delivery.settle();
}
connection.flush();

View File

@ -17,6 +17,11 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@ -24,10 +29,6 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -38,15 +39,23 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test various aspects of Transaction support.
*/
public class AmqpTransactionTest extends AmqpClientTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionTest.class);
@Test(timeout = 30000)
public void testBeginAndCommitTransaction() throws Exception {
AmqpClient client = createAmqpClient();
@ -77,6 +86,45 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 30000)
public void testSentTransactionalMessageIsSettleWithTransactionalDisposition() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
AmqpSender sender = session.createSender(getTestName());
sender.setStateInspector(new AmqpValidator() {
@Override
public void inspectDeliveryUpdate(Delivery delivery) {
if (delivery.remotelySettled()) {
DeliveryState state = delivery.getRemoteState();
if (state instanceof TransactionalState) {
LOG.debug("Remote settled with TX state: {}", state);
} else {
LOG.warn("Remote settled with non-TX state: {}", state);
markAsInvalid("Remote did not settled with TransactionState.");
}
}
}
});
session.begin();
assertTrue(session.isInTransaction());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
session.commit();
sender.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 30000)
public void testBeginAndRollbackTransaction() throws Exception {
AmqpClient client = createAmqpClient();