Close nodes properly in Coordinator tests (#44967)

Today closing a `ClusterNode` in an `AbstractCoordinatorTestCase` uses
`onNode()` so has no effect if the node is not in the current list of nodes.
It also discards the `Runnable` it creates without having run it, so has no
effect anyway.

This commit makes these tests much stricter about properly closing the nodes
started during `Coordinator` tests, by tracking the persisted states that are
opened, and adds an assertion to catch the trappy requirement that the closing
node still belongs to the cluster.
This commit is contained in:
David Turner 2019-07-30 11:46:14 +01:00
parent cf78ca58e3
commit 55f1dd8da6
5 changed files with 972 additions and 866 deletions

View File

@ -444,6 +444,10 @@ public class CoordinationState {
assert publishVotes.isEmpty() || electionWon();
}
public void close() {
persistedState.close();
}
/**
* Pluggable persistence layer for {@link CoordinationState}.
*
@ -506,6 +510,8 @@ public class CoordinationState {
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaDataBuilder).build());
}
}
default void close() {}
}
/**

View File

@ -725,6 +725,15 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
protected void doClose() {
final CoordinationState coordinationState = this.coordinationState.get();
if (coordinationState != null) {
// This looks like a race that might leak an unclosed CoordinationState if it's created while execution is here, but this method
// is synchronized on AbstractLifecycleComponent#lifestyle, as is the doStart() method that creates the CoordinationState, so
// it's all ok.
synchronized (mutex) {
coordinationState.close();
}
}
}
public void invariant() {

View File

@ -50,6 +50,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -128,6 +129,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -137,6 +139,7 @@ import static org.hamcrest.Matchers.sameInstance;
public class AbstractCoordinatorTestCase extends ESTestCase {
protected final List<NodeEnvironment> nodeEnvironments = new ArrayList<>();
protected final Set<Cluster.MockPersistedState> openPersistedStates = new HashSet<>();
protected final AtomicInteger nextNodeIndex = new AtomicInteger();
@ -153,6 +156,11 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
nodeEnvironments.clear();
}
@After
public void assertAllPersistedStatesClosed() {
assertThat(openPersistedStates, empty());
}
@Before
public void resetPortCounterBeforeEachTest() {
resetPortCounter();
@ -162,12 +170,13 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
public void testRepeatableTests() throws Exception {
final Callable<Long> test = () -> {
resetNodeIndexBeforeEachTest();
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
final long afterRunRandomly = value(cluster.getAnyNode().getLastAppliedClusterState());
cluster.stabilise();
final long afterStabilisation = value(cluster.getAnyNode().getLastAppliedClusterState());
return afterRunRandomly ^ afterStabilisation;
try (Cluster cluster = new Cluster(randomIntBetween(1, 5))) {
cluster.runRandomly();
final long afterRunRandomly = value(cluster.getAnyNode().getLastAppliedClusterState());
cluster.stabilise();
final long afterStabilisation = value(cluster.getAnyNode().getLastAppliedClusterState());
return afterRunRandomly ^ afterStabilisation;
}
};
final long seed = randomLong();
logger.info("First run with seed [{}]", seed);
@ -224,7 +233,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
// then wait for the new leader to commit a state without the old leader
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY;
class Cluster {
class Cluster implements Releasable {
static final long EXTREME_DELAY_VARIABILITY = 10000L;
static final long DEFAULT_DELAY_VARIABILITY = 100L;
@ -680,6 +689,11 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
blackholedConnections.clear();
}
@Override
public void close() {
clusterNodes.forEach(ClusterNode::close);
}
class MockPersistedState implements CoordinationState.PersistedState {
private final CoordinationState.PersistedState delegate;
private final NodeEnvironment nodeEnvironment;
@ -816,6 +830,11 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version());
delegate.setLastAcceptedState(clusterState);
}
@Override
public void close() {
assertTrue(openPersistedStates.remove(this));
}
}
class ClusterNode {
@ -843,7 +862,16 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
this.localNode = localNode;
this.nodeSettings = nodeSettings;
persistedState = persistedStateSupplier.apply(localNode);
onNodeLog(localNode, this::setUp).run();
assertTrue("must use a fresh PersistedState", openPersistedStates.add(persistedState));
boolean success = false;
try {
onNodeLog(localNode, this::setUp).run();
success = true;
} finally {
if (success == false) {
persistedState.close(); // removes it from openPersistedStates
}
}
}
private void setUp() {
@ -903,15 +931,16 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
}
void close() {
assertThat("must add nodes to a cluster before closing them", clusterNodes, hasItem(ClusterNode.this));
onNode(() -> {
logger.trace("taking down [{}]", localNode);
logger.trace("closing");
coordinator.stop();
clusterService.stop();
//transportService.stop(); // does blocking stuff :/
clusterService.close();
coordinator.close();
//transportService.close(); // does blocking stuff :/
});
}).run();
}
ClusterNode restartedNode() {

View File

@ -48,13 +48,14 @@ public class VotingOnlyNodeCoordinatorTests extends AbstractCoordinatorTestCase
}
public void testDoesNotElectVotingOnlyMasterNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5), false, Settings.EMPTY);
cluster.runRandomly();
cluster.stabilise();
try (Cluster cluster = new Cluster(randomIntBetween(1, 5), false, Settings.EMPTY)) {
cluster.runRandomly();
cluster.stabilise();
final Cluster.ClusterNode leader = cluster.getAnyLeader();
assertTrue(leader.getLocalNode().isMasterNode());
assertFalse(leader.getLocalNode().toString(), VotingOnlyNodePlugin.isVotingOnlyNode(leader.getLocalNode()));
final Cluster.ClusterNode leader = cluster.getAnyLeader();
assertTrue(leader.getLocalNode().isMasterNode());
assertFalse(leader.getLocalNode().toString(), VotingOnlyNodePlugin.isVotingOnlyNode(leader.getLocalNode()));
}
}
@Override