From 7fa85185aae02414ff023727efc46680f5ead66a Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 9 Oct 2018 12:01:47 +0100 Subject: [PATCH] AMQ-7067 - tidy up tests and add prepare variant, limit rollback location recording to xa case. There is still some work to do for the ack compaction case to make it aware of the tx records such that those are transferred as necessary (cherry picked from commit 57c7939534a927bfc2d1b0454aac7ef8d804532b) --- .../store/kahadb/MessageDatabase.java | 2 +- .../org/apache/activemq/bugs/AMQ7067Test.java | 128 ++++++++++++++---- 2 files changed, 99 insertions(+), 31 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 82c486595c..86dfcac65a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1437,7 +1437,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe updates = preparedTransactions.remove(key); } } - if (updates != null) { + if (key.isXATransaction() && updates != null) { for(Operation op : updates) { recordAckMessageReferenceLocation(location, op.getLocation()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java index c1f34d08a2..d00ee41c40 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java @@ -14,12 +14,14 @@ import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.Wait; import org.apache.commons.lang.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import javax.jms.*; +import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.transaction.xa.XAException; @@ -32,11 +34,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; -import java.net.URI; +import java.lang.reflect.UndeclaredThrowableException; import java.util.Random; +import java.util.concurrent.TimeUnit; import static javax.transaction.xa.XAResource.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class AMQ7067Test { @@ -82,11 +87,55 @@ public class AMQ7067Test { broker.start(); } + @Test + public void testXAPrepare() throws Exception { + + setupXAConnection(); + + Queue holdKahaDb = xaSession.createQueue("holdKahaDb"); + + MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb); + + XATransactionId txid = createXATransaction(); + System.out.println("****** create new txid = " + txid); + xaRes.start(txid, TMNOFLAGS); + + TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", 10)); + holdKahaDbProducer.send(helloMessage); + xaRes.end(txid, TMSUCCESS); + + Queue queue = xaSession.createQueue("test"); + + produce(xaRes, xaSession, queue, 100, 512 * 1024); + + xaRes.prepare(txid); + + produce(xaRes, xaSession, queue, 100, 512 * 1024); + + ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge(); + + Xid[] xids = xaRes.recover(TMSTARTRSCAN); + + //Should be 1 since we have only 1 prepared + assertEquals(1, xids.length); + connection.close(); + + broker.stop(); + broker.waitUntilStopped(); + createBroker(); + + setupXAConnection(); + xids = xaRes.recover(TMSTARTRSCAN); + + System.out.println("****** recovered = " + xids); + + // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION! + assertEquals(1, xids.length); + } @Test - public void testAMQ7067XAcommit() throws Exception { + public void testXAcommit() throws Exception { - PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString()); setupXAConnection(); Queue holdKahaDb = xaSession.createQueue("holdKahaDb"); @@ -124,15 +173,14 @@ public class AMQ7067Test { setupXAConnection(); xids = xaRes.recover(TMSTARTRSCAN); - // THIS SHOULD NOT FAIL AS THERE SHOUL DBE ONLY 1 TRANSACTION! + // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION! assertEquals(1, xids.length); } @Test - public void testAMQ7067XArollback() throws Exception { + public void testXArollback() throws Exception { - PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString()); setupXAConnection(); Queue holdKahaDb = xaSession.createQueue("holdKahaDb"); @@ -176,11 +224,11 @@ public class AMQ7067Test { } @Test - public void testAMQ7067commit() throws Exception { + public void testCommit() throws Exception { final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection(); connection.start(); - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue holdKahaDb = session.createQueue("holdKahaDb"); MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb); TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10)); @@ -192,14 +240,28 @@ public class AMQ7067Test { System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); purgeQueue(queue.getQueueName()); - Thread.sleep(10000); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == getQueueSize(queue.getQueueName()); + } + }); + // force gc + broker.getPersistenceAdapter().checkpoint(true); + + + connection.close(); curruptIndexFile(getDataDirectory()); + broker.stop(); + broker.waitUntilStopped(); + createBroker(); + broker.waitUntilStarted(); while(true) { try { - Thread.sleep(10000); + TimeUnit.SECONDS.sleep(1); System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); break; } catch (Exception ex) { @@ -208,18 +270,16 @@ public class AMQ7067Test { } } - connection.close(); - // THIS SHOULD NOT FAIL AS THERE SHOULD BE ONLY 1 TRANSACTION! assertEquals(1, getQueueSize(holdKahaDb.getQueueName())); } @Test - public void testAMQ7067rollback() throws Exception { + public void testRollback() throws Exception { final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection(); connection.start(); - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue holdKahaDb = session.createQueue("holdKahaDb"); MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb); TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10)); @@ -231,26 +291,34 @@ public class AMQ7067Test { System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); purgeQueue(queue.getQueueName()); - Thread.sleep(10000); - curruptIndexFile(getDataDirectory()); - - - while(true) { - try { - Thread.sleep(10000); - System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); - break; - } catch (Exception ex) { - System.out.println(ex.getMessage()); - break; + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == getQueueSize(queue.getQueueName()); } - } + }); + + // force gc + broker.getPersistenceAdapter().checkpoint(true); connection.close(); + curruptIndexFile(getDataDirectory()); - // THIS SHOULD NOT FAIL AS THERE SHOULD ZERO TRANSACTION! - assertEquals(0, getQueueSize(holdKahaDb.getQueueName())); + broker.stop(); + broker.waitUntilStopped(); + createBroker(); + broker.waitUntilStarted(); + + + // no sign of the test queue on recovery, rollback is the default for any inflight + // this test serves as a sanity check on existing behaviour + try { + getQueueSize(holdKahaDb.getQueueName()); + fail("expect InstanceNotFoundException"); + } catch (UndeclaredThrowableException expected) { + assertTrue(expected.getCause() instanceof InstanceNotFoundException); + } } protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException { @@ -281,7 +349,7 @@ public class AMQ7067Test { } protected static void produce(Connection connection, Queue queue, int messageCount, int messageSize) throws JMSException, IOException, XAException { - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = session.createProducer(queue); for (int i = 0; i < messageCount; i++) {