From 97143be5456a6d954b755ad68425a1403b91c493 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Mon, 18 Jan 2010 13:19:27 +0000 Subject: [PATCH] merge -c 898797 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2566, inflight out of sync after delivery to dlq. Also tidy failover transaction test from: https://issues.apache.org/activemq/browse/AMQ-2560 git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@900390 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 11 +++--- .../broker/policy/DeadLetterTest.java | 1 + .../broker/policy/DeadLetterTestSupport.java | 18 +++++++++- .../broker/policy/NoRetryDeadLetterTest.java | 35 +++++++++++++++++++ .../failover/FailoverTransactionTest.java | 12 ++++--- 5 files changed, 67 insertions(+), 10 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 35d6fa58d6..00452923c4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -199,7 +199,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } if (LOG.isTraceEnabled()) { - LOG.info("ack:" + ack); + LOG.trace("ack:" + ack); } synchronized(dispatchLock) { if (ack.isStandardAck()) { @@ -241,7 +241,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription { public void afterRollback() throws Exception { synchronized(dispatchLock) { - node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + if (isSlave()) { + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + } else { + // poisionAck will decrement - otherwise still inflight on client + } } } }); @@ -362,8 +366,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (inAckRange) { sendToDLQ(context, node); node.getRegionDestination().getDestinationStatistics() - .getInflight().increment(); - + .getInflight().decrement(); removeList.add(node); dequeueCounter++; index++; diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java index b4ba8b38c4..c5245313ea 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java @@ -57,6 +57,7 @@ public class DeadLetterTest extends DeadLetterTestSupport { assertMessage(msg, i); assertNotNull("Should be a DLQ message for loop: " + i, msg); } + session.commit(); } protected void consumeAndRollback(int messageCounter) throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java index f4479d34d4..eeb447c24d 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java @@ -31,6 +31,8 @@ import javax.jms.Topic; import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -169,13 +171,15 @@ public abstract class DeadLetterTestSupport extends TestSupport { deliveryMode = DeliveryMode.NON_PERSISTENT; durableSubscriber = false; doTest(); + validateConsumerPrefetch(this.getDestinationString(), 0); } - + public void testDurableQueueMessage() throws Exception { super.topic = false; deliveryMode = DeliveryMode.PERSISTENT; durableSubscriber = false; doTest(); + validateConsumerPrefetch(this.getDestinationString(), 0); } public Destination getDestination() { @@ -184,4 +188,16 @@ public abstract class DeadLetterTestSupport extends TestSupport { } return destination; } + + private void validateConsumerPrefetch(String destination, long expectedCount) { + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { + if (dest.getName().equals(destination)) { + DestinationStatistics stats = dest.getDestinationStatistics(); + LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount()); + assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", + expectedCount, stats.getInflight().getCount()); + } + } + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java new file mode 100644 index 0000000000..cf774096d6 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class NoRetryDeadLetterTest extends DeadLetterTest { + private static final Log LOG = LogFactory.getLog(NoRetryDeadLetterTest.class); + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory connectionFactory = super.createConnectionFactory(); + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setMaximumRedeliveries(0); + connectionFactory.setRedeliveryPolicy(redeliveryPolicy); + return connectionFactory; + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 35ef49ef6c..53d48b0d6d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -467,8 +467,8 @@ public class FailoverTransactionTest { TimeUnit.SECONDS.sleep(7); // should not get a second message as there are two messages and two consumers - // but with failover and unordered connection reinit it can get the second - // message which will have a problem for the ack + // but with failover and unordered connection restore it can get the second + // message which could create a problem for a pending ack msg = consumer1.receive(5000); LOG.info("consumer1 second attempt got message: " + msg); if (msg != null) { @@ -503,10 +503,12 @@ public class FailoverTransactionTest { assertNull("should be nothing left for consumer1", msg); consumerSession1.commit(); - // consumer2 should get other message + // consumer2 should get other message provided consumer1 did not get 2 msg = consumer2.receive(5000); LOG.info("post: from consumer2 received: " + msg); - assertNotNull("got message on consumer2", msg); + if (receivedMessages.size() == 1) { + assertNotNull("got second message on consumer2", msg); + } consumerSession2.commit(); for (Connection c: connections) { @@ -532,7 +534,7 @@ public class FailoverTransactionTest { if (msg == null) { msg = sweeper.receive(5000); } - LOG.info("Received: " + msg); + LOG.info("Sweep received: " + msg); assertNull("no messges left dangling but got: " + msg, msg); connection.close(); }