mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4160 - fix up multicast discovery regression - it was giving uniqueue events to add/remove. MulticastDiscoveryOnFaultyNetworkTest - added static variant to that test also to validate existing changes
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1428901 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
603dc66b44
commit
e5d616da50
|
@ -91,17 +91,15 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
private boolean reportAdvertizeFailed = true;
|
||||
private ExecutorService executor = null;
|
||||
|
||||
class RemoteBrokerData {
|
||||
final String brokerName;
|
||||
final String service;
|
||||
class RemoteBrokerData extends DiscoveryEvent {
|
||||
long lastHeartBeat;
|
||||
long recoveryTime;
|
||||
int failureCount;
|
||||
boolean failed;
|
||||
|
||||
public RemoteBrokerData(String brokerName, String service) {
|
||||
this.brokerName = brokerName;
|
||||
this.service = service;
|
||||
super(service);
|
||||
setBrokerName(brokerName);
|
||||
this.lastHeartBeat = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
@ -112,7 +110,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
// failed in 60 seconds.
|
||||
if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("I now think that the " + service + " service has recovered.");
|
||||
LOG.debug("I now think that the " + serviceName + " service has recovered.");
|
||||
}
|
||||
failureCount = 0;
|
||||
recoveryTime = 0;
|
||||
|
@ -139,7 +137,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay
|
||||
LOG.debug("Remote failure of " + serviceName + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay
|
||||
+ " ms, the current failure count is: " + failureCount);
|
||||
}
|
||||
|
||||
|
@ -161,7 +159,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
// Are we done trying to recover this guy?
|
||||
if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Max reconnect attempts of the " + service + " service has been reached.");
|
||||
LOG.debug("Max reconnect attempts of the " + serviceName + " service has been reached.");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -172,7 +170,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Resuming event advertisement of the " + service + " service.");
|
||||
LOG.debug("Resuming event advertisement of the " + serviceName + " service.");
|
||||
}
|
||||
failed = false;
|
||||
return true;
|
||||
|
@ -466,7 +464,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
|
||||
RemoteBrokerData data = i.next();
|
||||
if (data.getLastHeartBeat() < expireTime) {
|
||||
processDead(data.service);
|
||||
processDead(data.getServiceName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -488,11 +486,8 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
private void fireServiceRemovedEvent(RemoteBrokerData data) {
|
||||
private void fireServiceRemovedEvent(final RemoteBrokerData data) {
|
||||
if (discoveryListener != null && started.get()) {
|
||||
final DiscoveryEvent event = new DiscoveryEvent(data.service);
|
||||
event.setBrokerName(data.brokerName);
|
||||
|
||||
// Have the listener process the event async so that
|
||||
// he does not block this thread since we are doing time sensitive
|
||||
// processing of events.
|
||||
|
@ -500,17 +495,15 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
public void run() {
|
||||
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
|
||||
if (discoveryListener != null) {
|
||||
discoveryListener.onServiceRemove(event);
|
||||
discoveryListener.onServiceRemove(data);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void fireServiceAddEvent(RemoteBrokerData data) {
|
||||
private void fireServiceAddEvent(final RemoteBrokerData data) {
|
||||
if (discoveryListener != null && started.get()) {
|
||||
final DiscoveryEvent event = new DiscoveryEvent(data.service);
|
||||
event.setBrokerName(data.brokerName);
|
||||
|
||||
// Have the listener process the event async so that
|
||||
// he does not block this thread since we are doing time sensitive
|
||||
|
@ -519,7 +512,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
|
|||
public void run() {
|
||||
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
|
||||
if (discoveryListener != null) {
|
||||
discoveryListener.onServiceAdd(event);
|
||||
discoveryListener.onServiceAdd(data);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -37,13 +37,12 @@ public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTes
|
|||
protected static final int MESSAGE_COUNT = 200;
|
||||
private static final String HUB = "HubBroker";
|
||||
private static final String SPOKE = "SpokeBroker";
|
||||
public boolean useDuplexNetworkBridge;
|
||||
|
||||
private TransportConnector mCastTrpConnector;
|
||||
public boolean useDuplexNetworkBridge = true;
|
||||
public boolean useStaticDiscovery = false;
|
||||
|
||||
public void initCombosForTestSendOnAFaultyTransport() {
|
||||
addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE , Boolean.FALSE } );
|
||||
addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } );
|
||||
addCombinationValues( "useStaticDiscovery", new Object[]{ Boolean.TRUE , Boolean.FALSE } );
|
||||
}
|
||||
|
||||
public void testSendOnAFaultyTransport() throws Exception {
|
||||
|
@ -109,20 +108,23 @@ public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTes
|
|||
|
||||
@Override
|
||||
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
|
||||
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("multicast://default?group=TESTERIC&useLocalHost=false"));
|
||||
String networkDisoveryUrlString = useStaticDiscovery ?
|
||||
"static:(" + remoteBroker.getTransportConnectors().get(0).getPublishableConnectString() + ")?useExponentialBackOff=false" :
|
||||
"multicast://default?group=TESTERIC&useLocalHost=false";
|
||||
|
||||
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(networkDisoveryUrlString));
|
||||
connector.setDynamicOnly(dynamicOnly);
|
||||
connector.setNetworkTTL(networkTTL);
|
||||
localBroker.addNetworkConnector(connector);
|
||||
connector.setDuplex(useDuplexNetworkBridge);
|
||||
maxSetupTime = 2000;
|
||||
if (useDuplexNetworkBridge) {
|
||||
connector.setDuplex(true);
|
||||
}
|
||||
|
||||
if (!useStaticDiscovery) {
|
||||
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
|
||||
if (!transportConnectors.isEmpty()) {
|
||||
mCastTrpConnector = ((TransportConnector)transportConnectors.get(0));
|
||||
TransportConnector mCastTrpConnector = ((TransportConnector)transportConnectors.get(0));
|
||||
mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC"));
|
||||
}
|
||||
}
|
||||
localBroker.addNetworkConnector(connector);
|
||||
return connector;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue