resolve https://issues.apache.org/activemq/browse/AMQ-2566, inflight out of sync after delivery to dlq. Also tidy failover transaction test from: https://issues.apache.org/activemq/browse/AMQ-2560

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@898797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-01-13 15:18:24 +00:00
parent db6827346b
commit 80f7e38807
5 changed files with 67 additions and 10 deletions

View File

@ -199,7 +199,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.info("ack:" + ack); LOG.trace("ack:" + ack);
} }
synchronized(dispatchLock) { synchronized(dispatchLock) {
if (ack.isStandardAck()) { if (ack.isStandardAck()) {
@ -241,7 +241,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
synchronized(dispatchLock) { synchronized(dispatchLock) {
if (isSlave()) {
node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
} else {
// poisionAck will decrement - otherwise still inflight on client
}
} }
} }
}); });
@ -362,8 +366,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (inAckRange) { if (inAckRange) {
sendToDLQ(context, node); sendToDLQ(context, node);
node.getRegionDestination().getDestinationStatistics() node.getRegionDestination().getDestinationStatistics()
.getInflight().increment(); .getInflight().decrement();
removeList.add(node); removeList.add(node);
dequeueCounter++; dequeueCounter++;
index++; index++;

View File

@ -57,6 +57,7 @@ public class DeadLetterTest extends DeadLetterTestSupport {
assertMessage(msg, i); assertMessage(msg, i);
assertNotNull("Should be a DLQ message for loop: " + i, msg); assertNotNull("Should be a DLQ message for loop: " + i, msg);
} }
session.commit();
} }
protected void consumeAndRollback(int messageCounter) throws Exception { protected void consumeAndRollback(int messageCounter) throws Exception {

View File

@ -31,6 +31,8 @@ import javax.jms.Topic;
import org.apache.activemq.TestSupport; import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
@ -169,6 +171,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
deliveryMode = DeliveryMode.NON_PERSISTENT; deliveryMode = DeliveryMode.NON_PERSISTENT;
durableSubscriber = false; durableSubscriber = false;
doTest(); doTest();
validateConsumerPrefetch(this.getDestinationString(), 0);
} }
public void testDurableQueueMessage() throws Exception { public void testDurableQueueMessage() throws Exception {
@ -176,6 +179,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
deliveryMode = DeliveryMode.PERSISTENT; deliveryMode = DeliveryMode.PERSISTENT;
durableSubscriber = false; durableSubscriber = false;
doTest(); doTest();
validateConsumerPrefetch(this.getDestinationString(), 0);
} }
public Destination getDestination() { public Destination getDestination() {
@ -184,4 +188,16 @@ public abstract class DeadLetterTestSupport extends TestSupport {
} }
return destination; return destination;
} }
private void validateConsumerPrefetch(String destination, long expectedCount) {
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
if (dest.getName().equals(destination)) {
DestinationStatistics stats = dest.getDestinationStatistics();
LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches",
expectedCount, stats.getInflight().getCount());
}
}
}
} }

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class NoRetryDeadLetterTest extends DeadLetterTest {
private static final Log LOG = LogFactory.getLog(NoRetryDeadLetterTest.class);
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory connectionFactory = super.createConnectionFactory();
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(0);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return connectionFactory;
}
}

View File

@ -467,8 +467,8 @@ public class FailoverTransactionTest {
TimeUnit.SECONDS.sleep(7); TimeUnit.SECONDS.sleep(7);
// should not get a second message as there are two messages and two consumers // should not get a second message as there are two messages and two consumers
// but with failover and unordered connection reinit it can get the second // but with failover and unordered connection restore it can get the second
// message which will have a problem for the ack // message which could create a problem for a pending ack
msg = consumer1.receive(5000); msg = consumer1.receive(5000);
LOG.info("consumer1 second attempt got message: " + msg); LOG.info("consumer1 second attempt got message: " + msg);
if (msg != null) { if (msg != null) {
@ -503,10 +503,12 @@ public class FailoverTransactionTest {
assertNull("should be nothing left for consumer1", msg); assertNull("should be nothing left for consumer1", msg);
consumerSession1.commit(); consumerSession1.commit();
// consumer2 should get other message // consumer2 should get other message provided consumer1 did not get 2
msg = consumer2.receive(5000); msg = consumer2.receive(5000);
LOG.info("post: from consumer2 received: " + msg); LOG.info("post: from consumer2 received: " + msg);
assertNotNull("got message on consumer2", msg); if (receivedMessages.size() == 1) {
assertNotNull("got second message on consumer2", msg);
}
consumerSession2.commit(); consumerSession2.commit();
for (Connection c: connections) { for (Connection c: connections) {
@ -532,7 +534,7 @@ public class FailoverTransactionTest {
if (msg == null) { if (msg == null) {
msg = sweeper.receive(5000); msg = sweeper.receive(5000);
} }
LOG.info("Received: " + msg); LOG.info("Sweep received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg); assertNull("no messges left dangling but got: " + msg, msg);
connection.close(); connection.close();
} }