Ehanced these performance tests so that the producers are running in their own threads.

Also change the dumping of stats to be time based and not based on the number of messages received.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@476189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-17 16:04:57 +00:00
parent 629bc81fb2
commit e0fdce4f01
6 changed files with 108 additions and 60 deletions

View File

@ -17,44 +17,79 @@
*/
package org.apache.activemq.perf;
import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
/**
* @version $Revision: 1.3 $
*/
public class PerfProducer{
public class PerfProducer implements Runnable {
protected Connection connection;
protected MessageProducer producer;
protected PerfRate rate=new PerfRate();
public PerfProducer(ConnectionFactory fac,Destination dest) throws JMSException{
private byte[] payload;
private Session session;
private final CountDownLatch stopped = new CountDownLatch(1);
private boolean running;
public PerfProducer(ConnectionFactory fac,Destination dest, byte[] palyload) throws JMSException{
connection=fac.createConnection();
Session s=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
producer=s.createProducer(dest);
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
producer=session.createProducer(dest);
this.payload = palyload;
}
public void setDeliveryMode(int mode) throws JMSException{
producer.setDeliveryMode(mode);
}
public void start() throws JMSException{
connection.start();
rate.getRate();
}
public void stop() throws JMSException{
connection.stop();
}
public void shutDown() throws JMSException{
connection.close();
}
public void sendMessage(Message msg) throws JMSException{
producer.send(msg);
rate.increment();
}
public PerfRate getRate(){
return rate;
}
synchronized public void start() throws JMSException{
if( !running ) {
running = true;
connection.start();
new Thread(this).start();
rate.reset();
}
}
public void stop() throws JMSException, InterruptedException{
synchronized(this) {
running=false;
}
stopped.await();
connection.stop();
}
synchronized public boolean isRunning() {
return running;
}
public void run() {
try {
while(isRunning()){
BytesMessage msg;
msg=session.createBytesMessage();
msg.writeBytes(payload);
producer.send(msg);
rate.increment();
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
stopped.countDown();
}
}
}

View File

@ -30,26 +30,34 @@ public class PerfRate{
public int getCount(){
return totalCount;
}
public void increment(){
totalCount++;
count++;
}
public void start(){
count=0;
startTime=System.currentTimeMillis();
}
public int getRate(){
long endTime=System.currentTimeMillis();
long totalTime=endTime-startTime;
int result=(int) ((count*1000)/totalTime);
return result;
}
/**
* Resets the rate sampling.
*/
public void reset() {
count=0;
startTime=System.currentTimeMillis();
}
/**
* @return Returns the totalCount.
*/
public int getTotalCount(){
return totalCount;
}
/**
* @param totalCount
* The totalCount to set.

View File

@ -21,14 +21,12 @@ import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @version $Revision: 1.3 $
*/
public class SimpleDurableTopicTest extends SimpleTopicTest{
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number) throws JMSException{
PerfProducer pp=new PerfProducer(fac,dest);
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number, byte payload[]) throws JMSException{
PerfProducer pp=new PerfProducer(fac,dest, payload);
pp.setDeliveryMode(DeliveryMode.PERSISTENT);
return pp;
}

View File

@ -26,8 +26,8 @@ import javax.jms.JMSException;
*/
public class SimpleNonPersistentQueueTest extends SimpleQueueTest{
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number) throws JMSException{
PerfProducer pp=new PerfProducer(fac,dest);
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number, byte[] payload) throws JMSException{
PerfProducer pp=new PerfProducer(fac,dest,payload);
pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return pp;
}

View File

@ -26,8 +26,8 @@ import javax.jms.JMSException;
*/
public class SimpleNonPersistentTopicTest extends SimpleTopicTest{
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number) throws JMSException{
PerfProducer pp = new PerfProducer(fac,dest);
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number, byte[] payload) throws JMSException{
PerfProducer pp = new PerfProducer(fac,dest,payload);
pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return pp;
}

View File

@ -17,12 +17,12 @@
*/
package org.apache.activemq.perf;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
@ -42,11 +42,11 @@ public class SimpleTopicTest extends TestCase{
protected PerfProducer[] producers;
protected PerfConsumer[] consumers;
protected String DESTINATION_NAME=getClass().toString();
protected int SAMPLE_COUNT = 10;
protected long SAMPLE_INTERVAL = 2000;
protected int NUMBER_OF_CONSUMERS=1;
protected int NUMBER_OF_PRODUCERS=1;
protected BytesMessage payload;
protected int PAYLOAD_SIZE=1024;
protected int MESSAGE_COUNT=100000;
protected byte[] array=null;
protected ConnectionFactory factory;
protected Destination destination;
@ -60,26 +60,23 @@ public class SimpleTopicTest extends TestCase{
if(broker==null){
broker=createBroker();
}
array=new byte[PAYLOAD_SIZE];
for(int i=0;i<array.length;i++){
array[i]=(byte) i;
}
factory=createConnectionFactory();
Connection con=factory.createConnection();
Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
payload=session.createBytesMessage();
payload.writeBytes(array);
destination=createDestination(session,DESTINATION_NAME);
con.close();
producers=new PerfProducer[NUMBER_OF_PRODUCERS];
consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
consumers[i]=createConsumer(factory,destination,i);
consumers[i].start();
}
for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
producers[i]=createProducer(factory,destination,i);
producers[i].start();
array=new byte[PAYLOAD_SIZE];
for(int j=i;j<array.length;j++){
array[j]=(byte) j;
}
producers[i]=createProducer(factory,destination,i,array);
}
super.setUp();
}
@ -114,8 +111,8 @@ public class SimpleTopicTest extends TestCase{
return answer;
}
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number) throws JMSException{
return new PerfProducer(fac,dest);
protected PerfProducer createProducer(ConnectionFactory fac,Destination dest,int number, byte[] payload) throws JMSException{
return new PerfProducer(fac,dest,payload);
}
protected PerfConsumer createConsumer(ConnectionFactory fac,Destination dest,int number) throws JMSException{
@ -136,17 +133,27 @@ public class SimpleTopicTest extends TestCase{
return cf;
}
public void testPerformance() throws JMSException{
for(int i=0;i<MESSAGE_COUNT;i++){
if(i%10000==0){
public void testPerformance() throws JMSException, InterruptedException{
for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
consumers[i].start();
}
for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
producers[i].start();
}
log.info("Sampling performance "+SAMPLE_COUNT+" times at a "+SAMPLE_INTERVAL+" ms interval.");
for(int i=0; i < SAMPLE_COUNT; i++){
Thread.sleep(SAMPLE_INTERVAL);
dumpProducerRate();
dumpConsumerRate();
}
payload.clearBody();
payload.writeBytes(array);
for(int k=0;k<producers.length;k++){
producers[k].sendMessage(payload);
for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
producers[i].stop();
}
for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
consumers[i].stop();
}
}
@ -160,21 +167,21 @@ public class SimpleTopicTest extends TestCase{
count=count/producers.length;
log.info("Producer rate = "+count+" msg/sec total count = "+totalCount);
for(int i=0;i<producers.length;i++){
producers[i].getRate().start();
producers[i].getRate().reset();
}
}
protected void dumpConsumerRate(){
int count=0;
int rate=0;
int totalCount=0;
for(int i=0;i<consumers.length;i++){
count+=consumers[i].getRate().getRate();
rate+=consumers[i].getRate().getRate();
totalCount+=consumers[i].getRate().getTotalCount();
}
count=count/consumers.length;
log.info("Consumer rate = "+count+" msg/sec total count = "+totalCount);
rate=rate/consumers.length;
log.info("Consumer rate = "+rate+" msg/sec total count = "+totalCount);
for(int i=0;i<consumers.length;i++){
consumers[i].getRate().start();
consumers[i].getRate().reset();
}
}
}