diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 1c974cc0e7..46696f45ea 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1437,7 +1437,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe updates = preparedTransactions.remove(key); } } - if (updates != null) { + if (key.isXATransaction() && updates != null) { for(Operation op : updates) { recordAckMessageReferenceLocation(location, op.getLocation()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java index c1f34d08a2..d00ee41c40 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java @@ -14,12 +14,14 @@ import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.Wait; import org.apache.commons.lang.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import javax.jms.*; +import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.transaction.xa.XAException; @@ -32,11 +34,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; -import java.net.URI; +import java.lang.reflect.UndeclaredThrowableException; import java.util.Random; +import java.util.concurrent.TimeUnit; import static javax.transaction.xa.XAResource.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class AMQ7067Test { @@ -82,11 +87,55 @@ public class AMQ7067Test { broker.start(); } + @Test + public void testXAPrepare() throws Exception { + + setupXAConnection(); + + Queue holdKahaDb = xaSession.createQueue("holdKahaDb"); + + MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb); + + XATransactionId txid = createXATransaction(); + System.out.println("****** create new txid = " + txid); + xaRes.start(txid, TMNOFLAGS); + + TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", 10)); + holdKahaDbProducer.send(helloMessage); + xaRes.end(txid, TMSUCCESS); + + Queue queue = xaSession.createQueue("test"); + + produce(xaRes, xaSession, queue, 100, 512 * 1024); + + xaRes.prepare(txid); + + produce(xaRes, xaSession, queue, 100, 512 * 1024); + + ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge(); + + Xid[] xids = xaRes.recover(TMSTARTRSCAN); + + //Should be 1 since we have only 1 prepared + assertEquals(1, xids.length); + connection.close(); + + broker.stop(); + broker.waitUntilStopped(); + createBroker(); + + setupXAConnection(); + xids = xaRes.recover(TMSTARTRSCAN); + + System.out.println("****** recovered = " + xids); + + // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION! + assertEquals(1, xids.length); + } @Test - public void testAMQ7067XAcommit() throws Exception { + public void testXAcommit() throws Exception { - PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString()); setupXAConnection(); Queue holdKahaDb = xaSession.createQueue("holdKahaDb"); @@ -124,15 +173,14 @@ public class AMQ7067Test { setupXAConnection(); xids = xaRes.recover(TMSTARTRSCAN); - // THIS SHOULD NOT FAIL AS THERE SHOUL DBE ONLY 1 TRANSACTION! + // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION! assertEquals(1, xids.length); } @Test - public void testAMQ7067XArollback() throws Exception { + public void testXArollback() throws Exception { - PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString()); setupXAConnection(); Queue holdKahaDb = xaSession.createQueue("holdKahaDb"); @@ -176,11 +224,11 @@ public class AMQ7067Test { } @Test - public void testAMQ7067commit() throws Exception { + public void testCommit() throws Exception { final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection(); connection.start(); - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue holdKahaDb = session.createQueue("holdKahaDb"); MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb); TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10)); @@ -192,14 +240,28 @@ public class AMQ7067Test { System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); purgeQueue(queue.getQueueName()); - Thread.sleep(10000); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == getQueueSize(queue.getQueueName()); + } + }); + // force gc + broker.getPersistenceAdapter().checkpoint(true); + + + connection.close(); curruptIndexFile(getDataDirectory()); + broker.stop(); + broker.waitUntilStopped(); + createBroker(); + broker.waitUntilStarted(); while(true) { try { - Thread.sleep(10000); + TimeUnit.SECONDS.sleep(1); System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); break; } catch (Exception ex) { @@ -208,18 +270,16 @@ public class AMQ7067Test { } } - connection.close(); - // THIS SHOULD NOT FAIL AS THERE SHOULD BE ONLY 1 TRANSACTION! assertEquals(1, getQueueSize(holdKahaDb.getQueueName())); } @Test - public void testAMQ7067rollback() throws Exception { + public void testRollback() throws Exception { final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection(); connection.start(); - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue holdKahaDb = session.createQueue("holdKahaDb"); MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb); TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10)); @@ -231,26 +291,34 @@ public class AMQ7067Test { System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); purgeQueue(queue.getQueueName()); - Thread.sleep(10000); - curruptIndexFile(getDataDirectory()); - - - while(true) { - try { - Thread.sleep(10000); - System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName()))); - break; - } catch (Exception ex) { - System.out.println(ex.getMessage()); - break; + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == getQueueSize(queue.getQueueName()); } - } + }); + + // force gc + broker.getPersistenceAdapter().checkpoint(true); connection.close(); + curruptIndexFile(getDataDirectory()); - // THIS SHOULD NOT FAIL AS THERE SHOULD ZERO TRANSACTION! - assertEquals(0, getQueueSize(holdKahaDb.getQueueName())); + broker.stop(); + broker.waitUntilStopped(); + createBroker(); + broker.waitUntilStarted(); + + + // no sign of the test queue on recovery, rollback is the default for any inflight + // this test serves as a sanity check on existing behaviour + try { + getQueueSize(holdKahaDb.getQueueName()); + fail("expect InstanceNotFoundException"); + } catch (UndeclaredThrowableException expected) { + assertTrue(expected.getCause() instanceof InstanceNotFoundException); + } } protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException { @@ -281,7 +349,7 @@ public class AMQ7067Test { } protected static void produce(Connection connection, Queue queue, int messageCount, int messageSize) throws JMSException, IOException, XAException { - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = session.createProducer(queue); for (int i = 0; i < messageCount; i++) {