From 004be56127e7e949c8ffff8c36435ea0f27d32d4 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 13 Jun 2018 11:48:43 +0100 Subject: [PATCH] AMQ-2659 - make configurable based on xaAckMode, otherwise pure xa case can default to autoack in error. Additional tests --- .../apache/activemq/ActiveMQXASession.java | 19 +- .../jms/pool/XAConnectionPoolTest.java | 42 ++- .../ConnectionFailureEvictsFromPoolTest.java | 2 +- .../activemq/pool/XAConnectionPoolTest.java | 38 ++- .../apache/activemq/JMSXAConsumerTest.java | 6 +- .../org/apache/activemq/XAConsumerTest.java | 298 ++++++++++++++++++ .../store/jdbc/JDBCXACommitExceptionTest.java | 4 +- .../activemq/store/jdbc/XACompletionTest.java | 8 +- 8 files changed, 395 insertions(+), 22 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/XAConsumerTest.java diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java index 6e92416c03..6c928833de 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java @@ -68,11 +68,6 @@ public class ActiveMQXASession extends ActiveMQSession implements QueueSession, super(connection, sessionId, theAcknowlegeMode, dispatchAsync); } - public boolean getTransacted() throws JMSException { - checkClosed(); - return getTransactionContext().isInXATransaction(); - } - public void rollback() throws JMSException { checkClosed(); throw new TransactionInProgressException("Cannot rollback() inside an XASession"); @@ -99,16 +94,12 @@ public class ActiveMQXASession extends ActiveMQSession implements QueueSession, return new ActiveMQTopicSession(this); } - /* - * when there is no XA transaction it is auto ack - */ - public boolean isAutoAcknowledge() { - return true; - } - protected void doStartTransaction() throws JMSException { - // allow non transactional auto ack work on an XASession - // Seems ok by the spec that an XAConnection can be used without an XA tx + if (acknowledgementMode != SESSION_TRANSACTED) { + // ok once the factory XaAckMode has been explicitly set to allow use outside an XA tx + } else if (!getTransactionContext().isInXATransaction()) { + throw new JMSException("Session's XAResource has not been enlisted in a distributed transaction."); + } } } diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java index be13d37f89..d793759d36 100644 --- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java @@ -21,6 +21,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Hashtable; import java.util.Vector; @@ -47,7 +50,9 @@ import javax.transaction.Synchronization; import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.ActiveMQXASession; @@ -63,6 +68,9 @@ public class XAConnectionPoolTest extends JmsPoolTestSupport { ActiveMQTopic topic = new ActiveMQTopic("test"); XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); pcf.setConnectionFactory(new XAConnectionFactoryOnly(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"))); + + final Xid xid = createXid(); + // simple TM that is in a tx and will track syncs pcf.setTransactionManager(new TransactionManager(){ @Override @@ -92,7 +100,12 @@ public class XAConnectionPoolTest extends JmsPoolTestSupport { @Override public boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException { - return false; + try { + xaRes.start(xid, 0); + } catch (XAException e) { + throw new SystemException(e.getMessage()); + } + return true; } @Override @@ -160,6 +173,33 @@ public class XAConnectionPoolTest extends JmsPoolTestSupport { pcf.stop(); } + static long txGenerator = 22; + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + } + + + @Test(timeout = 60000) public void testAckModeOfPoolNonXAWithTM() throws Exception { final Vector syncs = new Vector(); diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java index 8c07111683..35b2af9891 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java @@ -68,7 +68,7 @@ public class ConnectionFailureEvictsFromPoolTest extends TestSupport { public void testEvictionXA() throws Exception { XaPooledConnectionFactory pooledFactory = - new XaPooledConnectionFactory(new ActiveMQXAConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false")); + new XaPooledConnectionFactory(new ActiveMQXAConnectionFactory("mock:(" + connector.getConnectUri() + "?closeAsync=false)?jms.xaAckMode=1")); doTestEviction(pooledFactory); } diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java index 9c85cdeca5..a048944a4c 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.pool; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Hashtable; import java.util.Vector; @@ -39,7 +42,9 @@ import javax.transaction.Synchronization; import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; @@ -55,6 +60,7 @@ public class XAConnectionPoolTest extends TestSupport { XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false")); + final Xid xid = createXid(); // simple TM that is in a tx and will track syncs pcf.setTransactionManager(new TransactionManager() { @Override @@ -85,7 +91,12 @@ public class XAConnectionPoolTest extends TestSupport { @Override public boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException { - return false; + try { + xaRes.start(xid, 0); + } catch (XAException e) { + throw new SystemException(e.getMessage()); + } + return true; } @Override @@ -148,6 +159,31 @@ public class XAConnectionPoolTest extends TestSupport { connection.close(); } + static long txGenerator = 22; + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + } + public void testAckModeOfPoolNonXAWithTM() throws Exception { final Vector syncs = new Vector(); ActiveMQTopic topic = new ActiveMQTopic("test"); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java index e50e35ff2b..af46014c50 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java @@ -17,6 +17,8 @@ package org.apache.activemq; import javax.jms.ConnectionFactory; +import javax.jms.Session; + import junit.framework.Test; /* @@ -31,7 +33,9 @@ public class JMSXAConsumerTest extends JMSConsumerTest { @Override protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQXAConnectionFactory("vm://localhost"); + ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory("vm://localhost?jms.xaAckMode=1"); + activeMQXAConnectionFactory.setXaAckMode(Session.AUTO_ACKNOWLEDGE); + return activeMQXAConnectionFactory; } // some tests use transactions, these will not work unless an XA transaction is in place diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/XAConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/XAConsumerTest.java new file mode 100644 index 0000000000..98ca091787 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/XAConsumerTest.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq; + +import junit.framework.TestCase; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.XATransactionId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.XAConnection; +import javax.jms.XASession; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class XAConsumerTest extends TestCase { + + static final Logger LOG = LoggerFactory.getLogger(XAConsumerTest.class); + private static final String TEST_AMQ_BROKER_URI = "tcp://localhost:0"; + private String brokerUri; + private static long txGenerator = 21; + + private BrokerService broker; + + + protected void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + brokerUri = broker.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + } + + protected void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + + public void testPullRequestXAConsumer() throws Exception { + + ActiveMQXAConnectionFactory activeMQConnectionFactory = + new ActiveMQXAConnectionFactory("admin", "admin", brokerUri + "?trace=true&jms.prefetchPolicy.all=0"); + XAConnection connection = activeMQConnectionFactory.createXAConnection(); + connection.start(); + + ActiveMQXAConnectionFactory activeMQConnectionFactoryAutoAck = + new ActiveMQXAConnectionFactory("admin", "admin", brokerUri + "?trace=true&jms.prefetchPolicy.all=0"); + // allow non xa use of connections + activeMQConnectionFactoryAutoAck.setXaAckMode(Session.AUTO_ACKNOWLEDGE); + Connection autoAckConnection = activeMQConnectionFactoryAutoAck.createConnection(); + autoAckConnection.start(); + + try { + + LOG.info(">>>INVOKE XA receive with PullRequest Consumer..."); + + XASession xaSession = connection.createXASession(); + XAResource xaResource = xaSession.getXAResource(); + Xid xid = createXid(); + xaResource.start(xid, 0); + + Destination destination = xaSession.createQueue("TEST.T2"); + + final MessageConsumer messageConsumer = xaSession.createConsumer(destination); + final CountDownLatch receiveThreadDone = new CountDownLatch(1); + + final CountDownLatch receiveLatch = new CountDownLatch(1); + // do a message receive + + Thread receiveThread = new Thread(new Runnable() { + public void run() { + try { + messageConsumer.receive(600000); + } catch (JMSException expected) { + receiveLatch.countDown(); + LOG.info("got expected ex: ", expected); + } finally { + receiveThreadDone.countDown(); + } + } + }); + + receiveThread.start(); + + LOG.info(">>>simulate Transaction Rollback"); + xaResource.end(xid, XAResource.TMFAIL); + xaResource.rollback(xid); + + // send a message after transaction is rolled back. + LOG.info(">>>Sending message..."); + + Session session = autoAckConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Message messageToSend = session.createMessage(); + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.send(messageToSend); + + receiveThreadDone.await(30, TimeUnit.SECONDS); + receiveLatch.await(5, TimeUnit.SECONDS); + + + // consume with non transacted consumer to verify not autoacked + messageConsumer.close(); + xaSession.close(); + + MessageConsumer messageConsumer1 = session.createConsumer(destination); + javax.jms.Message message = messageConsumer1.receive(5000); + + assertNotNull("Got message", message); + LOG.info("Got message on new session", message); + message.acknowledge(); + + } finally { + LOG.info(">>>Closing Connection"); + if (connection != null) { + connection.close(); + } + if (autoAckConnection != null) { + autoAckConnection.close(); + } + } + + } + + + public void testPullRequestXAConsumerSingleConsumer() throws Exception { + + ActiveMQXAConnectionFactory activeMQConnectionFactory = + new ActiveMQXAConnectionFactory("admin", "admin", brokerUri + "?trace=true&jms.prefetchPolicy.all=0"); + XAConnection connection = activeMQConnectionFactory.createXAConnection(); + connection.start(); + + try { + + LOG.info(">>>INVOKE XA receive with PullRequest Consumer..."); + + XASession xaSession = connection.createXASession(); + XAResource xaResource = xaSession.getXAResource(); + Xid xid = createXid(); + xaResource.start(xid, 0); + + Destination destination = xaSession.createQueue("TEST.T2"); + + final MessageConsumer messageConsumer = xaSession.createConsumer(destination); + final CountDownLatch receiveThreadDone = new CountDownLatch(1); + + final CountDownLatch receiveLatch = new CountDownLatch(1); + // do a message receive + + Thread receiveThread = new Thread(new Runnable() { + public void run() { + try { + messageConsumer.receive(600000); + } catch (JMSException expected) { + receiveLatch.countDown(); + LOG.info("got expected ex: ", expected); + } finally { + receiveThreadDone.countDown(); + } + } + }); + + receiveThread.start(); + + LOG.info(">>>simulate Transaction Rollback"); + xaResource.end(xid, XAResource.TMFAIL); + xaResource.rollback(xid); + + { + XASession xaSessionSend = connection.createXASession(); + XAResource xaResourceSend = xaSessionSend.getXAResource(); + Xid xidSend = createXid(); + xaResourceSend.start(xidSend, 0); + + // send a message after transaction is rolled back. + LOG.info(">>>Sending message..."); + + ActiveMQMessage messageToSend = (ActiveMQMessage) xaSessionSend.createMessage(); + messageToSend.setTransactionId(new XATransactionId(xidSend)); + MessageProducer messageProducer = xaSessionSend.createProducer(destination); + messageProducer.send(messageToSend); + + xaResourceSend.end(xidSend, XAResource.TMSUCCESS); + xaResourceSend.commit(xidSend, true); + } + + receiveThreadDone.await(30, TimeUnit.SECONDS); + receiveLatch.await(5, TimeUnit.SECONDS); + + // after jms exception we need to close + messageConsumer.close(); + + MessageConsumer messageConsumerTwo = xaSession.createConsumer(destination); + Xid xidReceiveOk = createXid(); + xaResource.start(xidReceiveOk, 0); + + javax.jms.Message message = messageConsumerTwo.receive(10000); + + assertNotNull("Got message", message); + LOG.info("Got message on new session", message); + + xaResource.end(xidReceiveOk, XAResource.TMSUCCESS); + xaResource.commit(xidReceiveOk, true); + + } finally { + LOG.info(">>>Closing Connection"); + if (connection != null) { + connection.close(); + } + } + + } + + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + + public int getFormatId() { + return 86; + } + + + public byte[] getGlobalTransactionId() { + return bs; + } + + + public byte[] getBranchQualifier() { + return bs; + } + }; + } + + + private BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + + PolicyMap policyMap = new PolicyMap(); + List entries = new ArrayList(); + PolicyEntry pe = new PolicyEntry(); + + + pe.setProducerFlowControl(true); + pe.setUseCache(true); + + pe.setPrioritizedMessages(false); + pe.setExpireMessagesPeriod(0); + pe.setQueuePrefetch(0); + + pe.setQueue(">"); + entries.add(pe); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + + + broker.addConnector(TEST_AMQ_BROKER_URI); + broker.deleteAllMessages(); + return broker; + } +} \ No newline at end of file diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java index 1529515b76..9fad049ad8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java @@ -363,10 +363,10 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest { getAutoCommitErrors.add(10); - factory = new ActiveMQXAConnectionFactory(connectionUri); + ActiveMQConnectionFactory nonTxFactory = new ActiveMQConnectionFactory(connectionUri); for (int i = 0; i < 10; i++) { - XAConnection connection = factory.createXAConnection(); + javax.jms.Connection connection = nonTxFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java index 6aef5335c2..a0c49cba24 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java @@ -919,7 +919,9 @@ public class XACompletionTest extends TestSupport { } private Message regularBrowseFirst() throws Exception { - javax.jms.Connection connection = factory.createConnection(); + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(connectionUri); + activeMQConnectionFactory.setWatchTopicAdvisories(false); + javax.jms.Connection connection = activeMQConnectionFactory.createConnection(); try { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -936,7 +938,9 @@ public class XACompletionTest extends TestSupport { } protected void sendMessages(int messagesExpected) throws Exception { - sendMessagesWith(factory, messagesExpected); + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(connectionUri); + activeMQConnectionFactory.setWatchTopicAdvisories(false); + sendMessagesWith(activeMQConnectionFactory, messagesExpected); } protected void sendMessagesWith(ConnectionFactory factory, int messagesExpected) throws Exception {