resolve AMQ-2199|https://issues.apache.org/activemq/browse/AMQ-2199 add a latch for the event that initialises the localbrokerid and path

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@763565 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-04-09 09:05:17 +00:00
parent 5b8a886891
commit 966658fdad
3 changed files with 27 additions and 16 deletions

View File

@ -63,10 +63,15 @@ public class ConduitBridge extends DemandForwardingBridge {
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
DemandSubscription ds = (DemandSubscription)i.next();
if (filter.matches(ds.getLocalInfo().getDestination())) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " matched exsting sub (add interest) for : " + ds.getRemoteInfo()
+ " with sub: " + info);
}
// add the interest in the subscription
// ds.add(ds.getRemoteInfo().getConsumerId());
ds.add(info.getConsumerId());
matched = true;
// continue - we want interest to any existing
// DemandSubscriptions
}
@ -82,6 +87,10 @@ public class ConduitBridge extends DemandForwardingBridge {
ds.remove(id);
if (ds.isEmpty()) {
tmpList.add(ds);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
}
}
}
for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {

View File

@ -76,6 +76,7 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
synchronized (brokerInfoMutex) {
localBrokerId = ((BrokerInfo)command).getBrokerId();
localBrokerPath[0] = localBrokerId;
localBrokerIdKnownLatch.countDown();
if (remoteBrokerId != null) {
if (remoteBrokerId.equals(localBrokerId)) {
if (LOG.isTraceEnabled()) {

View File

@ -115,6 +115,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected CountDownLatch localStartedLatch = new CountDownLatch(1);
protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
@ -621,10 +622,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
protected DemandSubscription getDemandSubscription(MessageDispatch md) {
return subscriptionMapByLocalId.get(md.getConsumerId());
}
protected Message configureMessage(MessageDispatch md) {
Message message = md.getMessage().copy();
// Update the packet to show where it came from.
@ -643,16 +640,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
try {
if (command.isMessageDispatch()) {
enqueueCounter.incrementAndGet();
//localStartedLatch.await();
final MessageDispatch md = (MessageDispatch)command;
final MessageDispatch md = (MessageDispatch)command;
DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage()!=null) {
// See if this consumer's brokerPath tells us it came from the broker at the other end
// of the bridge. I think we should be making this decision based on the message's
// broker bread crumbs and not the consumer's? However, the message's broker bread
// crumbs are null, which is another matter.
boolean cameFromRemote = false;
// See if this consumer's brokerPath tells us it came from the broker at the other end
// of the bridge. I think we should be making this decision based on the message's
// broker bread crumbs and not the consumer's? However, the message's broker bread
// crumbs are null, which is another matter.
boolean cameFromRemote = false;
Object consumerInfo = md.getMessage().getDataStructure();
if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) )
cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());
@ -661,7 +657,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (LOG.isDebugEnabled()) {
LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
}
if (!message.isResponseRequired()) {
// If the message was originally sent using async
@ -672,9 +668,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// Don't send it off to the remote if it originally came from the remote.
if( !cameFromRemote ) {
remoteBroker.oneway(message);
}
else{
LOG.info("Message not forwarded on to remote, because message came from remote");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Message not forwarded on to remote, because message came from remote");
}
}
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet();
@ -1060,16 +1057,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void removeDemandSubscription(ConsumerId id) throws IOException {
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
}
if (sub != null) {
removeSubscription(sub);
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo());
LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo());
}
}
}
protected void waitStarted() throws InterruptedException {
startedLatch.await();
localBrokerIdKnownLatch.await();
}
protected void clearDownSubscriptions() {