start consumers/producers

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@651619 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-25 14:49:58 +00:00
parent 803abe48a4
commit 8bf2d78cb2
1 changed files with 8 additions and 12 deletions

View File

@ -31,7 +31,6 @@ import org.apache.commons.logging.LogFactory;
public class SimpleNetworkTest extends SimpleTopicTest { public class SimpleNetworkTest extends SimpleTopicTest {
private static final Log LOG = LogFactory.getLog(SimpleNetworkTest.class); private static final Log LOG = LogFactory.getLog(SimpleNetworkTest.class);
//protected String consumerBindAddress = "tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000,tcp://localhost:61617?wireFormat.maxInactivityDuration=1000";
protected String consumerBindAddress = "tcp://localhost:61616"; protected String consumerBindAddress = "tcp://localhost:61616";
protected String producerBindAddress = "tcp://localhost:61617"; protected String producerBindAddress = "tcp://localhost:61617";
protected static final String CONSUMER_BROKER_NAME = "Consumer"; protected static final String CONSUMER_BROKER_NAME = "Consumer";
@ -49,10 +48,7 @@ public class SimpleNetworkTest extends SimpleTopicTest {
if (producerBroker == null) { if (producerBroker == null) {
producerBroker = createProducerBroker(producerBindAddress); producerBroker = createProducerBroker(producerBindAddress);
} }
//consumerFactory = createConnectionFactory("vm://"+CONSUMER_BROKER_NAME); consumerFactory = createConnectionFactory(consumerBindAddress);
//producerFactory = createConnectionFactory("vm://"+ PRODUCER_BROKER_NAME);
consumerFactory = createConnectionFactory("failover://("+consumerBindAddress + "," + producerBindAddress +")?randomize=false&backup=false");
//consumerFactory = createConnectionFactory("failover://("+consumerBindAddress+")?backup=true");
consumerFactory.setDispatchAsync(true); consumerFactory.setDispatchAsync(true);
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setQueuePrefetch(100); policy.setQueuePrefetch(100);
@ -63,23 +59,23 @@ public class SimpleNetworkTest extends SimpleTopicTest {
producers = new PerfProducer[numberofProducers*numberOfDestinations]; producers = new PerfProducer[numberofProducers*numberOfDestinations];
consumers = new PerfConsumer[numberOfConsumers*numberOfDestinations]; consumers = new PerfConsumer[numberOfConsumers*numberOfDestinations];
int consumerCount = 0;
int producerCount = 0;
for (int k =0; k < numberOfDestinations;k++) { for (int k =0; k < numberOfDestinations;k++) {
Destination destination = createDestination(session, destinationName+":"+k); Destination destination = createDestination(session, destinationName+":"+k);
LOG.info("Testing against destination: " + destination); LOG.info("Testing against destination: " + destination);
for (int i = 0; i < numberOfConsumers; i++) { for (int i = 0; i < numberOfConsumers; i++) {
consumers[consumerCount] = createConsumer(consumerFactory, destination, consumerCount); consumers[i] = createConsumer(consumerFactory, destination, i);
consumers[consumerCount].setSleepDuration(consumerSleepDuration); consumers[i].setSleepDuration(consumerSleepDuration);
consumerCount++; consumers[i].start();
} }
for (int i = 0; i < numberofProducers; i++) { for (int i = 0; i < numberofProducers; i++) {
array = new byte[playloadSize]; array = new byte[playloadSize];
for (int j = i; j < array.length; j++) { for (int j = i; j < array.length; j++) {
array[j] = (byte)j; array[j] = (byte)j;
} }
producers[producerCount] = createProducer(producerFactory, destination, i, array); producers[i] = createProducer(producerFactory, destination, i, array);
producerCount++; producers[i].start();
} }
} }
con.close(); con.close();