resolve https://issues.apache.org/activemq/browse/AMQ-2590 - commit may throw a TransactionRolledBackException in the event that after a failover recovery, the same messages are not redispatched - the transaction cannot be fully recreated so it must rollback

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@905432 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-02-01 22:41:17 +00:00
parent 722258e196
commit 62daac4b31
5 changed files with 193 additions and 24 deletions

View File

@ -46,7 +46,9 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -58,6 +60,7 @@ import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.TransactionRolledBackException;
/** /**
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@ -109,6 +112,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// not been acknowledged. It's kept in reverse order since we // not been acknowledged. It's kept in reverse order since we
// Always walk list in reverse order. // Always walk list in reverse order.
private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
private HashMap<MessageId, Boolean> previouslyDeliveredMessages;
private int deliveredCounter; private int deliveredCounter;
private int additionalWindowSize; private int additionalWindowSize;
private long redeliveryDelay; private long redeliveryDelay;
@ -146,7 +150,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @param name * @param name
* @param selector * @param selector
* @param prefetch * @param prefetch
* @param maximumPendingMessageCount TODO * @param maximumPendingMessageCount
* @param noLocal * @param noLocal
* @param browser * @param browser
* @param dispatchAsync * @param dispatchAsync
@ -640,7 +644,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
void clearMessagesInProgress() { void clearMessagesInProgress() {
// deal with delivered messages async to avoid lock contention with in pogress acks // deal with delivered messages async to avoid lock contention with in progress acks
clearDispatchList = true; clearDispatchList = true;
synchronized (unconsumedMessages.getMutex()) { synchronized (unconsumedMessages.getMutex()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -951,6 +955,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return; // no msgs return; // no msgs
if (session.getTransacted()) { if (session.getTransacted()) {
rollbackOnFailedRecoveryRedelivery();
session.doStartTransaction(); session.doStartTransaction();
ack.setTransactionId(session.getTransactionContext().getTransactionId()); ack.setTransactionId(session.getTransactionContext().getTransactionId());
} }
@ -967,6 +972,51 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
/*
* called with deliveredMessages locked
*/
private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
if (previouslyDeliveredMessages != null) {
// if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
// as messages have been dispatched else where.
int numberNotReplayed = 0;
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
numberNotReplayed++;
// allow outstanding messages to get delivered again
removeFromDeliveredMessages(entry.getKey());
if (LOG.isDebugEnabled()) {
LOG.debug("previously delivered message has not been replayed in transaction, id: " + entry.getKey());
}
}
}
clearPreviouslyDelivered();
if (numberNotReplayed > 0) {
String message = "rolling back transaction post failover recovery. " + numberNotReplayed
+ " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
LOG.warn(message);
throw new TransactionRolledBackException(message);
}
}
}
/*
* called with deliveredMessages locked
*/
private void removeFromDeliveredMessages(MessageId key) {
ListIterator<MessageDispatch> iterator = deliveredMessages.listIterator(deliveredMessages.size());
while (iterator.hasPrevious()) {
MessageDispatch candidate = iterator.previous();
if (key.equals(candidate.getMessage().getMessageId())) {
session.connection.rollbackDuplicate(this, candidate.getMessage());
iterator.remove();
break;
}
}
}
void acknowledge(MessageDispatch md) throws JMSException { void acknowledge(MessageDispatch md) throws JMSException {
MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1); MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
session.sendAck(ack); session.sendAck(ack);
@ -978,6 +1028,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void commit() throws JMSException { public void commit() throws JMSException {
synchronized (deliveredMessages) { synchronized (deliveredMessages) {
deliveredMessages.clear(); deliveredMessages.clear();
clearPreviouslyDelivered();
} }
redeliveryDelay = 0; redeliveryDelay = 0;
} }
@ -998,6 +1049,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
synchronized(deliveredMessages) { synchronized(deliveredMessages) {
clearPreviouslyDelivered();
if (deliveredMessages.isEmpty()) { if (deliveredMessages.isEmpty()) {
return; return;
} }
@ -1073,6 +1125,16 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
/*
* called with deliveredMessages locked
*/
private void clearPreviouslyDelivered() {
if (previouslyDeliveredMessages != null) {
previouslyDeliveredMessages.clear();
previouslyDeliveredMessages = null;
}
}
public void dispatch(MessageDispatch md) { public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get(); MessageListener listener = this.messageListener.get();
try { try {
@ -1106,11 +1168,23 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
} else { } else {
// ignore duplicate if (!session.isTransacted()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage()); LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage());
}
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
session.sendAck(ack);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + md.getMessage());
}
synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) {
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
}
}
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} }
acknowledge(md);
} }
} }
} }
@ -1126,13 +1200,27 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// async (on next call) clear delivered as they will be auto-acked as duplicates if they arrive again // async (on next call) clear delivered as they will be auto-acked as duplicates if they arrive again
private void clearDispatchList() { private void clearDispatchList() {
if (clearDispatchList) { if (clearDispatchList) {
synchronized (deliveredMessages) { synchronized (deliveredMessages) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " async clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
}
if (clearDispatchList) { if (clearDispatchList) {
deliveredMessages.clear(); if (!deliveredMessages.isEmpty()) {
pendingAck = null; if (session.isTransacted()) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking delivered list (" + deliveredMessages.size() + ") on transport interrupt");
}
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new HashMap<MessageId, Boolean>();
}
for (MessageDispatch delivered : deliveredMessages) {
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
}
deliveredMessages.clear();
pendingAck = null;
}
}
clearDispatchList = false; clearDispatchList = false;
} }
} }

View File

@ -516,7 +516,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
*/ */
public boolean getTransacted() throws JMSException { public boolean getTransacted() throws JMSException {
checkClosed(); checkClosed();
return (acknowledgementMode == Session.SESSION_TRANSACTED) || (transactionContext.isInXATransaction()); return isTransacted();
} }
/** /**
@ -1784,7 +1784,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @return true - if the session uses transactions. * @return true - if the session uses transactions.
*/ */
public boolean isTransacted() { public boolean isTransacted() {
return this.acknowledgementMode == Session.SESSION_TRANSACTED; return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
} }
/** /**

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.TransactionInProgressException; import javax.jms.TransactionInProgressException;
import javax.jms.TransactionRolledBackException;
import javax.transaction.xa.XAException; import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
@ -235,7 +236,11 @@ public class TransactionContext implements XAResource {
throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
} }
beforeEnd(); try {
beforeEnd();
} catch (TransactionRolledBackException canOcurrOnFailover) {
LOG.warn("rollback processing error", canOcurrOnFailover);
}
if (transactionId != null) { if (transactionId != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Rollback: " + transactionId LOG.debug("Rollback: " + transactionId
@ -270,7 +275,12 @@ public class TransactionContext implements XAResource {
throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
} }
beforeEnd(); try {
beforeEnd();
} catch (JMSException e) {
rollback();
throw e;
}
// Only send commit if the transaction was started. // Only send commit if the transaction was started.
if (transactionId != null) { if (transactionId != null) {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -239,6 +240,62 @@ public class FailoverConsumerOutstandingCommitTest {
connection.close(); connection.close();
} }
@Test
public void testRollbackFailoverConsumerTx() throws Exception {
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
assertNull("no message yet", testConsumer.receiveNoWait());
produceMessage(producerSession, destination, 1);
producerSession.close();
// consume then rollback after restart
Message msg = testConsumer.receive(5000);
assertNotNull(msg);
// restart with out standing delivered message
broker.stop();
broker.waitUntilStopped();
broker = createBroker(false);
broker.start();
consumerSession.rollback();
// receive again
msg = testConsumer.receive(10000);
assertNotNull("got message again after rollback", msg);
consumerSession.commit();
// close before sweep
consumerSession.close();
msg = receiveMessage(cf, destination);
assertNull("should be nothing left after commit", msg);
connection.close();
}
private Message receiveMessage(ActiveMQConnectionFactory cf,
Queue destination) throws Exception {
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start();
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final MessageConsumer consumer = consumerSession.createConsumer(destination);
Message msg = consumer.receive(5000);
consumerSession.commit();
connection.close();
return msg;
}
private void produceMessage(final Session producerSession, Queue destination, long count) private void produceMessage(final Session producerSession, Queue destination, long count)
throws JMSException { throws JMSException {
MessageProducer producer = producerSession.createProducer(destination); MessageProducer producer = producerSession.createProducer(destination);

View File

@ -36,6 +36,7 @@ import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
@ -468,7 +469,8 @@ public class FailoverTransactionTest {
// should not get a second message as there are two messages and two consumers // should not get a second message as there are two messages and two consumers
// but with failover and unordered connection restore it can get the second // but with failover and unordered connection restore it can get the second
// message which could create a problem for a pending ack // message which could create a problem for a pending ack and also invalidate
// the transaction in which the first was consumed and acked
msg = consumer1.receive(5000); msg = consumer1.receive(5000);
LOG.info("consumer1 second attempt got message: " + msg); LOG.info("consumer1 second attempt got message: " + msg);
if (msg != null) { if (msg != null) {
@ -476,7 +478,17 @@ public class FailoverTransactionTest {
} }
LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)"); LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
consumerSession1.commit(); try {
consumerSession1.commit();
} catch (JMSException expectedSometimes) {
LOG.info("got rollback ex on commit", expectedSometimes);
if (expectedSometimes instanceof TransactionRolledBackException && receivedMessages.size() == 2) {
// ok, message one was not replayed so we expect the rollback
} else {
throw expectedSometimes;
}
}
commitDoneLatch.countDown(); commitDoneLatch.countDown();
LOG.info("done async commit"); LOG.info("done async commit");
} catch (Exception e) { } catch (Exception e) {
@ -494,21 +506,23 @@ public class FailoverTransactionTest {
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// getting 2 is indicative of a problem - proven with dangling message found after restart // getting 2 is indicative of orderiing issue. a problem if dangling message found after restart
LOG.info("received message count: " + receivedMessages.size()); LOG.info("received message count: " + receivedMessages.size());
// new transaction // new transaction
Message msg = consumer1.receive(2000); Message msg = consumer1.receive(2000);
LOG.info("post: from consumer1 received: " + msg); LOG.info("post: from consumer1 received: " + msg);
assertNull("should be nothing left for consumer1", msg); if (receivedMessages.size() == 1) {
assertNull("should be nothing left for consumer as recieve should have committed", msg);
} else {
assertNotNull("should be available again after commit rollback ex", msg);
}
consumerSession1.commit(); consumerSession1.commit();
// consumer2 should get other message provided consumer1 did not get 2 // consumer2 should get other message
msg = consumer2.receive(5000); msg = consumer2.receive(5000);
LOG.info("post: from consumer2 received: " + msg); LOG.info("post: from consumer2 received: " + msg);
if (receivedMessages.size() == 1) { assertNotNull("got second message on consumer2", msg);
assertNotNull("got second message on consumer2", msg);
}
consumerSession2.commit(); consumerSession2.commit();
for (Connection c: connections) { for (Connection c: connections) {