AMQ-7052 - Fix JdbcXARecoveryBrokerTest and mLevelDBXARecoveryBrokerTest tests

Signed-off-by: gtully <gary.tully@gmail.com>
(cherry picked from commit b92aaa2f58)
This commit is contained in:
Alan Protasio 2018-09-14 01:39:17 -07:00 committed by gtully
parent 241b1562ab
commit 840c0c46d2
4 changed files with 183 additions and 142 deletions

View File

@ -770,6 +770,7 @@
<exclude>org/apache/activemq/broker/jmx/MBeanTest.*</exclude> <exclude>org/apache/activemq/broker/jmx/MBeanTest.*</exclude>
<exclude>org/apache/activemq/broker/jmx/PurgeTest.*</exclude> <exclude>org/apache/activemq/broker/jmx/PurgeTest.*</exclude>
<exclude>org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.*</exclude> <exclude>org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.*</exclude>
<exclude>org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.*</exclude>
<exclude>org/apache/activemq/broker/policy/AbortSlowConsumerTest.*</exclude> <exclude>org/apache/activemq/broker/policy/AbortSlowConsumerTest.*</exclude>
<exclude>org/apache/activemq/broker/region/DestinationGCTest.*</exclude> <exclude>org/apache/activemq/broker/region/DestinationGCTest.*</exclude>
<exclude>org/apache/activemq/broker/region/DestinationRemoveRestartTest.*</exclude> <exclude>org/apache/activemq/broker/region/DestinationRemoveRestartTest.*</exclude>

View File

@ -0,0 +1,172 @@
package org.apache.activemq.broker;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
public class KahaDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
@Override
protected void configureBroker(BrokerService broker) throws Exception {
super.configureBroker(broker);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
broker.setPersistenceAdapter(persistenceAdapter);
}
public static Test suite() {
return suite(KahaDBXARecoveryBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
protected ActiveMQDestination createDestination() {
return new ActiveMQQueue("test");
}
public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since committed ... they should get delivered.
for (int i = 0; i < 4; i++) {
assertNotNull(receiveMessage(connection));
}
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
//These should be purged so expect 0
assertEquals(0, dar.getData().length);
}
public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since rolledback but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
//These should be purged so expect 0
assertEquals(0, dar.getData().length);
}
}

View File

@ -267,140 +267,6 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEmptyDLQ(); assertEmptyDLQ();
} }
public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since rolledback but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
//These should be purged so expect 0
assertEquals(0, dar.getData().length);
}
public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since committed ... they should get delivered.
for (int i = 0; i < 4; i++) {
assertNotNull(receiveMessage(connection));
}
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
//These should be purged so expect 0
assertEquals(0, dar.getData().length);
}
private void assertEmptyDLQ() throws Exception { private void assertEmptyDLQ() throws Exception {
try { try {
DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));

View File

@ -63,10 +63,10 @@ public class AMQ6463Test extends JmsTestSupport {
TextMessage message = session.createTextMessage("test msg"); TextMessage message = session.createTextMessage("test msg");
final int numMessages = 20; final int numMessages = 20;
long time = 5;
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 0);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, numMessages - 1); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, numMessages - 1);
producer.send(message); producer.send(message);
@ -78,14 +78,14 @@ public class AMQ6463Test extends JmsTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return gotUsageBlocked.get(); return gotUsageBlocked.get();
} }
})); }, 60000));
MessageConsumer consumer = session.createConsumer(queueA); MessageConsumer consumer = session.createConsumer(queueA);
TextMessage msg; TextMessage msg;
for (int idx = 0; idx < numMessages; ++idx) { for (int idx = 0; idx < numMessages; ++idx) {
msg = (TextMessage) consumer.receive(10000); msg = (TextMessage) consumer.receive(10000);
assertNotNull("received: " + idx, msg); assertNotNull("received: " + idx, msg);
msg.acknowledge(); msg.acknowledge();
} }
assertTrue("no errors in the log", errors.get() == 0); assertTrue("no errors in the log", errors.get() == 0);
assertTrue("got blocked message", gotUsageBlocked.get()); assertTrue("got blocked message", gotUsageBlocked.get());
@ -99,6 +99,8 @@ public class AMQ6463Test extends JmsTestSupport {
service.setSchedulerSupport(true); service.setSchedulerSupport(true);
service.setDeleteAllMessagesOnStartup(true); service.setDeleteAllMessagesOnStartup(true);
service.getSystemUsage().getMemoryUsage().setLimit(512);
// Setup a destination policy where it takes only 1 message at a time. // Setup a destination policy where it takes only 1 message at a time.
PolicyMap policyMap = new PolicyMap(); PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
@ -131,7 +133,7 @@ public class AMQ6463Test extends JmsTestSupport {
super.setUp(); super.setUp();
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger(); org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
rootLogger.removeAppender(appender); rootLogger.removeAppender(appender);