From 98b7dcdd1da63e2c43656a248895b39740774553 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 15 Feb 2012 11:21:43 +0000 Subject: [PATCH] fix intermittent failure, consume messages from all destinations, sorts out the composite case so that there are no dangling messages git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1244443 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/XARecoveryBrokerTest.java | 41 +++++++++++-------- .../src/test/resources/log4j.properties | 2 +- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index fac399a498..435753de74 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -519,21 +519,25 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { connection.send(message); } - // Setup the consumer and receive the message. - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); - connection.send(consumerInfo); - // Begin the transaction. XATransactionId txid = createXATransaction(sessionInfo); connection.send(createBeginTransaction(connectionInfo, txid)); - Message m = null; - for (int i = 0; i < 4; i++) { - m = receiveMessage(connection); - assertNotNull(m); + + Message message = null; + for (ActiveMQDestination dest : destinationList(destination)) { + // Setup the consumer and receive the message. + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest); + connection.send(consumerInfo); + + for (int i = 0; i < 4; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); } - MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE); - ack.setTransactionId(txid); - connection.send(ack); + // Don't commit // restart the broker. @@ -545,13 +549,16 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { sessionInfo = createSessionInfo(connectionInfo); connection.send(connectionInfo); connection.send(sessionInfo); - consumerInfo = createConsumerInfo(sessionInfo, destination); - connection.send(consumerInfo); - // All messages should be re-delivered. - for (int i = 0; i < 4; i++) { - m = receiveMessage(connection); - assertNotNull(m); + for (ActiveMQDestination dest : destinationList(destination)) { + // Setup the consumer and receive the message. + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest); + connection.send(consumerInfo); + + for (int i = 0; i < 4; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } } assertNoMessagesLeft(connection); diff --git a/activemq-core/src/test/resources/log4j.properties b/activemq-core/src/test/resources/log4j.properties index c77a5c0e27..7cc19418fd 100755 --- a/activemq-core/src/test/resources/log4j.properties +++ b/activemq-core/src/test/resources/log4j.properties @@ -22,7 +22,7 @@ log4j.rootLogger=INFO, out, stdout #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG -log4j.logger.org.apache.activemq.transport.failover=TRACE +#log4j.logger.org.apache.activemq.transport.failover=TRACE #log4j.logger.org.apache.activemq.store.jdbc=TRACE #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG #log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG