added ability to have the consumer slow down. also better rate info.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@476279 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-17 19:48:47 +00:00
parent 87f2f502d7
commit d47855ace3
2 changed files with 28 additions and 13 deletions

View File

@ -32,6 +32,8 @@ import javax.jms.Topic;
public class PerfConsumer implements MessageListener{
protected Connection connection;
protected MessageConsumer consumer;
protected long sleepDuration;
protected PerfRate rate=new PerfRate();
public PerfConsumer(ConnectionFactory fac,Destination dest,String consumerName) throws JMSException{
connection=fac.createConnection();
@ -62,5 +64,18 @@ public class PerfConsumer implements MessageListener{
}
public void onMessage(Message msg){
rate.increment();
try {
if( sleepDuration!=0 ) {
Thread.sleep(sleepDuration);
}
} catch (InterruptedException e) {
}
}
public synchronized long getSleepDuration() {
return sleepDuration;
}
public synchronized void setSleepDuration(long sleepDuration) {
this.sleepDuration = sleepDuration;
}
}

View File

@ -52,6 +52,7 @@ public class SimpleTopicTest extends TestCase{
protected byte[] array=null;
protected ConnectionFactory factory;
protected Destination destination;
protected long CONSUMER_SLEEP_DURATION = 0;
/**
* Sets up a test where the producer and consumer have their own connection.
@ -67,11 +68,14 @@ public class SimpleTopicTest extends TestCase{
Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination=createDestination(session,DESTINATION_NAME);
log.info("Testing against destination: "+destination);
con.close();
producers=new PerfProducer[NUMBER_OF_PRODUCERS];
consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
consumers[i]=createConsumer(factory,destination,i);
consumers[i].setSleepDuration(CONSUMER_SLEEP_DURATION);
}
for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
array=new byte[PAYLOAD_SIZE];
@ -128,11 +132,7 @@ public class SimpleTopicTest extends TestCase{
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
// ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?marshal=true");
// ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?marshal=true&wireFormat.cacheEnabled=false");
// cf.setAsyncDispatch(false);
return cf;
return new ActiveMQConnectionFactory(bindAddress);
}
public void testPerformance() throws JMSException, InterruptedException{
@ -160,28 +160,28 @@ public class SimpleTopicTest extends TestCase{
}
protected void dumpProducerRate(){
int count=0;
int totalRate=0;
int totalCount=0;
for(int i=0;i<producers.length;i++){
count+=producers[i].getRate().getRate();
totalRate+=producers[i].getRate().getRate();
totalCount+=consumers[i].getRate().getTotalCount();
}
count=count/producers.length;
log.info("Producer rate = "+count+" msg/sec total count = "+totalCount);
int avgRate = totalRate/producers.length;
log.info("Avg producer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", sent = "+totalCount);
for(int i=0;i<producers.length;i++){
producers[i].getRate().reset();
}
}
protected void dumpConsumerRate(){
int rate=0;
int totalRate=0;
int totalCount=0;
for(int i=0;i<consumers.length;i++){
rate+=consumers[i].getRate().getRate();
totalRate+=consumers[i].getRate().getRate();
totalCount+=consumers[i].getRate().getTotalCount();
}
rate=rate/consumers.length;
log.info("Consumer rate = "+rate+" msg/sec total count = "+totalCount);
int avgRate = totalRate/consumers.length;
log.info("Avg consumer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", received = "+totalCount);
for(int i=0;i<consumers.length;i++){
consumers[i].getRate().reset();
}