From 73433372e9bacff78406c64b644c9b1202a0a2c0 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 13 Jul 2010 22:40:58 +0000 Subject: [PATCH] rework fix for https://issues.apache.org/activemq/browse/AMQ-1730 - better support for unordered message consumption when there are multiple short lived consumers, eg with spring mlc and concurrent consumers > 1 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@963894 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 30 +++- .../org/apache/activemq/JMSConsumerTest.java | 2 +- .../activemq/JmsRollbackRedeliveryTest.java | 2 +- .../java/org/apache/bugs/AMQ1730Test.java | 165 ++++++++++++++++++ 4 files changed, 193 insertions(+), 6 deletions(-) create mode 100644 activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 4d0cbf1b07..3f7fcfbd9d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -467,15 +467,37 @@ public class Queue extends BaseDestination implements Task, UsageListener { // redeliver inflight messages - for (MessageReference ref : sub.remove(context, this)) { + boolean markAsRedelivered = false; + MessageReference lastDeliveredRef = null; + List unAckedMessages = sub.remove(context, this); + + // locate last redelivered in unconsumed list (list in delivery rather than seq order) + if (lastDeiveredSequenceId != 0) { + for (MessageReference ref : unAckedMessages) { + if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) { + lastDeliveredRef = ref; + markAsRedelivered = true; + LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId()); + break; + } + } + } + for (MessageReference ref : unAckedMessages) { QueueMessageReference qmr = (QueueMessageReference) ref; if (qmr.getLockOwner() == sub) { qmr.unlock(); - // only increment redelivery if it was delivered or we + // have no delivery information - if (lastDeiveredSequenceId == 0 - || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) { + if (lastDeiveredSequenceId == 0) { qmr.incrementRedeliveryCounter(); + } else { + if (markAsRedelivered) { + qmr.incrementRedeliveryCounter(); + } + if (ref == lastDeliveredRef) { + // all that follow were not redelivered + markAsRedelivered = false; + } } } redeliveredWaitingDispatch.add(qmr); diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 48d4ff0433..40a3fbd983 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -807,7 +807,7 @@ public class JMSConsumerTest extends JmsTestSupport { Message msg = redispatchConsumer.receive(1000); assertNotNull(msg); - assertTrue(msg.getJMSRedelivered()); + assertTrue("redelivered flag set", msg.getJMSRedelivered()); assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); msg = redispatchConsumer.receive(1000); diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java index 6c0b5e6154..faa25b6c1f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java @@ -98,7 +98,7 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport { session.commit(); } else { LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); - assertFalse(msg.getJMSRedelivered()); + assertFalse("should not have redelivery flag set, id: " + msg.getJMSMessageID(), msg.getJMSRedelivered()); session.rollback(); } } diff --git a/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java b/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java new file mode 100644 index 0000000000..6f85099eb8 --- /dev/null +++ b/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bugs; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.jms.listener.DefaultMessageListenerContainer; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; + + +public class AMQ1730Test extends TestCase { + + private static final Log log = LogFactory.getLog(AMQ1730Test.class); + + + private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount"; + + + BrokerService brokerService; + + private static final int MESSAGE_COUNT = 250; + + public AMQ1730Test() { + super(); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + brokerService = new BrokerService(); + brokerService.addConnector("tcp://localhost:0"); + brokerService.setUseJmx(false); + brokerService.start(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + brokerService.stop(); + } + + public void testRedelivery() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + brokerService.getTransportConnectors().get(0).getConnectUri().toString() + "?jms.prefetchPolicy.queuePrefetch=100"); + + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue.test"); + + MessageProducer producer = session.createProducer(queue); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + log.info("Sending message " + i); + TextMessage message = session.createTextMessage("Message " + i); + producer.send(message); + } + + producer.close(); + session.close(); + connection.stop(); + connection.close(); + + final CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT); + + final ValueHolder messageRedelivered = new ValueHolder(false); + + DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); + messageListenerContainer.setConnectionFactory(connectionFactory); + messageListenerContainer.setDestination(queue); + messageListenerContainer.setAutoStartup(false); + messageListenerContainer.setConcurrentConsumers(1); + messageListenerContainer.setMaxConcurrentConsumers(16); + messageListenerContainer.setMaxMessagesPerTask(10); + messageListenerContainer.setReceiveTimeout(10000); + messageListenerContainer.setRecoveryInterval(5000); + messageListenerContainer.setAcceptMessagesWhileStopping(false); + messageListenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE); + messageListenerContainer.setSessionTransacted(false); + messageListenerContainer.setMessageListener(new MessageListener() { + + + public void onMessage(Message message) { + if (!(message instanceof TextMessage)) { + throw new RuntimeException(); + } + try { + TextMessage textMessage = (TextMessage) message; + String text = textMessage.getText(); + int messageDeliveryCount = message.getIntProperty(JMSX_DELIVERY_COUNT); + if (messageDeliveryCount > 1) { + messageRedelivered.set(true); + } + log.info("[Count down latch: " + countDownLatch.getCount() + "][delivery count: " + messageDeliveryCount + "] - " + "Received message with id: " + message.getJMSMessageID() + " with text: " + text); + + } catch (JMSException e) { + e.printStackTrace(); + } + finally { + countDownLatch.countDown(); + } + } + + }); + messageListenerContainer.afterPropertiesSet(); + + messageListenerContainer.start(); + + countDownLatch.await(); + messageListenerContainer.stop(); + messageListenerContainer.destroy(); + + assertFalse("no message has redelivery > 1", messageRedelivered.get()); + } + + private class ValueHolder { + + private T value; + + public ValueHolder(T value) { + super(); + this.value = value; + } + + void set(T value) { + this.value = value; + } + + T get() { + return value; + } + + } + +}