https://issues.apache.org/jira/browse/AMQ-4082 - fix regression with org.apache.activemq.usecases.NetworkOfTwentyBrokersTest nd org.apache.activemq.usecases.RequestReplyNoAdvisoryNetworkTest - statically included dests in duplex case being ignored and unregister without register

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1442122 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-02-04 14:06:06 +00:00
parent 856e5061d7
commit c372448471
1 changed files with 50 additions and 38 deletions

View File

@ -261,6 +261,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
l.onStop(this);
}
try {
// local start complete
if (startedLatch.getCount() < 2) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " unregister bridge (" + this + ") to " + remoteBrokerName);
}
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
}
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
@ -306,12 +315,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
if (remoteBrokerInfo != null) {
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
if (LOG.isInfoEnabled()) {
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
}
if (LOG.isInfoEnabled()) {
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
}
}
}
@ -381,6 +386,34 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
private void doStartLocalAndRemoteBridges() {
if (disposed.get()) {
return;
}
if (isCreatedByDuplex()) {
// apply remote (propagated) configuration to local duplex bridge before start
Properties props = null;
try {
props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
IntrospectionSupport.getProperties(configuration, props, null);
if (configuration.getExcludedDestinations() != null) {
excludedDestinations = configuration.getExcludedDestinations().toArray(
new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
}
if (configuration.getStaticallyIncludedDestinations() != null) {
staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
}
if (configuration.getDynamicallyIncludedDestinations() != null) {
dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
}
} catch (Throwable t) {
LOG.error("Error mapping remote configuration: " + props, t);
}
}
try {
startLocalBridge();
} catch (Throwable e) {
@ -389,35 +422,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
try {
if (disposed.get()) {
return;
}
Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
try {
IntrospectionSupport.getProperties(configuration, props, null);
if (configuration.getExcludedDestinations() != null) {
excludedDestinations = configuration.getExcludedDestinations().toArray(
new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
}
if (configuration.getStaticallyIncludedDestinations() != null) {
staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
}
if (configuration.getDynamicallyIncludedDestinations() != null) {
dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
}
} catch (Throwable t) {
LOG.error("Error mapping remote destinations", t);
}
// Let the local broker know the remote broker's ID.
localBroker.oneway(remoteBrokerInfo);
// new peer broker (a consumer can work with remote broker also)
brokerService.getBroker().addBroker(null, remoteBrokerInfo);
startRemoteBridge();
startRemoteBridge();
} catch (Throwable e) {
serviceRemoteException(e);
@ -454,7 +459,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localBroker.oneway(localSessionInfo);
if (configuration.isDuplex()) {
// separate in-bound chamnel for forwards so we don't
// separate in-bound channel for forwards so we don't
// contend with out-bound dispatch on same connection
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
@ -483,10 +488,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
l.onStart(this);
}
// Let the local broker know the remote broker's ID.
localBroker.oneway(remoteBrokerInfo);
// new peer broker (a consumer can work with remote broker also)
brokerService.getBroker().addBroker(null, remoteBrokerInfo);
if (LOG.isInfoEnabled()) {
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " register bridge (" + this + ") to " + remoteBrokerName);
}
}
} else {
LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
}
@ -1137,7 +1149,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.error("Failed to add static destination " + dest, e);
}
if (LOG.isTraceEnabled()) {
LOG.trace("bridging messages for static destination: " + dest);
LOG.trace(configuration.getBrokerName() + ", bridging messages for static destination: " + dest);
}
}
}