fix intermittent failure, consume messages from all destinations, sorts out the composite case so that there are no dangling messages

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1244443 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-02-15 11:21:43 +00:00
parent 1638801522
commit 98b7dcdd1d
2 changed files with 25 additions and 18 deletions

View File

@ -519,21 +519,25 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
connection.send(message);
}
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message m = null;
for (int i = 0; i < 4; i++) {
m = receiveMessage(connection);
assertNotNull(m);
Message message = null;
for (ActiveMQDestination dest : destinationList(destination)) {
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
// Don't commit
// restart the broker.
@ -545,13 +549,16 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// All messages should be re-delivered.
for (int i = 0; i < 4; i++) {
m = receiveMessage(connection);
assertNotNull(m);
for (ActiveMQDestination dest : destinationList(destination)) {
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
}
assertNoMessagesLeft(connection);

View File

@ -22,7 +22,7 @@ log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
log4j.logger.org.apache.activemq.transport.failover=TRACE
#log4j.logger.org.apache.activemq.transport.failover=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG