From 83df5cef5421aeb6db04c04377ed960c07e26f6c Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Mon, 23 Nov 2009 18:43:01 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2489 - duplicate delivery acks resulted in broker exceptions with client or inividual ack - delivery acks now only for unacked messages - down side is pending messages in broker remain on expiry awaiting ack from ackLaer that dependes on prefetch value - but this is reasonable and to be expected. they will be removed on close or subsequent acks in any event git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@883458 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 8 +- .../org/apache/activemq/bugs/AMQ2489Test.java | 226 ++++++++++++++++++ .../ExpiredMessagesWithNoConsumerTest.java | 21 +- 3 files changed, 250 insertions(+), 5 deletions(-) create mode 100755 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 683d484f2d..153e50b215 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -831,7 +831,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { - ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + boolean messageUnackedByConsumer = false; + synchronized (deliveredMessages) { + messageUnackedByConsumer = deliveredMessages.contains(md); + } + if (messageUnackedByConsumer) { + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + } } else { throw new IllegalStateException("Invalid session state."); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java new file mode 100755 index 0000000000..b581e6deb8 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.TestSupport; +import org.apache.activemq.command.ActiveMQQueue; + +/** + * In CLIENT_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE modes following exception + * occurs when ASYNCH consumers acknowledges messages in not in order they + * received the messages. + *

+ * Exception thrown on broker side: + *

+ * {@code javax.jms.JMSException: Could not correlate acknowledgment with + * dispatched message: MessageAck} + * + * @author daroo + */ +public class AMQ2489Test extends TestSupport { + private final static String SEQ_NUM_PROPERTY = "seqNum"; + + private final static int TOTAL_MESSAGES_CNT = 2; + private final static int CONSUMERS_CNT = 2; + + private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); + + private Connection connection; + + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + public void testUnorderedClientAcknowledge() throws Exception { + doUnorderedAck(Session.CLIENT_ACKNOWLEDGE); + } + + public void testUnorderedIndividualAcknowledge() throws Exception { + doUnorderedAck(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + } + + /** + * Main test method + * + * @param acknowledgmentMode + * - ACK mode to be used by consumers + * @throws Exception + */ + protected void doUnorderedAck(int acknowledgmentMode) throws Exception { + List consumers = null; + Session producerSession = null; + + connection.start(); + // Because exception is thrown on broker side only, let's set up + // exception listener to get it + final TestExceptionListener exceptionListener = new TestExceptionListener(); + connection.setExceptionListener(exceptionListener); + try { + consumers = new ArrayList(); + // start customers + for (int i = 0; i < CONSUMERS_CNT; i++) { + consumers.add(new Consumer(acknowledgmentMode)); + } + + // produce few test messages + producerSession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = producerSession + .createProducer(new ActiveMQQueue(getQueueName())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < TOTAL_MESSAGES_CNT; i++) { + final Message message = producerSession + .createTextMessage("test"); + // assign each message sequence number + message.setIntProperty(SEQ_NUM_PROPERTY, i); + producer.send(message); + } + + // during each onMessage() calls consumers decreases the LATCH + // counter. + // + // so, let's wait till all messages are consumed. + // + LATCH.await(); + + // wait a bit more to give exception listener a chance be populated + // with + // broker's error + TimeUnit.SECONDS.sleep(1); + + assertFalse(exceptionListener.getStatusText(), exceptionListener.hasExceptions()); + + } finally { + if (producerSession != null) + producerSession.close(); + + if (consumers != null) { + for (Consumer c : consumers) { + c.close(); + } + } + } + } + + protected String getQueueName() { + return getClass().getName() + "." + getName(); + } + + public final class Consumer implements MessageListener { + final Session session; + + private Consumer(int acknowledgmentMode) { + try { + session = connection.createSession(false, acknowledgmentMode); + final Queue queue = session.createQueue(getQueueName() + + "?consumer.prefetchSize=1"); + final MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(this); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void onMessage(Message message) { + try { + // retrieve sequence number assigned by producer... + final int seqNum = message.getIntProperty(SEQ_NUM_PROPERTY); + + // ...and let's delay every second message a little bit before + // acknowledgment + if ((seqNum % 2) == 0) { + System.out.println("Delayed message sequence numeber: " + + seqNum); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + message.acknowledge(); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + // decrease LATCH counter in the main test method. + LATCH.countDown(); + } + } + + private void close() { + if (session != null) { + try { + session.close(); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + } + + public final class TestExceptionListener implements ExceptionListener { + private final java.util.Queue exceptions = new ConcurrentLinkedQueue(); + + public void onException(JMSException e) { + exceptions.add(e); + } + + public boolean hasExceptions() { + return exceptions.isEmpty() == false; + } + + public String getStatusText() { + final StringBuilder str = new StringBuilder(); + str.append("Exceptions count on broker side: " + exceptions.size() + + ".\nMessages:\n"); + for (Exception e : exceptions) { + str.append(e.getMessage() + "\n\n"); + } + return str.toString(); + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index 032998c260..b8371854af 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -159,7 +159,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { // first ack delivered after expiry public void testExpiredMessagesWithVerySlowConsumer() throws Exception { createBroker(); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + final long queuePrefetch = 600; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); connection = factory.createConnection(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); producer = session.createProducer(destination); @@ -222,7 +223,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return 1000 == view.getDispatchCount(); + return queuePrefetch == view.getDispatchCount(); } })); assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() { @@ -240,17 +241,29 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return 0 == view.getInFlightCount(); + // consumer ackLater(delivery ack for expired messages) is based on half the prefetch value + // which will leave half of the prefetch pending till consumer close + return (queuePrefetch/2) -1 == view.getInFlightCount(); } }); LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); - assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount()); + + + assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount()); assertEquals("size gets back to 0 ", 0, view.getQueueSize()); assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount()); consumer.close(); + + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return 0 == view.getInFlightCount(); + } + }); + assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount()); + LOG.info("done: " + getName()); }