diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 329405849e9..fb6f98ea3b1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -99,6 +99,24 @@ public class DiscoveryNode implements Writeable, ToXContent { private final Version version; private final Set roles; + + /** + * Creates a new {@link DiscoveryNode} + *

+ * Note: if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current + * version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used + * the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered + * and updated. + *

+ * + * @param id the nodes unique (persistent) node id. This constructor will auto generate a random ephemeral id. + * @param address the nodes transport address + * @param version the version of the node + */ + public DiscoveryNode(final String id, TransportAddress address, Version version) { + this(id, address, Collections.emptyMap(), Collections.emptySet(), version); + } + /** * Creates a new {@link DiscoveryNode} *

diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index f8eee7071f6..12f2bc3dcd3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -84,9 +84,6 @@ import java.util.stream.Collectors; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; -/** - * - */ public class ClusterService extends AbstractLifecycleComponent { public static final Setting CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = @@ -348,6 +345,7 @@ public class ClusterService extends AbstractLifecycleComponent { * @param source the source of the cluster state update task * @param updateTask the full context for the cluster state update * task + * */ public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) { submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask); @@ -371,6 +369,7 @@ public class ClusterService extends AbstractLifecycleComponent { * @param listener callback after the cluster state update task * completes * @param the type of the cluster state update task state + * */ public void submitStateUpdateTask(final String source, final T task, final ClusterStateTaskConfig config, @@ -390,6 +389,7 @@ public class ClusterService extends AbstractLifecycleComponent { * that share the same executor will be executed * batches on this executor * @param the type of the cluster state update task state + * */ public void submitStateUpdateTasks(final String source, final Map tasks, final ClusterStateTaskConfig config, @@ -411,7 +411,7 @@ public class ClusterService extends AbstractLifecycleComponent { List existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()); for (@SuppressWarnings("unchecked") UpdateTask existing : existingTasks) { if (tasksIdentity.containsKey(existing.task)) { - throw new IllegalArgumentException("task [" + existing.task + "] is already queued"); + throw new IllegalStateException("task [" + existing.task + "] with source [" + source + "] is already queued"); } } existingTasks.addAll(updateTasks); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 3388347d6c2..3b176f7eff9 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -25,6 +25,9 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -79,13 +82,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; -/** - * - */ public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider { public static final Setting PING_TIMEOUT_SETTING = @@ -148,6 +149,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover // must initialized in doStart(), when we have the allocationService set private volatile NodeJoinController nodeJoinController; + private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; @Inject public ZenDiscovery(Settings settings, ThreadPool threadPool, @@ -216,6 +218,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover joinThreadControl.start(); pingService.start(); this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings); + this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::rejoin, logger); } @Override @@ -500,43 +503,119 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } } + // visible for testing + static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + + private final AllocationService allocationService; + private final ElectMasterService electMasterService; + private final BiFunction rejoin; + private final ESLogger logger; + + static class Task { + + private final DiscoveryNode node; + private final String reason; + + public Task(final DiscoveryNode node, final String reason) { + this.node = node; + this.reason = reason; + } + + public DiscoveryNode node() { + return node; + } + + public String reason() { + return reason; + } + + @Override + public String toString() { + return node + " " + reason; + } + } + + NodeRemovalClusterStateTaskExecutor( + final AllocationService allocationService, + final ElectMasterService electMasterService, + final BiFunction rejoin, + final ESLogger logger) { + this.allocationService = allocationService; + this.electMasterService = electMasterService; + this.rejoin = rejoin; + this.logger = logger; + } + + @Override + public BatchResult execute(final ClusterState currentState, final List tasks) throws Exception { + final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes()); + boolean removed = false; + for (final Task task : tasks) { + if (currentState.nodes().nodeExists(task.node())) { + remainingNodesBuilder.remove(task.node()); + removed = true; + } else { + logger.debug("node [{}] does not exist in cluster state, ignoring", task); + } + } + + if (!removed) { + // no nodes to remove, keep the current cluster state + return BatchResult.builder().successes(tasks).build(currentState); + } + + final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder); + + final BatchResult.Builder resultBuilder = BatchResult.builder().successes(tasks); + if (!electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes())) { + return resultBuilder.build(rejoin.apply(remainingNodesClusterState, "not enough master nodes")); + } else { + final RoutingAllocation.Result routingResult = allocationService.reroute(remainingNodesClusterState, describeTasks(tasks)); + return resultBuilder.build(ClusterState.builder(remainingNodesClusterState).routingResult(routingResult).build()); + } + } + + // visible for testing + // hook is used in testing to ensure that correct cluster state is used to test whether a + // rejoin or reroute is needed + ClusterState remainingNodesClusterState(final ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) { + return ClusterState.builder(currentState).nodes(remainingNodesBuilder).build(); + } + + @Override + public void onFailure(final String source, final Exception e) { + logger.error("unexpected failure during [{}]", e, source); + } + + @Override + public void onNoLongerMaster(String source) { + logger.debug("no longer master while processing node removal [{}]", source); + } + + } + + private void removeNode(final DiscoveryNode node, final String source, final String reason) { + clusterService.submitStateUpdateTask( + source + "(" + node + "), reason(" + reason + ")", + new NodeRemovalClusterStateTaskExecutor.Task(node, reason), + ClusterStateTaskConfig.build(Priority.IMMEDIATE), + nodeRemovalExecutor, + nodeRemovalExecutor); + } + private void handleLeaveRequest(final DiscoveryNode node) { if (lifecycleState() != Lifecycle.State.STARTED) { // not started, ignore a node failure return; } if (localNodeMaster()) { - clusterService.submitStateUpdateTask("zen-disco-node-left(" + node + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) { - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node.getId()); - currentState = ClusterState.builder(currentState).nodes(builder).build(); - // check if we have enough master nodes, if not, we need to move into joining the cluster again - if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) { - return rejoin(currentState, "not enough master nodes"); - } - // eagerly run reroute to remove dead nodes from routing table - RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(), - "[" + node + "] left"); - return ClusterState.builder(currentState).routingResult(routingResult).build(); - } - - @Override - public void onNoLongerMaster(String source) { - // ignoring (already logged) - } - - @Override - public void onFailure(String source, Exception e) { - logger.error("unexpected failure during [{}]", e, source); - } - }); + removeNode(node, "zen-disco-node-left", "left"); } else if (node.equals(nodes().getMasterNode())) { handleMasterGone(node, null, "shut_down"); } } - private void handleNodeFailure(final DiscoveryNode node, String reason) { + private void handleNodeFailure(final DiscoveryNode node, final String reason) { if (lifecycleState() != Lifecycle.State.STARTED) { // not started, ignore a node failure return; @@ -545,41 +624,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover // nothing to do here... return; } - clusterService.submitStateUpdateTask("zen-disco-node-failed(" + node + "), reason " + reason, - new ClusterStateUpdateTask(Priority.IMMEDIATE) { - @Override - public ClusterState execute(ClusterState currentState) { - if (currentState.nodes().nodeExists(node) == false) { - logger.debug("node [{}] already removed from cluster state. ignoring.", node); - return currentState; - } - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node); - currentState = ClusterState.builder(currentState).nodes(builder).build(); - // check if we have enough master nodes, if not, we need to move into joining the cluster again - if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) { - return rejoin(currentState, "not enough master nodes"); - } - // eagerly run reroute to remove dead nodes from routing table - RoutingAllocation.Result routingResult = allocationService.reroute( - ClusterState.builder(currentState).build(), - "[" + node + "] failed"); - return ClusterState.builder(currentState).routingResult(routingResult).build(); - } - - @Override - public void onNoLongerMaster(String source) { - // already logged - } - - @Override - public void onFailure(String source, Exception e) { - logger.error("unexpected failure during [{}]", e, source); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - } - }); + removeNode(node, "zen-disco-node-failed", reason); } private void handleMinimumMasterNodesChanged(final int minimumMasterNodes) { diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index a41ad124a46..095bf65b189 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -57,9 +57,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -/** - * - */ @ClusterScope(scope = Scope.TEST, numDataNodes = 0) @ESIntegTestCase.SuppressLocalMode public class ClusterServiceIT extends ESIntegTestCase { diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index 54f6233631b..b0b2d1fc153 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -69,10 +69,14 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.is; public class ClusterServiceTests extends ESTestCase { @@ -654,8 +658,15 @@ public class ClusterServiceTests extends ESTestCase { clusterService.submitStateUpdateTask("first time", task, ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener); - expectThrows(IllegalArgumentException.class, () -> clusterService.submitStateUpdateTask("second time", task, - ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener)); + final IllegalStateException e = + expectThrows( + IllegalStateException.class, + () -> clusterService.submitStateUpdateTask( + "second time", + task, + ClusterStateTaskConfig.build(Priority.NORMAL), + executor, listener)); + assertThat(e, hasToString(containsString("task [1] with source [second time] is already queued"))); clusterService.submitStateUpdateTask("third time a charm", new SimpleTask(1), ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener); @@ -886,6 +897,11 @@ public class ClusterServiceTests extends ESTestCase { public boolean equals(Object obj) { return super.equals(obj); } + + @Override + public String toString() { + return Integer.toString(id); + } } private static class BlockingTask extends ClusterStateUpdateTask implements Releasable { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java new file mode 100644 index 00000000000..667ca6fbccb --- /dev/null +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java @@ -0,0 +1,185 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class NodeRemovalClusterStateTaskExecutorTests extends ESTestCase { + + public void testRemovingNonExistentNodes() throws Exception { + final ZenDiscovery.NodeRemovalClusterStateTaskExecutor executor = + new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(null, null, null, logger); + final DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + final int nodes = randomIntBetween(2, 16); + for (int i = 0; i < nodes; i++) { + builder.put(node(i)); + } + final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(builder).build(); + + final DiscoveryNodes.Builder removeBuilder = DiscoveryNodes.builder(); + for (int i = nodes; i < nodes + randomIntBetween(1, 16); i++) { + removeBuilder.put(node(i)); + } + final List tasks = + StreamSupport + .stream(removeBuilder.build().spliterator(), false) + .map(node -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed")) + .collect(Collectors.toList()); + + final ClusterStateTaskExecutor.BatchResult result + = executor.execute(clusterState, tasks); + assertThat(result.resultingState, equalTo(clusterState)); + } + + public void testNotEnoughMasterNodesAfterRemove() throws Exception { + final ElectMasterService electMasterService = mock(ElectMasterService.class); + when(electMasterService.hasEnoughMasterNodes(any(Iterable.class))).thenReturn(false); + + final AllocationService allocationService = mock(AllocationService.class); + + final AtomicBoolean rejoined = new AtomicBoolean(); + final AtomicReference rejoinedClusterState = new AtomicReference<>(); + final BiFunction rejoin = (cs, r) -> { + rejoined.set(true); + rejoinedClusterState.set(ClusterState.builder(cs).build()); + return rejoinedClusterState.get(); + }; + + final AtomicReference remainingNodesClusterState = new AtomicReference<>(); + final ZenDiscovery.NodeRemovalClusterStateTaskExecutor executor = + new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService, rejoin, logger) { + @Override + ClusterState remainingNodesClusterState(ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) { + remainingNodesClusterState.set(super.remainingNodesClusterState(currentState, remainingNodesBuilder)); + return remainingNodesClusterState.get(); + } + }; + + final DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + final int nodes = randomIntBetween(2, 16); + final List tasks = new ArrayList<>(); + // to ensure there is at least one removal + boolean first = true; + for (int i = 0; i < nodes; i++) { + final DiscoveryNode node = node(i); + builder.put(node); + if (first || randomBoolean()) { + tasks.add(new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed")); + } + first = false; + } + final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(builder).build(); + + final ClusterStateTaskExecutor.BatchResult result = + executor.execute(clusterState, tasks); + verify(electMasterService).hasEnoughMasterNodes(eq(remainingNodesClusterState.get().nodes())); + verifyNoMoreInteractions(electMasterService); + + // ensure that we did not reroute + verifyNoMoreInteractions(allocationService); + assertTrue(rejoined.get()); + assertThat(result.resultingState, equalTo(rejoinedClusterState.get())); + + for (final ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task task : tasks) { + assertNull(result.resultingState.nodes().get(task.node().getId())); + } + } + + public void testRerouteAfterRemovingNodes() throws Exception { + final ElectMasterService electMasterService = mock(ElectMasterService.class); + when(electMasterService.hasEnoughMasterNodes(any(Iterable.class))).thenReturn(true); + + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.reroute(any(ClusterState.class), any(String.class))).thenReturn(mock(RoutingAllocation.Result.class)); + + final BiFunction rejoin = (cs, r) -> { + fail("rejoin should not be invoked"); + return cs; + }; + + final AtomicReference remainingNodesClusterState = new AtomicReference<>(); + final ZenDiscovery.NodeRemovalClusterStateTaskExecutor executor = + new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService, rejoin, logger) { + @Override + ClusterState remainingNodesClusterState(ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) { + remainingNodesClusterState.set(super.remainingNodesClusterState(currentState, remainingNodesBuilder)); + return remainingNodesClusterState.get(); + } + }; + + final DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + final int nodes = randomIntBetween(2, 16); + final List tasks = new ArrayList<>(); + // to ensure that there is at least one removal + boolean first = true; + for (int i = 0; i < nodes; i++) { + final DiscoveryNode node = node(i); + builder.put(node); + if (first || randomBoolean()) { + tasks.add(new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed")); + } + first = false; + } + final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(builder).build(); + + final ClusterStateTaskExecutor.BatchResult result = + executor.execute(clusterState, tasks); + verify(electMasterService).hasEnoughMasterNodes(eq(remainingNodesClusterState.get().nodes())); + verifyNoMoreInteractions(electMasterService); + + verify(allocationService).reroute(eq(remainingNodesClusterState.get()), any(String.class)); + + for (final ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task task : tasks) { + assertNull(result.resultingState.nodes().get(task.node().getId())); + } + } + + private DiscoveryNode node(final int id) { + return new DiscoveryNode(Integer.toString(id), LocalTransportAddress.buildUnique(), Version.CURRENT); + } + +}