diff --git a/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java b/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java index ff887fe4b2..9036be342b 100644 --- a/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java +++ b/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java @@ -85,6 +85,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { while (System.currentTimeMillis() < endTime) { getJmsConsumer().receive(); incThroughput(); + sleep(); } } finally { if (client.isDurable() && client.isUnsubscribe()) { @@ -110,6 +111,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { getJmsConsumer().receive(); incThroughput(); recvCount++; + sleep(); } } finally { if (client.isDurable() && client.isUnsubscribe()) { @@ -129,6 +131,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { getJmsConsumer().setMessageListener(new MessageListener() { public void onMessage(Message msg) { incThroughput(); + sleep(); } }); @@ -236,4 +239,15 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { public void setClient(JmsClientProperties clientProps) { client = (JmsConsumerProperties)clientProps; } + + protected void sleep() { + if (client.getRecvDelay() > 0) { + try { + LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds"); + Thread.sleep(client.getRecvDelay()); + } catch (java.lang.InterruptedException ex) { + LOG.warn(ex.getMessage()); + } + } + } } diff --git a/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java b/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java index 9abec9951b..9694a26950 100644 --- a/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java +++ b/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java @@ -95,6 +95,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { for (int j = 0; j < dest.length; j++) { getJmsProducer().send(dest[j], getJmsTextMessage()); incThroughput(); + sleep(); } } // Send to only one actual destination @@ -102,6 +103,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { for (int i = 0; i < messageCount; i++) { getJmsProducer().send(getJmsTextMessage()); incThroughput(); + sleep(); } } @@ -115,6 +117,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { for (int j = 0; j < dest.length; j++) { getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]")); incThroughput(); + sleep(); } } @@ -123,6 +126,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { for (int i = 0; i < messageCount; i++) { getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]")); incThroughput(); + sleep(); } } } @@ -162,6 +166,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { for (int j = 0; j < dest.length; j++) { getJmsProducer().send(dest[j], getJmsTextMessage()); incThroughput(); + sleep(); } } // Send to only one actual destination @@ -169,6 +174,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { while (System.currentTimeMillis() < endTime) { getJmsProducer().send(getJmsTextMessage()); incThroughput(); + sleep(); } } @@ -183,6 +189,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { for (int j = 0; j < dest.length; j++) { getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]")); incThroughput(); + sleep(); } } @@ -192,6 +199,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]")); incThroughput(); + sleep(); } } } @@ -265,4 +273,15 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { Arrays.fill(data, (byte) 0); return text + new String(data); } + + protected void sleep() { + if (client.getSendDelay() > 0) { + try { + LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds"); + Thread.sleep(client.getSendDelay()); + } catch (java.lang.InterruptedException ex) { + LOG.warn(ex.getMessage()); + } + } + } } diff --git a/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java b/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java index 6873e5d7b3..b51e252786 100644 --- a/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java +++ b/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsConsumerProperties.java @@ -26,6 +26,7 @@ public class JmsConsumerProperties extends JmsClientProperties { protected long recvCount = 1000000; // Receive a million messages by default protected long recvDuration = 5 * 60 * 1000; // Receive for 5 mins by default + protected long recvDelay = 0; // delay in milliseconds for processing received msg protected String recvType = TIME_BASED_RECEIVING; public boolean isDurable() { @@ -75,4 +76,12 @@ public class JmsConsumerProperties extends JmsClientProperties { public void setRecvType(String recvType) { this.recvType = recvType; } + + public void setRecvDelay(long delay) { + this.recvDelay = delay; + } + + public long getRecvDelay() { + return this.recvDelay; + } } diff --git a/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java b/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java index 338e70d4b7..fcb0aea32c 100644 --- a/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java +++ b/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsProducerProperties.java @@ -27,6 +27,8 @@ public class JmsProducerProperties extends JmsClientProperties { protected long sendCount = 1000000; // Send a million messages by default protected long sendDuration = 5 * 60 * 1000; // Send for 5 mins by default protected String sendType = TIME_BASED_SENDING; + protected long sendDelay = 0; // delay in milliseconds between each producer send + // If true, create a different message on each send, otherwise reuse. protected boolean createNewMsg; @@ -77,4 +79,12 @@ public class JmsProducerProperties extends JmsClientProperties { public void setCreateNewMsg(boolean createNewMsg) { this.createNewMsg = createNewMsg; } + + public void setSendDelay(long delay) { + this.sendDelay = delay; + } + + public long getSendDelay() { + return this.sendDelay; + } }