AMQ-7067 - tidy up tests and add prepare variant, limit rollback location recording to xa case. There is still some work to do for the ack compaction case to make it aware of the tx records such that those are transferred as necessary

This commit is contained in:
gtully 2018-10-09 12:01:47 +01:00
parent 99db9ef301
commit 57c7939534
2 changed files with 99 additions and 31 deletions

View File

@ -1437,7 +1437,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
updates = preparedTransactions.remove(key);
}
}
if (updates != null) {
if (key.isXATransaction() && updates != null) {
for(Operation op : updates) {
recordAckMessageReferenceLocation(location, op.getLocation());
}

View File

@ -14,12 +14,14 @@ import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.apache.commons.lang.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.*;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.transaction.xa.XAException;
@ -32,11 +34,14 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static javax.transaction.xa.XAResource.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class AMQ7067Test {
@ -82,11 +87,55 @@ public class AMQ7067Test {
broker.start();
}
@Test
public void testXAPrepare() throws Exception {
setupXAConnection();
Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb);
XATransactionId txid = createXATransaction();
System.out.println("****** create new txid = " + txid);
xaRes.start(txid, TMNOFLAGS);
TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", 10));
holdKahaDbProducer.send(helloMessage);
xaRes.end(txid, TMSUCCESS);
Queue queue = xaSession.createQueue("test");
produce(xaRes, xaSession, queue, 100, 512 * 1024);
xaRes.prepare(txid);
produce(xaRes, xaSession, queue, 100, 512 * 1024);
((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
Xid[] xids = xaRes.recover(TMSTARTRSCAN);
//Should be 1 since we have only 1 prepared
assertEquals(1, xids.length);
connection.close();
broker.stop();
broker.waitUntilStopped();
createBroker();
setupXAConnection();
xids = xaRes.recover(TMSTARTRSCAN);
System.out.println("****** recovered = " + xids);
// THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
assertEquals(1, xids.length);
}
@Test
public void testAMQ7067XAcommit() throws Exception {
public void testXAcommit() throws Exception {
PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
setupXAConnection();
Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
@ -124,15 +173,14 @@ public class AMQ7067Test {
setupXAConnection();
xids = xaRes.recover(TMSTARTRSCAN);
// THIS SHOULD NOT FAIL AS THERE SHOUL DBE ONLY 1 TRANSACTION!
// THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
assertEquals(1, xids.length);
}
@Test
public void testAMQ7067XArollback() throws Exception {
public void testXArollback() throws Exception {
PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
setupXAConnection();
Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
@ -176,11 +224,11 @@ public class AMQ7067Test {
}
@Test
public void testAMQ7067commit() throws Exception {
public void testCommit() throws Exception {
final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue holdKahaDb = session.createQueue("holdKahaDb");
MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
@ -192,14 +240,28 @@ public class AMQ7067Test {
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
purgeQueue(queue.getQueueName());
Thread.sleep(10000);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == getQueueSize(queue.getQueueName());
}
});
// force gc
broker.getPersistenceAdapter().checkpoint(true);
connection.close();
curruptIndexFile(getDataDirectory());
broker.stop();
broker.waitUntilStopped();
createBroker();
broker.waitUntilStarted();
while(true) {
try {
Thread.sleep(10000);
TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
break;
} catch (Exception ex) {
@ -208,18 +270,16 @@ public class AMQ7067Test {
}
}
connection.close();
// THIS SHOULD NOT FAIL AS THERE SHOULD BE ONLY 1 TRANSACTION!
assertEquals(1, getQueueSize(holdKahaDb.getQueueName()));
}
@Test
public void testAMQ7067rollback() throws Exception {
public void testRollback() throws Exception {
final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue holdKahaDb = session.createQueue("holdKahaDb");
MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
@ -231,26 +291,34 @@ public class AMQ7067Test {
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
purgeQueue(queue.getQueueName());
Thread.sleep(10000);
curruptIndexFile(getDataDirectory());
while(true) {
try {
Thread.sleep(10000);
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
break;
} catch (Exception ex) {
System.out.println(ex.getMessage());
break;
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == getQueueSize(queue.getQueueName());
}
}
});
// force gc
broker.getPersistenceAdapter().checkpoint(true);
connection.close();
curruptIndexFile(getDataDirectory());
// THIS SHOULD NOT FAIL AS THERE SHOULD ZERO TRANSACTION!
assertEquals(0, getQueueSize(holdKahaDb.getQueueName()));
broker.stop();
broker.waitUntilStopped();
createBroker();
broker.waitUntilStarted();
// no sign of the test queue on recovery, rollback is the default for any inflight
// this test serves as a sanity check on existing behaviour
try {
getQueueSize(holdKahaDb.getQueueName());
fail("expect InstanceNotFoundException");
} catch (UndeclaredThrowableException expected) {
assertTrue(expected.getCause() instanceof InstanceNotFoundException);
}
}
protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException {
@ -281,7 +349,7 @@ public class AMQ7067Test {
}
protected static void produce(Connection connection, Queue queue, int messageCount, int messageSize) throws JMSException, IOException, XAException {
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < messageCount; i++) {