Updated to support multiple destinations

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@645881 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-08 12:43:36 +00:00
parent 836ca817ef
commit 7f0583dd2c
11 changed files with 199 additions and 75 deletions

View File

@ -38,7 +38,7 @@ public class PerfConsumer implements MessageListener {
protected Connection connection;
protected MessageConsumer consumer;
protected long sleepDuration;
protected boolean enableAudit = true;
protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20);
protected PerfRate rate = new PerfRate();
@ -82,7 +82,7 @@ public class PerfConsumer implements MessageListener {
if (enableAudit && !this.audit.isInOrder(msg.getJMSMessageID())) {
LOG.error("Message out of order!!" + msg);
}
if (this.audit.isDuplicate(msg)){
if (enableAudit && this.audit.isDuplicate(msg)){
LOG.error("Duplicate Message!" + msg);
}
} catch (JMSException e1) {

View File

@ -38,6 +38,7 @@ public class PerfProducer implements Runnable {
private Session session;
private final CountDownLatch stopped = new CountDownLatch(1);
private boolean running;
private int sleep = 0;
public PerfProducer(ConnectionFactory fac, Destination dest, byte[] palyload) throws JMSException {
connection = fac.createConnection();
@ -93,6 +94,9 @@ public class PerfProducer implements Runnable {
msg.writeBytes(payload);
producer.send(msg);
rate.increment();
if (sleep > 0) {
Thread.sleep(sleep);
}
}
} catch (Throwable e) {
e.printStackTrace();
@ -101,4 +105,12 @@ public class PerfProducer implements Runnable {
}
}
public int getSleep() {
return sleep;
}
public void setSleep(int sleep) {
this.sleep = sleep;
}
}

View File

@ -61,7 +61,7 @@ public class QueueConnectionMemoryTest extends SimpleQueueTest {
factory = createConnectionFactory(bindAddress);
Connection con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationName);
Destination destination = createDestination(session, destinationName);
con.close();
for (int i = 0; i < 3; i++) {
Connection connection = factory.createConnection();

View File

@ -28,8 +28,8 @@ import org.apache.activemq.network.NetworkConnector;
public class SimpleDurableTopicNetworkTest extends SimpleNetworkTest {
protected void setUp() throws Exception {
numberofProducers=6;
numberOfConsumers=6;
numberofProducers=60;
numberOfConsumers=60;
samepleCount=1000;
playloadSize = 1024;
super.setUp();

View File

@ -25,6 +25,15 @@ import javax.jms.JMSException;
* @version $Revision: 1.3 $
*/
public class SimpleDurableTopicTest extends SimpleTopicTest {
protected void setUp() throws Exception {
numberOfDestinations=6;
numberOfConsumers = 1;
numberofProducers = 1;
samepleCount=1000;
playloadSize = 1024;
super.setUp();
}
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte payload[]) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.PERSISTENT);

View File

@ -17,9 +17,11 @@
package org.apache.activemq.perf;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector;
import org.apache.commons.logging.Log;
@ -29,45 +31,58 @@ import org.apache.commons.logging.LogFactory;
public class SimpleNetworkTest extends SimpleTopicTest {
private static final Log LOG = LogFactory.getLog(SimpleNetworkTest.class);
protected String consumerBindAddress = "tcp://localhost:61616";
//protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000,tcp://localhost:61617?wireFormat.maxInactivityDuration=1000";
protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=2000&socket.tcpNoDelayEnabled=false";
protected String producerBindAddress = "tcp://localhost:61617";
protected static final String CONSUMER_BROKER_NAME = "Consumer";
protected static final String PRODUCER_BROKER_NAME = "Producer";
protected BrokerService consumerBroker;
protected BrokerService producerBroker;
protected ConnectionFactory consumerFactory;
protected ConnectionFactory producerFactory;
protected ActiveMQConnectionFactory consumerFactory;
protected ActiveMQConnectionFactory producerFactory;
protected void setUp() throws Exception {
if (consumerBroker == null) {
consumerBroker = createConsumerBroker(consumerBindAddress);
// consumerBroker = createConsumerBroker(consumerBindAddress);
}
if (producerBroker == null) {
producerBroker = createProducerBroker(producerBindAddress);
}
consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME);
producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME);
//consumerFactory = createConnectionFactory(consumerBindAddress);
//producerFactory = createConnectionFactory(producerBindAddress);
//consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME);
//producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME);
consumerFactory = createConnectionFactory("failover://("+consumerBindAddress + "," + producerBindAddress +")?randomize=false&backup=false");
//consumerFactory = createConnectionFactory("failover://("+consumerBindAddress+")?backup=true");
consumerFactory.setDispatchAsync(true);
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setQueuePrefetch(100);
consumerFactory.setPrefetchPolicy(policy);
producerFactory = createConnectionFactory(producerBindAddress);
Connection con = consumerFactory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationName);
LOG.info("Testing against destination: " + destination);
LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s)");
con.close();
producers = new PerfProducer[numberofProducers];
consumers = new PerfConsumer[numberOfConsumers];
for (int i = 0; i < numberOfConsumers; i++) {
consumers[i] = createConsumer(consumerFactory, destination, i);
consumers[i].setSleepDuration(consumerSleepDuration);
}
for (int i = 0; i < numberofProducers; i++) {
array = new byte[playloadSize];
for (int j = i; j < array.length; j++) {
array[j] = (byte)j;
producers = new PerfProducer[numberofProducers*numberOfDestinations];
consumers = new PerfConsumer[numberOfConsumers*numberOfDestinations];
int consumerCount = 0;
int producerCount = 0;
for (int k =0; k < numberOfDestinations;k++) {
Destination destination = createDestination(session, destinationName+":"+k);
LOG.info("Testing against destination: " + destination);
for (int i = 0; i < numberOfConsumers; i++) {
consumers[consumerCount] = createConsumer(factory, destination, consumerCount);
consumers[consumerCount].setSleepDuration(consumerSleepDuration);
consumerCount++;
}
for (int i = 0; i < numberofProducers; i++) {
array = new byte[playloadSize];
for (int j = i; j < array.length; j++) {
array[j] = (byte)j;
}
producers[producerCount] = createProducer(factory, destination, i, array);
producerCount++;
}
producers[i] = createProducer(producerFactory, destination, i, array);
}
con.close();
}
protected void tearDown() throws Exception {
@ -96,6 +111,7 @@ public class SimpleNetworkTest extends SimpleTopicTest {
}
protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception {
configureBroker(answer);
answer.setPersistent(false);
answer.setBrokerName(CONSUMER_BROKER_NAME);
answer.setDeleteAllMessagesOnStartup(true);
@ -111,14 +127,23 @@ public class SimpleNetworkTest extends SimpleTopicTest {
}
protected void configureProducerBroker(BrokerService answer,String uri) throws Exception {
configureBroker(answer);
answer.setBrokerName(PRODUCER_BROKER_NAME);
answer.setMonitorConnectionSplits(false);
//answer.setSplitSystemUsageForProducersConsumers(true);
answer.setPersistent(false);
answer.setDeleteAllMessagesOnStartup(true);
NetworkConnector connector = answer.addNetworkConnector("static://"+consumerBindAddress);
connector.setDuplex(true);
NetworkConnector connector = answer.addNetworkConnector("static://tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=2000");
//connector.setNetworkTTL(3);
//connector.setDynamicOnly(true);
//connector.setDuplex(true);
answer.addConnector(uri);
answer.setUseShutdownHook(false);
}
protected void configureBroker(BrokerService service) throws Exception{
}
}

View File

@ -17,23 +17,30 @@
package org.apache.activemq.perf;
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
public class SimpleNonPersistentQueueNetworkTest extends SimpleNetworkTest {
protected void setUp() throws Exception {
numberOfConsumers = 10;
numberofProducers = 10;
protected void setUp()throws Exception {
numberOfDestinations =20;
super.setUp();
}
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
pp.setTimeToLive(1000);
pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// pp.setTimeToLive(1000);
//pp.setSleep(1);
return pp;
}
@ -41,11 +48,36 @@ public class SimpleNonPersistentQueueNetworkTest extends SimpleNetworkTest {
PerfConsumer consumer = new PerfConsumer(fac, dest);
boolean enableAudit = numberOfConsumers <= 1;
System.out.println("Enable Audit = " + enableAudit);
consumer.setEnableAudit(enableAudit);
consumer.setEnableAudit(false);
return consumer;
}
public void testPerformance() throws JMSException, InterruptedException {
//Thread.sleep(5000);
super.testPerformance();
}
protected Destination createDestination(Session s, String destinationName) throws JMSException {
return s.createQueue(destinationName);
}
protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false);
answer.setMonitorConnectionSplits(true);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
entry.setMemoryLimit(1024 * 1024 * 100); // Set to 1 MB
entry.setOptimizedDispatch(true);
entry.setProducerFlowControl(true);
entry.setMaxPageSize(10);
entry.setLazyDispatch(false);
policyEntries.add(entry);
final PolicyMap policyMap = new PolicyMap();
policyMap.setPolicyEntries(policyEntries);
answer.setDestinationPolicy(policyMap);
super.configureBroker(answer);
}
}

View File

@ -16,20 +16,53 @@
*/
package org.apache.activemq.perf;
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
/**
* @version $Revision: 1.3 $
*/
public class SimpleNonPersistentQueueTest extends SimpleQueueTest {
protected void setUp() throws Exception {
numberOfConsumers = 10;
numberofProducers = 10;
//this.consumerSleepDuration=100;
super.setUp();
}
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
pp.setTimeToLive(100);
//pp.setTimeToLive(100);
return pp;
}
protected void configureBroker(BrokerService answer,String uri) throws Exception {
answer.setPersistent(false);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
entry.setMemoryLimit(1024 * 1024 * 1); // Set to 1 MB
entry.setOptimizedDispatch(true);
entry.setLazyDispatch(true);
policyEntries.add(entry);
final PolicyMap policyMap = new PolicyMap();
policyMap.setPolicyEntries(policyEntries);
answer.setDestinationPolicy(policyMap);
super.configureBroker(answer, uri);
}
}

View File

@ -31,9 +31,7 @@ public class SimpleQueueTest extends SimpleTopicTest {
}
protected void setUp() throws Exception {
numberOfConsumers = 1;
numberofProducers = 2;
this.consumerSleepDuration=0;
super.setUp();
}

View File

@ -35,8 +35,8 @@ public class SimpleTopicTest extends TestCase {
private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class);
protected BrokerService broker;
// protected String
// bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000";
//protected String clientURI="tcp://localhost:61616";
protected String bindAddress="tcp://localhost:61616";
//protected String bindAddress = "tcp://localhost:61616";
//protected String bindAddress="vm://localhost?marshal=true";
@ -46,12 +46,15 @@ public class SimpleTopicTest extends TestCase {
protected String destinationName = getClass().getName();
protected int samepleCount = 20;
protected long sampleInternal = 10000;
protected int numberOfConsumers = 1;
protected int numberofProducers = 0;
protected int numberOfDestinations=1;
protected int numberOfConsumers = 10;
protected int numberofProducers = 10;
protected int totalNumberOfProducers;
protected int totalNumberOfConsumers;
protected int playloadSize = 1024;
protected byte[] array;
protected ConnectionFactory factory;
protected Destination destination;
protected long consumerSleepDuration=0;
/**
@ -63,26 +66,37 @@ public class SimpleTopicTest extends TestCase {
if (broker == null) {
broker = createBroker(bindAddress);
}
factory = createConnectionFactory(bindAddress);
factory = createConnectionFactory(clientURI);
Connection con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationName);
LOG.info("Testing against destination: " + destination);
LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s)");
con.close();
producers = new PerfProducer[numberofProducers];
consumers = new PerfConsumer[numberOfConsumers];
for (int i = 0; i < numberOfConsumers; i++) {
consumers[i] = createConsumer(factory, destination, i);
consumers[i].setSleepDuration(consumerSleepDuration);
}
for (int i = 0; i < numberofProducers; i++) {
array = new byte[playloadSize];
for (int j = i; j < array.length; j++) {
array[j] = (byte)j;
LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s) per " + numberOfDestinations + " Destination(s)");
totalNumberOfConsumers=numberOfConsumers*numberOfDestinations;
totalNumberOfProducers=numberofProducers*numberOfDestinations;
producers = new PerfProducer[totalNumberOfProducers];
consumers = new PerfConsumer[totalNumberOfConsumers];
int consumerCount = 0;
int producerCount = 0;
for (int k =0; k < numberOfDestinations;k++) {
Destination destination = createDestination(session, destinationName+":"+k);
LOG.info("Testing against destination: " + destination);
for (int i = 0; i < numberOfConsumers; i++) {
consumers[consumerCount] = createConsumer(factory, destination, consumerCount);
consumers[consumerCount].setSleepDuration(consumerSleepDuration);
consumerCount++;
}
for (int i = 0; i < numberofProducers; i++) {
array = new byte[playloadSize];
for (int j = i; j < array.length; j++) {
array[j] = (byte)j;
}
producers[producerCount] = createProducer(factory, destination, i, array);
producerCount++;
}
producers[i] = createProducer(factory, destination, i, array);
}
con.close();
super.setUp();
}
@ -136,10 +150,10 @@ public class SimpleTopicTest extends TestCase {
}
public void testPerformance() throws JMSException, InterruptedException {
for (int i = 0; i < numberOfConsumers; i++) {
for (int i = 0; i < totalNumberOfConsumers; i++) {
consumers[i].start();
}
for (int i = 0; i < numberofProducers; i++) {
for (int i = 0; i < totalNumberOfProducers; i++) {
producers[i].start();
}
LOG.info("Sampling performance " + samepleCount + " times at a " + sampleInternal + " ms interval.");
@ -148,10 +162,10 @@ public class SimpleTopicTest extends TestCase {
dumpProducerRate();
dumpConsumerRate();
}
for (int i = 0; i < numberofProducers; i++) {
for (int i = 0; i < totalNumberOfProducers; i++) {
producers[i].stop();
}
for (int i = 0; i < numberOfConsumers; i++) {
for (int i = 0; i < totalNumberOfConsumers; i++) {
consumers[i].stop();
}
}
@ -159,30 +173,36 @@ public class SimpleTopicTest extends TestCase {
protected void dumpProducerRate() {
int totalRate = 0;
int totalCount = 0;
String producerString="Producers:";
for (int i = 0; i < producers.length; i++) {
PerfRate rate = producers[i].getRate().cloneAndReset();
totalRate += rate.getRate();
totalCount += rate.getTotalCount();
producerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];";
}
if (producers != null && producers.length > 0) {
int avgRate = totalRate / producers.length;
System.out.println("Avg producer rate = " + avgRate
+ " msg/sec | Total rate = " + totalRate + ", sent = "
+ totalCount);
// System.out.println(producerString);
}
}
protected void dumpConsumerRate() {
int totalRate = 0;
int totalCount = 0;
String consumerString="Consumers:";
for (int i = 0; i < consumers.length; i++) {
PerfRate rate = consumers[i].getRate().cloneAndReset();
totalRate += rate.getRate();
totalCount += rate.getTotalCount();
consumerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];";
}
if (consumers != null && consumers.length > 0) {
int avgRate = totalRate / consumers.length;
System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount);
System.out.println(consumerString);
}
}
}

View File

@ -33,20 +33,15 @@ import org.springframework.core.io.Resource;
public class SlowConsumerTopicTest extends SimpleTopicTest {
protected PerfConsumer[] slowConsumers;
protected int numberOfSlowConsumers = 1;
protected void setUp() throws Exception {
numberOfConsumers = 0;
playloadSize = 10 * 1024;
super.setUp();
slowConsumers = new SlowConsumer[numberOfSlowConsumers];
for (int i = 0; i < numberOfSlowConsumers; i++) {
slowConsumers[i] = createSlowConsumer(factory, destination, i);
slowConsumers[i].start();
}
}
protected PerfConsumer createSlowConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
return new SlowConsumer(fac, dest);
}