From d7d5909e69982287a550b5427747ea610d9d6805 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 31 Oct 2016 15:09:43 +0100 Subject: [PATCH] Disconnect from newly added nodes if cluster state publishing fails (#21197) Before publishing a cluster state the master connects to the nodes that are added in the cluster state. When publishing fails, however, it does not disconnect from these nodes, leaving NodeConnectionsService out of sync with the currently applied cluster state. --- .../cluster/NodeConnectionsService.java | 9 +-- .../cluster/service/ClusterService.java | 6 +- .../cluster/NodeConnectionsServiceTests.java | 10 +-- .../cluster/service/ClusterServiceTests.java | 70 ++++++++++++++++++- .../test/ClusterServiceUtils.java | 5 +- 5 files changed, 85 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 6a9d0ae160f..94333c10dde 100644 --- a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -36,6 +36,7 @@ import org.elasticsearch.discovery.zen.NodesFaultDetection; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; @@ -75,10 +76,10 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings); } - public void connectToAddedNodes(ClusterChangedEvent event) { + public void connectToNodes(List addedNodes) { // TODO: do this in parallel (and wait) - for (final DiscoveryNode node : event.nodesDelta().addedNodes()) { + for (final DiscoveryNode node : addedNodes) { try (Releasable ignored = nodeLocks.acquire(node)) { Integer current = nodes.put(node, 0); assert current == null : "node " + node + " was added in event but already in internal nodes"; @@ -87,8 +88,8 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { } } - public void disconnectFromRemovedNodes(ClusterChangedEvent event) { - for (final DiscoveryNode node : event.nodesDelta().removedNodes()) { + public void disconnectFromNodes(List removedNodes) { + for (final DiscoveryNode node : removedNodes) { try (Releasable ignored = nodeLocks.acquire(node)) { Integer current = nodes.remove(node); assert current != null : "node " + node + " was removed in event but not in internal nodes"; diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 38d42a0b0f9..fe2adf1f4c6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -671,7 +671,7 @@ public class ClusterService extends AbstractLifecycleComponent { } } - nodeConnectionsService.connectToAddedNodes(clusterChangedEvent); + nodeConnectionsService.connectToNodes(clusterChangedEvent.nodesDelta().addedNodes()); // if we are the master, publish the new state to all nodes // we publish here before we send a notification to all the listeners, since if it fails @@ -686,6 +686,8 @@ public class ClusterService extends AbstractLifecycleComponent { (Supplier) () -> new ParameterizedMessage( "failing [{}]: failed to commit cluster state version [{}]", tasksSummary, version), t); + // ensure that list of connected nodes in NodeConnectionsService is in-sync with the nodes of the current cluster state + nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().addedNodes()); proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t)); return; } @@ -711,7 +713,7 @@ public class ClusterService extends AbstractLifecycleComponent { } } - nodeConnectionsService.disconnectFromRemovedNodes(clusterChangedEvent); + nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes()); newClusterState.status(ClusterState.ClusterStateStatus.APPLIED); diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 599b62b1ee2..863349e897a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -84,19 +84,19 @@ public class NodeConnectionsServiceTests extends ESTestCase { ClusterState current = clusterStateFromNodes(Collections.emptyList()); ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); - service.connectToAddedNodes(event); + service.connectToNodes(event.nodesDelta().addedNodes()); assertConnected(event.nodesDelta().addedNodes()); - service.disconnectFromRemovedNodes(event); + service.disconnectFromNodes(event.nodesDelta().removedNodes()); assertConnectedExactlyToNodes(event.state()); current = event.state(); event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); - service.connectToAddedNodes(event); + service.connectToNodes(event.nodesDelta().addedNodes()); assertConnected(event.nodesDelta().addedNodes()); - service.disconnectFromRemovedNodes(event); + service.disconnectFromNodes(event.nodesDelta().removedNodes()); assertConnectedExactlyToNodes(event.state()); } @@ -110,7 +110,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { transport.randomConnectionExceptions = true; - service.connectToAddedNodes(event); + service.connectToNodes(event.nodesDelta().addedNodes()); for (int i = 0; i < 3; i++) { // simulate disconnects diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index c12b54e71ef..a39bcf38391 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -41,6 +41,8 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -119,12 +121,12 @@ public class ClusterServiceTests extends ESTestCase { emptySet(), Version.CURRENT)); timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override - public void connectToAddedNodes(ClusterChangedEvent event) { + public void connectToNodes(List addedNodes) { // skip } @Override - public void disconnectFromRemovedNodes(ClusterChangedEvent event) { + public void disconnectFromNodes(List removedNodes) { // skip } }); @@ -970,6 +972,70 @@ public class ClusterServiceTests extends ESTestCase { mockAppender.assertAllExpectationsMatched(); } + public void testDisconnectFromNewlyAddedNodesIfClusterStatePublishingFails() throws InterruptedException { + TimedClusterService timedClusterService = new TimedClusterService(Settings.builder().put("cluster.name", + "ClusterServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool); + timedClusterService.setLocalNode(new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), + emptySet(), Version.CURRENT)); + Set currentNodes = Collections.synchronizedSet(new HashSet<>()); + currentNodes.add(timedClusterService.localNode()); + timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { + @Override + public void connectToNodes(List addedNodes) { + currentNodes.addAll(addedNodes); + } + + @Override + public void disconnectFromNodes(List removedNodes) { + currentNodes.removeAll(removedNodes); + } + }); + AtomicBoolean failToCommit = new AtomicBoolean(); + timedClusterService.setClusterStatePublisher((event, ackListener) -> { + if (failToCommit.get()) { + throw new Discovery.FailedToCommitClusterStateException("just to test this"); + } + }); + timedClusterService.start(); + ClusterState state = timedClusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes) + .masterNodeId(nodes.getLocalNodeId()); + state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .nodes(nodesBuilder).build(); + setState(timedClusterService, state); + + assertThat(currentNodes, equalTo(Sets.newHashSet(timedClusterService.state().getNodes()))); + + final CountDownLatch latch = new CountDownLatch(1); + + // try to add node when cluster state publishing fails + failToCommit.set(true); + timedClusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + DiscoveryNode newNode = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), + emptySet(), Version.CURRENT); + return ClusterState.builder(currentState).nodes(DiscoveryNodes.builder(currentState.nodes()).add(newNode)).build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Exception e) { + latch.countDown(); + } + }); + + latch.await(); + assertThat(currentNodes, equalTo(Sets.newHashSet(timedClusterService.state().getNodes()))); + timedClusterService.close(); + } + private static class SimpleTask { private final int id; diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index f247c56636c..3e3896dfc2c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.concurrent.CountDownLatch; import static junit.framework.TestCase.fail; @@ -53,12 +54,12 @@ public class ClusterServiceUtils { clusterService.setLocalNode(localNode); clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override - public void connectToAddedNodes(ClusterChangedEvent event) { + public void connectToNodes(List addedNodes) { // skip } @Override - public void disconnectFromRemovedNodes(ClusterChangedEvent event) { + public void disconnectFromNodes(List removedNodes) { // skip } });