Un-assign persistent tasks as nodes exit the cluster (#37656)
PersistentTasksClusterService decides if a task should be reassigned by checking there is a node in the cluster with the same Id. If a node is restarted PersistentTasksClusterService may not observe the change and decide the task still has a valid assignment because the node's ephemeral Id is not used in that decision. This change un-assigns tasks as the nodes in the cluster change.
This commit is contained in:
parent
228611843c
commit
3fad1eeaed
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -187,6 +188,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
|
|||
.blocks(currentState.blocks())
|
||||
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
|
||||
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
|
||||
tmpState = PersistentTasksCustomMetaData.deassociateDeadNodes(tmpState);
|
||||
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
@ -91,8 +92,9 @@ public class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExec
|
|||
|
||||
protected ClusterTasksResult<Task> getTaskClusterTasksResult(ClusterState currentState, List<Task> tasks,
|
||||
ClusterState remainingNodesClusterState) {
|
||||
ClusterState ptasksDeassociatedState = PersistentTasksCustomMetaData.deassociateDeadNodes(remainingNodesClusterState);
|
||||
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
|
||||
return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
|
||||
return resultBuilder.build(allocationService.deassociateDeadNodes(ptasksDeassociatedState, true, describeTasks(tasks)));
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
|
|
|
@ -425,7 +425,7 @@ public class AllocationService {
|
|||
for (ShardRouting shardRouting : node.copyShards()) {
|
||||
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
|
||||
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]",
|
||||
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
|
||||
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
|
||||
public static final String TYPE = "persistent_tasks";
|
||||
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
|
||||
static final Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss");
|
||||
|
||||
// TODO: Implement custom Diff for tasks
|
||||
private final Map<String, PersistentTask<?>> tasks;
|
||||
|
@ -119,6 +120,11 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
new ParseField("allocation_id_on_last_status_update"));
|
||||
}
|
||||
|
||||
|
||||
public static PersistentTasksCustomMetaData getPersistentTasksCustomMetaData(ClusterState clusterState) {
|
||||
return clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Private builder used in XContent parser to build task-specific portion (params and state)
|
||||
*/
|
||||
|
@ -209,6 +215,39 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unassign any persistent tasks executing on nodes that are no longer in
|
||||
* the cluster. If the task's assigment has a non-null executor node and that
|
||||
* node is no longer in the cluster then the assignment is set to
|
||||
* {@link #LOST_NODE_ASSIGNMENT}
|
||||
*
|
||||
* @param clusterState The clusterstate
|
||||
* @return If no changes the argument {@code clusterState} is returned else
|
||||
* a copy with the modified tasks
|
||||
*/
|
||||
public static ClusterState deassociateDeadNodes(ClusterState clusterState) {
|
||||
PersistentTasksCustomMetaData tasks = getPersistentTasksCustomMetaData(clusterState);
|
||||
if (tasks == null) {
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
PersistentTasksCustomMetaData.Builder taskBuilder = PersistentTasksCustomMetaData.builder(tasks);
|
||||
for (PersistentTask<?> task : tasks.tasks()) {
|
||||
if (task.getAssignment().getExecutorNode() != null &&
|
||||
clusterState.nodes().nodeExists(task.getAssignment().getExecutorNode()) == false) {
|
||||
taskBuilder.reassignTask(task.getId(), LOST_NODE_ASSIGNMENT);
|
||||
}
|
||||
}
|
||||
|
||||
if (taskBuilder.isChanged() == false) {
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData());
|
||||
metaDataBuilder.putCustom(TYPE, taskBuilder.build());
|
||||
return ClusterState.builder(clusterState).metaData(metaDataBuilder).build();
|
||||
}
|
||||
|
||||
public static class Assignment {
|
||||
@Nullable
|
||||
private final String executorNode;
|
||||
|
|
|
@ -21,10 +21,14 @@ package org.elasticsearch.persistent;
|
|||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData.Custom;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -33,9 +37,11 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -65,6 +71,8 @@ import static org.elasticsearch.test.VersionUtils.getFirstVersion;
|
|||
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializationTestCase<Custom> {
|
||||
|
||||
|
@ -307,6 +315,91 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
|||
assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible")));
|
||||
}
|
||||
|
||||
public void testDisassociateDeadNodes_givenNoPersistentTasks() {
|
||||
ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests")).build();
|
||||
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
|
||||
assertThat(originalState, sameInstance(returnedState));
|
||||
}
|
||||
|
||||
public void testDisassociateDeadNodes_givenAssignedPersistentTask() {
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
|
||||
.localNodeId("node1")
|
||||
.masterNodeId("node1")
|
||||
.build();
|
||||
|
||||
String taskName = "test/task";
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder()
|
||||
.addTask("task-id", taskName, emptyTaskParams(taskName),
|
||||
new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));
|
||||
|
||||
ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests"))
|
||||
.nodes(nodes)
|
||||
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
|
||||
.build();
|
||||
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
|
||||
assertThat(originalState, sameInstance(returnedState));
|
||||
|
||||
PersistentTasksCustomMetaData originalTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(originalState);
|
||||
PersistentTasksCustomMetaData returnedTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(returnedState);
|
||||
assertEquals(originalTasks, returnedTasks);
|
||||
}
|
||||
|
||||
public void testDisassociateDeadNodes() {
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
|
||||
.localNodeId("node1")
|
||||
.masterNodeId("node1")
|
||||
.build();
|
||||
|
||||
String taskName = "test/task";
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder()
|
||||
.addTask("assigned-task", taskName, emptyTaskParams(taskName),
|
||||
new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"))
|
||||
.addTask("task-on-deceased-node", taskName, emptyTaskParams(taskName),
|
||||
new PersistentTasksCustomMetaData.Assignment("left-the-cluster", "test assignment"));
|
||||
|
||||
ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests"))
|
||||
.nodes(nodes)
|
||||
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
|
||||
.build();
|
||||
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
|
||||
assertThat(originalState, not(sameInstance(returnedState)));
|
||||
|
||||
PersistentTasksCustomMetaData originalTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(originalState);
|
||||
PersistentTasksCustomMetaData returnedTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(returnedState);
|
||||
assertNotEquals(originalTasks, returnedTasks);
|
||||
|
||||
assertEquals(originalTasks.getTask("assigned-task"), returnedTasks.getTask("assigned-task"));
|
||||
assertNotEquals(originalTasks.getTask("task-on-deceased-node"), returnedTasks.getTask("task-on-deceased-node"));
|
||||
assertEquals(PersistentTasksCustomMetaData.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment());
|
||||
}
|
||||
|
||||
private PersistentTaskParams emptyTaskParams(String taskName) {
|
||||
return new PersistentTaskParams() {
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return taskName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Assignment randomAssignment() {
|
||||
if (randomBoolean()) {
|
||||
if (randomBoolean()) {
|
||||
|
|
Loading…
Reference in New Issue