mirror of https://github.com/apache/activemq.git
AMQ-7067 - tidy up tests and add prepare variant, limit rollback location recording to xa case. There is still some work to do for the ack compaction case to make it aware of the tx records such that those are transferred as necessary
(cherry picked from commit 57c7939534
)
This commit is contained in:
parent
cbe486fb9d
commit
7fa85185aa
|
@ -1437,7 +1437,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
updates = preparedTransactions.remove(key);
|
||||
}
|
||||
}
|
||||
if (updates != null) {
|
||||
if (key.isXATransaction() && updates != null) {
|
||||
for(Operation op : updates) {
|
||||
recordAckMessageReferenceLocation(location, op.getLocation());
|
||||
}
|
||||
|
|
|
@ -14,12 +14,14 @@ import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
|
|||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.util.JMXSupport;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.*;
|
||||
import javax.management.InstanceNotFoundException;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.transaction.xa.XAException;
|
||||
|
@ -32,11 +34,14 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URI;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static javax.transaction.xa.XAResource.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class AMQ7067Test {
|
||||
|
||||
|
@ -82,11 +87,55 @@ public class AMQ7067Test {
|
|||
broker.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAPrepare() throws Exception {
|
||||
|
||||
setupXAConnection();
|
||||
|
||||
Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
|
||||
|
||||
MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb);
|
||||
|
||||
XATransactionId txid = createXATransaction();
|
||||
System.out.println("****** create new txid = " + txid);
|
||||
xaRes.start(txid, TMNOFLAGS);
|
||||
|
||||
TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", 10));
|
||||
holdKahaDbProducer.send(helloMessage);
|
||||
xaRes.end(txid, TMSUCCESS);
|
||||
|
||||
Queue queue = xaSession.createQueue("test");
|
||||
|
||||
produce(xaRes, xaSession, queue, 100, 512 * 1024);
|
||||
|
||||
xaRes.prepare(txid);
|
||||
|
||||
produce(xaRes, xaSession, queue, 100, 512 * 1024);
|
||||
|
||||
((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
|
||||
|
||||
Xid[] xids = xaRes.recover(TMSTARTRSCAN);
|
||||
|
||||
//Should be 1 since we have only 1 prepared
|
||||
assertEquals(1, xids.length);
|
||||
connection.close();
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
createBroker();
|
||||
|
||||
setupXAConnection();
|
||||
xids = xaRes.recover(TMSTARTRSCAN);
|
||||
|
||||
System.out.println("****** recovered = " + xids);
|
||||
|
||||
// THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
|
||||
assertEquals(1, xids.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMQ7067XAcommit() throws Exception {
|
||||
public void testXAcommit() throws Exception {
|
||||
|
||||
PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
|
||||
setupXAConnection();
|
||||
|
||||
Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
|
||||
|
@ -124,15 +173,14 @@ public class AMQ7067Test {
|
|||
setupXAConnection();
|
||||
xids = xaRes.recover(TMSTARTRSCAN);
|
||||
|
||||
// THIS SHOULD NOT FAIL AS THERE SHOUL DBE ONLY 1 TRANSACTION!
|
||||
// THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
|
||||
assertEquals(1, xids.length);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMQ7067XArollback() throws Exception {
|
||||
public void testXArollback() throws Exception {
|
||||
|
||||
PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
|
||||
setupXAConnection();
|
||||
|
||||
Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
|
||||
|
@ -176,11 +224,11 @@ public class AMQ7067Test {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testAMQ7067commit() throws Exception {
|
||||
public void testCommit() throws Exception {
|
||||
final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue holdKahaDb = session.createQueue("holdKahaDb");
|
||||
MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
|
||||
TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
|
||||
|
@ -192,14 +240,28 @@ public class AMQ7067Test {
|
|||
|
||||
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
|
||||
purgeQueue(queue.getQueueName());
|
||||
Thread.sleep(10000);
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 0 == getQueueSize(queue.getQueueName());
|
||||
}
|
||||
});
|
||||
|
||||
// force gc
|
||||
broker.getPersistenceAdapter().checkpoint(true);
|
||||
|
||||
|
||||
connection.close();
|
||||
curruptIndexFile(getDataDirectory());
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
createBroker();
|
||||
broker.waitUntilStarted();
|
||||
|
||||
while(true) {
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
|
||||
break;
|
||||
} catch (Exception ex) {
|
||||
|
@ -208,18 +270,16 @@ public class AMQ7067Test {
|
|||
}
|
||||
}
|
||||
|
||||
connection.close();
|
||||
|
||||
// THIS SHOULD NOT FAIL AS THERE SHOULD BE ONLY 1 TRANSACTION!
|
||||
assertEquals(1, getQueueSize(holdKahaDb.getQueueName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMQ7067rollback() throws Exception {
|
||||
public void testRollback() throws Exception {
|
||||
final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue holdKahaDb = session.createQueue("holdKahaDb");
|
||||
MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
|
||||
TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
|
||||
|
@ -231,26 +291,34 @@ public class AMQ7067Test {
|
|||
|
||||
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
|
||||
purgeQueue(queue.getQueueName());
|
||||
Thread.sleep(10000);
|
||||
|
||||
curruptIndexFile(getDataDirectory());
|
||||
|
||||
|
||||
while(true) {
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
|
||||
break;
|
||||
} catch (Exception ex) {
|
||||
System.out.println(ex.getMessage());
|
||||
break;
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 0 == getQueueSize(queue.getQueueName());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// force gc
|
||||
broker.getPersistenceAdapter().checkpoint(true);
|
||||
|
||||
connection.close();
|
||||
curruptIndexFile(getDataDirectory());
|
||||
|
||||
// THIS SHOULD NOT FAIL AS THERE SHOULD ZERO TRANSACTION!
|
||||
assertEquals(0, getQueueSize(holdKahaDb.getQueueName()));
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
createBroker();
|
||||
broker.waitUntilStarted();
|
||||
|
||||
|
||||
// no sign of the test queue on recovery, rollback is the default for any inflight
|
||||
// this test serves as a sanity check on existing behaviour
|
||||
try {
|
||||
getQueueSize(holdKahaDb.getQueueName());
|
||||
fail("expect InstanceNotFoundException");
|
||||
} catch (UndeclaredThrowableException expected) {
|
||||
assertTrue(expected.getCause() instanceof InstanceNotFoundException);
|
||||
}
|
||||
}
|
||||
|
||||
protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException {
|
||||
|
@ -281,7 +349,7 @@ public class AMQ7067Test {
|
|||
}
|
||||
|
||||
protected static void produce(Connection connection, Queue queue, int messageCount, int messageSize) throws JMSException, IOException, XAException {
|
||||
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
|
|
Loading…
Reference in New Issue