Disconnect from newly added nodes if cluster state publishing fails (#21197)

Before publishing a cluster state the master connects to the nodes that are added in the cluster state. When publishing fails, however, it does not disconnect from these nodes, leaving NodeConnectionsService out of sync with the currently applied cluster state.
This commit is contained in:
Yannick Welsch 2016-10-31 15:09:43 +01:00 committed by GitHub
parent 37228f924a
commit d7d5909e69
5 changed files with 85 additions and 15 deletions

View File

@ -36,6 +36,7 @@ import org.elasticsearch.discovery.zen.NodesFaultDetection;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
@ -75,10 +76,10 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
}
public void connectToAddedNodes(ClusterChangedEvent event) {
public void connectToNodes(List<DiscoveryNode> addedNodes) {
// TODO: do this in parallel (and wait)
for (final DiscoveryNode node : event.nodesDelta().addedNodes()) {
for (final DiscoveryNode node : addedNodes) {
try (Releasable ignored = nodeLocks.acquire(node)) {
Integer current = nodes.put(node, 0);
assert current == null : "node " + node + " was added in event but already in internal nodes";
@ -87,8 +88,8 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
}
}
public void disconnectFromRemovedNodes(ClusterChangedEvent event) {
for (final DiscoveryNode node : event.nodesDelta().removedNodes()) {
public void disconnectFromNodes(List<DiscoveryNode> removedNodes) {
for (final DiscoveryNode node : removedNodes) {
try (Releasable ignored = nodeLocks.acquire(node)) {
Integer current = nodes.remove(node);
assert current != null : "node " + node + " was removed in event but not in internal nodes";

View File

@ -671,7 +671,7 @@ public class ClusterService extends AbstractLifecycleComponent {
}
}
nodeConnectionsService.connectToAddedNodes(clusterChangedEvent);
nodeConnectionsService.connectToNodes(clusterChangedEvent.nodesDelta().addedNodes());
// if we are the master, publish the new state to all nodes
// we publish here before we send a notification to all the listeners, since if it fails
@ -686,6 +686,8 @@ public class ClusterService extends AbstractLifecycleComponent {
(Supplier<?>) () -> new ParameterizedMessage(
"failing [{}]: failed to commit cluster state version [{}]", tasksSummary, version),
t);
// ensure that list of connected nodes in NodeConnectionsService is in-sync with the nodes of the current cluster state
nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().addedNodes());
proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t));
return;
}
@ -711,7 +713,7 @@ public class ClusterService extends AbstractLifecycleComponent {
}
}
nodeConnectionsService.disconnectFromRemovedNodes(clusterChangedEvent);
nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes());
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);

View File

@ -84,19 +84,19 @@ public class NodeConnectionsServiceTests extends ESTestCase {
ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
service.connectToAddedNodes(event);
service.connectToNodes(event.nodesDelta().addedNodes());
assertConnected(event.nodesDelta().addedNodes());
service.disconnectFromRemovedNodes(event);
service.disconnectFromNodes(event.nodesDelta().removedNodes());
assertConnectedExactlyToNodes(event.state());
current = event.state();
event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
service.connectToAddedNodes(event);
service.connectToNodes(event.nodesDelta().addedNodes());
assertConnected(event.nodesDelta().addedNodes());
service.disconnectFromRemovedNodes(event);
service.disconnectFromNodes(event.nodesDelta().removedNodes());
assertConnectedExactlyToNodes(event.state());
}
@ -110,7 +110,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
transport.randomConnectionExceptions = true;
service.connectToAddedNodes(event);
service.connectToNodes(event.nodesDelta().addedNodes());
for (int i = 0; i < 3; i++) {
// simulate disconnects

View File

@ -41,6 +41,8 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -119,12 +121,12 @@ public class ClusterServiceTests extends ESTestCase {
emptySet(), Version.CURRENT));
timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToAddedNodes(ClusterChangedEvent event) {
public void connectToNodes(List<DiscoveryNode> addedNodes) {
// skip
}
@Override
public void disconnectFromRemovedNodes(ClusterChangedEvent event) {
public void disconnectFromNodes(List<DiscoveryNode> removedNodes) {
// skip
}
});
@ -970,6 +972,70 @@ public class ClusterServiceTests extends ESTestCase {
mockAppender.assertAllExpectationsMatched();
}
public void testDisconnectFromNewlyAddedNodesIfClusterStatePublishingFails() throws InterruptedException {
TimedClusterService timedClusterService = new TimedClusterService(Settings.builder().put("cluster.name",
"ClusterServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool);
timedClusterService.setLocalNode(new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT));
Set<DiscoveryNode> currentNodes = Collections.synchronizedSet(new HashSet<>());
currentNodes.add(timedClusterService.localNode());
timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(List<DiscoveryNode> addedNodes) {
currentNodes.addAll(addedNodes);
}
@Override
public void disconnectFromNodes(List<DiscoveryNode> removedNodes) {
currentNodes.removeAll(removedNodes);
}
});
AtomicBoolean failToCommit = new AtomicBoolean();
timedClusterService.setClusterStatePublisher((event, ackListener) -> {
if (failToCommit.get()) {
throw new Discovery.FailedToCommitClusterStateException("just to test this");
}
});
timedClusterService.start();
ClusterState state = timedClusterService.state();
final DiscoveryNodes nodes = state.nodes();
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes)
.masterNodeId(nodes.getLocalNodeId());
state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.nodes(nodesBuilder).build();
setState(timedClusterService, state);
assertThat(currentNodes, equalTo(Sets.newHashSet(timedClusterService.state().getNodes())));
final CountDownLatch latch = new CountDownLatch(1);
// try to add node when cluster state publishing fails
failToCommit.set(true);
timedClusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
DiscoveryNode newNode = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT);
return ClusterState.builder(currentState).nodes(DiscoveryNodes.builder(currentState.nodes()).add(newNode)).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Exception e) {
latch.countDown();
}
});
latch.await();
assertThat(currentNodes, equalTo(Sets.newHashSet(timedClusterService.state().getNodes())));
timedClusterService.close();
}
private static class SimpleTask {
private final int id;

View File

@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static junit.framework.TestCase.fail;
@ -53,12 +54,12 @@ public class ClusterServiceUtils {
clusterService.setLocalNode(localNode);
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToAddedNodes(ClusterChangedEvent event) {
public void connectToNodes(List<DiscoveryNode> addedNodes) {
// skip
}
@Override
public void disconnectFromRemovedNodes(ClusterChangedEvent event) {
public void disconnectFromNodes(List<DiscoveryNode> removedNodes) {
// skip
}
});