diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 6ea47d3a46..b6bde2c0b9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -388,7 +388,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { } } - protected void serviceRemoteCommand(Command command) { + protected void serviceRemoteCommand(Command command) { if (!disposed) { try { if (command.isMessageDispatch()) { @@ -580,9 +580,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { final MessageDispatch md = (MessageDispatch)command; DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null && md.getMessage()!=null) { + + // See if this consumer's brokerPath tells us it came from the broker at the other end + // of the bridge. I think we should be making this decision based on the message's + // broker bread crumbs and not the consumer's? However, the message's broker bread + // crumbs are null, which is another matter. + boolean cameFromRemote = false; + Object consumerInfo = md.getMessage().getDataStructure(); + if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) ) + cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId()); + Message message = configureMessage(md); if (trace) { LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message); + LOG.trace("cameFromRemote = "+cameFromRemote); } if (!message.isResponseRequired() || isDuplex()) { @@ -591,9 +602,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { // send, we will preserve that QOS // by bridging it using an async send (small chance // of message loss). - remoteBroker.oneway(message); + + // Don't send it off to the remote if it originally came from the remote. + if( !cameFromRemote ) { + remoteBroker.oneway(message); + } + else{ + LOG.info("Message not forwarded on to remote, because message came from remote"); + } localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); - dequeueCounter.incrementAndGet(); + dequeueCounter.incrementAndGet(); } else {