fix sync on failover updated urls - intermittent failure on FailoverComplexClusterTest

This commit is contained in:
gtully 2015-05-19 11:41:49 +01:00
parent 27e11a388e
commit 2536c03125
2 changed files with 16 additions and 12 deletions

View File

@ -1284,21 +1284,22 @@ public class FailoverTransport implements CompositeTransport {
@Override
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
if (isUpdateURIsSupported()) {
HashSet<URI> copy = new HashSet<URI>(this.updated);
updated.clear();
if (updatedURIs != null && updatedURIs.length > 0) {
for (URI uri : updatedURIs) {
if (uri != null && !updated.contains(uri)) {
updated.add(uri);
}
}
if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
buildBackups();
synchronized (reconnectMutex) {
reconnect(rebalance);
HashSet<URI> copy = new HashSet<URI>();
synchronized (reconnectMutex) {
copy.addAll(this.updated);
updated.clear();
if (updatedURIs != null && updatedURIs.length > 0) {
for (URI uri : updatedURIs) {
if (uri != null && !updated.contains(uri)) {
updated.add(uri);
}
}
}
}
if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
buildBackups();
reconnect(rebalance);
}
}
}

View File

@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
* broker is removed and then show 3 after the 3rd broker is reintroduced.
*/
public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
protected final Logger LOG = LoggerFactory.getLogger(FailoverComplexClusterTest.class);
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
@ -258,6 +259,7 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
private void runTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter) throws Exception, InterruptedException {
assertClientsConnectedToThreeBrokers();
LOG.info("Stopping BrokerC in prep for restart");
getBroker(BROKER_C_NAME).stop();
getBroker(BROKER_C_NAME).waitUntilStopped();
removeBroker(BROKER_C_NAME);
@ -266,6 +268,7 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
assertClientsConnectedToTwoBrokers();
LOG.info("Recreating BrokerC after stop");
createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
getBroker(BROKER_C_NAME).waitUntilStarted();
Thread.sleep(5000);