From 3fad1eeaed45761d734ece7a1c717dc6c6e6ccd2 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 22 Jan 2019 12:44:45 +0000 Subject: [PATCH] Un-assign persistent tasks as nodes exit the cluster (#37656) PersistentTasksClusterService decides if a task should be reassigned by checking there is a node in the cluster with the same Id. If a node is restarted PersistentTasksClusterService may not observe the change and decide the task still has a valid assignment because the node's ephemeral Id is not used in that decision. This change un-assigns tasks as the nodes in the cluster change. --- .../coordination/JoinTaskExecutor.java | 2 + .../NodeRemovalClusterStateTaskExecutor.java | 4 +- .../routing/allocation/AllocationService.java | 2 +- .../PersistentTasksCustomMetaData.java | 39 ++++++++ .../PersistentTasksCustomMetaDataTests.java | 93 +++++++++++++++++++ 5 files changed, 138 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index c4c76d8a8fe..82eb26d98b9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import java.util.ArrayList; import java.util.Collection; @@ -187,6 +188,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor getTaskClusterTasksResult(ClusterState currentState, List tasks, ClusterState remainingNodesClusterState) { + ClusterState ptasksDeassociatedState = PersistentTasksCustomMetaData.deassociateDeadNodes(remainingNodesClusterState); final ClusterTasksResult.Builder resultBuilder = ClusterTasksResult.builder().successes(tasks); - return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks))); + return resultBuilder.build(allocationService.deassociateDeadNodes(ptasksDeassociatedState, true, describeTasks(tasks))); } // visible for testing diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 59f43a193dd..7acf20185ee 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -425,7 +425,7 @@ public class AllocationService { for (ShardRouting shardRouting : node.copyShards()) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0; - UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT); allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes()); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index b7a179e41e3..01e3d7450a6 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -62,6 +62,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable> tasks; @@ -119,6 +120,11 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable task : tasks.tasks()) { + if (task.getAssignment().getExecutorNode() != null && + clusterState.nodes().nodeExists(task.getAssignment().getExecutorNode()) == false) { + taskBuilder.reassignTask(task.getId(), LOST_NODE_ASSIGNMENT); + } + } + + if (taskBuilder.isChanged() == false) { + return clusterState; + } + + MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData()); + metaDataBuilder.putCustom(TYPE, taskBuilder.build()); + return ClusterState.builder(clusterState).metaData(metaDataBuilder).build(); + } + public static class Assignment { @Nullable private final String executorNode; diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java index 2a180cc12dd..c25ca3cb7db 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java @@ -21,10 +21,14 @@ package org.elasticsearch.persistent; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData.Custom; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -33,9 +37,11 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; @@ -65,6 +71,8 @@ import static org.elasticsearch.test.VersionUtils.getFirstVersion; import static org.elasticsearch.test.VersionUtils.getPreviousVersion; import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializationTestCase { @@ -307,6 +315,91 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible"))); } + public void testDisassociateDeadNodes_givenNoPersistentTasks() { + ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests")).build(); + ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState); + assertThat(originalState, sameInstance(returnedState)); + } + + public void testDisassociateDeadNodes_givenAssignedPersistentTask() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); + + String taskName = "test/task"; + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder() + .addTask("task-id", taskName, emptyTaskParams(taskName), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")); + + ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .build(); + ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState); + assertThat(originalState, sameInstance(returnedState)); + + PersistentTasksCustomMetaData originalTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(originalState); + PersistentTasksCustomMetaData returnedTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(returnedState); + assertEquals(originalTasks, returnedTasks); + } + + public void testDisassociateDeadNodes() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("node1") + .masterNodeId("node1") + .build(); + + String taskName = "test/task"; + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder() + .addTask("assigned-task", taskName, emptyTaskParams(taskName), + new PersistentTasksCustomMetaData.Assignment("node1", "test assignment")) + .addTask("task-on-deceased-node", taskName, emptyTaskParams(taskName), + new PersistentTasksCustomMetaData.Assignment("left-the-cluster", "test assignment")); + + ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .build(); + ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState); + assertThat(originalState, not(sameInstance(returnedState))); + + PersistentTasksCustomMetaData originalTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(originalState); + PersistentTasksCustomMetaData returnedTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(returnedState); + assertNotEquals(originalTasks, returnedTasks); + + assertEquals(originalTasks.getTask("assigned-task"), returnedTasks.getTask("assigned-task")); + assertNotEquals(originalTasks.getTask("task-on-deceased-node"), returnedTasks.getTask("task-on-deceased-node")); + assertEquals(PersistentTasksCustomMetaData.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment()); + } + + private PersistentTaskParams emptyTaskParams(String taskName) { + return new PersistentTaskParams() { + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) { + return builder; + } + + @Override + public void writeTo(StreamOutput out) { + + } + + @Override + public String getWriteableName() { + return taskName; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + }; + } + private Assignment randomAssignment() { if (randomBoolean()) { if (randomBoolean()) {