From ed266835b5aabfcb05e382f3056353a72347f158 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 16 Jun 2015 16:41:18 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5847 Add workarounds to allow for TX work to take place in multiple sessions on the same connection. Future work needed to properly support TXN Capabilities defined in the spec and support checking of violations of expected behavior. --- .../amqp/protocol/AmqpConnection.java | 20 ++++++++ .../transport/amqp/protocol/AmqpReceiver.java | 8 +-- .../transport/amqp/protocol/AmqpSender.java | 14 ++--- .../transport/amqp/protocol/AmqpSession.java | 13 +++++ .../protocol/AmqpTransactionCoordinator.java | 51 +++++++++++-------- .../amqp/JMSClientTransactionTest.java | 32 ++++++++++++ 6 files changed, 107 insertions(+), 31 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 577fcad2bc..c04a61f32e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -60,11 +60,13 @@ import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.amqp.AmqpHeader; import org.apache.activemq.transport.amqp.AmqpInactivityMonitor; @@ -142,10 +144,12 @@ public class AmqpConnection implements AmqpProtocolConverter { private final ConnectionInfo connectionInfo = new ConnectionInfo(); private long nextSessionId; private long nextTempDestinationId; + private long nextTransactionId; private boolean closing; private boolean closedSocket; private AmqpAuthenticator authenticator; + private final Map transactions = new HashMap(); private final ConcurrentMap resposeHandlers = new ConcurrentHashMap(); private final ConcurrentMap subscriptionsByConsumerId = new ConcurrentHashMap(); @@ -667,6 +671,22 @@ public class AmqpConnection implements AmqpProtocolConverter { subscriptionsByConsumerId.remove(consumerId); } + void registerTransaction(TransactionId txId, AmqpTransactionCoordinator coordinator) { + transactions.put(txId, coordinator); + } + + void unregisterTransaction(TransactionId txId) { + transactions.remove(txId); + } + + AmqpTransactionCoordinator getTxCoordinator(TransactionId txId) { + return transactions.get(txId); + } + + LocalTransactionId getNextTransactionId() { + return new LocalTransactionId(getConnectionId(), ++nextTransactionId); + } + ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException { ConsumerInfo result = null; RegionBroker regionBroker; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java index 6e52fec282..e62ad04a7a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java @@ -31,6 +31,7 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.ResponseHandler; import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer; @@ -205,9 +206,10 @@ public class AmqpReceiver extends AmqpAbstractReceiver { final DeliveryState remoteState = delivery.getRemoteState(); if (remoteState != null && remoteState instanceof TransactionalState) { - TransactionalState s = (TransactionalState) remoteState; - long txid = toLong(s.getTxnId()); - message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(), txid)); + TransactionalState txState = (TransactionalState) remoteState; + TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); + session.enlist(txId); + message.setTransactionId(txId); } message.onSend(); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 1dd99d2146..4cbf744f4c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -34,6 +34,7 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.ResponseHandler; import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; @@ -447,14 +448,13 @@ public class AmqpSender extends AmqpAbstractLink { DeliveryState remoteState = delivery.getRemoteState(); if (remoteState != null && remoteState instanceof TransactionalState) { - TransactionalState s = (TransactionalState) remoteState; - long txid = toLong(s.getTxnId()); - LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(), txid); - ack.setTransactionId(localTxId); + TransactionalState txState = (TransactionalState) remoteState; + TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); + ack.setTransactionId(txId); - // Store the message sent in this TX we might need to - // re-send on rollback - md.getMessage().setTransactionId(localTxId); + // Store the message sent in this TX we might need to re-send on rollback + session.enlist(txId); + md.getMessage().setTransactionId(txId); dispatchedInTx.addFirst(md); } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index ca3a90fb7f..20a8b9f480 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -41,6 +41,7 @@ import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.AmqpProtocolException; @@ -72,6 +73,7 @@ public class AmqpSession implements AmqpResource { private final Session protonSession; private final SessionId sessionId; + private boolean enlisted; private long nextProducerId = 0; private long nextConsumerId = 0; @@ -122,6 +124,8 @@ public class AmqpSession implements AmqpResource { for (AmqpSender consumer : consumers.values()) { consumer.commit(); } + + enlisted = false; } /** @@ -133,6 +137,8 @@ public class AmqpSession implements AmqpResource { for (AmqpSender consumer : consumers.values()) { consumer.rollback(); } + + enlisted = false; } /** @@ -367,6 +373,13 @@ public class AmqpSession implements AmqpResource { connection.unregisterSender(consumerId); } + public void enlist(TransactionId txId) { + if (!enlisted) { + connection.getTxCoordinator(txId).enlist(this); + enlisted = true; + } + } + //----- Configuration accessors ------------------------------------------// public AmqpConnection getConnection() { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java index 576ce2059b..40bcda5d47 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java @@ -20,6 +20,8 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes; import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionId; @@ -54,7 +56,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class); - private long nextTransactionId; + private final Set txSessions = new HashSet(); /** * Creates a new Transaction coordinator used to manage AMQP transactions. @@ -82,7 +84,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { } final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext(); - ConnectionId connectionId = session.getConnection().getConnectionId(); + final ConnectionId connectionId = session.getConnection().getConnectionId(); final Object action = ((AmqpValue) message.getBody()).getValue(); LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes); @@ -92,35 +94,41 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { throw new Exception("don't know how to handle a declare /w a set GlobalId"); } - long txid = getNextTransactionId(); - TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN); - sendToActiveMQ(txinfo, null); - LOG.trace("started transaction {}", txid); + LocalTransactionId txId = session.getConnection().getNextTransactionId(); + TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN); + session.getConnection().registerTransaction(txId, this); + sendToActiveMQ(txInfo, null); + LOG.trace("started transaction {}", txId.getValue()); Declared declared = new Declared(); - declared.setTxnId(new Binary(toBytes(txid))); + declared.setTxnId(new Binary(toBytes(txId.getValue()))); delivery.disposition(declared); delivery.settle(); } else if (action instanceof Discharge) { - Discharge discharge = (Discharge) action; - long txid = toLong(discharge.getTxnId()); - + final Discharge discharge = (Discharge) action; + final LocalTransactionId txId = new LocalTransactionId(connectionId, toLong(discharge.getTxnId())); final byte operation; + if (discharge.getFail()) { - LOG.trace("rollback transaction {}", txid); + LOG.trace("rollback transaction {}", txId.getValue()); operation = TransactionInfo.ROLLBACK; } else { - LOG.trace("commit transaction {}", txid); + LOG.trace("commit transaction {}", txId.getValue()); operation = TransactionInfo.COMMIT_ONE_PHASE; } - if (operation == TransactionInfo.ROLLBACK) { - session.rollback(); - } else { - session.commit(); + for (AmqpSession txSession : txSessions) { + if (operation == TransactionInfo.ROLLBACK) { + txSession.rollback(); + } else { + txSession.commit(); + } } - TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation); + txSessions.clear(); + session.getConnection().unregisterTransaction(txId); + + TransactionInfo txinfo = new TransactionInfo(connectionId, txId, operation); sendToActiveMQ(txinfo, new ResponseHandler() { @Override public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { @@ -132,6 +140,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { } else { delivery.disposition(Accepted.getInstance()); } + LOG.debug("TX: {} settling {}", operation, action); delivery.settle(); session.pumpProtonToSocket(); @@ -157,10 +166,6 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { } } - private long getNextTransactionId() { - return ++nextTransactionId; - } - @Override public ActiveMQDestination getDestination() { return null; @@ -169,4 +174,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { @Override public void setDestination(ActiveMQDestination destination) { } + + public void enlist(AmqpSession session) { + txSessions.add(session); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java index 508638e18f..560edda9a8 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java @@ -43,6 +43,38 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { private final int MSG_COUNT = 1000; + @Test(timeout = 60000) + public void testProduceOneConsumeOneInTx() throws Exception { + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination queue = session.createQueue(getTestName()); + MessageProducer messageProducer = session.createProducer(queue); + + messageProducer.send(session.createMessage()); + session.rollback(); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(0, queueView.getQueueSize()); + + messageProducer.send(session.createMessage()); + session.commit(); + + assertEquals(1, queueView.getQueueSize()); + + MessageConsumer messageConsumer = session.createConsumer(queue); + assertNotNull(messageConsumer.receive(5000)); + session.rollback(); + + assertEquals(1, queueView.getQueueSize()); + + assertNotNull(messageConsumer.receive(5000)); + session.commit(); + + assertEquals(0, queueView.getQueueSize()); + } + @Test(timeout = 60000) public void testSingleConsumedMessagePerTxCase() throws Exception { connection = createConnection();