mirror of https://github.com/apache/activemq.git
Check for duplicates
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@586025 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2cb2119814
commit
49325b60cd
|
@ -40,7 +40,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
public class InactiveDurableTopicTest extends TestCase {
|
public class InactiveDurableTopicTest extends TestCase {
|
||||||
private static final transient Log LOG = LogFactory.getLog(InactiveDurableTopicTest.class);
|
private static final transient Log LOG = LogFactory.getLog(InactiveDurableTopicTest.class);
|
||||||
|
|
||||||
private static final int MESSAGE_COUNT = 1000000;
|
private static final int MESSAGE_COUNT = 2000;
|
||||||
private static final String DEFAULT_PASSWORD = "";
|
private static final String DEFAULT_PASSWORD = "";
|
||||||
private static final String USERNAME = "testuser";
|
private static final String USERNAME = "testuser";
|
||||||
private static final String CLIENTID = "mytestclient";
|
private static final String CLIENTID = "mytestclient";
|
||||||
|
|
|
@ -26,13 +26,20 @@ import javax.jms.MessageListener;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQMessageAudit;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @version $Revision: 1.3 $
|
* @version $Revision: 1.3 $
|
||||||
*/
|
*/
|
||||||
public class PerfConsumer implements MessageListener {
|
public class PerfConsumer implements MessageListener {
|
||||||
|
private static final Log LOG = LogFactory.getLog(PerfConsumer.class);
|
||||||
protected Connection connection;
|
protected Connection connection;
|
||||||
protected MessageConsumer consumer;
|
protected MessageConsumer consumer;
|
||||||
protected long sleepDuration;
|
protected long sleepDuration;
|
||||||
|
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20);
|
||||||
|
|
||||||
protected PerfRate rate = new PerfRate();
|
protected PerfRate rate = new PerfRate();
|
||||||
|
|
||||||
|
@ -71,6 +78,14 @@ public class PerfConsumer implements MessageListener {
|
||||||
|
|
||||||
public void onMessage(Message msg) {
|
public void onMessage(Message msg) {
|
||||||
rate.increment();
|
rate.increment();
|
||||||
|
try {
|
||||||
|
if (this.audit.isDuplicateMessage(msg)){
|
||||||
|
LOG.error("Duplicate Message!" + msg);
|
||||||
|
}
|
||||||
|
} catch (JMSException e1) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e1.printStackTrace();
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
if (sleepDuration != 0) {
|
if (sleepDuration != 0) {
|
||||||
Thread.sleep(sleepDuration);
|
Thread.sleep(sleepDuration);
|
||||||
|
|
|
@ -37,22 +37,22 @@ public class SimpleTopicTest extends TestCase {
|
||||||
protected BrokerService broker;
|
protected BrokerService broker;
|
||||||
// protected String
|
// protected String
|
||||||
// bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
|
// bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
|
||||||
// protected String bindAddress="tcp://localhost:61616";
|
//protected String bindAddress="tcp://localhost:61616";
|
||||||
protected String bindAddress = "tcp://localhost:61616";
|
//protected String bindAddress = "tcp://localhost:61616";
|
||||||
// protected String bindAddress="vm://localhost?marshal=true";
|
//protected String bindAddress="vm://localhost?marshal=true";
|
||||||
// protected String bindAddress="vm://localhost";
|
protected String bindAddress="vm://localhost";
|
||||||
protected PerfProducer[] producers;
|
protected PerfProducer[] producers;
|
||||||
protected PerfConsumer[] consumers;
|
protected PerfConsumer[] consumers;
|
||||||
protected String destinationName = getClass().getName();
|
protected String destinationName = getClass().getName();
|
||||||
protected int samepleCount = 10;
|
protected int samepleCount = 10;
|
||||||
protected long sampleInternal = 1000;
|
protected long sampleInternal = 10000;
|
||||||
protected int numberOfConsumers = 10;
|
protected int numberOfConsumers = 1;
|
||||||
protected int numberofProducers = 1;
|
protected int numberofProducers = 2;
|
||||||
protected int playloadSize = 1024;
|
protected int playloadSize = 1024;
|
||||||
protected byte[] array;
|
protected byte[] array;
|
||||||
protected ConnectionFactory factory;
|
protected ConnectionFactory factory;
|
||||||
protected Destination destination;
|
protected Destination destination;
|
||||||
protected long consumerSleepDuration;
|
protected long consumerSleepDuration=0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up a test where the producer and consumer have their own connection.
|
* Sets up a test where the producer and consumer have their own connection.
|
||||||
|
@ -127,6 +127,7 @@ public class SimpleTopicTest extends TestCase {
|
||||||
protected void configureBroker(BrokerService answer) throws Exception {
|
protected void configureBroker(BrokerService answer) throws Exception {
|
||||||
answer.addConnector(bindAddress);
|
answer.addConnector(bindAddress);
|
||||||
answer.setDeleteAllMessagesOnStartup(true);
|
answer.setDeleteAllMessagesOnStartup(true);
|
||||||
|
answer.setUseShutdownHook(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||||
|
@ -163,7 +164,7 @@ public class SimpleTopicTest extends TestCase {
|
||||||
totalCount += rate.getTotalCount();
|
totalCount += rate.getTotalCount();
|
||||||
}
|
}
|
||||||
int avgRate = totalRate / producers.length;
|
int avgRate = totalRate / producers.length;
|
||||||
LOG.info("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount);
|
System.out.println("Avg producer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", sent = " + totalCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void dumpConsumerRate() {
|
protected void dumpConsumerRate() {
|
||||||
|
@ -176,7 +177,7 @@ public class SimpleTopicTest extends TestCase {
|
||||||
}
|
}
|
||||||
if (consumers != null && consumers.length > 0) {
|
if (consumers != null && consumers.length > 0) {
|
||||||
int avgRate = totalRate / consumers.length;
|
int avgRate = totalRate / consumers.length;
|
||||||
LOG.info("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount);
|
System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue