diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 12ddb94e1b..2bafa1faa1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -40,9 +40,9 @@ import org.apache.activemq.artemis.core.message.BodyEncoder; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.HandleStatus; -import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -304,7 +304,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { - ActiveMQServerLogger.LOGGER.trace("Handling reference " + ref); + ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " Handling reference " + ref); } if (!browseOnly) { if (!preAcknowledge) { @@ -379,6 +379,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void close(final boolean failed) throws Exception { + if (isTrace) + { + ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); + } + callback.removeReadyListener(this); setStarted(false); @@ -400,6 +405,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { while (iter.hasNext()) { MessageReference ref = iter.next(); + if (isTrace) + { + ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref); + } + ref.getQueue().cancel(tx, ref, true); } @@ -507,29 +517,31 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { LinkedList refs = new LinkedList(); - if (!deliveringRefs.isEmpty()) { - for (MessageReference ref : deliveringRefs) { - if (isTrace) { - ActiveMQServerLogger.LOGGER.trace("Cancelling reference for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref); - } - if (performACK) { - acknowledge(tx, ref.getMessage().getMessageID()); + synchronized (lock) { + if (!deliveringRefs.isEmpty()) { + for (MessageReference ref : deliveringRefs) { + if (performACK) { + ackReference(tx, ref); - performACK = false; - } - else { - if (!failed) { - // We don't decrement delivery count if the client failed, since there's a possibility that refs - // were actually delivered but we just didn't get any acks for them - // before failure - ref.decrementDeliveryCount(); + performACK = false; + } + else { + refs.add(ref); + if (!failed) { + // We don't decrement delivery count if the client failed, since there's a possibility that refs + // were actually delivered but we just didn't get any acks for them + // before failure + ref.decrementDeliveryCount(); + } } - refs.add(ref); + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " Preparing Cancelling list for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref); + } } - } - deliveringRefs.clear(); + deliveringRefs.clear(); + } } return refs; @@ -662,7 +674,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { throw ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName()); } - ref.getQueue().acknowledge(tx, ref); + ackReference(tx, ref); acks++; } while (ref.getMessage().getMessageID() != messageID); @@ -692,6 +704,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + private void ackReference(Transaction tx, MessageReference ref) throws Exception { + if (tx == null) { + ref.getQueue().acknowledge(ref); + } + else { + ref.getQueue().acknowledge(tx, ref); + } + } + public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception { if (browseOnly) { return; @@ -703,12 +724,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { throw new IllegalStateException("Cannot find ref to ack " + messageID); } - if (tx == null) { - ref.getQueue().acknowledge(ref); - } - else { - ref.getQueue().acknowledge(tx, ref); - } + ackReference(tx, ref); + acks++; } diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java index 1ce2bf948a..d46761602e 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java @@ -32,6 +32,7 @@ import javax.jms.TopicSubscriber; import java.util.concurrent.CountDownLatch; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; +import org.junit.Assert; import org.junit.Test; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -779,6 +780,15 @@ public class AcknowledgementTest extends JMSTestCase { ProxyAssertSupport.assertEquals("two", messageReceived.getText()); + messageReceived = (TextMessage)consumer.receiveNoWait(); + + if (messageReceived != null) + { + System.out.println("Message received " + messageReceived.getText()); + } + Assert.assertNull(messageReceived); + + consumer.close(); // I can't call xasession.close for this test as JCA layer would cache the session