finese tuning

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@395611 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-04-20 14:58:50 +00:00
parent 6575f2d22d
commit fa8b889906
6 changed files with 38 additions and 14 deletions

View File

@ -625,7 +625,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if(optimizeAcknowledge){ if(optimizeAcknowledge){
if(deliveryingAcknowledgements.compareAndSet(false,true)){ if(deliveryingAcknowledgements.compareAndSet(false,true)){
ackCounter++; ackCounter++;
if(ackCounter>=(info.getCurrentPrefetchSize()*.50)){ if(ackCounter>=(info.getCurrentPrefetchSize()*.65)){
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack); session.asyncSendPacket(ack);
ackCounter=0; ackCounter=0;

View File

@ -32,6 +32,7 @@ public class ActiveMQPrefetchPolicy implements Serializable {
private int queueBrowserPrefetch; private int queueBrowserPrefetch;
private int topicPrefetch; private int topicPrefetch;
private int durableTopicPrefetch; private int durableTopicPrefetch;
private int optimizeDurableTopicPrefetch;
private int inputStreamPrefetch; private int inputStreamPrefetch;
private int maximumPendingMessageLimit; private int maximumPendingMessageLimit;
@ -43,6 +44,7 @@ public class ActiveMQPrefetchPolicy implements Serializable {
this.queueBrowserPrefetch = 500; this.queueBrowserPrefetch = 500;
this.topicPrefetch = MAX_PREFETCH_SIZE; this.topicPrefetch = MAX_PREFETCH_SIZE;
this.durableTopicPrefetch = 100; this.durableTopicPrefetch = 100;
this.optimizeDurableTopicPrefetch=1000;
this.inputStreamPrefetch = 100; this.inputStreamPrefetch = 100;
} }
@ -102,6 +104,20 @@ public class ActiveMQPrefetchPolicy implements Serializable {
this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch); this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
} }
/**
* @return Returns the optimizeDurableTopicPrefetch.
*/
public int getOptimizeDurableTopicPrefetch(){
return optimizeDurableTopicPrefetch;
}
/**
* @param optimizeDurableTopicPrefetch The optimizeDurableTopicPrefetch to set.
*/
public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch){
this.optimizeDurableTopicPrefetch=optimizeAcknowledgePrefetch;
}
public int getMaximumPendingMessageLimit() { public int getMaximumPendingMessageLimit() {
return maximumPendingMessageLimit; return maximumPendingMessageLimit;
} }
@ -129,6 +145,7 @@ public class ActiveMQPrefetchPolicy implements Serializable {
this.queuePrefetch=i; this.queuePrefetch=i;
this.topicPrefetch=i; this.topicPrefetch=i;
this.inputStreamPrefetch=1; this.inputStreamPrefetch=1;
this.optimizeDurableTopicPrefetch=i;
} }
public int getInputStreamPrefetch() { public int getInputStreamPrefetch() {
@ -138,4 +155,6 @@ public class ActiveMQPrefetchPolicy implements Serializable {
public void setInputStreamPrefetch(int inputStreamPrefetch) { public void setInputStreamPrefetch(int inputStreamPrefetch) {
this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch); this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
} }
} }

View File

@ -1057,14 +1057,17 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* if the message selector is invalid. * if the message selector is invalid.
* @since 1.1 * @since 1.1
*/ */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) public TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean noLocal)
throws JMSException { throws JMSException{
checkClosed(); checkClosed();
connection.checkClientIDWasManuallySpecified(); connection.checkClientIDWasManuallySpecified();
ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); ActiveMQPrefetchPolicy prefetchPolicy=this.connection.getPrefetchPolicy();
return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation int prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy
.transformDestination(topic), name, messageSelector, prefetchPolicy.getDurableTopicPrefetch(), .getOptimizeDurableTopicPrefetch():prefetchPolicy.getDurableTopicPrefetch();
prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); int maxPrendingLimit=prefetchPolicy.getMaximumPendingMessageLimit();
return new ActiveMQTopicSubscriber(this,getNextConsumerId(),ActiveMQMessageTransformation
.transformDestination(topic),name,messageSelector,prefetch,maxPrendingLimit,noLocal,false,
asyncDispatch);
} }
/** /**

View File

@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory;
*/ */
final class DataManager{ final class DataManager{
private static final Log log=LogFactory.getLog(DataManager.class); private static final Log log=LogFactory.getLog(DataManager.class);
protected static long MAX_FILE_LENGTH=1024*1024*16; protected static long MAX_FILE_LENGTH=1024*1024*32;
private final File dir; private final File dir;
private final String prefix; private final String prefix;
private StoreDataReader reader; private StoreDataReader reader;

View File

@ -281,7 +281,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
session.createTextMessage("Second Message") session.createTextMessage("Second Message")
}; };
// lets consume any outstanding messages from previous test runs // lets consume any outstanding messages from prev test runs
while (consumer.receive(1000) != null) { while (consumer.receive(1000) != null) {
} }
session.commit(); session.commit();
@ -306,7 +306,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
assertEquals(outbound[1], message); assertEquals(outbound[1], message);
session.rollback(); session.rollback();
// Consume again.. the previous message should // Consume again.. the prev message should
// get redelivered. // get redelivered.
message = consumer.receive(5000); message = consumer.receive(5000);
assertNotNull("Should have re-received the message again!", message); assertNotNull("Should have re-received the message again!", message);
@ -329,7 +329,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
session.createTextMessage("Second Message") session.createTextMessage("Second Message")
}; };
// lets consume any outstanding messages from previous test runs // lets consume any outstanding messages from prev test runs
while (consumer.receive(1000) != null) { while (consumer.receive(1000) != null) {
} }
session.commit(); session.commit();
@ -351,7 +351,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
assertEquals(outbound[1], message); assertEquals(outbound[1], message);
session.rollback(); session.rollback();
// Consume again.. the previous message should // Consume again.. the prev message should
// get redelivered. // get redelivered.
message = consumer.receive(5000); message = consumer.receive(5000);
assertNotNull("Should have re-received the first message again!", message); assertNotNull("Should have re-received the first message again!", message);
@ -445,7 +445,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
session.createTextMessage("Second Message") session.createTextMessage("Second Message")
}; };
// lets consume any outstanding messages from previous test runs // lets consume any outstanding messages from prev test runs
while (consumer.receiveNoWait() != null) { while (consumer.receiveNoWait() != null) {
} }
@ -529,7 +529,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
protected void reconnect() throws JMSException { protected void reconnect() throws JMSException {
if (connection != null) { if (connection != null) {
// Close the previous connection. // Close the prev connection.
connection.close(); connection.close();
} }
session = null; session = null;
@ -562,6 +562,7 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
prefetchPolicy.setQueuePrefetch(1); prefetchPolicy.setQueuePrefetch(1);
prefetchPolicy.setTopicPrefetch(1); prefetchPolicy.setTopicPrefetch(1);
prefetchPolicy.setDurableTopicPrefetch(1); prefetchPolicy.setDurableTopicPrefetch(1);
prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
} }
public void testMessageListener() throws Exception { public void testMessageListener() throws Exception {

View File

@ -130,6 +130,7 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag
activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue); activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue); activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue); activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue);
} }
public void tearDown() throws Exception { public void tearDown() throws Exception {