From 7fbbcf3565d024b4753a19d8ec2a9a9d963002de Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 20 Jun 2012 14:56:38 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3887 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1352137 13f79535-47bb-0310-9956-ffa450edef68 --- .../DemandForwardingBridgeSupport.java | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 0e6a0da24b..cb81aa540a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.management.ObjectName; + import org.apache.activemq.Service; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; @@ -41,7 +42,32 @@ import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTempDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionError; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.NetworkBridgeFilter; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.thread.DefaultThreadPools; @@ -245,6 +271,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } startedLatch.countDown(); localStartedLatch.countDown(); + + safeWaitUntilStarted(); + if (!disposed.get()) { setupStaticDestinations(); } else { @@ -1183,6 +1212,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br startedLatch.await(); } + /** + * Performs a timed wait on the started latch and then checks for disposed before performing + * another wait each time the the started wait times out. + * + * @throws InterruptedException + */ + protected void safeWaitUntilStarted() throws InterruptedException { + while (!disposed.get()) { + if (startedLatch.await(1, TimeUnit.SECONDS)) { + return; + } + } + } + protected void clearDownSubscriptions() { subscriptionMapByLocalId.clear(); subscriptionMapByRemoteId.clear();