This closes #571

This commit is contained in:
Clebert Suconic 2016-06-09 14:15:43 -04:00
commit 7c2e58606f
2 changed files with 23 additions and 0 deletions

View File

@ -90,6 +90,10 @@ public interface JMSBridge extends ActiveMQComponent {
boolean isFailed(); boolean isFailed();
long getMessageCount();
long getAbortedMessageCount();
void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff); void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff);
void setTargetConnectionFactoryFactory(ConnectionFactoryFactory cff); void setTargetConnectionFactoryFactory(ConnectionFactoryFactory cff);

View File

@ -181,6 +181,10 @@ public final class JMSBridgeImpl implements JMSBridge {
private ClassLoader moduleTccl; private ClassLoader moduleTccl;
private long messageCount = 0;
private long abortedMessageCount = 0;
/* /*
* Constructor for MBean * Constructor for MBean
*/ */
@ -488,6 +492,7 @@ public final class JMSBridgeImpl implements JMSBridge {
try { try {
tx.rollback(); tx.rollback();
abortedMessageCount += messages.size();
} }
catch (Exception ignore) { catch (Exception ignore) {
if (JMSBridgeImpl.trace) { if (JMSBridgeImpl.trace) {
@ -774,6 +779,16 @@ public final class JMSBridgeImpl implements JMSBridge {
return failed; return failed;
} }
@Override
public synchronized long getMessageCount() {
return messageCount;
}
@Override
public synchronized long getAbortedMessageCount() {
return abortedMessageCount;
}
@Override @Override
public synchronized void setSourceConnectionFactoryFactory(final ConnectionFactoryFactory cff) { public synchronized void setSourceConnectionFactoryFactory(final ConnectionFactoryFactory cff) {
checkBridgeNotStarted(); checkBridgeNotStarted();
@ -1199,6 +1214,7 @@ public final class JMSBridgeImpl implements JMSBridge {
try { try {
// Terminate the tx // Terminate the tx
tx.rollback(); tx.rollback();
abortedMessageCount += messages.size();
} }
catch (Throwable ignore) { catch (Throwable ignore) {
if (JMSBridgeImpl.trace) { if (JMSBridgeImpl.trace) {
@ -1398,6 +1414,7 @@ public final class JMSBridgeImpl implements JMSBridge {
try { try {
// we call this just in case there is a failure other than failover // we call this just in case there is a failure other than failover
tx.rollback(); tx.rollback();
abortedMessageCount += messages.size();
} }
catch (Throwable ignored) { catch (Throwable ignored) {
} }
@ -1493,6 +1510,8 @@ public final class JMSBridgeImpl implements JMSBridge {
targetProducer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive); targetProducer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
if (msg != null)
messageCount++;
if (JMSBridgeImpl.trace) { if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Sent message " + msg); ActiveMQJMSBridgeLogger.LOGGER.trace("Sent message " + msg);
} }