diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index d6d38e19a1..0883e6b123 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -69,6 +69,7 @@ public class FailoverTransport implements CompositeTransport { private final Object reconnectMutex = new Object(); private final Object backupMutex = new Object(); private final Object sleepMutex = new Object(); + private final Object listenerMutex = new Object(); private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); private final Map requestMap = new LinkedHashMap(); @@ -564,7 +565,10 @@ public class FailoverTransport implements CompositeTransport { } public void setTransportListener(TransportListener commandListener) { - this.transportListener = commandListener; + synchronized(listenerMutex) { + this.transportListener = commandListener; + listenerMutex.notifyAll(); + } } public T narrow(Class target) { @@ -683,8 +687,21 @@ public class FailoverTransport implements CompositeTransport { connectedTransport.set(t); reconnectMutex.notifyAll(); connectFailures = 0; + // Make sure on initial startup, that the transportListener + // has been initialized for this instance. + synchronized(listenerMutex) { + if (transportListener==null) { + try { + //if it isn't set after 2secs - it + //probably never will be + listenerMutex.wait(2000); + }catch(InterruptedException ex) {} + } + } if (transportListener != null) { transportListener.transportResumed(); + }else { + LOG.debug("transport resumed by transport listener not set"); } if (firstConnection) { firstConnection=false;