From cdd4efabaf38c7727e42ae7e2127633d4dd1b13c Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 16 May 2008 17:44:21 +0000 Subject: [PATCH] patch for https://issues.apache.org/activemq/browse/AMQ-1661 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@657147 13f79535-47bb-0310-9956-ffa450edef68 --- .../DemandForwardingBridgeSupport.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 6ea47d3a46..b6bde2c0b9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -388,7 +388,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { } } - protected void serviceRemoteCommand(Command command) { + protected void serviceRemoteCommand(Command command) { if (!disposed) { try { if (command.isMessageDispatch()) { @@ -580,9 +580,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { final MessageDispatch md = (MessageDispatch)command; DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null && md.getMessage()!=null) { + + // See if this consumer's brokerPath tells us it came from the broker at the other end + // of the bridge. I think we should be making this decision based on the message's + // broker bread crumbs and not the consumer's? However, the message's broker bread + // crumbs are null, which is another matter. + boolean cameFromRemote = false; + Object consumerInfo = md.getMessage().getDataStructure(); + if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) ) + cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId()); + Message message = configureMessage(md); if (trace) { LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message); + LOG.trace("cameFromRemote = "+cameFromRemote); } if (!message.isResponseRequired() || isDuplex()) { @@ -591,9 +602,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { // send, we will preserve that QOS // by bridging it using an async send (small chance // of message loss). - remoteBroker.oneway(message); + + // Don't send it off to the remote if it originally came from the remote. + if( !cameFromRemote ) { + remoteBroker.oneway(message); + } + else{ + LOG.info("Message not forwarded on to remote, because message came from remote"); + } localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); - dequeueCounter.incrementAndGet(); + dequeueCounter.incrementAndGet(); } else {