diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java index 44fa7cdb3f..7852695dfa 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java @@ -40,7 +40,7 @@ import org.apache.commons.logging.LogFactory; public class InactiveDurableTopicTest extends TestCase { private static final transient Log LOG = LogFactory.getLog(InactiveDurableTopicTest.class); - private static final int MESSAGE_COUNT = 1000000; + private static final int MESSAGE_COUNT = 2000; private static final String DEFAULT_PASSWORD = ""; private static final String USERNAME = "testuser"; private static final String CLIENTID = "mytestclient"; diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java b/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java index 457580cc58..3c5a7729b7 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java @@ -26,13 +26,20 @@ import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQMessageAudit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * @version $Revision: 1.3 $ */ public class PerfConsumer implements MessageListener { + private static final Log LOG = LogFactory.getLog(PerfConsumer.class); protected Connection connection; protected MessageConsumer consumer; protected long sleepDuration; + protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20); protected PerfRate rate = new PerfRate(); @@ -71,6 +78,14 @@ public class PerfConsumer implements MessageListener { public void onMessage(Message msg) { rate.increment(); + try { + if (this.audit.isDuplicateMessage(msg)){ + LOG.error("Duplicate Message!" + msg); + } + } catch (JMSException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } try { if (sleepDuration != 0) { Thread.sleep(sleepDuration); diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java index b0df95feab..36a74b6721 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java @@ -37,22 +37,22 @@ public class SimpleTopicTest extends TestCase { protected BrokerService broker; // protected String // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false"; - // protected String bindAddress="tcp://localhost:61616"; - protected String bindAddress = "tcp://localhost:61616"; - // protected String bindAddress="vm://localhost?marshal=true"; - // protected String bindAddress="vm://localhost"; + //protected String bindAddress="tcp://localhost:61616"; + //protected String bindAddress = "tcp://localhost:61616"; + //protected String bindAddress="vm://localhost?marshal=true"; + protected String bindAddress="vm://localhost"; protected PerfProducer[] producers; protected PerfConsumer[] consumers; protected String destinationName = getClass().getName(); protected int samepleCount = 10; - protected long sampleInternal = 1000; - protected int numberOfConsumers = 10; - protected int numberofProducers = 1; + protected long sampleInternal = 10000; + protected int numberOfConsumers = 1; + protected int numberofProducers = 2; protected int playloadSize = 1024; protected byte[] array; protected ConnectionFactory factory; protected Destination destination; - protected long consumerSleepDuration; + protected long consumerSleepDuration=0; /** * Sets up a test where the producer and consumer have their own connection. @@ -127,6 +127,7 @@ public class SimpleTopicTest extends TestCase { protected void configureBroker(BrokerService answer) throws Exception { answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true); + answer.setUseShutdownHook(false); } protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { @@ -163,7 +164,7 @@ public class SimpleTopicTest extends TestCase { totalCount += rate.getTotalCount(); } int avgRate = totalRate / producers.length; - LOG.info("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount); + System.out.println("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount); } protected void dumpConsumerRate() { @@ -176,7 +177,7 @@ public class SimpleTopicTest extends TestCase { } if (consumers != null && consumers.length > 0) { int avgRate = totalRate / consumers.length; - LOG.info("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount); + System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount); } } }