From 975fc55b2ae9de9ed630f69812914dc8267655df Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 20 May 2011 16:37:39 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3332 - Make optimizeAckTimeout configurable to allow for different network latencies. Added attribute optimizeAcknowledgeTimeOut to ActiveMQConnectionFactory and ActiveMQConnection to make this configurble. A value of 0 disables, default stays at 300ms git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1125454 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 13 +++ .../activemq/ActiveMQConnectionFactory.java | 13 +++ .../activemq/ActiveMQMessageConsumer.java | 7 +- .../org/apache/activemq/OptimizedAckTest.java | 101 ++++++++++++++++++ 4 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index d25962a060..4705bc46d6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -138,6 +138,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean objectMessageSerializationDefered; private boolean useAsyncSend; private boolean optimizeAcknowledge; + private long optimizeAcknowledgeTimeOut = 0; private boolean nestedMapAndListEnabled = true; private boolean useRetroactiveConsumer; private boolean exclusiveConsumer; @@ -1620,6 +1621,18 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.optimizeAcknowledge = optimizeAcknowledge; } + /** + * The max time in milliseconds between optimized ack batches + * @param optimizeAcknowledgeTimeOut + */ + public void setOptimizeAcknowledgeTimeOut(int optimizeAcknowledgeTimeOut) { + this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; + } + + public long getOptimizeAcknowledgeTimeOut() { + return optimizeAcknowledgeTimeOut; + } + public long getWarnAboutUnstartedConnectionTimeout() { return warnAboutUnstartedConnectionTimeout; } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 54b5e3cf42..a121678d9d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -94,6 +94,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private boolean disableTimeStampsByDefault; private boolean optimizedMessageDispatch = true; + private long optimizeAcknowledgeTimeOut = 300; private boolean copyMessageOnSend = true; private boolean useCompression; private boolean objectMessageSerializationDefered; @@ -803,6 +804,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne this.optimizeAcknowledge = optimizeAcknowledge; } + /** + * The max time in milliseconds between optimized ack batches + * @param optimizeAcknowledgeTimeOut + */ + public void setOptimizeAcknowledgeTimeOut(int optimizeAcknowledgeTimeOut) { + this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; + } + + public long getOptimizeAcknowledgeTimeOut() { + return optimizeAcknowledgeTimeOut; + } + public boolean isNestedMapAndListEnabled() { return nestedMapAndListEnabled; } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 1a1b0c9ccf..77a54019a8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -150,7 +150,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private IOException failureError; private long optimizeAckTimestamp = System.currentTimeMillis(); - private final long optimizeAckTimeout = 300; + private long optimizeAcknowledgeTimeOut = 0; private long failoverRedeliveryWaitPeriod = 0; /** @@ -244,6 +244,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !info.isBrowser(); + if (this.optimizeAcknowledge) { + this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut(); + } this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); if (messageListener != null) { @@ -855,7 +858,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; - if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) { + if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); diff --git a/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java b/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java new file mode 100644 index 0000000000..182041510b --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.util.Wait; + +public class OptimizedAckTest extends TestSupport { + + private ActiveMQConnection connection; + + protected void setUp() throws Exception { + super.setUp(); + connection = (ActiveMQConnection) createConnection(); + connection.setOptimizeAcknowledge(true); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(10); + connection.setPrefetchPolicy(prefetchPolicy); + } + + protected void tearDown() throws Exception { + connection.close(); + super.tearDown(); + } + + public void testReceivedMessageStillInflight() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + for (int i=0; i<10; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } + + final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); + MessageConsumer consumer = session.createConsumer(queue); + for (int i=0; i<10; i++) { + javax.jms.Message msg = consumer.receive(4000); + assertNotNull(msg); + if (i<7) { + assertEquals("all prefetch is still in flight", 10, regionBroker.getDestinationStatistics().getInflight().getCount()); + } else { + assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } + } + } + + + public void testVerySlowReceivedMessageStillInflight() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.setOptimizeAcknowledgeTimeOut(0); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + for (int i=0; i<10; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } + + final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); + MessageConsumer consumer = session.createConsumer(queue); + for (int i=0; i<10; i++) { + Thread.sleep(400); + javax.jms.Message msg = consumer.receive(4000); + assertNotNull(msg); + if (i<7) { + assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); + } else { + assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } + } + } +}