Batch process node left and node failure

Today when a node is removed the cluster (it leaves or it fails), we
submit a cluster state update task. These cluster state update tasks are
processed serially on the master. When nodes are removed en masse (e.g.,
a rack is taken down or otherwise becomes unavailable), the master will
be slow to process these failures because of the resulting reroutes and
publishing of each subsequent cluster state. We improve this in this
commit by processing the node removals using the cluster state update
task batch processing framework.

Relates #19289
This commit is contained in:
Jason Tedor 2016-07-11 08:30:09 -04:00 committed by GitHub
parent 3f3c93ec65
commit df7ad9970b
6 changed files with 334 additions and 73 deletions

View File

@ -99,6 +99,24 @@ public class DiscoveryNode implements Writeable, ToXContent {
private final Version version;
private final Set<Role> roles;
/**
* Creates a new {@link DiscoveryNode}
* <p>
* <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current
* version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
* </p>
*
* @param id the nodes unique (persistent) node id. This constructor will auto generate a random ephemeral id.
* @param address the nodes transport address
* @param version the version of the node
*/
public DiscoveryNode(final String id, TransportAddress address, Version version) {
this(id, address, Collections.emptyMap(), Collections.emptySet(), version);
}
/**
* Creates a new {@link DiscoveryNode}
* <p>

View File

@ -84,9 +84,6 @@ import java.util.stream.Collectors;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
/**
*
*/
public class ClusterService extends AbstractLifecycleComponent {
public static final Setting<TimeValue> CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING =
@ -348,6 +345,7 @@ public class ClusterService extends AbstractLifecycleComponent {
* @param source the source of the cluster state update task
* @param updateTask the full context for the cluster state update
* task
*
*/
public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) {
submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
@ -371,6 +369,7 @@ public class ClusterService extends AbstractLifecycleComponent {
* @param listener callback after the cluster state update task
* completes
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTask(final String source, final T task,
final ClusterStateTaskConfig config,
@ -390,6 +389,7 @@ public class ClusterService extends AbstractLifecycleComponent {
* that share the same executor will be executed
* batches on this executor
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTasks(final String source,
final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
@ -411,7 +411,7 @@ public class ClusterService extends AbstractLifecycleComponent {
List<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>());
for (@SuppressWarnings("unchecked") UpdateTask<T> existing : existingTasks) {
if (tasksIdentity.containsKey(existing.task)) {
throw new IllegalArgumentException("task [" + existing.task + "] is already queued");
throw new IllegalStateException("task [" + existing.task + "] with source [" + source + "] is already queued");
}
}
existingTasks.addAll(updateTasks);

View File

@ -25,6 +25,9 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -79,13 +82,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
*
*/
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider {
public static final Setting<TimeValue> PING_TIMEOUT_SETTING =
@ -148,6 +149,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
// must initialized in doStart(), when we have the allocationService set
private volatile NodeJoinController nodeJoinController;
private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
@Inject
public ZenDiscovery(Settings settings, ThreadPool threadPool,
@ -216,6 +218,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::rejoin, logger);
}
@Override
@ -500,43 +503,119 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
}
// visible for testing
static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {
private final AllocationService allocationService;
private final ElectMasterService electMasterService;
private final BiFunction<ClusterState, String, ClusterState> rejoin;
private final ESLogger logger;
static class Task {
private final DiscoveryNode node;
private final String reason;
public Task(final DiscoveryNode node, final String reason) {
this.node = node;
this.reason = reason;
}
public DiscoveryNode node() {
return node;
}
public String reason() {
return reason;
}
@Override
public String toString() {
return node + " " + reason;
}
}
NodeRemovalClusterStateTaskExecutor(
final AllocationService allocationService,
final ElectMasterService electMasterService,
final BiFunction<ClusterState, String, ClusterState> rejoin,
final ESLogger logger) {
this.allocationService = allocationService;
this.electMasterService = electMasterService;
this.rejoin = rejoin;
this.logger = logger;
}
@Override
public BatchResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
boolean removed = false;
for (final Task task : tasks) {
if (currentState.nodes().nodeExists(task.node())) {
remainingNodesBuilder.remove(task.node());
removed = true;
} else {
logger.debug("node [{}] does not exist in cluster state, ignoring", task);
}
}
if (!removed) {
// no nodes to remove, keep the current cluster state
return BatchResult.<Task>builder().successes(tasks).build(currentState);
}
final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
final BatchResult.Builder<Task> resultBuilder = BatchResult.<Task>builder().successes(tasks);
if (!electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes())) {
return resultBuilder.build(rejoin.apply(remainingNodesClusterState, "not enough master nodes"));
} else {
final RoutingAllocation.Result routingResult = allocationService.reroute(remainingNodesClusterState, describeTasks(tasks));
return resultBuilder.build(ClusterState.builder(remainingNodesClusterState).routingResult(routingResult).build());
}
}
// visible for testing
// hook is used in testing to ensure that correct cluster state is used to test whether a
// rejoin or reroute is needed
ClusterState remainingNodesClusterState(final ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) {
return ClusterState.builder(currentState).nodes(remainingNodesBuilder).build();
}
@Override
public void onFailure(final String source, final Exception e) {
logger.error("unexpected failure during [{}]", e, source);
}
@Override
public void onNoLongerMaster(String source) {
logger.debug("no longer master while processing node removal [{}]", source);
}
}
private void removeNode(final DiscoveryNode node, final String source, final String reason) {
clusterService.submitStateUpdateTask(
source + "(" + node + "), reason(" + reason + ")",
new NodeRemovalClusterStateTaskExecutor.Task(node, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
nodeRemovalExecutor);
}
private void handleLeaveRequest(final DiscoveryNode node) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
return;
}
if (localNodeMaster()) {
clusterService.submitStateUpdateTask("zen-disco-node-left(" + node + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node.getId());
currentState = ClusterState.builder(currentState).nodes(builder).build();
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(),
"[" + node + "] left");
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@Override
public void onNoLongerMaster(String source) {
// ignoring (already logged)
}
@Override
public void onFailure(String source, Exception e) {
logger.error("unexpected failure during [{}]", e, source);
}
});
removeNode(node, "zen-disco-node-left", "left");
} else if (node.equals(nodes().getMasterNode())) {
handleMasterGone(node, null, "shut_down");
}
}
private void handleNodeFailure(final DiscoveryNode node, String reason) {
private void handleNodeFailure(final DiscoveryNode node, final String reason) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
return;
@ -545,41 +624,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node-failed(" + node + "), reason " + reason,
new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(node) == false) {
logger.debug("node [{}] already removed from cluster state. ignoring.", node);
return currentState;
}
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node);
currentState = ClusterState.builder(currentState).nodes(builder).build();
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = allocationService.reroute(
ClusterState.builder(currentState).build(),
"[" + node + "] failed");
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@Override
public void onNoLongerMaster(String source) {
// already logged
}
@Override
public void onFailure(String source, Exception e) {
logger.error("unexpected failure during [{}]", e, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
removeNode(node, "zen-disco-node-failed", reason);
}
private void handleMinimumMasterNodesChanged(final int minimumMasterNodes) {

View File

@ -57,9 +57,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
*
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class ClusterServiceIT extends ESIntegTestCase {

View File

@ -69,10 +69,14 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.is;
public class ClusterServiceTests extends ESTestCase {
@ -654,8 +658,15 @@ public class ClusterServiceTests extends ESTestCase {
clusterService.submitStateUpdateTask("first time", task, ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener);
expectThrows(IllegalArgumentException.class, () -> clusterService.submitStateUpdateTask("second time", task,
ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener));
final IllegalStateException e =
expectThrows(
IllegalStateException.class,
() -> clusterService.submitStateUpdateTask(
"second time",
task,
ClusterStateTaskConfig.build(Priority.NORMAL),
executor, listener));
assertThat(e, hasToString(containsString("task [1] with source [second time] is already queued")));
clusterService.submitStateUpdateTask("third time a charm", new SimpleTask(1),
ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener);
@ -886,6 +897,11 @@ public class ClusterServiceTests extends ESTestCase {
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public String toString() {
return Integer.toString(id);
}
}
private static class BlockingTask extends ClusterStateUpdateTask implements Releasable {

View File

@ -0,0 +1,185 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class NodeRemovalClusterStateTaskExecutorTests extends ESTestCase {
public void testRemovingNonExistentNodes() throws Exception {
final ZenDiscovery.NodeRemovalClusterStateTaskExecutor executor =
new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(null, null, null, logger);
final DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
final int nodes = randomIntBetween(2, 16);
for (int i = 0; i < nodes; i++) {
builder.put(node(i));
}
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(builder).build();
final DiscoveryNodes.Builder removeBuilder = DiscoveryNodes.builder();
for (int i = nodes; i < nodes + randomIntBetween(1, 16); i++) {
removeBuilder.put(node(i));
}
final List<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> tasks =
StreamSupport
.stream(removeBuilder.build().spliterator(), false)
.map(node -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed"))
.collect(Collectors.toList());
final ClusterStateTaskExecutor.BatchResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result
= executor.execute(clusterState, tasks);
assertThat(result.resultingState, equalTo(clusterState));
}
public void testNotEnoughMasterNodesAfterRemove() throws Exception {
final ElectMasterService electMasterService = mock(ElectMasterService.class);
when(electMasterService.hasEnoughMasterNodes(any(Iterable.class))).thenReturn(false);
final AllocationService allocationService = mock(AllocationService.class);
final AtomicBoolean rejoined = new AtomicBoolean();
final AtomicReference<ClusterState> rejoinedClusterState = new AtomicReference<>();
final BiFunction<ClusterState, String, ClusterState> rejoin = (cs, r) -> {
rejoined.set(true);
rejoinedClusterState.set(ClusterState.builder(cs).build());
return rejoinedClusterState.get();
};
final AtomicReference<ClusterState> remainingNodesClusterState = new AtomicReference<>();
final ZenDiscovery.NodeRemovalClusterStateTaskExecutor executor =
new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService, rejoin, logger) {
@Override
ClusterState remainingNodesClusterState(ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) {
remainingNodesClusterState.set(super.remainingNodesClusterState(currentState, remainingNodesBuilder));
return remainingNodesClusterState.get();
}
};
final DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
final int nodes = randomIntBetween(2, 16);
final List<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> tasks = new ArrayList<>();
// to ensure there is at least one removal
boolean first = true;
for (int i = 0; i < nodes; i++) {
final DiscoveryNode node = node(i);
builder.put(node);
if (first || randomBoolean()) {
tasks.add(new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed"));
}
first = false;
}
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(builder).build();
final ClusterStateTaskExecutor.BatchResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result =
executor.execute(clusterState, tasks);
verify(electMasterService).hasEnoughMasterNodes(eq(remainingNodesClusterState.get().nodes()));
verifyNoMoreInteractions(electMasterService);
// ensure that we did not reroute
verifyNoMoreInteractions(allocationService);
assertTrue(rejoined.get());
assertThat(result.resultingState, equalTo(rejoinedClusterState.get()));
for (final ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task task : tasks) {
assertNull(result.resultingState.nodes().get(task.node().getId()));
}
}
public void testRerouteAfterRemovingNodes() throws Exception {
final ElectMasterService electMasterService = mock(ElectMasterService.class);
when(electMasterService.hasEnoughMasterNodes(any(Iterable.class))).thenReturn(true);
final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.reroute(any(ClusterState.class), any(String.class))).thenReturn(mock(RoutingAllocation.Result.class));
final BiFunction<ClusterState, String, ClusterState> rejoin = (cs, r) -> {
fail("rejoin should not be invoked");
return cs;
};
final AtomicReference<ClusterState> remainingNodesClusterState = new AtomicReference<>();
final ZenDiscovery.NodeRemovalClusterStateTaskExecutor executor =
new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService, rejoin, logger) {
@Override
ClusterState remainingNodesClusterState(ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) {
remainingNodesClusterState.set(super.remainingNodesClusterState(currentState, remainingNodesBuilder));
return remainingNodesClusterState.get();
}
};
final DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
final int nodes = randomIntBetween(2, 16);
final List<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> tasks = new ArrayList<>();
// to ensure that there is at least one removal
boolean first = true;
for (int i = 0; i < nodes; i++) {
final DiscoveryNode node = node(i);
builder.put(node);
if (first || randomBoolean()) {
tasks.add(new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed"));
}
first = false;
}
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(builder).build();
final ClusterStateTaskExecutor.BatchResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result =
executor.execute(clusterState, tasks);
verify(electMasterService).hasEnoughMasterNodes(eq(remainingNodesClusterState.get().nodes()));
verifyNoMoreInteractions(electMasterService);
verify(allocationService).reroute(eq(remainingNodesClusterState.get()), any(String.class));
for (final ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task task : tasks) {
assertNull(result.resultingState.nodes().get(task.node().getId()));
}
}
private DiscoveryNode node(final int id) {
return new DiscoveryNode(Integer.toString(id), LocalTransportAddress.buildUnique(), Version.CURRENT);
}
}