From d717a495c74573ad8fce17fe4e7fac9b7f759c14 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 27 Sep 2011 14:13:16 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3465 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1176393 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/TransactionContext.java | 55 ++--- .../org/apache/activemq/bugs/AMQ3465Test.java | 195 ++++++++++++++++++ 2 files changed, 223 insertions(+), 27 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java diff --git a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java index ae323a333b..dc6f602475 100755 --- a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java @@ -56,8 +56,8 @@ import org.slf4j.LoggerFactory; * and so on. A JTA aware JMS provider must fully implement this functionality. * This could be done by using the services of a database that supports XA, or a * JMS provider may choose to implement this functionality from scratch.

- * - * + * + * * @see javax.jms.Session * @see javax.jms.QueueSession * @see javax.jms.TopicSession @@ -88,7 +88,8 @@ public class TransactionContext implements XAResource { } public boolean isInXATransaction() { - return (transactionId != null && transactionId.isXATransaction()) || !ENDED_XA_TRANSACTION_CONTEXTS.isEmpty(); + return (transactionId != null && transactionId.isXATransaction()) || + (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty() && ENDED_XA_TRANSACTION_CONTEXTS.containsValue(this)); } public boolean isInLocalTransaction() { @@ -98,7 +99,7 @@ public class TransactionContext implements XAResource { public boolean isInTransaction() { return transactionId != null; } - + /** * @return Returns the localTransactionEventListener. */ @@ -108,7 +109,7 @@ public class TransactionContext implements XAResource { /** * Used by the resource adapter to listen to transaction events. - * + * * @param localTransactionEventListener The localTransactionEventListener to * set. */ @@ -212,7 +213,7 @@ public class TransactionContext implements XAResource { if (isInXATransaction()) { throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); } - + if (transactionId == null) { synchronizations = null; beforeEndIndex = 0; @@ -229,13 +230,13 @@ public class TransactionContext implements XAResource { LOG.debug("Begin:" + transactionId); } } - + } /** * Rolls back any work done in this transaction and releases any locks * currently held. - * + * * @throws JMSException if the JMS provider fails to roll back the * transaction due to some internal error. * @throws javax.jms.IllegalStateException if the method is not called by a @@ -245,7 +246,7 @@ public class TransactionContext implements XAResource { if (isInXATransaction()) { throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); } - + try { beforeEnd(); } catch (TransactionRolledBackException canOcurrOnFailover) { @@ -254,7 +255,7 @@ public class TransactionContext implements XAResource { if (transactionId != null) { if (LOG.isDebugEnabled()) { LOG.debug("Rollback: " + transactionId - + " syncCount: " + + " syncCount: " + (synchronizations != null ? synchronizations.size() : 0)); } @@ -274,7 +275,7 @@ public class TransactionContext implements XAResource { /** * Commits all work done in this transaction and releases any locks * currently held. - * + * * @throws JMSException if the JMS provider fails to commit the transaction * due to some internal error. * @throws javax.jms.IllegalStateException if the method is not called by a @@ -284,7 +285,7 @@ public class TransactionContext implements XAResource { if (isInXATransaction()) { throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); } - + try { beforeEnd(); } catch (JMSException e) { @@ -296,7 +297,7 @@ public class TransactionContext implements XAResource { if (transactionId != null) { if (LOG.isDebugEnabled()) { LOG.debug("Commit: " + transactionId - + " syncCount: " + + " syncCount: " + (synchronizations != null ? synchronizations.size() : 0)); } @@ -317,7 +318,7 @@ public class TransactionContext implements XAResource { afterRollback(); throw cause; } - + } } @@ -367,11 +368,11 @@ public class TransactionContext implements XAResource { if (LOG.isDebugEnabled()) { LOG.debug("End: " + xid); } - + if (isInLocalTransaction()) { throw new XAException(XAException.XAER_PROTO); } - + if ((flags & (TMSUSPEND | TMFAIL)) != 0) { // You can only suspend the associated xid. if (!equals(associatedXid, xid)) { @@ -416,7 +417,7 @@ public class TransactionContext implements XAResource { if (LOG.isDebugEnabled()) { LOG.debug("Prepare: " + xid); } - + // We allow interleaving multiple transactions, so // we don't limit prepare to the associated xid. XATransactionId x; @@ -471,7 +472,7 @@ public class TransactionContext implements XAResource { if (LOG.isDebugEnabled()) { LOG.debug("Rollback: " + xid); } - + // We allow interleaving multiple transactions, so // we don't limit rollback to the associated xid. XATransactionId x; @@ -512,7 +513,7 @@ public class TransactionContext implements XAResource { if (LOG.isDebugEnabled()) { LOG.debug("Commit: " + xid + ", onePhase=" + onePhase); } - + // We allow interleaving multiple transactions, so // we don't limit commit to the associated xid. XATransactionId x; @@ -569,7 +570,7 @@ public class TransactionContext implements XAResource { if (LOG.isDebugEnabled()) { LOG.debug("Forget: " + xid); } - + // We allow interleaving multiple transactions, so // we don't limit forget to the associated xid. XATransactionId x; @@ -613,7 +614,7 @@ public class TransactionContext implements XAResource { if (LOG.isDebugEnabled()) { LOG.debug("Recover: " + flag); } - + TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); try { this.connection.checkClosedOrFailed(); @@ -709,9 +710,9 @@ public class TransactionContext implements XAResource { /** * Sends the given command. Also sends the command in case of interruption, * so that important commands like rollback and commit are never interrupted. - * If interruption occurred, set the interruption state of the current - * after performing the action again. - * + * If interruption occurred, set the interruption state of the current + * after performing the action again. + * * @return the response */ private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException { @@ -724,9 +725,9 @@ public class TransactionContext implements XAResource { return this.connection.syncSendPacket(command); } finally { Thread.currentThread().interrupt(); - } + } } - + throw e; } } @@ -734,7 +735,7 @@ public class TransactionContext implements XAResource { /** * Converts a JMSException from the server to an XAException. if the * JMSException contained a linked XAException that is returned instead. - * + * * @param e JMSException to convert * @return XAException wrapping original exception or its message */ diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java new file mode 100644 index 0000000000..bac3829bb0 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XASession; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ3465Test +{ + private final String xaDestinationName = "DestinationXA"; + private final String destinationName = "Destination"; + private BrokerService broker; + private String connectionUri; + private long txGenerator = System.currentTimeMillis(); + + private XAConnectionFactory xaConnectionFactory; + private ConnectionFactory connectionFactory; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + xaConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testMixedXAandNonXAorTXSessions() throws Exception { + + XAConnection xaConnection = xaConnectionFactory.createXAConnection(); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(xaDestinationName); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("Some Text"); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = xaConnection.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals("Some Text", receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + + // Test that a normal session doesn't operate on XASession state. + Connection connection2 = connectionFactory.createConnection(); + connection2.start(); + ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (session2.isTransacted()) { + session2.rollback(); + } + + session2.close(); + + resource.commit(tid, true); + } + + @Test + public void testMixedXAandNonXALocalTXSessions() throws Exception { + + XAConnection xaConnection = xaConnectionFactory.createXAConnection(); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(xaDestinationName); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("Some Text"); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = xaConnection.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals("Some Text", receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + + // Test that a normal session doesn't operate on XASession state. + Connection connection2 = connectionFactory.createConnection(); + connection2.start(); + ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = new ActiveMQQueue(destinationName); + ActiveMQMessageProducer producer2 = (ActiveMQMessageProducer) session2.createProducer(destination); + producer2.send(session2.createTextMessage("Local-TX")); + + if (session2.isTransacted()) { + session2.rollback(); + } + + session2.close(); + + resource.commit(tid, true); + } + + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + } +}