diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 59239eec4a..2e50839258 100644 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -770,6 +770,7 @@ org/apache/activemq/broker/jmx/MBeanTest.* org/apache/activemq/broker/jmx/PurgeTest.* org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.* + org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.* org/apache/activemq/broker/policy/AbortSlowConsumerTest.* org/apache/activemq/broker/region/DestinationGCTest.* org/apache/activemq/broker/region/DestinationRemoveRestartTest.* diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java new file mode 100644 index 0000000000..2c4588ed53 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/KahaDBXARecoveryBrokerTest.java @@ -0,0 +1,172 @@ +package org.apache.activemq.broker; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DataArrayResponse; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; + +public class KahaDBXARecoveryBrokerTest extends XARecoveryBrokerTest { + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + broker.setPersistenceAdapter(persistenceAdapter); + } + + public static Test suite() { + return suite(KahaDBXARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected ActiveMQDestination createDestination() { + return new ActiveMQQueue("test"); + } + + public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + stopBroker(); + if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); + adapter.setPurgeRecoveredXATransactionStrategy("COMMIT"); + LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter"); + } + broker.start(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Since committed ... they should get delivered. + for (int i = 0; i < 4; i++) { + assertNotNull(receiveMessage(connection)); + } + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse)response; + + //These should be purged so expect 0 + assertEquals(0, dar.getData().length); + + } + + public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + stopBroker(); + if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); + adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK"); + LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter"); + } + broker.start(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Since rolledback but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse)response; + + //These should be purged so expect 0 + assertEquals(0, dar.getData().length); + + } +} \ No newline at end of file diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 034e1f2b83..9660ef0b0b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -267,140 +267,6 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { assertEmptyDLQ(); } - public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception { - - ActiveMQDestination destination = createDestination(); - - // Setup the producer and send the message. - StubConnection connection = createConnection(); - ConnectionInfo connectionInfo = createConnectionInfo(); - SessionInfo sessionInfo = createSessionInfo(connectionInfo); - ProducerInfo producerInfo = createProducerInfo(sessionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - connection.send(producerInfo); - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); - connection.send(consumerInfo); - - // Prepare 4 message sends. - for (int i = 0; i < 4; i++) { - // Begin the transaction. - XATransactionId txid = createXATransaction(sessionInfo); - connection.send(createBeginTransaction(connectionInfo, txid)); - - Message message = createMessage(producerInfo, destination); - message.setPersistent(true); - message.setTransactionId(txid); - connection.send(message); - - // Prepare - connection.send(createPrepareTransaction(connectionInfo, txid)); - } - - // Since prepared but not committed.. they should not get delivered. - assertNull(receiveMessage(connection)); - assertNoMessagesLeft(connection); - connection.request(closeConnectionInfo(connectionInfo)); - - // restart the broker. - stopBroker(); - if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { - KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); - adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK"); - LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter"); - } - broker.start(); - - // Setup the consumer and try receive the message. - connection = createConnection(); - connectionInfo = createConnectionInfo(); - sessionInfo = createSessionInfo(connectionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - consumerInfo = createConsumerInfo(sessionInfo, destination); - connection.send(consumerInfo); - - // Since rolledback but not committed.. they should not get delivered. - assertNull(receiveMessage(connection)); - assertNoMessagesLeft(connection); - - Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); - assertNotNull(response); - DataArrayResponse dar = (DataArrayResponse)response; - - //These should be purged so expect 0 - assertEquals(0, dar.getData().length); - - } - - public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception { - - ActiveMQDestination destination = createDestination(); - - // Setup the producer and send the message. - StubConnection connection = createConnection(); - ConnectionInfo connectionInfo = createConnectionInfo(); - SessionInfo sessionInfo = createSessionInfo(connectionInfo); - ProducerInfo producerInfo = createProducerInfo(sessionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - connection.send(producerInfo); - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); - connection.send(consumerInfo); - - // Prepare 4 message sends. - for (int i = 0; i < 4; i++) { - // Begin the transaction. - XATransactionId txid = createXATransaction(sessionInfo); - connection.send(createBeginTransaction(connectionInfo, txid)); - - Message message = createMessage(producerInfo, destination); - message.setPersistent(true); - message.setTransactionId(txid); - connection.send(message); - - // Prepare - connection.send(createPrepareTransaction(connectionInfo, txid)); - } - - // Since prepared but not committed.. they should not get delivered. - assertNull(receiveMessage(connection)); - assertNoMessagesLeft(connection); - connection.request(closeConnectionInfo(connectionInfo)); - - // restart the broker. - stopBroker(); - if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { - KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); - adapter.setPurgeRecoveredXATransactionStrategy("COMMIT"); - LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter"); - } - broker.start(); - - // Setup the consumer and try receive the message. - connection = createConnection(); - connectionInfo = createConnectionInfo(); - sessionInfo = createSessionInfo(connectionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - consumerInfo = createConsumerInfo(sessionInfo, destination); - connection.send(consumerInfo); - - // Since committed ... they should get delivered. - for (int i = 0; i < 4; i++) { - assertNotNull(receiveMessage(connection)); - } - assertNoMessagesLeft(connection); - - Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); - assertNotNull(response); - DataArrayResponse dar = (DataArrayResponse)response; - - //These should be purged so expect 0 - assertEquals(0, dar.getData().length); - - } - private void assertEmptyDLQ() throws Exception { try { DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java index 96332442bb..e3581a30b4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java @@ -63,10 +63,10 @@ public class AMQ6463Test extends JmsTestSupport { TextMessage message = session.createTextMessage("test msg"); final int numMessages = 20; - long time = 5; + message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *"); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); - message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 0); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, numMessages - 1); producer.send(message); @@ -78,14 +78,14 @@ public class AMQ6463Test extends JmsTestSupport { public boolean isSatisified() throws Exception { return gotUsageBlocked.get(); } - })); + }, 60000)); MessageConsumer consumer = session.createConsumer(queueA); TextMessage msg; for (int idx = 0; idx < numMessages; ++idx) { - msg = (TextMessage) consumer.receive(10000); - assertNotNull("received: " + idx, msg); - msg.acknowledge(); + msg = (TextMessage) consumer.receive(10000); + assertNotNull("received: " + idx, msg); + msg.acknowledge(); } assertTrue("no errors in the log", errors.get() == 0); assertTrue("got blocked message", gotUsageBlocked.get()); @@ -99,6 +99,8 @@ public class AMQ6463Test extends JmsTestSupport { service.setSchedulerSupport(true); service.setDeleteAllMessagesOnStartup(true); + service.getSystemUsage().getMemoryUsage().setLimit(512); + // Setup a destination policy where it takes only 1 message at a time. PolicyMap policyMap = new PolicyMap(); PolicyEntry policy = new PolicyEntry(); @@ -131,7 +133,7 @@ public class AMQ6463Test extends JmsTestSupport { super.setUp(); } - + protected void tearDown() throws Exception { org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger(); rootLogger.removeAppender(appender);