ARTEMIS-301 - Fixing concurrent issues over closing consumer during failover and reconnect

This commit is contained in:
Clebert Suconic 2015-11-10 14:29:25 -05:00
parent f0f886f53a
commit a21a447b4c
2 changed files with 54 additions and 27 deletions

View File

@ -40,9 +40,9 @@ import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -304,7 +304,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("Handling reference " + ref); ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " Handling reference " + ref);
} }
if (!browseOnly) { if (!browseOnly) {
if (!preAcknowledge) { if (!preAcknowledge) {
@ -379,6 +379,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override @Override
public void close(final boolean failed) throws Exception { public void close(final boolean failed) throws Exception {
if (isTrace)
{
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
}
callback.removeReadyListener(this); callback.removeReadyListener(this);
setStarted(false); setStarted(false);
@ -400,6 +405,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
if (isTrace)
{
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref);
}
ref.getQueue().cancel(tx, ref, true); ref.getQueue().cancel(tx, ref, true);
} }
@ -507,30 +517,32 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
LinkedList<MessageReference> refs = new LinkedList<MessageReference>(); LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
synchronized (lock) {
if (!deliveringRefs.isEmpty()) { if (!deliveringRefs.isEmpty()) {
for (MessageReference ref : deliveringRefs) { for (MessageReference ref : deliveringRefs) {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("Cancelling reference for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref);
}
if (performACK) { if (performACK) {
acknowledge(tx, ref.getMessage().getMessageID()); ackReference(tx, ref);
performACK = false; performACK = false;
} }
else { else {
refs.add(ref);
if (!failed) { if (!failed) {
// We don't decrement delivery count if the client failed, since there's a possibility that refs // We don't decrement delivery count if the client failed, since there's a possibility that refs
// were actually delivered but we just didn't get any acks for them // were actually delivered but we just didn't get any acks for them
// before failure // before failure
ref.decrementDeliveryCount(); ref.decrementDeliveryCount();
} }
}
refs.add(ref); if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " Preparing Cancelling list for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref);
} }
} }
deliveringRefs.clear(); deliveringRefs.clear();
} }
}
return refs; return refs;
} }
@ -662,7 +674,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
throw ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName()); throw ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
} }
ref.getQueue().acknowledge(tx, ref); ackReference(tx, ref);
acks++; acks++;
} while (ref.getMessage().getMessageID() != messageID); } while (ref.getMessage().getMessageID() != messageID);
@ -692,6 +704,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
} }
private void ackReference(Transaction tx, MessageReference ref) throws Exception {
if (tx == null) {
ref.getQueue().acknowledge(ref);
}
else {
ref.getQueue().acknowledge(tx, ref);
}
}
public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception { public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception {
if (browseOnly) { if (browseOnly) {
return; return;
@ -703,12 +724,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
throw new IllegalStateException("Cannot find ref to ack " + messageID); throw new IllegalStateException("Cannot find ref to ack " + messageID);
} }
if (tx == null) { ackReference(tx, ref);
ref.getQueue().acknowledge(ref);
}
else {
ref.getQueue().acknowledge(tx, ref);
}
acks++; acks++;
} }

View File

@ -32,6 +32,7 @@ import javax.jms.TopicSubscriber;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
@ -779,6 +780,15 @@ public class AcknowledgementTest extends JMSTestCase {
ProxyAssertSupport.assertEquals("two", messageReceived.getText()); ProxyAssertSupport.assertEquals("two", messageReceived.getText());
messageReceived = (TextMessage)consumer.receiveNoWait();
if (messageReceived != null)
{
System.out.println("Message received " + messageReceived.getText());
}
Assert.assertNull(messageReceived);
consumer.close(); consumer.close();
// I can't call xasession.close for this test as JCA layer would cache the session // I can't call xasession.close for this test as JCA layer would cache the session