diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 577fcad2bc..c04a61f32e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -60,11 +60,13 @@ import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.amqp.AmqpHeader; import org.apache.activemq.transport.amqp.AmqpInactivityMonitor; @@ -142,10 +144,12 @@ public class AmqpConnection implements AmqpProtocolConverter { private final ConnectionInfo connectionInfo = new ConnectionInfo(); private long nextSessionId; private long nextTempDestinationId; + private long nextTransactionId; private boolean closing; private boolean closedSocket; private AmqpAuthenticator authenticator; + private final Map transactions = new HashMap(); private final ConcurrentMap resposeHandlers = new ConcurrentHashMap(); private final ConcurrentMap subscriptionsByConsumerId = new ConcurrentHashMap(); @@ -667,6 +671,22 @@ public class AmqpConnection implements AmqpProtocolConverter { subscriptionsByConsumerId.remove(consumerId); } + void registerTransaction(TransactionId txId, AmqpTransactionCoordinator coordinator) { + transactions.put(txId, coordinator); + } + + void unregisterTransaction(TransactionId txId) { + transactions.remove(txId); + } + + AmqpTransactionCoordinator getTxCoordinator(TransactionId txId) { + return transactions.get(txId); + } + + LocalTransactionId getNextTransactionId() { + return new LocalTransactionId(getConnectionId(), ++nextTransactionId); + } + ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException { ConsumerInfo result = null; RegionBroker regionBroker; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java index 6e52fec282..e62ad04a7a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java @@ -31,6 +31,7 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.ResponseHandler; import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer; @@ -205,9 +206,10 @@ public class AmqpReceiver extends AmqpAbstractReceiver { final DeliveryState remoteState = delivery.getRemoteState(); if (remoteState != null && remoteState instanceof TransactionalState) { - TransactionalState s = (TransactionalState) remoteState; - long txid = toLong(s.getTxnId()); - message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(), txid)); + TransactionalState txState = (TransactionalState) remoteState; + TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); + session.enlist(txId); + message.setTransactionId(txId); } message.onSend(); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 1dd99d2146..4cbf744f4c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -34,6 +34,7 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.ResponseHandler; import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; @@ -447,14 +448,13 @@ public class AmqpSender extends AmqpAbstractLink { DeliveryState remoteState = delivery.getRemoteState(); if (remoteState != null && remoteState instanceof TransactionalState) { - TransactionalState s = (TransactionalState) remoteState; - long txid = toLong(s.getTxnId()); - LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(), txid); - ack.setTransactionId(localTxId); + TransactionalState txState = (TransactionalState) remoteState; + TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); + ack.setTransactionId(txId); - // Store the message sent in this TX we might need to - // re-send on rollback - md.getMessage().setTransactionId(localTxId); + // Store the message sent in this TX we might need to re-send on rollback + session.enlist(txId); + md.getMessage().setTransactionId(txId); dispatchedInTx.addFirst(md); } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index ca3a90fb7f..20a8b9f480 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -41,6 +41,7 @@ import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.AmqpProtocolException; @@ -72,6 +73,7 @@ public class AmqpSession implements AmqpResource { private final Session protonSession; private final SessionId sessionId; + private boolean enlisted; private long nextProducerId = 0; private long nextConsumerId = 0; @@ -122,6 +124,8 @@ public class AmqpSession implements AmqpResource { for (AmqpSender consumer : consumers.values()) { consumer.commit(); } + + enlisted = false; } /** @@ -133,6 +137,8 @@ public class AmqpSession implements AmqpResource { for (AmqpSender consumer : consumers.values()) { consumer.rollback(); } + + enlisted = false; } /** @@ -367,6 +373,13 @@ public class AmqpSession implements AmqpResource { connection.unregisterSender(consumerId); } + public void enlist(TransactionId txId) { + if (!enlisted) { + connection.getTxCoordinator(txId).enlist(this); + enlisted = true; + } + } + //----- Configuration accessors ------------------------------------------// public AmqpConnection getConnection() { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java index 576ce2059b..40bcda5d47 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java @@ -20,6 +20,8 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes; import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionId; @@ -54,7 +56,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class); - private long nextTransactionId; + private final Set txSessions = new HashSet(); /** * Creates a new Transaction coordinator used to manage AMQP transactions. @@ -82,7 +84,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { } final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext(); - ConnectionId connectionId = session.getConnection().getConnectionId(); + final ConnectionId connectionId = session.getConnection().getConnectionId(); final Object action = ((AmqpValue) message.getBody()).getValue(); LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes); @@ -92,35 +94,41 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { throw new Exception("don't know how to handle a declare /w a set GlobalId"); } - long txid = getNextTransactionId(); - TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN); - sendToActiveMQ(txinfo, null); - LOG.trace("started transaction {}", txid); + LocalTransactionId txId = session.getConnection().getNextTransactionId(); + TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN); + session.getConnection().registerTransaction(txId, this); + sendToActiveMQ(txInfo, null); + LOG.trace("started transaction {}", txId.getValue()); Declared declared = new Declared(); - declared.setTxnId(new Binary(toBytes(txid))); + declared.setTxnId(new Binary(toBytes(txId.getValue()))); delivery.disposition(declared); delivery.settle(); } else if (action instanceof Discharge) { - Discharge discharge = (Discharge) action; - long txid = toLong(discharge.getTxnId()); - + final Discharge discharge = (Discharge) action; + final LocalTransactionId txId = new LocalTransactionId(connectionId, toLong(discharge.getTxnId())); final byte operation; + if (discharge.getFail()) { - LOG.trace("rollback transaction {}", txid); + LOG.trace("rollback transaction {}", txId.getValue()); operation = TransactionInfo.ROLLBACK; } else { - LOG.trace("commit transaction {}", txid); + LOG.trace("commit transaction {}", txId.getValue()); operation = TransactionInfo.COMMIT_ONE_PHASE; } - if (operation == TransactionInfo.ROLLBACK) { - session.rollback(); - } else { - session.commit(); + for (AmqpSession txSession : txSessions) { + if (operation == TransactionInfo.ROLLBACK) { + txSession.rollback(); + } else { + txSession.commit(); + } } - TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation); + txSessions.clear(); + session.getConnection().unregisterTransaction(txId); + + TransactionInfo txinfo = new TransactionInfo(connectionId, txId, operation); sendToActiveMQ(txinfo, new ResponseHandler() { @Override public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { @@ -132,6 +140,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { } else { delivery.disposition(Accepted.getInstance()); } + LOG.debug("TX: {} settling {}", operation, action); delivery.settle(); session.pumpProtonToSocket(); @@ -157,10 +166,6 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { } } - private long getNextTransactionId() { - return ++nextTransactionId; - } - @Override public ActiveMQDestination getDestination() { return null; @@ -169,4 +174,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { @Override public void setDestination(ActiveMQDestination destination) { } + + public void enlist(AmqpSession session) { + txSessions.add(session); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java index 508638e18f..560edda9a8 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java @@ -43,6 +43,38 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { private final int MSG_COUNT = 1000; + @Test(timeout = 60000) + public void testProduceOneConsumeOneInTx() throws Exception { + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination queue = session.createQueue(getTestName()); + MessageProducer messageProducer = session.createProducer(queue); + + messageProducer.send(session.createMessage()); + session.rollback(); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(0, queueView.getQueueSize()); + + messageProducer.send(session.createMessage()); + session.commit(); + + assertEquals(1, queueView.getQueueSize()); + + MessageConsumer messageConsumer = session.createConsumer(queue); + assertNotNull(messageConsumer.receive(5000)); + session.rollback(); + + assertEquals(1, queueView.getQueueSize()); + + assertNotNull(messageConsumer.receive(5000)); + session.commit(); + + assertEquals(0, queueView.getQueueSize()); + } + @Test(timeout = 60000) public void testSingleConsumedMessagePerTxCase() throws Exception { connection = createConnection();