This commit is contained in:
gtully 2015-09-30 11:40:30 +01:00
parent 8514e38135
commit fc25535748
2 changed files with 22 additions and 16 deletions

View File

@ -313,7 +313,7 @@ public class TransactionContext implements XAResource {
throw e;
}
if (rollbackOnly) {
if (transactionId != null && rollbackOnly) {
final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks";
try {
rollback();

View File

@ -1120,6 +1120,8 @@ public class FailoverTransactionTest extends TestSupport {
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=0");
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final Session secondConsumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
@ -1130,22 +1132,25 @@ public class FailoverTransactionTest extends TestSupport {
assertNotNull("got message just produced", msg);
// add another consumer into the mix that may get the message after restart
MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
MessageConsumer consumer2 = secondConsumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
broker.stop();
broker = createBroker(false, url);
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
final CountDownLatch gotRollback = new CountDownLatch(1);
final Vector<Exception> exceptions = new Vector<Exception>();
// commit may fail if other consumer gets the message on restart
// commit will fail due to failover with outstanding ack
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
} catch (TransactionRolledBackException ex) {
gotRollback.countDown();
} catch (JMSException ex) {
exceptions.add(ex);
} finally {
@ -1156,23 +1161,24 @@ public class FailoverTransactionTest extends TestSupport {
assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
assertTrue("got Rollback", gotRollback.await(15, TimeUnit.SECONDS));
// either message redelivered in existing tx or consumed by consumer2
// should not be available again in any event
assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
assertTrue("no other exceptions", exceptions.isEmpty());
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
if (exceptions.isEmpty()) {
LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
} else {
LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
assertNotNull("consumer2 got message", consumer2.receive(2000));
// consume message from one of the consumers
Message message = consumer2.receive(2000);
if (message == null) {
message = consumer.receive(2000);
}
consumerSession.commit();
secondConsumerSession.commit();
assertNotNull("got message after rollback", message);
// no message should be in dlq
MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
assertNull("nothing in the dlq", dlqConsumer.receive(5000));
}
assertNull("nothing in the dlq", dlqConsumer.receive(2000));
connection.close();
}