diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 5070266b97..879ab39fa5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -93,6 +93,7 @@ import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.failover.FailoverTransport; import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.util.IdGenerator; @@ -323,6 +324,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } finally { ServiceStopper ss = new ServiceStopper(); + stopFailoverTransport(remoteBroker); ss.stop(remoteBroker); ss.stop(localBroker); ss.stop(duplexInboundLocalBroker); @@ -341,6 +343,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + private void stopFailoverTransport(Transport transport) { + FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); + if (failoverTransport != null) { + // may be blocked on write, in which case stop will block + try { + failoverTransport.handleTransportFailure(new IOException("Bridge stopped")); + } catch (InterruptedException ignored) {} + } + } + protected void triggerStartAsyncNetworkBridgeCreation() throws IOException { brokerService.getTaskRunnerFactory().execute(new Runnable() { @Override