git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1352137 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-06-20 14:56:38 +00:00
parent 32d3fd51b4
commit 7fbbcf3565
1 changed files with 44 additions and 1 deletions

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -41,7 +42,32 @@ import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.DefaultThreadPools;
@ -245,6 +271,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
startedLatch.countDown(); startedLatch.countDown();
localStartedLatch.countDown(); localStartedLatch.countDown();
safeWaitUntilStarted();
if (!disposed.get()) { if (!disposed.get()) {
setupStaticDestinations(); setupStaticDestinations();
} else { } else {
@ -1183,6 +1212,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
startedLatch.await(); startedLatch.await();
} }
/**
* Performs a timed wait on the started latch and then checks for disposed before performing
* another wait each time the the started wait times out.
*
* @throws InterruptedException
*/
protected void safeWaitUntilStarted() throws InterruptedException {
while (!disposed.get()) {
if (startedLatch.await(1, TimeUnit.SECONDS)) {
return;
}
}
}
protected void clearDownSubscriptions() { protected void clearDownSubscriptions() {
subscriptionMapByLocalId.clear(); subscriptionMapByLocalId.clear();
subscriptionMapByRemoteId.clear(); subscriptionMapByRemoteId.clear();