mirror of https://github.com/apache/activemq.git
Add an optional initial delay for the consumer
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@656379 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c2a5b54c6d
commit
f12c100805
|
@ -38,15 +38,17 @@ public class PerfConsumer implements MessageListener {
|
|||
protected Connection connection;
|
||||
protected MessageConsumer consumer;
|
||||
protected long sleepDuration;
|
||||
protected long initialDelay;
|
||||
protected boolean enableAudit = false;
|
||||
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20);
|
||||
protected boolean firstMessage =true;
|
||||
|
||||
protected PerfRate rate = new PerfRate();
|
||||
|
||||
public PerfConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException {
|
||||
connection = fac.createConnection();
|
||||
connection.setClientID(consumerName);
|
||||
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Session s = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
|
||||
if (dest instanceof Topic && consumerName != null && consumerName.length() > 0) {
|
||||
consumer = s.createDurableSubscriber((Topic)dest, consumerName);
|
||||
} else {
|
||||
|
@ -77,6 +79,15 @@ public class PerfConsumer implements MessageListener {
|
|||
}
|
||||
|
||||
public void onMessage(Message msg) {
|
||||
if (firstMessage) {
|
||||
firstMessage=false;
|
||||
if (getInitialDelay() > 0) {
|
||||
try {
|
||||
Thread.sleep(getInitialDelay());
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
rate.increment();
|
||||
try {
|
||||
if (enableAudit && !this.audit.isInOrder(msg.getJMSMessageID())) {
|
||||
|
@ -112,4 +123,18 @@ public class PerfConsumer implements MessageListener {
|
|||
public void setEnableAudit(boolean doAudit) {
|
||||
this.enableAudit = doAudit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the initialDelay
|
||||
*/
|
||||
public long getInitialDelay() {
|
||||
return initialDelay;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param initialDelay the initialDelay to set
|
||||
*/
|
||||
public void setInitialDelay(long initialDelay) {
|
||||
this.initialDelay = initialDelay;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
|
|||
|
||||
protected void setUp() throws Exception {
|
||||
numberOfDestinations=1;
|
||||
numberOfConsumers = 4;
|
||||
numberOfConsumers = 1;
|
||||
numberofProducers = 1;
|
||||
sampleCount=1000;
|
||||
playloadSize = 1024;
|
||||
|
@ -41,7 +41,9 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
|
|||
}
|
||||
|
||||
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
|
||||
return new PerfConsumer(fac, dest, "subs:" + number);
|
||||
PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number);
|
||||
result.setInitialDelay(20000);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue