diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 5dccc56d86..b4e59f48df 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -625,7 +625,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if(optimizeAcknowledge){ if(deliveryingAcknowledgements.compareAndSet(false,true)){ ackCounter++; - if(ackCounter>=(info.getCurrentPrefetchSize()*.50)){ + if(ackCounter>=(info.getCurrentPrefetchSize()*.65)){ MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); session.asyncSendPacket(ack); ackCounter=0; diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java index dfd4af94b7..cf776f112f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java @@ -32,6 +32,7 @@ public class ActiveMQPrefetchPolicy implements Serializable { private int queueBrowserPrefetch; private int topicPrefetch; private int durableTopicPrefetch; + private int optimizeDurableTopicPrefetch; private int inputStreamPrefetch; private int maximumPendingMessageLimit; @@ -43,6 +44,7 @@ public class ActiveMQPrefetchPolicy implements Serializable { this.queueBrowserPrefetch = 500; this.topicPrefetch = MAX_PREFETCH_SIZE; this.durableTopicPrefetch = 100; + this.optimizeDurableTopicPrefetch=1000; this.inputStreamPrefetch = 100; } @@ -102,6 +104,20 @@ public class ActiveMQPrefetchPolicy implements Serializable { this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch); } + /** + * @return Returns the optimizeDurableTopicPrefetch. + */ + public int getOptimizeDurableTopicPrefetch(){ + return optimizeDurableTopicPrefetch; + } + + /** + * @param optimizeDurableTopicPrefetch The optimizeDurableTopicPrefetch to set. + */ + public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch){ + this.optimizeDurableTopicPrefetch=optimizeAcknowledgePrefetch; + } + public int getMaximumPendingMessageLimit() { return maximumPendingMessageLimit; } @@ -129,6 +145,7 @@ public class ActiveMQPrefetchPolicy implements Serializable { this.queuePrefetch=i; this.topicPrefetch=i; this.inputStreamPrefetch=1; + this.optimizeDurableTopicPrefetch=i; } public int getInputStreamPrefetch() { @@ -138,4 +155,6 @@ public class ActiveMQPrefetchPolicy implements Serializable { public void setInputStreamPrefetch(int inputStreamPrefetch) { this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch); } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 0a1d1da240..ebf0a2119b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1057,14 +1057,17 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * if the message selector is invalid. * @since 1.1 */ - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException { + public TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean noLocal) + throws JMSException{ checkClosed(); connection.checkClientIDWasManuallySpecified(); - ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); - return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation - .transformDestination(topic), name, messageSelector, prefetchPolicy.getDurableTopicPrefetch(), - prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); + ActiveMQPrefetchPolicy prefetchPolicy=this.connection.getPrefetchPolicy(); + int prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy + .getOptimizeDurableTopicPrefetch():prefetchPolicy.getDurableTopicPrefetch(); + int maxPrendingLimit=prefetchPolicy.getMaximumPendingMessageLimit(); + return new ActiveMQTopicSubscriber(this,getNextConsumerId(),ActiveMQMessageTransformation + .transformDestination(topic),name,messageSelector,prefetch,maxPrendingLimit,noLocal,false, + asyncDispatch); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java index c51b722280..1733e7d2cb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java @@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory; */ final class DataManager{ private static final Log log=LogFactory.getLog(DataManager.class); - protected static long MAX_FILE_LENGTH=1024*1024*16; + protected static long MAX_FILE_LENGTH=1024*1024*32; private final File dir; private final String prefix; private StoreDataReader reader; diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java index 0f70a0df50..08129e95a2 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java @@ -281,7 +281,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M session.createTextMessage("Second Message") }; - // lets consume any outstanding messages from previous test runs + // lets consume any outstanding messages from prev test runs while (consumer.receive(1000) != null) { } session.commit(); @@ -306,7 +306,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M assertEquals(outbound[1], message); session.rollback(); - // Consume again.. the previous message should + // Consume again.. the prev message should // get redelivered. message = consumer.receive(5000); assertNotNull("Should have re-received the message again!", message); @@ -329,7 +329,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M session.createTextMessage("Second Message") }; - // lets consume any outstanding messages from previous test runs + // lets consume any outstanding messages from prev test runs while (consumer.receive(1000) != null) { } session.commit(); @@ -351,7 +351,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M assertEquals(outbound[1], message); session.rollback(); - // Consume again.. the previous message should + // Consume again.. the prev message should // get redelivered. message = consumer.receive(5000); assertNotNull("Should have re-received the first message again!", message); @@ -445,7 +445,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M session.createTextMessage("Second Message") }; - // lets consume any outstanding messages from previous test runs + // lets consume any outstanding messages from prev test runs while (consumer.receiveNoWait() != null) { } @@ -529,7 +529,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M protected void reconnect() throws JMSException { if (connection != null) { - // Close the previous connection. + // Close the prev connection. connection.close(); } session = null; @@ -562,6 +562,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M prefetchPolicy.setQueuePrefetch(1); prefetchPolicy.setTopicPrefetch(1); prefetchPolicy.setDurableTopicPrefetch(1); + prefetchPolicy.setOptimizeDurableTopicPrefetch(1); } public void testMessageListener() throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java index 5fed86165c..782354e425 100755 --- a/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java @@ -130,6 +130,7 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue); activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue); activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue); + activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue); } public void tearDown() throws Exception {