diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java index 058130dc7c..98d3dd27c6 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java @@ -91,17 +91,15 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { private boolean reportAdvertizeFailed = true; private ExecutorService executor = null; - class RemoteBrokerData { - final String brokerName; - final String service; + class RemoteBrokerData extends DiscoveryEvent { long lastHeartBeat; long recoveryTime; int failureCount; boolean failed; public RemoteBrokerData(String brokerName, String service) { - this.brokerName = brokerName; - this.service = service; + super(service); + setBrokerName(brokerName); this.lastHeartBeat = System.currentTimeMillis(); } @@ -112,7 +110,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { // failed in 60 seconds. if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) { if (LOG.isDebugEnabled()) { - LOG.debug("I now think that the " + service + " service has recovered."); + LOG.debug("I now think that the " + serviceName + " service has recovered."); } failureCount = 0; recoveryTime = 0; @@ -139,7 +137,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { } if (LOG.isDebugEnabled()) { - LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay + LOG.debug("Remote failure of " + serviceName + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay + " ms, the current failure count is: " + failureCount); } @@ -161,7 +159,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { // Are we done trying to recover this guy? if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) { if (LOG.isDebugEnabled()) { - LOG.debug("Max reconnect attempts of the " + service + " service has been reached."); + LOG.debug("Max reconnect attempts of the " + serviceName + " service has been reached."); } return false; } @@ -172,7 +170,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { } if (LOG.isDebugEnabled()) { - LOG.debug("Resuming event advertisement of the " + service + " service."); + LOG.debug("Resuming event advertisement of the " + serviceName + " service."); } failed = false; return true; @@ -466,7 +464,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { for (Iterator i = brokersByService.values().iterator(); i.hasNext();) { RemoteBrokerData data = i.next(); if (data.getLastHeartBeat() < expireTime) { - processDead(data.service); + processDead(data.getServiceName()); } } } @@ -488,11 +486,8 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { } } - private void fireServiceRemovedEvent(RemoteBrokerData data) { + private void fireServiceRemovedEvent(final RemoteBrokerData data) { if (discoveryListener != null && started.get()) { - final DiscoveryEvent event = new DiscoveryEvent(data.service); - event.setBrokerName(data.brokerName); - // Have the listener process the event async so that // he does not block this thread since we are doing time sensitive // processing of events. @@ -500,18 +495,16 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { public void run() { DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; if (discoveryListener != null) { - discoveryListener.onServiceRemove(event); + discoveryListener.onServiceRemove(data); } } }); } } - private void fireServiceAddEvent(RemoteBrokerData data) { + private void fireServiceAddEvent(final RemoteBrokerData data) { if (discoveryListener != null && started.get()) { - final DiscoveryEvent event = new DiscoveryEvent(data.service); - event.setBrokerName(data.brokerName); - + // Have the listener process the event async so that // he does not block this thread since we are doing time sensitive // processing of events. @@ -519,7 +512,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { public void run() { DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; if (discoveryListener != null) { - discoveryListener.onServiceAdd(event); + discoveryListener.onServiceAdd(data); } } }); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java index dce25b00fe..e911d0c348 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java @@ -37,13 +37,12 @@ public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTes protected static final int MESSAGE_COUNT = 200; private static final String HUB = "HubBroker"; private static final String SPOKE = "SpokeBroker"; - public boolean useDuplexNetworkBridge; + public boolean useDuplexNetworkBridge = true; + public boolean useStaticDiscovery = false; - private TransportConnector mCastTrpConnector; - public void initCombosForTestSendOnAFaultyTransport() { addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE , Boolean.FALSE } ); - addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } ); + addCombinationValues( "useStaticDiscovery", new Object[]{ Boolean.TRUE , Boolean.FALSE } ); } public void testSendOnAFaultyTransport() throws Exception { @@ -109,20 +108,23 @@ public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTes @Override protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception { - DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("multicast://default?group=TESTERIC&useLocalHost=false")); + String networkDisoveryUrlString = useStaticDiscovery ? + "static:(" + remoteBroker.getTransportConnectors().get(0).getPublishableConnectString() + ")?useExponentialBackOff=false" : + "multicast://default?group=TESTERIC&useLocalHost=false"; + + DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(networkDisoveryUrlString)); connector.setDynamicOnly(dynamicOnly); connector.setNetworkTTL(networkTTL); - localBroker.addNetworkConnector(connector); + connector.setDuplex(useDuplexNetworkBridge); maxSetupTime = 2000; - if (useDuplexNetworkBridge) { - connector.setDuplex(true); + if (!useStaticDiscovery) { + List transportConnectors = remoteBroker.getTransportConnectors(); + if (!transportConnectors.isEmpty()) { + TransportConnector mCastTrpConnector = ((TransportConnector)transportConnectors.get(0)); + mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC")); + } } - - List transportConnectors = remoteBroker.getTransportConnectors(); - if (!transportConnectors.isEmpty()) { - mCastTrpConnector = ((TransportConnector)transportConnectors.get(0)); - mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC")); - } + localBroker.addNetworkConnector(connector); return connector; } }