resolve https://issues.apache.org/activemq/browse/AMQ-2573 - rollback of audit check needs to be synced with redispatch after failover transport resumption, otherwise some redispatched unconsumed messages can get auto-acked as duplicates in error

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@901273 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-01-20 16:49:22 +00:00
parent 8de3bd29bf
commit 862cd718ed
6 changed files with 352 additions and 54 deletions

View File

@ -187,6 +187,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private DestinationSource destinationSource; private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object(); private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner; private boolean useDedicatedTaskRunner;
protected CountDownLatch transportInterruptionProcessingComplete;
/** /**
* Construct an <code>ActiveMQConnection</code> * Construct an <code>ActiveMQConnection</code>
@ -1674,6 +1675,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
command.visit(new CommandVisitorAdapter() { command.visit(new CommandVisitorAdapter() {
@Override @Override
public Response processMessageDispatch(MessageDispatch md) throws Exception { public Response processMessageDispatch(MessageDispatch md) throws Exception {
waitForTransportInterruptionProcessing();
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) { if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via // Copy in case a embedded broker is dispatching via
@ -1837,6 +1839,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
public void transportInterupted() { public void transportInterupted() {
transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
if (LOG.isDebugEnabled()) {
LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
}
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next(); ActiveMQSession s = i.next();
s.clearMessagesInProgress(); s.clearMessagesInProgress();
@ -2235,4 +2241,21 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public IOException getFirstFailureError() { public IOException getFirstFailureError() {
return firstFailureError; return firstFailureError;
} }
protected void waitForTransportInterruptionProcessing() throws InterruptedException {
if (transportInterruptionProcessingComplete != null) {
while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15, TimeUnit.SECONDS)) {
LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
}
synchronized (this) {
transportInterruptionProcessingComplete = null;
}
}
}
protected synchronized void transportInterruptionProcessingComplete() {
if (transportInterruptionProcessingComplete != null) {
transportInterruptionProcessingComplete.countDown();
}
}
} }

View File

@ -103,7 +103,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
protected final ConsumerInfo info; protected final ConsumerInfo info;
// These are the messages waiting to be delivered to the client // These are the messages waiting to be delivered to the client
private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
// The are the messages that were delivered to the consumer but that have // The are the messages that were delivered to the consumer but that have
// not been acknowledged. It's kept in reverse order since we // not been acknowledged. It's kept in reverse order since we
@ -640,14 +640,22 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
void clearMessagesInProgress() { void clearMessagesInProgress() {
// we are called from inside the transport reconnection logic // deal with delivered messages async to avoid lock contention with in pogress acks
// which involves us clearing all the connections' consumers
// dispatch lists and clearing them
// so rather than trying to grab a mutex (which could be already
// owned by the message listener calling the send) we will just set
// a flag so that the list can be cleared as soon as the
// dispatch thread is ready to flush the dispatch list
clearDispatchList = true; clearDispatchList = true;
synchronized (unconsumedMessages.getMutex()) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
}
// ensure unconsumed are rolledback up front as they may get redelivered to another consumer
List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
}
// allow dispatch on this connection to resume
session.connection.transportInterruptionProcessingComplete();
} }
void deliverAcks() { void deliverAcks() {
@ -755,9 +763,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* broker to pull a message we are about to receive * broker to pull a message we are about to receive
*/ */
protected void sendPullCommand(long timeout) throws JMSException { protected void sendPullCommand(long timeout) throws JMSException {
synchronized (unconsumedMessages.getMutex()) { clearDispatchList();
clearDispatchListOnReconnect();
}
if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull(); MessagePull messagePull = new MessagePull();
messagePull.configure(info); messagePull.configure(info);
@ -937,9 +943,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @throws JMSException * @throws JMSException
*/ */
public void acknowledge() throws JMSException { public void acknowledge() throws JMSException {
synchronized (unconsumedMessages.getMutex()) { clearDispatchList();
clearDispatchListOnReconnect();
}
synchronized(deliveredMessages) { synchronized(deliveredMessages) {
// Acknowledge all messages so far. // Acknowledge all messages so far.
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@ -1072,8 +1076,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void dispatch(MessageDispatch md) { public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get(); MessageListener listener = this.messageListener.get();
try { try {
clearDispatchList();
synchronized (unconsumedMessages.getMutex()) { synchronized (unconsumedMessages.getMutex()) {
clearDispatchListOnReconnect();
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) { if (listener != null && unconsumedMessages.isRunning()) {
@ -1119,25 +1123,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
// called holding unconsumedMessages.getMutex() // async (on next call) clear delivered as they will be auto-acked as duplicates if they arrive again
private void clearDispatchListOnReconnect() { private void clearDispatchList() {
if (clearDispatchList) { if (clearDispatchList) {
// we are reconnecting so lets flush the in progress
// messages
clearDispatchList = false;
List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
// clean, so we don't have duplicates with optimizeAcknowledge
synchronized (deliveredMessages) { synchronized (deliveredMessages) {
deliveredMessages.clear(); if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " async clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
} }
if (clearDispatchList) {
deliveredMessages.clear();
pendingAck = null; pendingAck = null;
clearDispatchList = false;
}
}
} }
} }

View File

@ -16,9 +16,71 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader; import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl; import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
@ -30,20 +92,6 @@ import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Message;
import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.blob.BlobDownloader;
/** /**
* <P> * <P>
* A <CODE>Session</CODE> object is a single-threaded context for producing * A <CODE>Session</CODE> object is a single-threaded context for producing
@ -592,9 +640,19 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
void clearMessagesInProgress() { void clearMessagesInProgress() {
executor.clearMessagesInProgress(); executor.clearMessagesInProgress();
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { // we are called from inside the transport reconnection logic
ActiveMQMessageConsumer consumer = iter.next(); // which involves us clearing all the connections' consumers
// dispatch and delivered lists. So rather than trying to
// grab a mutex (which could be already owned by the message
// listener calling the send or an ack) we allow it to complete in
// a separate thread via the scheduler and notify us via
// connection.transportInterruptionProcessingComplete()
//
for (final ActiveMQMessageConsumer consumer : consumers) {
scheduler.executeAfterDelay(new Runnable() {
public void run() {
consumer.clearMessagesInProgress(); consumer.clearMessagesInProgress();
}}, 0l);
} }
} }

View File

@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertTrue;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Test;
// see https://issues.apache.org/activemq/browse/AMQ-2573
public class FailoverConsumerUnconsumedTest {
private static final Log LOG = LogFactory.getLog(FailoverConsumerUnconsumedTest.class);
private static final String QUEUE_NAME = "FailoverWithUnconsumed";
private String url = "tcp://localhost:61616";
final int prefetch = 10;
BrokerService broker;
public void startCleanBroker() throws Exception {
startBroker(true);
}
@After
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = new BrokerService();
broker.addConnector(url);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
return broker;
}
@Test
public void testFailoverConsumerDups() throws Exception {
doTestFailoverConsumerDups(true);
}
@Test
public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception {
doTestFailoverConsumerDups(false);
}
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
final int maxConsumers = 4;
broker = createBroker(true);
broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() {
int consumerCount;
// broker is killed on x create consumer
@Override
public Subscription addConsumer(ConnectionContext context,
final ConsumerInfo info) throws Exception {
if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1:0)) {
context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker on consumer: " + info.getConsumerId());
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
return super.addConsumer(context, info);
}
}
});
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories);
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start();
final Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch);
final Vector<TestConsumer> testConsumers = new Vector<TestConsumer>();
for (int i=0; i<maxConsumers -1; i++) {
testConsumers.add(new TestConsumer(consumerSession, destination, connection));
}
produceMessage(consumerSession, destination, maxConsumers * prefetch);
assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
int totalUnconsumed = 0;
for (TestConsumer testConsumer : testConsumers) {
long unconsumed = testConsumer.unconsumedSize();
LOG.info(testConsumer.getConsumerId() + " unconsumed: " + unconsumed);
totalUnconsumed += unconsumed;
}
return totalUnconsumed == (maxConsumers-1) * prefetch;
}
}));
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
try {
LOG.info("add last consumer...");
testConsumers.add(new TestConsumer(consumerSession, destination, connection));
commitDoneLatch.countDown();
LOG.info("done add last consumer");
} catch (Exception e) {
e.printStackTrace();
}
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false);
broker.start();
assertTrue("consumer added through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// each should again get prefetch messages - all unconsumed deliveries should be rolledback
assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
int totalUnconsumed = 0;
for (TestConsumer testConsumer : testConsumers) {
long unconsumed = testConsumer.unconsumedSize();
LOG.info(testConsumer.getConsumerId() + " after restart: unconsumed: " + unconsumed);
totalUnconsumed += unconsumed;
}
return totalUnconsumed == (maxConsumers) * prefetch;
}
}));
connection.close();
}
private void produceMessage(final Session producerSession, Queue destination, long count)
throws JMSException {
MessageProducer producer = producerSession.createProducer(destination);
for (int i=0; i<count; i++) {
TextMessage message = producerSession.createTextMessage("Test message " + i);
producer.send(message);
}
producer.close();
}
// allow access to unconsumedMessages
class TestConsumer extends ActiveMQMessageConsumer {
TestConsumer(Session consumerSession, Destination destination, ActiveMQConnection connection) throws Exception {
super((ActiveMQSession) consumerSession,
new ConsumerId(new SessionId(connection.getConnectionInfo().getConnectionId(),1), nextGen()),
ActiveMQMessageTransformation.transformDestination(destination), null, "",
prefetch, -1, false, false, true, null);
}
public int unconsumedSize() {
return unconsumedMessages.size();
}
}
static long idGen = 100;
private static long nextGen() {
idGen -=5;
return idGen;
}
}

View File

@ -390,7 +390,7 @@ public class FailoverTransactionTest {
@Test @Test
public void testFailoverConsumerAckLost() throws Exception { public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order, do a few times // as failure depends on hash order, do a few times
for (int i=0; i<4; i++) { for (int i=0; i<3; i++) {
try { try {
doTestFailoverConsumerAckLost(); doTestFailoverConsumerAckLost();
} finally { } finally {

View File

@ -22,9 +22,6 @@ log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq=DEBUG #log4j.logger.org.apache.activemq=DEBUG
# get to the bottom of jmx related intermittent failures
log4j.logger.org.apache.activemq.broker.jmx.ManagementContext=DEBUG
# CONSOLE appender not used by default # CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout=org.apache.log4j.PatternLayout