git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1092753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-04-15 16:10:30 +00:00
parent 2f5821b425
commit 459be2d0fb
4 changed files with 62 additions and 0 deletions

View File

@ -37,6 +37,8 @@ public abstract class AbstractJmsClient {
protected int destCount = 1; protected int destCount = 1;
protected int destIndex; protected int destIndex;
protected String clientName = ""; protected String clientName = "";
private int internalTxCounter = 0;
public AbstractJmsClient(ConnectionFactory factory) { public AbstractJmsClient(ConnectionFactory factory) {
this.factory = factory; this.factory = factory;
@ -159,4 +161,25 @@ public abstract class AbstractJmsClient {
} }
} }
/**
* Helper method that checks if session is
* transacted and whether to commit the tx based on commitAfterXMsgs
* property.
*
* @return true if transaction was committed.
* @throws JMSException in case the call to JMS Session.commit() fails.
*/
public boolean commitTxIfNecessary() throws JMSException {
internalTxCounter++;
if (getClient().isSessTransacted()) {
if ((internalTxCounter % getClient().getCommitAfterXMsgs()) == 0) {
LOG.debug("Committing transaction.");
internalTxCounter = 0;
getSession().commit();
return true;
}
}
return false;
}
} }

View File

@ -82,10 +82,14 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
LOG.info("Starting to synchronously receive messages for " + duration + " ms..."); LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
long endTime = System.currentTimeMillis() + duration; long endTime = System.currentTimeMillis() + duration;
int counter = 0;
while (System.currentTimeMillis() < endTime) { while (System.currentTimeMillis() < endTime) {
getJmsConsumer().receive(); getJmsConsumer().receive();
incThroughput(); incThroughput();
counter++;
sleep(); sleep();
commitTxIfNecessary();
} }
} finally { } finally {
if (client.isDurable() && client.isUnsubscribe()) { if (client.isDurable() && client.isUnsubscribe()) {
@ -112,6 +116,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
incThroughput(); incThroughput();
recvCount++; recvCount++;
sleep(); sleep();
commitTxIfNecessary();
} }
} finally { } finally {
if (client.isDurable() && client.isUnsubscribe()) { if (client.isDurable() && client.isUnsubscribe()) {
@ -132,6 +137,11 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
public void onMessage(Message msg) { public void onMessage(Message msg) {
incThroughput(); incThroughput();
sleep(); sleep();
try {
commitTxIfNecessary();
} catch (JMSException ex) {
LOG.error("Error committing transaction: " + ex.getMessage());
}
} }
}); });
@ -165,6 +175,12 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
recvCount.incrementAndGet(); recvCount.incrementAndGet();
synchronized (recvCount) { synchronized (recvCount) {
recvCount.notify(); recvCount.notify();
}
try {
commitTxIfNecessary();
} catch (JMSException ex) {
LOG.error("Error committing transaction: " + ex.getMessage());
} }
} }
}); });
@ -244,6 +260,10 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
client = (JmsConsumerProperties)clientProps; client = (JmsConsumerProperties)clientProps;
} }
/**
* A way to throttle the consumer. Time to sleep is
* configured via recvDelay property.
*/
protected void sleep() { protected void sleep() {
if (client.getRecvDelay() > 0) { if (client.getRecvDelay() > 0) {
try { try {

View File

@ -97,6 +97,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
getJmsProducer().send(dest[j], getJmsTextMessage()); getJmsProducer().send(dest[j], getJmsTextMessage());
incThroughput(); incThroughput();
sleep(); sleep();
commitTxIfNecessary();
} }
} }
// Send to only one actual destination // Send to only one actual destination
@ -105,6 +106,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
getJmsProducer().send(getJmsTextMessage()); getJmsProducer().send(getJmsTextMessage());
incThroughput(); incThroughput();
sleep(); sleep();
commitTxIfNecessary();
} }
} }
@ -119,6 +121,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]")); getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
incThroughput(); incThroughput();
sleep(); sleep();
commitTxIfNecessary();
} }
} }
@ -128,6 +131,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]")); getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
incThroughput(); incThroughput();
sleep(); sleep();
commitTxIfNecessary();
} }
} }
} }
@ -168,6 +172,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
getJmsProducer().send(dest[j], getJmsTextMessage()); getJmsProducer().send(dest[j], getJmsTextMessage());
incThroughput(); incThroughput();
sleep(); sleep();
commitTxIfNecessary();
} }
} }
// Send to only one actual destination // Send to only one actual destination
@ -176,6 +181,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
getJmsProducer().send(getJmsTextMessage()); getJmsProducer().send(getJmsTextMessage());
incThroughput(); incThroughput();
sleep(); sleep();
commitTxIfNecessary();
} }
} }
@ -191,6 +197,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]")); getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
incThroughput(); incThroughput();
sleep(); sleep();
commitTxIfNecessary();
} }
} }
@ -201,6 +208,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]")); getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
incThroughput(); incThroughput();
sleep(); sleep();
commitTxIfNecessary();
} }
} }
} }

View File

@ -27,6 +27,9 @@ public class JmsClientProperties extends AbstractObjectProperties {
protected String sessAckMode = SESSION_AUTO_ACKNOWLEDGE; protected String sessAckMode = SESSION_AUTO_ACKNOWLEDGE;
protected boolean sessTransacted; protected boolean sessTransacted;
// commit transaction after X msgs only.
protected int commitAfterXMsgs = 1;
protected String jmsProvider; protected String jmsProvider;
protected String jmsVersion; protected String jmsVersion;
@ -63,6 +66,14 @@ public class JmsClientProperties extends AbstractObjectProperties {
public void setSessTransacted(boolean sessTransacted) { public void setSessTransacted(boolean sessTransacted) {
this.sessTransacted = sessTransacted; this.sessTransacted = sessTransacted;
} }
public void setCommitAfterXMsgs(int commitAfterXMsg) {
this.commitAfterXMsgs = commitAfterXMsg;
}
public int getCommitAfterXMsgs() {
return this.commitAfterXMsgs;
}
public String getJmsProvider() { public String getJmsProvider() {
return jmsProvider; return jmsProvider;