mirror of https://github.com/apache/activemq.git
Apply patch for: https://issues.apache.org/jira/browse/AMQ-3186
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1074325 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3e895c6e95
commit
576230be65
|
@ -85,6 +85,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
||||||
while (System.currentTimeMillis() < endTime) {
|
while (System.currentTimeMillis() < endTime) {
|
||||||
getJmsConsumer().receive();
|
getJmsConsumer().receive();
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (client.isDurable() && client.isUnsubscribe()) {
|
if (client.isDurable() && client.isUnsubscribe()) {
|
||||||
|
@ -110,6 +111,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
||||||
getJmsConsumer().receive();
|
getJmsConsumer().receive();
|
||||||
incThroughput();
|
incThroughput();
|
||||||
recvCount++;
|
recvCount++;
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (client.isDurable() && client.isUnsubscribe()) {
|
if (client.isDurable() && client.isUnsubscribe()) {
|
||||||
|
@ -129,6 +131,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
||||||
getJmsConsumer().setMessageListener(new MessageListener() {
|
getJmsConsumer().setMessageListener(new MessageListener() {
|
||||||
public void onMessage(Message msg) {
|
public void onMessage(Message msg) {
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -236,4 +239,15 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
||||||
public void setClient(JmsClientProperties clientProps) {
|
public void setClient(JmsClientProperties clientProps) {
|
||||||
client = (JmsConsumerProperties)clientProps;
|
client = (JmsConsumerProperties)clientProps;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void sleep() {
|
||||||
|
if (client.getRecvDelay() > 0) {
|
||||||
|
try {
|
||||||
|
LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds");
|
||||||
|
Thread.sleep(client.getRecvDelay());
|
||||||
|
} catch (java.lang.InterruptedException ex) {
|
||||||
|
LOG.warn(ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,6 +95,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
for (int j = 0; j < dest.length; j++) {
|
for (int j = 0; j < dest.length; j++) {
|
||||||
getJmsProducer().send(dest[j], getJmsTextMessage());
|
getJmsProducer().send(dest[j], getJmsTextMessage());
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Send to only one actual destination
|
// Send to only one actual destination
|
||||||
|
@ -102,6 +103,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
for (int i = 0; i < messageCount; i++) {
|
for (int i = 0; i < messageCount; i++) {
|
||||||
getJmsProducer().send(getJmsTextMessage());
|
getJmsProducer().send(getJmsTextMessage());
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,6 +117,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
for (int j = 0; j < dest.length; j++) {
|
for (int j = 0; j < dest.length; j++) {
|
||||||
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
|
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,6 +126,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
for (int i = 0; i < messageCount; i++) {
|
for (int i = 0; i < messageCount; i++) {
|
||||||
getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
|
getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,6 +166,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
for (int j = 0; j < dest.length; j++) {
|
for (int j = 0; j < dest.length; j++) {
|
||||||
getJmsProducer().send(dest[j], getJmsTextMessage());
|
getJmsProducer().send(dest[j], getJmsTextMessage());
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Send to only one actual destination
|
// Send to only one actual destination
|
||||||
|
@ -169,6 +174,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
while (System.currentTimeMillis() < endTime) {
|
while (System.currentTimeMillis() < endTime) {
|
||||||
getJmsProducer().send(getJmsTextMessage());
|
getJmsProducer().send(getJmsTextMessage());
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,6 +189,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
for (int j = 0; j < dest.length; j++) {
|
for (int j = 0; j < dest.length; j++) {
|
||||||
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
|
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,6 +199,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
|
|
||||||
getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
|
getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
|
||||||
incThroughput();
|
incThroughput();
|
||||||
|
sleep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -265,4 +273,15 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
||||||
Arrays.fill(data, (byte) 0);
|
Arrays.fill(data, (byte) 0);
|
||||||
return text + new String(data);
|
return text + new String(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void sleep() {
|
||||||
|
if (client.getSendDelay() > 0) {
|
||||||
|
try {
|
||||||
|
LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds");
|
||||||
|
Thread.sleep(client.getSendDelay());
|
||||||
|
} catch (java.lang.InterruptedException ex) {
|
||||||
|
LOG.warn(ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ public class JmsConsumerProperties extends JmsClientProperties {
|
||||||
|
|
||||||
protected long recvCount = 1000000; // Receive a million messages by default
|
protected long recvCount = 1000000; // Receive a million messages by default
|
||||||
protected long recvDuration = 5 * 60 * 1000; // Receive for 5 mins by default
|
protected long recvDuration = 5 * 60 * 1000; // Receive for 5 mins by default
|
||||||
|
protected long recvDelay = 0; // delay in milliseconds for processing received msg
|
||||||
protected String recvType = TIME_BASED_RECEIVING;
|
protected String recvType = TIME_BASED_RECEIVING;
|
||||||
|
|
||||||
public boolean isDurable() {
|
public boolean isDurable() {
|
||||||
|
@ -75,4 +76,12 @@ public class JmsConsumerProperties extends JmsClientProperties {
|
||||||
public void setRecvType(String recvType) {
|
public void setRecvType(String recvType) {
|
||||||
this.recvType = recvType;
|
this.recvType = recvType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRecvDelay(long delay) {
|
||||||
|
this.recvDelay = delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getRecvDelay() {
|
||||||
|
return this.recvDelay;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ public class JmsProducerProperties extends JmsClientProperties {
|
||||||
protected long sendCount = 1000000; // Send a million messages by default
|
protected long sendCount = 1000000; // Send a million messages by default
|
||||||
protected long sendDuration = 5 * 60 * 1000; // Send for 5 mins by default
|
protected long sendDuration = 5 * 60 * 1000; // Send for 5 mins by default
|
||||||
protected String sendType = TIME_BASED_SENDING;
|
protected String sendType = TIME_BASED_SENDING;
|
||||||
|
protected long sendDelay = 0; // delay in milliseconds between each producer send
|
||||||
|
|
||||||
// If true, create a different message on each send, otherwise reuse.
|
// If true, create a different message on each send, otherwise reuse.
|
||||||
protected boolean createNewMsg;
|
protected boolean createNewMsg;
|
||||||
|
|
||||||
|
@ -77,4 +79,12 @@ public class JmsProducerProperties extends JmsClientProperties {
|
||||||
public void setCreateNewMsg(boolean createNewMsg) {
|
public void setCreateNewMsg(boolean createNewMsg) {
|
||||||
this.createNewMsg = createNewMsg;
|
this.createNewMsg = createNewMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSendDelay(long delay) {
|
||||||
|
this.sendDelay = delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSendDelay() {
|
||||||
|
return this.sendDelay;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue