resolve potential lost ack with failover and an in progress consumer transaction that results in an Unmatched ack exception - resolve: https://issues.apache.org/activemq/browse/AMQ-2560

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@897061 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-01-08 00:17:37 +00:00
parent 6e5c9340c7
commit 0bc545b5d1
4 changed files with 307 additions and 53 deletions

View File

@ -755,6 +755,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* broker to pull a message we are about to receive
*/
protected void sendPullCommand(long timeout) throws JMSException {
synchronized (unconsumedMessages.getMutex()) {
clearDispatchListOnReconnect();
}
if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
@ -1067,25 +1070,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
MessageListener listener = this.messageListener.get();
try {
synchronized (unconsumedMessages.getMutex()) {
if (clearDispatchList) {
// we are reconnecting so lets flush the in progress
// messages
clearDispatchList = false;
List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
if (!session.isTransacted()) {
// clean, so we don't have duplicates with optimizeAcknowledge
synchronized (deliveredMessages) {
deliveredMessages.clear();
}
}
pendingAck = null;
}
clearDispatchListOnReconnect();
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
@ -1118,13 +1103,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage());
}
// in a transaction ack delivery of duplicates to ensure prefetch extension kicks in.
// the normal ack will happen in the transaction.
if (session.isTransacted()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
acknowledge(md);
}
acknowledge(md);
}
}
}
@ -1137,6 +1116,28 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
// called holding unconsumedMessages.getMutex()
private void clearDispatchListOnReconnect() {
if (clearDispatchList) {
// we are reconnecting so lets flush the in progress
// messages
clearDispatchList = false;
List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
// clean, so we don't have duplicates with optimizeAcknowledge
synchronized (deliveredMessages) {
deliveredMessages.clear();
}
pendingAck = null;
}
}
public int getMessageSize() {
return unconsumedMessages.size();
}

View File

@ -437,15 +437,15 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
if (!checkFoundStart && firstAckedMsg != null)
throw new JMSException("Unmatched acknowledege: " + ack
throw new JMSException("Unmatched acknowledge: " + ack
+ "; Could not find Message-ID " + firstAckedMsg
+ " in dispatched-list (start of ack)");
if (!checkFoundEnd && lastAckedMsg != null)
throw new JMSException("Unmatched acknowledege: " + ack
throw new JMSException("Unmatched acknowledge: " + ack
+ "; Could not find Message-ID " + lastAckedMsg
+ " in dispatched-list (end of ack)");
if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
throw new JMSException("Unmatched acknowledege: " + ack
throw new JMSException("Unmatched acknowledge: " + ack
+ "; Expected message count (" + ack.getMessageCount()
+ ") differs from count in dispatched-list (" + checkCount
+ ")");
@ -663,9 +663,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
if (LOG.isTraceEnabled()) {
LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId()
+ ", dispatched: " + node.getRegionDestination().getDestinationStatistics().getDispatched().getCount()
+ ", inflight: " + node.getRegionDestination().getDestinationStatistics().getInflight().getCount());
LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId()
+ ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
}
}
}

View File

@ -336,11 +336,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
s.setMaxRows(limit);
rs = s.executeQuery();
// jdbc scrollable cursor requires jdbc ver > 1.0 andis often implemented locally so avoid
// jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
while (rs.next()) {
reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
}
if (LOG.isDebugEnabled()) {
LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
}
for (MessageId id : reverseOrderIds) {
listener.messageId(id);
}

View File

@ -16,17 +16,20 @@
*/
package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -39,6 +42,8 @@ import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@ -51,7 +56,7 @@ import org.junit.Test;
public class FailoverTransactionTest {
private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
private static final String QUEUE_NAME = "test.FailoverTransactionTest";
private static final String QUEUE_NAME = "FailoverWithTx";
private String url = "tcp://localhost:61616";
BrokerService broker;
@ -79,7 +84,7 @@ public class FailoverTransactionTest {
return broker;
}
//@Test
@Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@ -89,13 +94,7 @@ public class FailoverTransactionTest {
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Test message");
producer.send(message);
// close producer before commit, emulate jmstemplate
producer.close();
produceMessage(session, destination);
// restart to force failover and connection state recovery before the commit
broker.stop();
@ -157,10 +156,7 @@ public class FailoverTransactionTest {
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Test message");
producer.send(message);
produceMessage(session, destination);
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
// broker will die on commit reply so this will hang till restart
@ -243,13 +239,7 @@ public class FailoverTransactionTest {
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Test message");
producer.send(message);
// close producer before commit, emulate jmstemplate
producer.close();
produceMessage(session, destination);
// restart to force failover and connection state recovery before the commit
broker.stop();
@ -294,4 +284,265 @@ public class FailoverTransactionTest {
session.commit();
connection.close();
}
@Test
public void testFailoverConsumerCommitLost() throws Exception {
final int adapter = 0;
broker = createBroker(true);
setPersistenceAdapter(adapter);
broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() {
@Override
public void commitTransaction(ConnectionContext context,
TransactionId xid, boolean onePhase) throws Exception {
super.commitTransaction(context, xid, onePhase);
// so commit will hang as if reply is lost
context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker post commit...");
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = producerSession.createQueue(QUEUE_NAME);
final MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
final Vector<Message> receivedMessages = new Vector<Message>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit after consume...");
try {
Message msg = consumer.receive(20000);
LOG.info("Got message: " + msg);
receivedMessages.add(msg);
consumerSession.commit();
commitDoneLatch.countDown();
LOG.info("done async commit");
} catch (Exception e) {
e.printStackTrace();
}
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false);
setPersistenceAdapter(adapter);
broker.start();
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
assertEquals("we got a message", 1, receivedMessages.size());
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNull("we did not get a duplicate message", msg);
consumerSession.commit();
consumer.close();
connection.close();
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false);
setPersistenceAdapter(adapter);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer2 = session2.createConsumer(destination);
msg = consumer2.receive(1000);
if (msg == null) {
msg = consumer2.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
@Test
public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order, do a few times
for (int i=0; i<4; i++) {
try {
doTestFailoverConsumerAckLost();
} finally {
stopBroker();
}
}
}
public void doTestFailoverConsumerAckLost() throws Exception {
final int adapter = 0;
broker = createBroker(true);
setPersistenceAdapter(adapter);
broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() {
// broker is killed on delivered ack as prefetch is 1
@Override
public void acknowledge(
ConsumerBrokerExchange consumerExchange,
final MessageAck ack) throws Exception {
consumerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker on ack: " + ack);
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
broker.start();
Vector<Connection> connections = new Vector<Connection>();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
produceMessage(producerSession, destination);
produceMessage(producerSession, destination);
final Vector<Message> receivedMessages = new Vector<Message>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit after consume...");
try {
Message msg = consumer1.receive(20000);
LOG.info("consumer1 first attempt got message: " + msg);
receivedMessages.add(msg);
TimeUnit.SECONDS.sleep(7);
// should not get a second message as there are two messages and two consumers
// but with failover and unordered connection reinit it can get the second
// message which will have a problem for the ack
msg = consumer1.receive(5000);
LOG.info("consumer1 second attempt got message: " + msg);
if (msg != null) {
receivedMessages.add(msg);
}
LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
consumerSession1.commit();
commitDoneLatch.countDown();
LOG.info("done async commit");
} catch (Exception e) {
e.printStackTrace();
}
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false);
setPersistenceAdapter(adapter);
broker.start();
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// getting 2 is indicative of a problem - proven with dangling message found after restart
LOG.info("received message count: " + receivedMessages.size());
// new transaction
Message msg = consumer1.receive(2000);
LOG.info("post: from consumer1 received: " + msg);
assertNull("should be nothing left for consumer1", msg);
consumerSession1.commit();
// consumer2 should get other message
msg = consumer2.receive(5000);
LOG.info("post: from consumer2 received: " + msg);
assertNotNull("got message on consumer2", msg);
consumerSession2.commit();
for (Connection c: connections) {
c.close();
}
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false);
setPersistenceAdapter(adapter);
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
connection = cf.createConnection();
connection.start();
Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer sweeper = sweeperSession.createConsumer(destination);
msg = sweeper.receive(1000);
if (msg == null) {
msg = sweeper.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
private void produceMessage(final Session producerSession, Queue destination)
throws JMSException {
MessageProducer producer = producerSession.createProducer(destination);
TextMessage message = producerSession.createTextMessage("Test message");
producer.send(message);
producer.close();
}
}