From 7f0583dd2c21612acce7ad60d013c2b06a6640e3 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 8 Apr 2008 12:43:36 +0000 Subject: [PATCH] Updated to support multiple destinations git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@645881 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/perf/PerfConsumer.java | 4 +- .../apache/activemq/perf/PerfProducer.java | 12 +++ .../perf/QueueConnectionMemoryTest.java | 2 +- .../perf/SimpleDurableTopicNetworkTest.java | 4 +- .../activemq/perf/SimpleDurableTopicTest.java | 9 +++ .../activemq/perf/SimpleNetworkTest.java | 77 ++++++++++++------- .../SimpleNonPersistentQueueNetworkTest.java | 44 +++++++++-- .../perf/SimpleNonPersistentQueueTest.java | 35 ++++++++- .../apache/activemq/perf/SimpleQueueTest.java | 4 +- .../apache/activemq/perf/SimpleTopicTest.java | 70 +++++++++++------ .../activemq/perf/SlowConsumerTopicTest.java | 13 +--- 11 files changed, 199 insertions(+), 75 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java b/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java index 478eb9e0b7..d2aae7b231 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java @@ -38,7 +38,7 @@ public class PerfConsumer implements MessageListener { protected Connection connection; protected MessageConsumer consumer; protected long sleepDuration; - protected boolean enableAudit = true; + protected boolean enableAudit = false; protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20); protected PerfRate rate = new PerfRate(); @@ -82,7 +82,7 @@ public class PerfConsumer implements MessageListener { if (enableAudit && !this.audit.isInOrder(msg.getJMSMessageID())) { LOG.error("Message out of order!!" + msg); } - if (this.audit.isDuplicate(msg)){ + if (enableAudit && this.audit.isDuplicate(msg)){ LOG.error("Duplicate Message!" + msg); } } catch (JMSException e1) { diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java b/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java index d59f8c4557..070b42e98b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java @@ -38,6 +38,7 @@ public class PerfProducer implements Runnable { private Session session; private final CountDownLatch stopped = new CountDownLatch(1); private boolean running; + private int sleep = 0; public PerfProducer(ConnectionFactory fac, Destination dest, byte[] palyload) throws JMSException { connection = fac.createConnection(); @@ -93,6 +94,9 @@ public class PerfProducer implements Runnable { msg.writeBytes(payload); producer.send(msg); rate.increment(); + if (sleep > 0) { + Thread.sleep(sleep); + } } } catch (Throwable e) { e.printStackTrace(); @@ -101,4 +105,12 @@ public class PerfProducer implements Runnable { } } + public int getSleep() { + return sleep; + } + + public void setSleep(int sleep) { + this.sleep = sleep; + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java index bb98607fc6..ee8df9e304 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java @@ -61,7 +61,7 @@ public class QueueConnectionMemoryTest extends SimpleQueueTest { factory = createConnectionFactory(bindAddress); Connection con = factory.createConnection(); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationName); + Destination destination = createDestination(session, destinationName); con.close(); for (int i = 0; i < 3; i++) { Connection connection = factory.createConnection(); diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java index 3fb2b16da0..3d94a615c6 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java @@ -28,8 +28,8 @@ import org.apache.activemq.network.NetworkConnector; public class SimpleDurableTopicNetworkTest extends SimpleNetworkTest { protected void setUp() throws Exception { - numberofProducers=6; - numberOfConsumers=6; + numberofProducers=60; + numberOfConsumers=60; samepleCount=1000; playloadSize = 1024; super.setUp(); diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java index be7a5c42d3..db79a5aef5 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java @@ -25,6 +25,15 @@ import javax.jms.JMSException; * @version $Revision: 1.3 $ */ public class SimpleDurableTopicTest extends SimpleTopicTest { + + protected void setUp() throws Exception { + numberOfDestinations=6; + numberOfConsumers = 1; + numberofProducers = 1; + samepleCount=1000; + playloadSize = 1024; + super.setUp(); + } protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte payload[]) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); pp.setDeliveryMode(DeliveryMode.PERSISTENT); diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java index e98fc382a6..300ae52557 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java @@ -17,9 +17,11 @@ package org.apache.activemq.perf; import javax.jms.Connection; -import javax.jms.ConnectionFactory; +import javax.jms.Destination; import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.network.NetworkConnector; import org.apache.commons.logging.Log; @@ -29,45 +31,58 @@ import org.apache.commons.logging.LogFactory; public class SimpleNetworkTest extends SimpleTopicTest { private static final Log LOG = LogFactory.getLog(SimpleNetworkTest.class); - protected String consumerBindAddress = "tcp://localhost:61616"; + //protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000,tcp://localhost:61617?wireFormat.maxInactivityDuration=1000"; + protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=2000&socket.tcpNoDelayEnabled=false"; protected String producerBindAddress = "tcp://localhost:61617"; protected static final String CONSUMER_BROKER_NAME = "Consumer"; protected static final String PRODUCER_BROKER_NAME = "Producer"; protected BrokerService consumerBroker; protected BrokerService producerBroker; - protected ConnectionFactory consumerFactory; - protected ConnectionFactory producerFactory; + protected ActiveMQConnectionFactory consumerFactory; + protected ActiveMQConnectionFactory producerFactory; + protected void setUp() throws Exception { if (consumerBroker == null) { - consumerBroker = createConsumerBroker(consumerBindAddress); + // consumerBroker = createConsumerBroker(consumerBindAddress); } if (producerBroker == null) { producerBroker = createProducerBroker(producerBindAddress); } - consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME); - producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME); - //consumerFactory = createConnectionFactory(consumerBindAddress); - //producerFactory = createConnectionFactory(producerBindAddress); + //consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME); + //producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME); + consumerFactory = createConnectionFactory("failover://("+consumerBindAddress + "," + producerBindAddress +")?randomize=false&backup=false"); + //consumerFactory = createConnectionFactory("failover://("+consumerBindAddress+")?backup=true"); + consumerFactory.setDispatchAsync(true); + ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); + policy.setQueuePrefetch(100); + consumerFactory.setPrefetchPolicy(policy); + producerFactory = createConnectionFactory(producerBindAddress); Connection con = consumerFactory.createConnection(); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationName); - LOG.info("Testing against destination: " + destination); - LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s)"); - con.close(); - producers = new PerfProducer[numberofProducers]; - consumers = new PerfConsumer[numberOfConsumers]; - for (int i = 0; i < numberOfConsumers; i++) { - consumers[i] = createConsumer(consumerFactory, destination, i); - consumers[i].setSleepDuration(consumerSleepDuration); - } - for (int i = 0; i < numberofProducers; i++) { - array = new byte[playloadSize]; - for (int j = i; j < array.length; j++) { - array[j] = (byte)j; + + producers = new PerfProducer[numberofProducers*numberOfDestinations]; + consumers = new PerfConsumer[numberOfConsumers*numberOfDestinations]; + int consumerCount = 0; + int producerCount = 0; + for (int k =0; k < numberOfDestinations;k++) { + Destination destination = createDestination(session, destinationName+":"+k); + LOG.info("Testing against destination: " + destination); + for (int i = 0; i < numberOfConsumers; i++) { + consumers[consumerCount] = createConsumer(factory, destination, consumerCount); + consumers[consumerCount].setSleepDuration(consumerSleepDuration); + consumerCount++; + } + for (int i = 0; i < numberofProducers; i++) { + array = new byte[playloadSize]; + for (int j = i; j < array.length; j++) { + array[j] = (byte)j; + } + producers[producerCount] = createProducer(factory, destination, i, array); + producerCount++; } - producers[i] = createProducer(producerFactory, destination, i, array); } + con.close(); } protected void tearDown() throws Exception { @@ -96,6 +111,7 @@ public class SimpleNetworkTest extends SimpleTopicTest { } protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception { + configureBroker(answer); answer.setPersistent(false); answer.setBrokerName(CONSUMER_BROKER_NAME); answer.setDeleteAllMessagesOnStartup(true); @@ -111,14 +127,23 @@ public class SimpleNetworkTest extends SimpleTopicTest { } protected void configureProducerBroker(BrokerService answer,String uri) throws Exception { + configureBroker(answer); answer.setBrokerName(PRODUCER_BROKER_NAME); + answer.setMonitorConnectionSplits(false); + //answer.setSplitSystemUsageForProducersConsumers(true); answer.setPersistent(false); answer.setDeleteAllMessagesOnStartup(true); - NetworkConnector connector = answer.addNetworkConnector("static://"+consumerBindAddress); - connector.setDuplex(true); + NetworkConnector connector = answer.addNetworkConnector("static://tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=2000"); + //connector.setNetworkTTL(3); + //connector.setDynamicOnly(true); + //connector.setDuplex(true); answer.addConnector(uri); answer.setUseShutdownHook(false); } + + protected void configureBroker(BrokerService service) throws Exception{ + + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java index 37de574093..7011a35020 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java @@ -17,23 +17,30 @@ package org.apache.activemq.perf; +import java.util.ArrayList; +import java.util.List; + import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; + public class SimpleNonPersistentQueueNetworkTest extends SimpleNetworkTest { - protected void setUp() throws Exception { - numberOfConsumers = 10; - numberofProducers = 10; + protected void setUp()throws Exception { + numberOfDestinations =20; super.setUp(); } protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); - pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - pp.setTimeToLive(1000); + pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + // pp.setTimeToLive(1000); + //pp.setSleep(1); return pp; } @@ -41,11 +48,36 @@ public class SimpleNonPersistentQueueNetworkTest extends SimpleNetworkTest { PerfConsumer consumer = new PerfConsumer(fac, dest); boolean enableAudit = numberOfConsumers <= 1; System.out.println("Enable Audit = " + enableAudit); - consumer.setEnableAudit(enableAudit); + consumer.setEnableAudit(false); return consumer; } + public void testPerformance() throws JMSException, InterruptedException { + //Thread.sleep(5000); + super.testPerformance(); + } + protected Destination createDestination(Session s, String destinationName) throws JMSException { return s.createQueue(destinationName); + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + answer.setMonitorConnectionSplits(true); + final List policyEntries = new ArrayList(); + final PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024 * 1024 * 100); // Set to 1 MB + entry.setOptimizedDispatch(true); + entry.setProducerFlowControl(true); + entry.setMaxPageSize(10); + entry.setLazyDispatch(false); + policyEntries.add(entry); + + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + answer.setDestinationPolicy(policyMap); + super.configureBroker(answer); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java index 13d489d95b..9836e95e46 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java @@ -16,20 +16,53 @@ */ package org.apache.activemq.perf; +import java.util.ArrayList; +import java.util.List; + import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; + /** * @version $Revision: 1.3 $ */ public class SimpleNonPersistentQueueTest extends SimpleQueueTest { + protected void setUp() throws Exception { + numberOfConsumers = 10; + numberofProducers = 10; + //this.consumerSleepDuration=100; + super.setUp(); + } protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException { PerfProducer pp = new PerfProducer(fac, dest, payload); pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - pp.setTimeToLive(100); + //pp.setTimeToLive(100); return pp; } + + protected void configureBroker(BrokerService answer,String uri) throws Exception { + answer.setPersistent(false); + final List policyEntries = new ArrayList(); + final PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024 * 1024 * 1); // Set to 1 MB + entry.setOptimizedDispatch(true); + entry.setLazyDispatch(true); + policyEntries.add(entry); + + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + answer.setDestinationPolicy(policyMap); + super.configureBroker(answer, uri); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java index 3ae95d5a58..0a9278585b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java @@ -31,9 +31,7 @@ public class SimpleQueueTest extends SimpleTopicTest { } protected void setUp() throws Exception { - numberOfConsumers = 1; - numberofProducers = 2; - this.consumerSleepDuration=0; + super.setUp(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java index ec1470e714..166b75c35d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java @@ -35,8 +35,8 @@ public class SimpleTopicTest extends TestCase { private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class); protected BrokerService broker; - // protected String - // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false"; + protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000"; + //protected String clientURI="tcp://localhost:61616"; protected String bindAddress="tcp://localhost:61616"; //protected String bindAddress = "tcp://localhost:61616"; //protected String bindAddress="vm://localhost?marshal=true"; @@ -46,12 +46,15 @@ public class SimpleTopicTest extends TestCase { protected String destinationName = getClass().getName(); protected int samepleCount = 20; protected long sampleInternal = 10000; - protected int numberOfConsumers = 1; - protected int numberofProducers = 0; + protected int numberOfDestinations=1; + protected int numberOfConsumers = 10; + protected int numberofProducers = 10; + protected int totalNumberOfProducers; + protected int totalNumberOfConsumers; protected int playloadSize = 1024; protected byte[] array; protected ConnectionFactory factory; - protected Destination destination; + protected long consumerSleepDuration=0; /** @@ -63,26 +66,37 @@ public class SimpleTopicTest extends TestCase { if (broker == null) { broker = createBroker(bindAddress); } - factory = createConnectionFactory(bindAddress); + factory = createConnectionFactory(clientURI); Connection con = factory.createConnection(); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationName); - LOG.info("Testing against destination: " + destination); - LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s)"); - con.close(); - producers = new PerfProducer[numberofProducers]; - consumers = new PerfConsumer[numberOfConsumers]; - for (int i = 0; i < numberOfConsumers; i++) { - consumers[i] = createConsumer(factory, destination, i); - consumers[i].setSleepDuration(consumerSleepDuration); - } - for (int i = 0; i < numberofProducers; i++) { - array = new byte[playloadSize]; - for (int j = i; j < array.length; j++) { - array[j] = (byte)j; + + + LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s) per " + numberOfDestinations + " Destination(s)"); + + totalNumberOfConsumers=numberOfConsumers*numberOfDestinations; + totalNumberOfProducers=numberofProducers*numberOfDestinations; + producers = new PerfProducer[totalNumberOfProducers]; + consumers = new PerfConsumer[totalNumberOfConsumers]; + int consumerCount = 0; + int producerCount = 0; + for (int k =0; k < numberOfDestinations;k++) { + Destination destination = createDestination(session, destinationName+":"+k); + LOG.info("Testing against destination: " + destination); + for (int i = 0; i < numberOfConsumers; i++) { + consumers[consumerCount] = createConsumer(factory, destination, consumerCount); + consumers[consumerCount].setSleepDuration(consumerSleepDuration); + consumerCount++; + } + for (int i = 0; i < numberofProducers; i++) { + array = new byte[playloadSize]; + for (int j = i; j < array.length; j++) { + array[j] = (byte)j; + } + producers[producerCount] = createProducer(factory, destination, i, array); + producerCount++; } - producers[i] = createProducer(factory, destination, i, array); } + con.close(); super.setUp(); } @@ -136,10 +150,10 @@ public class SimpleTopicTest extends TestCase { } public void testPerformance() throws JMSException, InterruptedException { - for (int i = 0; i < numberOfConsumers; i++) { + for (int i = 0; i < totalNumberOfConsumers; i++) { consumers[i].start(); } - for (int i = 0; i < numberofProducers; i++) { + for (int i = 0; i < totalNumberOfProducers; i++) { producers[i].start(); } LOG.info("Sampling performance " + samepleCount + " times at a " + sampleInternal + " ms interval."); @@ -148,10 +162,10 @@ public class SimpleTopicTest extends TestCase { dumpProducerRate(); dumpConsumerRate(); } - for (int i = 0; i < numberofProducers; i++) { + for (int i = 0; i < totalNumberOfProducers; i++) { producers[i].stop(); } - for (int i = 0; i < numberOfConsumers; i++) { + for (int i = 0; i < totalNumberOfConsumers; i++) { consumers[i].stop(); } } @@ -159,30 +173,36 @@ public class SimpleTopicTest extends TestCase { protected void dumpProducerRate() { int totalRate = 0; int totalCount = 0; + String producerString="Producers:"; for (int i = 0; i < producers.length; i++) { PerfRate rate = producers[i].getRate().cloneAndReset(); totalRate += rate.getRate(); totalCount += rate.getTotalCount(); + producerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];"; } if (producers != null && producers.length > 0) { int avgRate = totalRate / producers.length; System.out.println("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount); + // System.out.println(producerString); } } protected void dumpConsumerRate() { int totalRate = 0; int totalCount = 0; + String consumerString="Consumers:"; for (int i = 0; i < consumers.length; i++) { PerfRate rate = consumers[i].getRate().cloneAndReset(); totalRate += rate.getRate(); totalCount += rate.getTotalCount(); + consumerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];"; } if (consumers != null && consumers.length > 0) { int avgRate = totalRate / consumers.length; System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount); + System.out.println(consumerString); } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java index c3b99662bd..353fef80d5 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java @@ -33,20 +33,15 @@ import org.springframework.core.io.Resource; public class SlowConsumerTopicTest extends SimpleTopicTest { protected PerfConsumer[] slowConsumers; - protected int numberOfSlowConsumers = 1; - + protected void setUp() throws Exception { - numberOfConsumers = 0; + playloadSize = 10 * 1024; super.setUp(); - slowConsumers = new SlowConsumer[numberOfSlowConsumers]; - for (int i = 0; i < numberOfSlowConsumers; i++) { - slowConsumers[i] = createSlowConsumer(factory, destination, i); - slowConsumers[i].start(); - } } + - protected PerfConsumer createSlowConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { + protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { return new SlowConsumer(fac, dest); }