Fix TransportAddVotingConfigExclusionsActionTests Leaking CS Observers (#55549) (#55584)

There is no guarantee the observer and subsequent CS update will execute
before we move on to the next test here and we ahve to wait for the observer + CS update cycle to complete
before moving on to the next test.

closes #55481
This commit is contained in:
Armin Braun 2020-04-22 13:49:24 +02:00 committed by GitHub
parent 2dc5586afe
commit 250a51bca1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 19 deletions

View File

@ -150,9 +150,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testWithdrawsVoteFromANode() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME, new AddVotingConfigExclusionsRequest("other1"),
expectSuccess(r -> {
assertNotNull(r);
@ -165,9 +165,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testWithdrawsVotesFromMultipleNodes() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest("other1", "other2"),
expectSuccess(r -> {
@ -182,9 +182,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testWithdrawsVotesFromNodesMatchingWildcard() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("other*"),
expectSuccess(r -> {
assertNotNull(r);
@ -199,9 +199,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testWithdrawsVotesFromAllMasterEligibleNodes() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("_all"),
expectSuccess(r -> {
assertNotNull(r);
@ -216,9 +216,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testWithdrawsVoteFromLocalNode() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("_local"),
expectSuccess(r -> {
assertNotNull(r);
@ -296,9 +296,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testExcludeAbsentNodesByNodeIds() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest(Strings.EMPTY_ARRAY, new String[]{"absent_id"},
Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30)),
@ -313,9 +313,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testExcludeExistingNodesByNodeIds() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest(Strings.EMPTY_ARRAY, new String[]{"other1", "other2"},
Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30)),
@ -331,9 +331,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testExcludeAbsentNodesByNodeNames() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME, new AddVotingConfigExclusionsRequest("absent_node"),
expectSuccess(e -> {
countDownLatch.countDown();
@ -346,9 +346,9 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
}
public void testExcludeExistingNodesByNodeNames() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(2);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch));
transportService.sendRequest(localNode, AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest("other1", "other2"),
expectSuccess(r -> {
@ -549,7 +549,14 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
};
}
private class AdjustConfigurationForExclusions implements Listener {
private static class AdjustConfigurationForExclusions implements Listener {
final CountDownLatch doneLatch;
AdjustConfigurationForExclusions(CountDownLatch latch) {
this.doneLatch = latch;
}
@Override
public void onNewClusterState(ClusterState state) {
clusterService.getMasterService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() {
@ -573,6 +580,11 @@ public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase {
public void onFailure(String source, Exception e) {
throw new AssertionError("unexpected failure", e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
doneLatch.countDown();
}
});
}