mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@657147 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b609bdb95b
commit
cdd4efabaf
|
@ -388,7 +388,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void serviceRemoteCommand(Command command) {
|
protected void serviceRemoteCommand(Command command) {
|
||||||
if (!disposed) {
|
if (!disposed) {
|
||||||
try {
|
try {
|
||||||
if (command.isMessageDispatch()) {
|
if (command.isMessageDispatch()) {
|
||||||
|
@ -580,9 +580,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
final MessageDispatch md = (MessageDispatch)command;
|
final MessageDispatch md = (MessageDispatch)command;
|
||||||
DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
||||||
if (sub != null && md.getMessage()!=null) {
|
if (sub != null && md.getMessage()!=null) {
|
||||||
|
|
||||||
|
// See if this consumer's brokerPath tells us it came from the broker at the other end
|
||||||
|
// of the bridge. I think we should be making this decision based on the message's
|
||||||
|
// broker bread crumbs and not the consumer's? However, the message's broker bread
|
||||||
|
// crumbs are null, which is another matter.
|
||||||
|
boolean cameFromRemote = false;
|
||||||
|
Object consumerInfo = md.getMessage().getDataStructure();
|
||||||
|
if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) )
|
||||||
|
cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());
|
||||||
|
|
||||||
Message message = configureMessage(md);
|
Message message = configureMessage(md);
|
||||||
if (trace) {
|
if (trace) {
|
||||||
LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
|
LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
|
||||||
|
LOG.trace("cameFromRemote = "+cameFromRemote);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!message.isResponseRequired() || isDuplex()) {
|
if (!message.isResponseRequired() || isDuplex()) {
|
||||||
|
@ -591,9 +602,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
// send, we will preserve that QOS
|
// send, we will preserve that QOS
|
||||||
// by bridging it using an async send (small chance
|
// by bridging it using an async send (small chance
|
||||||
// of message loss).
|
// of message loss).
|
||||||
remoteBroker.oneway(message);
|
|
||||||
|
// Don't send it off to the remote if it originally came from the remote.
|
||||||
|
if( !cameFromRemote ) {
|
||||||
|
remoteBroker.oneway(message);
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
LOG.info("Message not forwarded on to remote, because message came from remote");
|
||||||
|
}
|
||||||
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
|
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
|
||||||
dequeueCounter.incrementAndGet();
|
dequeueCounter.incrementAndGet();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue