diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index cde1442cb2..93e2da9037 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -261,6 +261,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br l.onStop(this); } try { + // local start complete + if (startedLatch.getCount() < 2) { + if (LOG.isTraceEnabled()) { + LOG.trace(configuration.getBrokerName() + " unregister bridge (" + this + ") to " + remoteBrokerName); + } + brokerService.getBroker().removeBroker(null, remoteBrokerInfo); + brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); + } + remoteBridgeStarted.set(false); final CountDownLatch sendShutdown = new CountDownLatch(1); @@ -306,12 +315,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } - if (remoteBrokerInfo != null) { - brokerService.getBroker().removeBroker(null, remoteBrokerInfo); - brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); - if (LOG.isInfoEnabled()) { - LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); - } + if (LOG.isInfoEnabled()) { + LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); } } } @@ -381,6 +386,34 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } private void doStartLocalAndRemoteBridges() { + + if (disposed.get()) { + return; + } + + if (isCreatedByDuplex()) { + // apply remote (propagated) configuration to local duplex bridge before start + Properties props = null; + try { + props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); + IntrospectionSupport.getProperties(configuration, props, null); + if (configuration.getExcludedDestinations() != null) { + excludedDestinations = configuration.getExcludedDestinations().toArray( + new ActiveMQDestination[configuration.getExcludedDestinations().size()]); + } + if (configuration.getStaticallyIncludedDestinations() != null) { + staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); + } + if (configuration.getDynamicallyIncludedDestinations() != null) { + dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); + } + } catch (Throwable t) { + LOG.error("Error mapping remote configuration: " + props, t); + } + } + try { startLocalBridge(); } catch (Throwable e) { @@ -389,35 +422,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } try { - - if (disposed.get()) { - return; - } - - Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); - try { - IntrospectionSupport.getProperties(configuration, props, null); - if (configuration.getExcludedDestinations() != null) { - excludedDestinations = configuration.getExcludedDestinations().toArray( - new ActiveMQDestination[configuration.getExcludedDestinations().size()]); - } - if (configuration.getStaticallyIncludedDestinations() != null) { - staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( - new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); - } - if (configuration.getDynamicallyIncludedDestinations() != null) { - dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( - new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); - } - } catch (Throwable t) { - LOG.error("Error mapping remote destinations", t); - } - - // Let the local broker know the remote broker's ID. - localBroker.oneway(remoteBrokerInfo); - // new peer broker (a consumer can work with remote broker also) - brokerService.getBroker().addBroker(null, remoteBrokerInfo); - + startRemoteBridge(); startRemoteBridge(); } catch (Throwable e) { serviceRemoteException(e); @@ -454,7 +459,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localBroker.oneway(localSessionInfo); if (configuration.isDuplex()) { - // separate in-bound chamnel for forwards so we don't + // separate in-bound channel for forwards so we don't // contend with out-bound dispatch on same connection ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); @@ -483,10 +488,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br l.onStart(this); } + // Let the local broker know the remote broker's ID. + localBroker.oneway(remoteBrokerInfo); + // new peer broker (a consumer can work with remote broker also) + brokerService.getBroker().addBroker(null, remoteBrokerInfo); + if (LOG.isInfoEnabled()) { LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); + if (LOG.isTraceEnabled()) { + LOG.trace(configuration.getBrokerName() + " register bridge (" + this + ") to " + remoteBrokerName); + } } - } else { LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed."); } @@ -1137,7 +1149,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br LOG.error("Failed to add static destination " + dest, e); } if (LOG.isTraceEnabled()) { - LOG.trace("bridging messages for static destination: " + dest); + LOG.trace(configuration.getBrokerName() + ", bridging messages for static destination: " + dest); } } }