https://issues.apache.org/jira/browse/AMQ-4791 - fix and test. removed the delay but left the warn if dispatch ocurrs before interruption processing is complete. Problem was a race between consumer close and sessions copy on write dispatchers list

This commit is contained in:
gtully 2013-10-08 21:28:30 +01:00
parent 3a0a9aae05
commit dc0291b290
7 changed files with 180 additions and 37 deletions

View File

@ -198,7 +198,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner;
protected volatile CountDownLatch transportInterruptionProcessingComplete;
protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
private long consumerFailoverRedeliveryWaitPeriod;
private Scheduler scheduler;
private boolean messagePrioritySupported = true;
@ -2023,19 +2023,21 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
@Override
public void transportInterupted() {
this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
if (LOG.isDebugEnabled()) {
LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
}
signalInterruptionProcessingNeeded();
transportInterruptionProcessingComplete.set(1);
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
s.clearMessagesInProgress(transportInterruptionProcessingComplete);
}
for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
connectionConsumer.clearMessagesInProgress();
connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
}
if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
}
signalInterruptionProcessingNeeded();
}
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
@ -2462,33 +2464,23 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if (cdl != null) {
if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
cdl.await(10, TimeUnit.SECONDS);
}
if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
signalInterruptionProcessingComplete();
}
}
protected void transportInterruptionProcessingComplete() {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if (cdl != null) {
cdl.countDown();
try {
if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
signalInterruptionProcessingComplete();
} catch (InterruptedException ignored) {}
}
}
private void signalInterruptionProcessingComplete() throws InterruptedException {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if (cdl.getCount()==0) {
private void signalInterruptionProcessingComplete() {
if (LOG.isDebugEnabled()) {
LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
+ " for:" + this.getConnectionInfo().getConnectionId());
}
this.transportInterruptionProcessingComplete = null;
FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
if (failoverTransport != null) {
@ -2498,8 +2490,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
+ ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
}
}
}
transportInterruptionProcessingComplete.set(0);
}
private void signalInterruptionProcessingNeeded() {

View File

@ -20,6 +20,8 @@ package org.apache.activemq;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
@ -155,11 +157,9 @@ public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQD
return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
}
public void clearMessagesInProgress() {
public void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
// future: may want to deal with rollback of in progress messages to track re deliveries
// before indicating that all is complete.
// Till there is a need, lets immediately allow dispatch
this.connection.transportInterruptionProcessingComplete();
}
public ConsumerInfo getConsumerInfo() {

View File

@ -687,7 +687,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
void doClose() throws JMSException {
// Store interrupted state and clear so that Transport operations don't
// throw InterruptedException and we ensure that resources are clened up.
// throw InterruptedException and we ensure that resources are cleaned up.
boolean interrupted = Thread.interrupted();
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
@ -1584,4 +1584,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
}
public boolean hasMessageListener() {
return messageListener.get() != null;
}
}

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@ -647,7 +648,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
}
void clearMessagesInProgress() {
void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
executor.clearMessagesInProgress();
// we are called from inside the transport reconnection logic
// which involves us clearing all the connections' consumers
@ -659,6 +660,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
//
for (final ActiveMQMessageConsumer consumer : consumers) {
consumer.inProgressClearRequired();
transportInterruptionProcessingComplete.incrementAndGet();
try {
connection.getScheduler().executeAfterDelay(new Runnable() {
public void run() {
@ -2012,7 +2014,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
ActiveMQMessageConsumer consumer = i.next();
if (consumer.getMessageListener() != null) {
if (consumer.hasMessageListener()) {
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
}
}

View File

@ -406,6 +406,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
if (ss != null) {
ss.removeConsumer(id);
}
cs.getRecoveringPullConsumers().remove(id);
}
}
}

View File

@ -167,6 +167,7 @@ public class FailoverConsumerOutstandingCommitTest {
LOG.info("producer started");
try {
produceMessage(producerSession, destination, prefetch * 2);
} catch (javax.jms.IllegalStateException SessionClosedExpectedOnShutdown) {
} catch (JMSException e) {
e.printStackTrace();
fail("unexpceted ex on producer: " + e);
@ -273,6 +274,7 @@ public class FailoverConsumerOutstandingCommitTest {
LOG.info("producer started");
try {
produceMessage(producerSession, destination, prefetch * 2);
} catch (javax.jms.IllegalStateException SessionClosedExpectedOnShutdown) {
} catch (JMSException e) {
e.printStackTrace();
fail("unexpceted ex on producer: " + e);

View File

@ -17,7 +17,9 @@
package org.apache.activemq.transport.failover;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
@ -29,10 +31,12 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.SocketProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,9 +52,15 @@ import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.NoSuchElementException;
import java.util.Stack;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -874,6 +884,139 @@ public class FailoverTransactionTest extends TestSupport {
connection.close();
}
public void testPoolingNConsumesAfterReconnect() throws Exception {
broker = createBroker(true);
setDefaultPersistenceAdapter(broker);
broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
int count = 0;
@Override
public void removeConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
if (count++ == 1) {
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("Stopping broker on removeConsumer: " + info);
try {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
broker.start();
Vector<Connection> connections = new Vector<Connection>();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
produceMessage(producerSession, destination);
connection.close();
connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final int sessionCount = 10;
final Stack<Session> sessions = new Stack<Session>();
for (int i = 0; i < sessionCount; i++) {
sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
}
final int consumerCount = 1000;
final Deque<MessageConsumer> consumers = new ArrayDeque<MessageConsumer>();
for (int i = 0; i < consumerCount; i++) {
consumers.push(consumerSession.createConsumer(destination));
}
final ExecutorService executorService = Executors.newCachedThreadPool();
final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
final TransportListener delegate = failoverTransport.getTransportListener();
failoverTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
delegate.onCommand(command);
}
@Override
public void onException(IOException error) {
delegate.onException(error);
}
@Override
public void transportInterupted() {
LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
executorService.execute(new Runnable() {
public void run() {
MessageConsumer localConsumer = null;
try {
synchronized (delegate) {
localConsumer = consumers.pop();
}
localConsumer.receive(1);
LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId());
localConsumer.close();
} catch (NoSuchElementException nse) {
} catch (Exception ignored) {
LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored);
}
}
});
}
delegate.transportInterupted();
}
@Override
public void transportResumed() {
delegate.transportResumed();
}
});
MessageConsumer consumer = null;
synchronized (delegate) {
consumer = consumers.pop();
}
LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
consumer.close();
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false, url);
setDefaultPersistenceAdapter(broker);
broker.start();
consumer = consumerSession.createConsumer(destination);
LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
Message msg = null;
for (int i = 0; i < 4 && msg == null; i++) {
msg = consumer.receive(1000);
}
LOG.info("post: from consumer1 received: " + msg);
assertNotNull("got message after failover", msg);
msg.acknowledge();
for (Connection c : connections) {
c.close();
}
}
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
broker = createBroker(true);
broker.start();
@ -991,8 +1134,8 @@ public class FailoverTransactionTest extends TestSupport {
final Vector<Exception> exceptions = new Vector<Exception>();
// commit may fail if other consumer gets the message on restart, it will be seen a a duplicate on teh connection
// but with no transaciton and it pending on another consumer it will be posion
// commit may fail if other consumer gets the message on restart, it will be seen as a duplicate on the connection
// but with no transaction and it pending on another consumer it will be poison
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");