From 16e661c34ba85d539fd69cdf6933173a1c19331c Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 10 Feb 2017 19:58:13 -0500 Subject: [PATCH] Make persistent task persist full cluster restart This commit moves persistent tasks from ClusterState.Custom to MetaData.Custom and adds ability for the task to remain in the metadata after completion. --- .../CreatePersistentTaskAction.java | 254 +++++++++++ .../PersistentActionCoordinator.java | 17 +- .../persistent/PersistentActionService.java | 4 +- .../PersistentTaskClusterService.java | 280 +++++++----- .../persistent/PersistentTasksInProgress.java | 390 ++++++++++++++++- .../RemovePersistentTaskAction.java | 2 +- .../persistent/StartPersistentTaskAction.java | 100 +++-- .../persistent/TransportPersistentAction.java | 2 +- ...ersistentActionCoordinatorStatusTests.java | 39 ++ .../PersistentActionCoordinatorTests.java | 55 +-- .../PersistentActionFullRestartIT.java | 136 ++++++ .../persistent/PersistentActionIT.java | 106 +++-- .../PersistentTaskClusterServiceTests.java | 401 ++++++++++++++++++ .../PersistentTasksInProgressTests.java | 241 ++++++++++- .../StartPersistentActionRequestTests.java | 2 +- .../TestPersistentActionPlugin.java | 55 ++- .../UpdatePersistentTaskRequestTests.java | 47 ++ 17 files changed, 1885 insertions(+), 246 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/PersistentActionCoordinatorStatusTests.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/PersistentActionFullRestartIT.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/PersistentTaskClusterServiceTests.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java diff --git a/server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java new file mode 100644 index 00000000000..d6a56ae928c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java @@ -0,0 +1,254 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * This action can be used to add the record for the persistent action to the cluster state. + */ +public class CreatePersistentTaskAction extends Action { + + public static final CreatePersistentTaskAction INSTANCE = new CreatePersistentTaskAction(); + public static final String NAME = "cluster:admin/persistent/create"; + + private CreatePersistentTaskAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public PersistentActionResponse newResponse() { + return new PersistentActionResponse(); + } + + public static class Request extends MasterNodeRequest { + + private String action; + + private PersistentActionRequest request; + + private boolean stopped; + + private boolean removeOnCompletion = true; + + public Request() { + + } + + public Request(String action, PersistentActionRequest request) { + this.action = action; + this.request = request; + this.stopped = false; + this.removeOnCompletion = true; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + action = in.readString(); + request = in.readNamedWriteable(PersistentActionRequest.class); + stopped = in.readBoolean(); + removeOnCompletion = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(action); + out.writeNamedWriteable(request); + out.writeBoolean(stopped); + out.writeBoolean(removeOnCompletion); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (this.action == null) { + validationException = addValidationError("action must be specified", validationException); + } + if (this.request == null) { + validationException = addValidationError("request must be specified", validationException); + } + return validationException; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request1 = (Request) o; + return Objects.equals(action, request1.action) && + Objects.equals(request, request1.request) && + removeOnCompletion == request1.removeOnCompletion && + stopped == request1.stopped; + } + + @Override + public int hashCode() { + return Objects.hash(action, request, removeOnCompletion, stopped); + } + + public String getAction() { + return action; + } + + public void setAction(String action) { + this.action = action; + } + + public PersistentActionRequest getRequest() { + return request; + } + + public void setRequest(PersistentActionRequest request) { + this.request = request; + } + + public boolean isStopped() { + return stopped; + } + + public void setStopped(boolean stopped) { + this.stopped = stopped; + } + + public boolean shouldRemoveOnCompletion() { + return removeOnCompletion; + } + + public void setRemoveOnCompletion(boolean removeOnCompletion) { + this.removeOnCompletion = removeOnCompletion; + } + } + + public static class RequestBuilder extends MasterNodeOperationRequestBuilder { + + protected RequestBuilder(ElasticsearchClient client, CreatePersistentTaskAction action) { + super(client, action, new Request()); + } + + public RequestBuilder setAction(String action) { + request.setAction(action); + return this; + } + + public RequestBuilder setRequest(PersistentActionRequest persistentActionRequest) { + request.setRequest(persistentActionRequest); + return this; + } + + /** + * Indicates if the persistent task should be created in the stopped state. Defaults to false. + */ + public RequestBuilder setStopped(boolean stopped) { + request.setStopped(stopped); + return this; + } + + /** + * Indicates if the persistent task record should be removed upon the first successful completion of the task. Defaults to true. + */ + public RequestBuilder setRemoveOnCompletion(boolean removeOnCompletion) { + request.setRemoveOnCompletion(removeOnCompletion); + return this; + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + private final PersistentTaskClusterService persistentTaskClusterService; + + @Inject + public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + PersistentTaskClusterService persistentTaskClusterService, + PersistentActionRegistry persistentActionRegistry, + PersistentActionService persistentActionService, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, CreatePersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, Request::new); + this.persistentTaskClusterService = persistentTaskClusterService; + PersistentActionExecutor executor = new PersistentActionExecutor(threadPool); + clusterService.addListener(new PersistentActionCoordinator(settings, persistentActionService, persistentActionRegistry, + transportService.getTaskManager(), executor)); + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected PersistentActionResponse newResponse() { + return new PersistentActionResponse(); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + // Cluster is not affected but we look up repositories in metadata + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected final void masterOperation(final Request request, ClusterState state, + final ActionListener listener) { + persistentTaskClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion, + new ActionListener() { + @Override + public void onResponse(Long newTaskId) { + listener.onResponse(new PersistentActionResponse(newTaskId)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + } +} + + diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentActionCoordinator.java b/server/src/main/java/org/elasticsearch/persistent/PersistentActionCoordinator.java index 0825f246c36..ccf3a9dc35d 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentActionCoordinator.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentActionCoordinator.java @@ -75,8 +75,8 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl @Override public void clusterChanged(ClusterChangedEvent event) { - PersistentTasksInProgress tasks = event.state().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress previousTasks = event.previousState().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress previousTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE); if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) { // We have some changes let's check if they are related to our node @@ -402,6 +402,19 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl public boolean isFragment() { return false; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return state == status.state; + } + + @Override + public int hashCode() { + return Objects.hash(state); + } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentActionService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentActionService.java index f8a063640fd..4cd553b84c7 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentActionService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentActionService.java @@ -49,9 +49,9 @@ public class PersistentActionService extends AbstractComponent { public void sendRequest(String action, Request request, ActionListener listener) { - StartPersistentTaskAction.Request startRequest = new StartPersistentTaskAction.Request(action, request); + CreatePersistentTaskAction.Request startRequest = new CreatePersistentTaskAction.Request(action, request); try { - client.execute(StartPersistentTaskAction.INSTANCE, startRequest, listener); + client.execute(CreatePersistentTaskAction.INSTANCE, startRequest, listener); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskClusterService.java index c99b934eca5..a9a75863a1b 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskClusterService.java @@ -19,11 +19,14 @@ package org.elasticsearch.persistent; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -33,9 +36,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -63,20 +63,19 @@ public class PersistentTaskClusterService extends AbstractComponent implements C * @param request request * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String action, Request request, + public void createPersistentTask(String action, Request request, boolean stopped, + boolean removeOnCompletion, ActionListener listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - final String executorNodeId = executorNode(action, currentState, request); - PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE); - long nextId; - if (tasksInProgress != null) { - nextId = tasksInProgress.getCurrentId() + 1; + final String executorNodeId; + if (stopped) { + executorNodeId = null; // the task is stopped no need to assign it anywhere yet } else { - nextId = 1; + executorNodeId = executorNode(action, currentState, request); } - return createPersistentTask(currentState, new PersistentTaskInProgress<>(nextId, action, request, executorNodeId)); + return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, executorNodeId)); } @Override @@ -86,7 +85,8 @@ public class PersistentTaskClusterService extends AbstractComponent implements C @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(((PersistentTasksInProgress) newState.custom(PersistentTasksInProgress.TYPE)).getCurrentId()); + listener.onResponse( + ((PersistentTasksInProgress) newState.getMetaData().custom(PersistentTasksInProgress.TYPE)).getCurrentId()); } }); } @@ -110,23 +110,81 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE); - if (tasksInProgress == null) { - // Nothing to do, the task was already deleted - return currentState; - } - if (failure != null) { - // If the task failed - we need to restart it on another node, otherwise we just remove it - PersistentTaskInProgress taskInProgress = tasksInProgress.getTask(id); - if (taskInProgress != null) { - String executorNode = executorNode(taskInProgress.getAction(), currentState, taskInProgress.getRequest()); - return updatePersistentTask(currentState, new PersistentTaskInProgress<>(taskInProgress, executorNode)); + PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(id)) { + if (failure != null) { + // If the task failed - we need to restart it on another node, otherwise we just remove it + tasksInProgress.reassignTask(id, (action, request) -> executorNode(action, currentState, request)); + } else { + tasksInProgress.finishTask(id); } - return currentState; + return update(currentState, tasksInProgress); } else { - return removePersistentTask(currentState, id); + // we don't send the error message back to the caller becase that would cause an infinite loop of notifications + logger.warn("The task {} wasn't found, status is not updated", id); + return currentState; } + } + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(Empty.INSTANCE); + } + }); + } + + /** + * Switches the persistent task from stopped to started mode + * + * @param id the id of a persistent task + * @param listener the listener that will be called when task is removed + */ + public void startPersistentTask(long id, ActionListener listener) { + clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(id)) { + return update(currentState, tasksInProgress + .assignTask(id, (action, request) -> executorNode(action, currentState, request))); + } else { + throw new ResourceNotFoundException("the task with id {} doesn't exist", id); + } + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(Empty.INSTANCE); + } + }); + } + + /** + * Removes the persistent task + * + * @param id the id of a persistent task + * @param listener the listener that will be called when task is removed + */ + public void removePersistentTask(long id, ActionListener listener) { + clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(id)) { + return update(currentState, tasksInProgress.removeTask(id)); + } else { + throw new ResourceNotFoundException("the task with id {} doesn't exist", id); + } } @Override @@ -152,16 +210,12 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE); - if (tasksInProgress == null) { - // Nothing to do, the task no longer exists - return currentState; + PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(id)) { + return update(currentState, tasksInProgress.updateTaskStatus(id, status)); + } else { + throw new ResourceNotFoundException("the task with id {} doesn't exist", id); } - PersistentTaskInProgress task = tasksInProgress.getTask(id); - if (task != null) { - return updatePersistentTask(currentState, new PersistentTaskInProgress<>(task, status)); - } - return currentState; } @Override @@ -176,44 +230,6 @@ public class PersistentTaskClusterService extends AbstractComponent implements C }); } - private ClusterState updatePersistentTask(ClusterState oldState, PersistentTaskInProgress newTask) { - PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE); - Map> taskMap = new HashMap<>(); - taskMap.putAll(oldTasks.taskMap()); - taskMap.put(newTask.getId(), newTask); - ClusterState.Builder builder = ClusterState.builder(oldState); - PersistentTasksInProgress newTasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap)); - return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build(); - } - - private ClusterState createPersistentTask(ClusterState oldState, PersistentTaskInProgress newTask) { - PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE); - Map> taskMap = new HashMap<>(); - if (oldTasks != null) { - taskMap.putAll(oldTasks.taskMap()); - } - taskMap.put(newTask.getId(), newTask); - ClusterState.Builder builder = ClusterState.builder(oldState); - PersistentTasksInProgress newTasks = new PersistentTasksInProgress(newTask.getId(), Collections.unmodifiableMap(taskMap)); - return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build(); - } - - private ClusterState removePersistentTask(ClusterState oldState, long taskId) { - PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE); - if (oldTasks != null) { - Map> taskMap = new HashMap<>(); - ClusterState.Builder builder = ClusterState.builder(oldState); - taskMap.putAll(oldTasks.taskMap()); - taskMap.remove(taskId); - PersistentTasksInProgress newTasks = - new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap)); - return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build(); - } else { - // no tasks - nothing to do - return oldState; - } - } - private String executorNode(String action, ClusterState currentState, Request request) { TransportPersistentAction persistentAction = registry.getPersistentActionSafe(action); persistentAction.validate(request, currentState); @@ -232,30 +248,48 @@ public class PersistentTaskClusterService extends AbstractComponent implements C @Override public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { - PersistentTasksInProgress tasks = event.state().custom(PersistentTasksInProgress.TYPE); - if (tasks != null && (event.nodesChanged() || event.previousState().nodes().isLocalNodeElectedMaster() == false)) { - // We need to check if removed nodes were running any of the tasks and reassign them - boolean reassignmentRequired = false; - Set removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - for (PersistentTaskInProgress taskInProgress : tasks.tasks()) { - if (taskInProgress.getExecutorNode() == null) { - // there is an unassigned task - we need to try assigning it - reassignmentRequired = true; - break; - } - if (removedNodes.contains(taskInProgress.getExecutorNode())) { - // The caller node disappeared, we need to assign a new caller node - reassignmentRequired = true; - break; - } - } - if (reassignmentRequired) { - reassignTasks(); - } + logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); + if (reassignmentRequired(event, this::executorNode)) { + logger.trace("task reassignment is needed"); + reassignTasks(); + } else { + logger.trace("task reassignment is not needed"); } } } + interface ExecutorNodeDecider { + String executorNode(String action, ClusterState currentState, Request request); + } + + static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) { + PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress prevTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE); + if (tasks != null && (Objects.equals(tasks, prevTasks) == false || + event.nodesChanged() || + event.routingTableChanged() || + event.previousState().nodes().isLocalNodeElectedMaster() == false)) { + // We need to check if removed nodes were running any of the tasks and reassign them + boolean reassignmentRequired = false; + Set removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + for (PersistentTaskInProgress taskInProgress : tasks.tasks()) { + if (taskInProgress.isStopped() == false) { // skipping stopped tasks + if (taskInProgress.getExecutorNode() == null || removedNodes.contains(taskInProgress.getExecutorNode())) { + // there is an unassigned task or task with a disappeared node - we need to try assigning it + if (Objects.equals(taskInProgress.getRequest(), + decider.executorNode(taskInProgress.getAction(), event.state(), taskInProgress.getRequest())) == false) { + // it looks like a assignment for at least one task is possible - let's trigger reassignment + reassignmentRequired = true; + break; + } + } + } + } + return reassignmentRequired; + } + return false; + } + /** * Evaluates the cluster state and tries to assign tasks to nodes */ @@ -263,22 +297,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress tasks = currentState.custom(PersistentTasksInProgress.TYPE); - ClusterState newClusterState = currentState; - DiscoveryNodes nodes = currentState.nodes(); - if (tasks != null) { - // We need to check if removed nodes were running any of the tasks and reassign them - for (PersistentTaskInProgress task : tasks.tasks()) { - if (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false) { - // there is an unassigned task - we need to try assigning it - String executorNode = executorNode(task.getAction(), currentState, task.getRequest()); - if (Objects.equals(executorNode, task.getExecutorNode()) == false) { - newClusterState = updatePersistentTask(newClusterState, new PersistentTaskInProgress<>(task, executorNode)); - } - } - } - } - return newClusterState; + return reassignTasks(currentState, logger, PersistentTaskClusterService.this::executorNode); } @Override @@ -292,4 +311,49 @@ public class PersistentTaskClusterService extends AbstractComponent implements C } }); } + + static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) { + PersistentTasksInProgress tasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE); + ClusterState clusterState = currentState; + DiscoveryNodes nodes = currentState.nodes(); + if (tasks != null) { + logger.trace("reassigning {} persistent tasks", tasks.tasks().size()); + // We need to check if removed nodes were running any of the tasks and reassign them + for (PersistentTaskInProgress task : tasks.tasks()) { + if (task.isStopped() == false && + (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false)) { + // there is an unassigned task - we need to try assigning it + String executorNode = decider.executorNode(task.getAction(), clusterState, task.getRequest()); + if (Objects.equals(executorNode, task.getExecutorNode()) == false) { + logger.trace("reassigning task {} from node {} to node {}", task.getId(), + task.getExecutorNode(), executorNode); + clusterState = update(clusterState, builder(clusterState).reassignTask(task.getId(), executorNode)); + } else { + logger.trace("ignoring task {} because executor nodes are the same {}", task.getId(), executorNode); + } + } else { + if (task.isStopped()) { + logger.trace("ignoring task {} because it is stopped", task.getId()); + } else { + logger.trace("ignoring task {} because it is still running", task.getId()); + } + } + } + } + return clusterState; + } + + private static PersistentTasksInProgress.Builder builder(ClusterState currentState) { + return PersistentTasksInProgress.builder(currentState.getMetaData().custom(PersistentTasksInProgress.TYPE)); + } + + private static ClusterState update(ClusterState currentState, PersistentTasksInProgress.Builder tasksInProgress) { + if (tasksInProgress.isChanged()) { + return ClusterState.builder(currentState).metaData( + MetaData.builder(currentState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasksInProgress.build()) + ).build(); + } else { + return currentState; + } + } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksInProgress.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksInProgress.java index 62c89fe793e..13e1a6f07e6 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksInProgress.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksInProgress.java @@ -20,30 +20,44 @@ package org.elasticsearch.persistent; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.NamedObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task.Status; import java.io.IOException; import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.metadata.MetaData.ALL_CONTEXTS; + /** * A cluster state record that contains a list of all running persistent tasks */ -public final class PersistentTasksInProgress extends AbstractNamedDiffable implements ClusterState.Custom { +public final class PersistentTasksInProgress extends AbstractNamedDiffable implements MetaData.Custom { public static final String TYPE = "persistent_tasks"; + private static final String API_CONTEXT = MetaData.XContentContext.API.toString(); + // TODO: Implement custom Diff for tasks private final Map> tasks; @@ -54,6 +68,69 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable PERSISTENT_TASKS_IN_PROGRESS_PARSER = new ObjectParser<>(TYPE, + Builder::new); + + public static final ObjectParser, Void> PERSISTENT_TASK_IN_PROGRESS_PARSER = + new ObjectParser<>("running_tasks", TaskBuilder::new); + + public static final NamedObjectParser, Void> ACTION_PARSER; + + static { + // Tasks parser initialization + PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id")); + PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_IN_PROGRESS_PARSER, + new ParseField("running_tasks")); + + // Action parser initialization + ObjectParser, String> parser = new ObjectParser<>("named"); + parser.declareObject(ActionDescriptionBuilder::setRequest, + (p, c) -> p.namedObject(PersistentActionRequest.class, c, null), new ParseField("request")); + parser.declareObject(ActionDescriptionBuilder::setStatus, + (p, c) -> p.namedObject(Status.class, c, null), new ParseField("status")); + ACTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new ActionDescriptionBuilder<>(name), name); + + // Task parser initialization + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setId, new ParseField("id")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareNamedObjects( + (TaskBuilder taskBuilder, List> objects) -> { + if (objects.size() != 1) { + throw new IllegalArgumentException("only one action description per task is allowed"); + } + ActionDescriptionBuilder builder = objects.get(0); + taskBuilder.setAction(builder.action); + taskBuilder.setRequest(builder.request); + taskBuilder.setStatus(builder.status); + }, ACTION_PARSER, new ParseField("action")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareStringOrNull(TaskBuilder::setExecutorNode, new ParseField("executor_node")); + } + + /** + * Private builder used in XContent parser + */ + private static class ActionDescriptionBuilder { + private final String action; + private Request request; + private Status status; + + private ActionDescriptionBuilder(String action) { + this.action = action; + } + + private ActionDescriptionBuilder setRequest(Request request) { + this.request = request; + return this; + } + + private ActionDescriptionBuilder setStatus(Status status) { + this.status = status; + return this; + } + } + public Collection> tasks() { return this.tasks.values(); } @@ -93,6 +170,11 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable action.equals(task.action) && nodeId.equals(task.executorNode)).count(); } @@ -102,6 +184,15 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable context() { + return ALL_CONTEXTS; + } + + public static PersistentTasksInProgress fromXContent(XContentParser parser) throws IOException { + return PERSISTENT_TASKS_IN_PROGRESS_PARSER.parse(parser, null).build(); + } + /** * A record that represents a single running persistent task */ @@ -110,32 +201,37 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable persistentTaskInProgress, String newExecutorNode) { - this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId + 1L, - persistentTaskInProgress.action, persistentTaskInProgress.request, persistentTaskInProgress.status, newExecutorNode); + public PersistentTaskInProgress(PersistentTaskInProgress task, boolean stopped, String newExecutorNode) { + this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status, + newExecutorNode); } - public PersistentTaskInProgress(PersistentTaskInProgress persistentTaskInProgress, Status status) { - this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId, - persistentTaskInProgress.action, persistentTaskInProgress.request, status, persistentTaskInProgress.executorNode); + public PersistentTaskInProgress(PersistentTaskInProgress task, Status status) { + this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.executorNode); } - private PersistentTaskInProgress(long id, long allocationId, String action, Request request, Status status, String executorNode) { + private PersistentTaskInProgress(long id, long allocationId, String action, Request request, + boolean stopped, boolean removeOnCompletion, Status status, String executorNode) { this.id = id; this.allocationId = allocationId; this.action = action; this.request = request; this.status = status; + this.stopped = stopped; + this.removeOnCompletion = removeOnCompletion; this.executorNode = executorNode; // Update parent request for starting tasks with correct parent task ID request.setParentTask("cluster", id); @@ -147,6 +243,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable { + private long id; + private long allocationId; + private String action; + private Request request; + private boolean stopped = true; + private boolean removeOnCompletion; + private Status status; + private String executorNode; + + public TaskBuilder setId(long id) { + this.id = id; + return this; + } + + public TaskBuilder setAllocationId(long allocationId) { + this.allocationId = allocationId; + return this; + } + + public TaskBuilder setAction(String action) { + this.action = action; + return this; + } + + public TaskBuilder setRequest(Request request) { + this.request = request; + return this; + } + + public TaskBuilder setStatus(Status status) { + this.status = status; + return this; + } + + + public TaskBuilder setStopped(boolean stopped) { + this.stopped = stopped; + return this; + } + + public TaskBuilder setRemoveOnCompletion(boolean removeOnCompletion) { + this.removeOnCompletion = removeOnCompletion; + return this; + } + + public TaskBuilder setExecutorNode(String executorNode) { + this.executorNode = executorNode; + return this; + } + + public PersistentTaskInProgress build() { + return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode); + } + } + @Override public String getWriteableName() { return TYPE; @@ -246,8 +430,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable readDiffFrom(StreamInput in) throws IOException { - return readDiffFrom(ClusterState.Custom.class, TYPE, in); + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(MetaData.Custom.class, TYPE, in); } public long getCurrentId() { @@ -266,4 +450,168 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable> tasks = new HashMap<>(); + private long currentId; + private boolean changed; + + public Builder() { + } + + public Builder(PersistentTasksInProgress tasksInProgress) { + if (tasksInProgress != null) { + tasks.putAll(tasksInProgress.tasks); + currentId = tasksInProgress.currentId; + } else { + currentId = 0; + } + } + + private Builder setCurrentId(long currentId) { + this.currentId = currentId; + return this; + } + + private Builder setTasks(List> tasks) { + for (TaskBuilder builder : tasks) { + PersistentTaskInProgress task = builder.build(); + this.tasks.put(task.getId(), task); + } + return this; + } + + /** + * Adds a new task to the builder + *

+ * After the task is added its id can be found by calling {{@link #getCurrentId()}} method. + */ + public Builder addTask(String action, Request request, boolean stopped, + boolean removeOnCompletion, String executorNode) { + changed = true; + currentId++; + tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion, + executorNode)); + return this; + } + + /** + * Reassigns the task to another node if the task exist + */ + public Builder reassignTask(long taskId, String executorNode) { + PersistentTaskInProgress taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + } + return this; + } + + /** + * Assigns the task to another node if the task exist and not currently assigned + *

+ * The operation is only performed if the task is not currently assigned to any nodes. To force assignment use + * {@link #reassignTask(long, BiFunction)} instead + */ + @SuppressWarnings("unchecked") + public Builder assignTask(long taskId, + BiFunction executorNodeFunc) { + PersistentTaskInProgress taskInProgress = (PersistentTaskInProgress) tasks.get(taskId); + if (taskInProgress != null && taskInProgress.getExecutorNode() == null) { // only assign unassigned tasks + String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); + if (executorNode != null || taskInProgress.isStopped()) { + changed = true; + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + } + } + return this; + } + + /** + * Reassigns the task to another node if the task exist + */ + @SuppressWarnings("unchecked") + public Builder reassignTask(long taskId, + BiFunction executorNodeFunc) { + PersistentTaskInProgress taskInProgress = (PersistentTaskInProgress) tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + } + return this; + } + + /** + * Updates the task status if the task exist + */ + public Builder updateTaskStatus(long taskId, Status status) { + PersistentTaskInProgress taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, status)); + } + return this; + } + + /** + * Removes the task if the task exist + */ + public Builder removeTask(long taskId) { + if (tasks.remove(taskId) != null) { + changed = true; + } + return this; + } + + /** + * Finishes the task if the task exist. + * + * If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped. + */ + public Builder finishTask(long taskId) { + PersistentTaskInProgress taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + if (taskInProgress.removeOnCompletion) { + tasks.remove(taskId); + } else { + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, null)); + } + } + return this; + } + + /** + * Checks if the task is currently present in the list + */ + public boolean hasTask(long taskId) { + return tasks.containsKey(taskId); + } + + /** + * Returns the id of the last added task + */ + public long getCurrentId() { + return currentId; + } + + /** + * Returns true if any the task list was changed since the builder was created + */ + public boolean isChanged() { + return changed; + } + + public PersistentTasksInProgress build() { + return new PersistentTasksInProgress(currentId, Collections.unmodifiableMap(tasks)); + } + } } diff --git a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java index 3cbd1f58527..08dbd8a5e7a 100644 --- a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java @@ -191,7 +191,7 @@ public class RemovePersistentTaskAction extends Action listener) { - persistentTaskClusterService.completeOrRestartPersistentTask(request.taskId, null, new ActionListener() { + persistentTaskClusterService.removePersistentTask(request.taskId, new ActionListener() { @Override public void onResponse(Empty empty) { listener.onResponse(new Response(true)); diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index 1ee89b097ba..8c302ab170e 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -36,16 +37,17 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Objects; /** - * Internal action used by TransportPersistentAction to add the record for the persistent action to the cluster state. + * This action can be used to start persistent action previously created using {@link CreatePersistentTaskAction} */ public class StartPersistentTaskAction extends Action { public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction(); @@ -61,37 +63,36 @@ public class StartPersistentTaskAction extends Action { - private String action; - - private PersistentActionRequest request; + private long taskId; public Request() { } - public Request(String action, PersistentActionRequest request) { - this.action = action; - this.request = request; + public Request(long taskId) { + this.taskId = taskId; + } + + public void setTaskId(long taskId) { + this.taskId = taskId; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - action = in.readString(); - request = in.readOptionalNamedWriteable(PersistentActionRequest.class); + taskId = in.readLong(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(action); - out.writeOptionalNamedWriteable(request); + out.writeLong(taskId); } @Override @@ -103,26 +104,65 @@ public class StartPersistentTaskAction extends Action { + StartPersistentTaskAction.Response, StartPersistentTaskAction.RequestBuilder> { protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) { super(client, action, new Request()); } + + public final RequestBuilder setTaskId(long taskId) { + request.setTaskId(taskId); + return this; + } + } - public static class TransportAction extends TransportMasterNodeAction { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTaskClusterService persistentTaskClusterService; @@ -130,25 +170,20 @@ public class StartPersistentTaskAction extends Action listener) { - persistentTaskClusterService.createPersistentTask(request.action, request.request, new ActionListener() { + protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { + persistentTaskClusterService.startPersistentTask(request.taskId, new ActionListener() { @Override - public void onResponse(Long newTaskId) { - listener.onResponse(new PersistentActionResponse(newTaskId)); + public void onResponse(Empty empty) { + listener.onResponse(new Response(true)); } @Override diff --git a/server/src/main/java/org/elasticsearch/persistent/TransportPersistentAction.java b/server/src/main/java/org/elasticsearch/persistent/TransportPersistentAction.java index 7e98262a6d6..9e58ee0896d 100644 --- a/server/src/main/java/org/elasticsearch/persistent/TransportPersistentAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/TransportPersistentAction.java @@ -71,7 +71,7 @@ public abstract class TransportPersistentAction selector) { long minLoad = Long.MAX_VALUE; DiscoveryNode minLoadedNode = null; - PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { if (selector.test(node)) { if (persistentTasksInProgress == null) { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentActionCoordinatorStatusTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentActionCoordinatorStatusTests.java new file mode 100644 index 00000000000..fbb649d17fc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentActionCoordinatorStatusTests.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.persistent.PersistentActionCoordinator.State; +import org.elasticsearch.persistent.PersistentActionCoordinator.Status; + +import static org.hamcrest.Matchers.containsString; + +public class PersistentActionCoordinatorStatusTests extends AbstractWireSerializingTestCase { + + @Override + protected Status createTestInstance() { + return new Status(randomFrom(State.values())); + } + + @Override + protected Writeable.Reader instanceReader() { + return Status::new; + } + + public void testToString() { + assertThat(createTestInstance().toString(), containsString("state")); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentActionCoordinatorTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentActionCoordinatorTests.java index 253f665d602..7de67a22cae 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentActionCoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentActionCoordinatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -36,14 +37,11 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.persistent.CompletionPersistentTaskAction.Response; -import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -87,18 +85,14 @@ public class PersistentActionCoordinatorTests extends ESTestCase { ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) .build(); - Map> tasks = new HashMap<>(); - long taskId = randomLong(); + PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(); boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { - tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test_action", new TestRequest("other_" + i), - "other_node_" + randomInt(nonLocalNodesCount))); - taskId++; + tasks.addTask("test_action", new TestRequest("other_" + i), false, true, "other_node_" + randomInt(nonLocalNodesCount)); if (added == false && randomBoolean()) { added = true; - tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test", new TestRequest("this_param"), "this_node")); - taskId++; + tasks.addTask("test", new TestRequest("this_param"), false, true, "this_node"); } } } @@ -107,8 +101,9 @@ public class PersistentActionCoordinatorTests extends ESTestCase { logger.info("No local node action was added"); } - ClusterState newClusterState = ClusterState.builder(state) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(taskId, tasks)).build(); + MetaData.Builder metaData = MetaData.builder(state.metaData()); + metaData.putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); if (added) { @@ -304,34 +299,26 @@ public class PersistentActionCoordinatorTests extends ESTestCase { private ClusterState addTask(ClusterState state, String action, Request request, String node) { - PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); - Map> tasks = prevTasks == null ? new HashMap<>() : new HashMap<>(prevTasks.taskMap()); - long id = prevTasks == null ? 0 : prevTasks.getCurrentId(); - tasks.put(id, new PersistentTaskInProgress<>(id, action, request, node)); - return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, - new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build(); + PersistentTasksInProgress.Builder builder = + PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + builder.addTask(action, request, false, true, node).build())).build(); } private ClusterState reallocateTask(ClusterState state, long taskId, String node) { - PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); - assertNotNull(prevTasks); - Map> tasks = new HashMap<>(prevTasks.taskMap()); - PersistentTaskInProgress prevTask = tasks.get(taskId); - assertNotNull(prevTask); - tasks.put(prevTask.getId(), new PersistentTaskInProgress<>(prevTask, node)); - return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, - new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build(); + PersistentTasksInProgress.Builder builder = + PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); + assertTrue(builder.hasTask(taskId)); + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + builder.reassignTask(taskId, node).build())).build(); } private ClusterState removeTask(ClusterState state, long taskId) { - PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); - assertNotNull(prevTasks); - Map> tasks = new HashMap<>(prevTasks.taskMap()); - PersistentTaskInProgress prevTask = tasks.get(taskId); - assertNotNull(prevTask); - tasks.remove(prevTask.getId()); - return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, - new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build(); + PersistentTasksInProgress.Builder builder = + PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); + assertTrue(builder.hasTask(taskId)); + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + builder.removeTask(taskId).build())).build(); } private class Execution { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentActionFullRestartIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentActionFullRestartIT.java new file mode 100644 index 00000000000..6c9e7e7e3e7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentActionFullRestartIT.java @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction; +import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest; + +import java.util.Collection; +import java.util.Collections; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 1) +public class PersistentActionFullRestartIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(TestPersistentActionPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + protected boolean ignoreExternalCluster() { + return true; + } + + @TestLogging("org.elasticsearch.persistent:TRACE,org.elasticsearch.cluster.service:DEBUG") + public void testFullClusterRestart() throws Exception { + int numberOfTasks = randomIntBetween(1, 10); + long[] taskIds = new long[numberOfTasks]; + boolean[] stopped = new boolean[numberOfTasks]; + int runningTasks = 0; + for (int i = 0; i < numberOfTasks; i++) { + if (randomBoolean()) { + runningTasks++; + taskIds[i] = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get().getTaskId(); + stopped[i] = false; + } else { + taskIds[i] = CreatePersistentTaskAction.INSTANCE.newRequestBuilder(client()) + .setAction(TestPersistentAction.NAME) + .setRequest(new TestRequest("Blah")) + .setStopped(true) + .get().getTaskId(); + stopped[i] = true; + } + } + final int numberOfRunningTasks = runningTasks; + PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks)); + + if (numberOfRunningTasks > 0) { + // Make sure that at least one of the tasks is running + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get() + .getTasks().size(), greaterThan(0)); + }); + } + + // Restart cluster + internalCluster().fullRestart(); + ensureYellow(); + + tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks)); + // Check that cluster state is correct + for (int i = 0; i < numberOfTasks; i++) { + PersistentTaskInProgress task = tasksInProgress.getTask(taskIds[i]); + assertNotNull(task); + assertThat(task.isStopped(), equalTo(stopped[i])); + } + + logger.info("Waiting for {} original tasks to start", numberOfRunningTasks); + assertBusy(() -> { + // Wait for the running task to start automatically + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(), + equalTo(numberOfRunningTasks)); + }); + + // Start all other tasks + tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE); + for (int i = 0; i < numberOfTasks; i++) { + PersistentTaskInProgress task = tasksInProgress.getTask(taskIds[i]); + assertNotNull(task); + logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode()); + assertThat(task.isStopped(), equalTo(stopped[i])); + assertThat(task.getExecutorNode(), stopped[i] ? nullValue() : notNullValue()); + if (stopped[i]) { + assertAcked(StartPersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(task.getId()).get()); + } + } + + logger.info("Waiting for {} tasks to start", numberOfTasks); + assertBusy(() -> { + // Wait for all tasks to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(), + equalTo(numberOfTasks)); + }); + + logger.info("Complete all tasks"); + // Complete the running task and make sure it finishes properly + assertThat(new TestPersistentActionPlugin.TestTasksRequestBuilder(client()).setOperation("finish").get().getTasks().size(), + equalTo(numberOfTasks)); + + assertBusy(() -> { + // Make sure the task is removed from the cluster state + assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE)).tasks(), empty()); + }); + + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentActionIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentActionIT.java index 4d0163321b6..ec288c15a1c 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentActionIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentActionIT.java @@ -21,6 +21,7 @@ package org.elasticsearch.persistent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction; @@ -51,22 +52,10 @@ public class PersistentActionIT extends ESIntegTestCase { return nodePlugins(); } - @Override - protected Collection> getMockPlugins() { - return super.getMockPlugins(); - } - protected boolean ignoreExternalCluster() { return true; } - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .build(); - } - @After public void cleanup() throws Exception { assertNoRunningTasks(); @@ -126,19 +115,64 @@ public class PersistentActionIT extends ESIntegTestCase { // Verifying parent assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId)); assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster")); + stopOrCancelTask(firstRunningTask.getTaskId()); + } - if (randomBoolean()) { - logger.info("Completing the running task"); - // Complete the running task and make sure it finishes properly - assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) - .get().getTasks().size(), equalTo(1)); - } else { - logger.info("Cancelling the running task"); - // Cancel the running task and make sure it finishes properly - assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(firstRunningTask.getTaskId()) - .get().getTasks().size(), equalTo(1)); + public void testPersistentActionCompletionWithoutRemoval() throws Exception { + boolean stopped = randomBoolean(); + long taskId = CreatePersistentTaskAction.INSTANCE.newRequestBuilder(client()) + .setAction(TestPersistentAction.NAME) + .setRequest(new TestPersistentActionPlugin.TestRequest("Blah")) + .setRemoveOnCompletion(false) + .setStopped(stopped).get().getTaskId(); + PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress.tasks().size(), equalTo(1)); + assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped)); + assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue()); + assertThat(tasksInProgress.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); + + int numberOfIters = randomIntBetween(1, 5); // we will start/stop the action a few times before removing it + logger.info("start/stop the task {} times stating with stopped {}", numberOfIters, stopped); + for (int i = 0; i < numberOfIters; i++) { + logger.info("iteration {}", i); + if (stopped) { + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks(), + empty()); + assertAcked(StartPersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(taskId).get()); + } + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks() + .size(), equalTo(1)); + }); + TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]") + .get().getTasks().get(0); + + stopOrCancelTask(firstRunningTask.getTaskId()); + + assertBusy(() -> { + // Wait for the task to finish + List tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get() + .getTasks(); + logger.info("Found {} tasks", tasks.size()); + assertThat(tasks.size(), equalTo(0)); + }); + stopped = true; } + + assertBusy(() -> { + // Wait for the task to be marked as stopped + PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); + assertThat(tasks.tasks().size(), equalTo(1)); + assertThat(tasks.getTask(taskId).isStopped(), equalTo(true)); + assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); + }); + + logger.info("Removing action record from cluster state"); + assertAcked(RemovePersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(taskId).get()); } public void testPersistentActionWithNoAvailableNode() throws Exception { @@ -182,7 +216,8 @@ public class PersistentActionIT extends ESIntegTestCase { TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]") .get().getTasks().get(0); - PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(1)); assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue()); @@ -195,7 +230,8 @@ public class PersistentActionIT extends ESIntegTestCase { int finalI = i; assertBusy(() -> { - PersistentTasksInProgress tasks = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); assertThat(tasks.tasks().size(), equalTo(1)); assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue()); assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}")); @@ -209,6 +245,24 @@ public class PersistentActionIT extends ESIntegTestCase { .get().getTasks().size(), equalTo(1)); } + + private void stopOrCancelTask(TaskId taskId) { + if (randomBoolean()) { + logger.info("Completing the running task"); + // Complete the running task and make sure it finishes properly + assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(taskId) + .get().getTasks().size(), equalTo(1)); + + } else { + logger.info("Cancelling the running task"); + // Cancel the running task and make sure it finishes properly + assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(taskId) + .get().getTasks().size(), equalTo(1)); + } + + + } + private void assertNoRunningTasks() throws Exception { assertBusy(() -> { // Wait for the task to finish @@ -218,8 +272,8 @@ public class PersistentActionIT extends ESIntegTestCase { assertThat(tasks.size(), equalTo(0)); // Make sure the task is removed from the cluster state - assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE)) - .tasks(), empty()); + assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE)).tasks(), empty()); }); } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTaskClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTaskClusterServiceTests.java new file mode 100644 index 00000000000..fb6e884a1bf --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTaskClusterServiceTests.java @@ -0,0 +1,401 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + */ + +package org.elasticsearch.persistent; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class PersistentTaskClusterServiceTests extends ESTestCase { + + public void testReassignmentRequired() { + int numberOfIterations = randomIntBetween(10, 100); + ClusterState clusterState = initialState(); + for (int i = 0; i < numberOfIterations; i++) { + boolean significant = randomBoolean(); + ClusterState previousState = clusterState; + logger.info("inter {} significant: {}", i, significant); + if (significant) { + clusterState = significantChange(clusterState); + } else { + clusterState = insignificantChange(clusterState); + } + ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState); + assertThat(dumpEvent(event), significant, equalTo(PersistentTaskClusterService.reassignmentRequired(event, + new PersistentTaskClusterService.ExecutorNodeDecider() { + @Override + public String executorNode( + String action, ClusterState currentState, Request request) { + return randomNode(currentState.nodes()); + } + }))); + } + } + + public void testReassignTasksWithNoTasks() { + ClusterState clusterState = initialState(); + assertThat(reassign(clusterState).metaData().custom(PersistentTasksInProgress.TYPE), nullValue()); + } + + public void testReassignConsidersClusterStateUpdates() { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder( + clusterState.metaData().custom(PersistentTasksInProgress.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 10)); + int numberOfTasks = randomIntBetween(2, 40); + for (int i = 0; i < numberOfTasks; i++) { + addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false); + } + + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + clusterState = builder.metaData(metaData).nodes(nodes).build(); + ClusterState newClusterState = reassign(clusterState); + + PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress, notNullValue()); + + } + + public void testReassignTasks() { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder( + clusterState.metaData().custom(PersistentTasksInProgress.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 10)); + int numberOfTasks = randomIntBetween(0, 40); + for (int i = 0; i < numberOfTasks; i++) { + switch (randomInt(3)) { + case 0: + // add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned + addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits", false); + break; + case 1: + // add a task assigned to non-existing node that should not get assigned + addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits", false); + break; + case 2: + // add a stopped task assigned to non-existing node that should not get assigned + addTask(tasks, "should_not_assign", "fail_me_if_called", null, true); + break; + case 3: + addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits", false); + break; + + } + } + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + clusterState = builder.metaData(metaData).nodes(nodes).build(); + ClusterState newClusterState = reassign(clusterState); + + PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress, notNullValue()); + + assertThat("number of tasks shouldn't change as a result or reassignment", + numberOfTasks, equalTo(tasksInProgress.tasks().size())); + + int assignOneCount = 0; + + for (PersistentTaskInProgress task : tasksInProgress.tasks()) { + if (task.isStopped()) { + assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue()); + } else { + switch (task.getAction()) { + case "should_assign": + assertThat(task.getExecutorNode(), notNullValue()); + if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { + logger.info(clusterState.metaData().custom(PersistentTasksInProgress.TYPE).toString()); + } + assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(), + clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); + break; + case "should_not_assign": + assertThat(task.getExecutorNode(), nullValue()); + break; + case "assign_one": + if (task.getExecutorNode() != null) { + assignOneCount++; + assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1)); + } + break; + default: + fail("Unknown action " + task.getAction()); + } + } + } + } + + + private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) { + for (int i = 0; i < nonLocalNodesCount; i++) { + nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); + } + } + + private ClusterState reassign(ClusterState clusterState) { + return PersistentTaskClusterService.reassignTasks(clusterState, logger, + new PersistentTaskClusterService.ExecutorNodeDecider() { + @Override + public String executorNode( + String action, ClusterState currentState, Request request) { + TestRequest testRequest = (TestRequest) request; + switch (testRequest.getTestParam()) { + case "assign_me": + return randomNode(currentState.nodes()); + case "dont_assign_me": + return null; + case "fail_me_if_called": + fail("the decision decider shouldn't be called on this task"); + return null; + case "assign_one": + return assignOnlyOneTaskAtATime(currentState); + default: + fail("unknown param " + testRequest.getTestParam()); + } + return null; + } + }); + + } + + private String assignOnlyOneTaskAtATime(ClusterState clusterState) { + DiscoveryNodes nodes = clusterState.nodes(); + PersistentTasksInProgress tasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + if (tasksInProgress.findTasks("assign_one", + task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) { + return randomNode(clusterState.nodes()); + } else { + return null; + } + } + + private String randomNode(DiscoveryNodes nodes) { + if (nodes.getNodes().isEmpty()) { + return null; + } + List nodeList = new ArrayList<>(); + for (ObjectCursor node : nodes.getNodes().keys()) { + nodeList.add(node.value); + } + return randomFrom(nodeList); + + } + + private String dumpEvent(ClusterChangedEvent event) { + return "nodes_changed: " + event.nodesChanged() + + " nodes_removed:" + event.nodesRemoved() + + " routing_table_changed:" + event.routingTableChanged() + + " tasks: " + event.state().metaData().custom(PersistentTasksInProgress.TYPE); + } + + private ClusterState significantChange(ClusterState clusterState) { + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + if (tasks != null) { + if (randomBoolean()) { + // + boolean removedNode = false; + for (PersistentTaskInProgress task : tasks.tasks()) { + if (task.getExecutorNode() != null && clusterState.nodes().nodeExists(task.getExecutorNode())) { + logger.info("removed node {}", task.getExecutorNode()); + builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(task.getExecutorNode())); + return builder.build(); + } + } + } + } + boolean tasksOrNodesChanged = false; + // add a new unassigned task + if (hasUnassigned(tasks, clusterState.nodes()) == false) { + // we don't have any unassigned tasks - add some + logger.info("added random task"); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false); + tasksOrNodesChanged = true; + } + // add a node if there are unassigned tasks + if (clusterState.nodes().getNodes().isEmpty()) { + logger.info("added random node"); + builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode(randomAsciiOfLength(10)))); + tasksOrNodesChanged = true; + } + + if (tasksOrNodesChanged == false) { + // change routing table to simulate a change + logger.info("changed routing table"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); + RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); + changeRoutingTable(metaData, routingTable); + builder.metaData(metaData).routingTable(routingTable.build()); + } + return builder.build(); + } + + private ClusterState insignificantChange(ClusterState clusterState) { + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + if (randomBoolean()) { + if (hasUnassigned(tasks, clusterState.nodes()) == false) { + // we don't have any unassigned tasks - adding a node or changing a routing table shouldn't affect anything + if (randomBoolean()) { + logger.info("added random node"); + builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode(randomAsciiOfLength(10)))); + } + if (randomBoolean()) { + // add unassigned task in stopped state + logger.info("added random stopped task"); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, true); + return builder.build(); + } else { + logger.info("changed routing table"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); + RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); + changeRoutingTable(metaData, routingTable); + builder.metaData(metaData).routingTable(routingTable.build()); + } + return builder.build(); + } + } + if (randomBoolean()) { + // remove a node that doesn't have any tasks assigned to it and it's not the master node + for (DiscoveryNode node : clusterState.nodes()) { + if (hasTasksAssignedTo(tasks, node.getId()) == false && "this_node".equals(node.getId()) == false) { + logger.info("removed unassigned node {}", node.getId()); + return builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(node.getId())).build(); + } + } + } + + if (randomBoolean()) { + // clear the task + if (randomBoolean()) { + logger.info("removed all tasks"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, + PersistentTasksInProgress.builder().build()); + return builder.metaData(metaData).build(); + } else { + logger.info("set task custom to null"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasksInProgress.TYPE); + return builder.metaData(metaData).build(); + } + } + logger.info("removed all unassigned tasks and changed routing table"); + PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks); + if (tasks != null) { + for (PersistentTaskInProgress task : tasks.tasks()) { + if (task.getExecutorNode() == null) { + tasksBuilder.removeTask(task.getId()); + } + } + } + // Just add a random index - that shouldn't change anything + IndexMetaData indexMetaData = IndexMetaData.builder(randomAsciiOfLength(10)) + .settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random()))) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).put(indexMetaData, false) + .putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build()); + return builder.metaData(metaData).build(); + } + + private boolean hasUnassigned(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) { + if (tasks == null || tasks.tasks().isEmpty()) { + return false; + } + return tasks.tasks().stream().anyMatch(task -> + task.isStopped() == false && + (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode()))); + } + + private boolean hasTasksAssignedTo(PersistentTasksInProgress tasks, String nodeId) { + return tasks != null && tasks.tasks().stream().anyMatch( + task -> nodeId.equals(task.getExecutorNode())) == false; + } + + private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, + MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks, + String node, + boolean stopped) { + return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksInProgress.TYPE, + tasks.addTask(randomAsciiOfLength(10), new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(), node).build())); + } + + private void addTask(PersistentTasksInProgress.Builder tasks, String action, String param, String node, boolean stopped) { + tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), node); + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))), + Version.CURRENT); + } + + + private ClusterState initialState() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + int randomIndices = randomIntBetween(0, 5); + for (int i = 0; i < randomIndices; i++) { + changeRoutingTable(metaData, routingTable); + } + + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); + nodes.add(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "this_node")); + nodes.localNodeId("this_node"); + nodes.masterNodeId("this_node"); + + return ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable.build()) + .build(); + } + + private void changeRoutingTable(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + IndexMetaData indexMetaData = IndexMetaData.builder(randomAsciiOfLength(10)) + .settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random()))) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + metaData.put(indexMetaData, false); + routingTable.addAsNew(indexMetaData); + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksInProgressTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksInProgressTests.java index 582eac2d884..896d749772a 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksInProgressTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksInProgressTests.java @@ -18,48 +18,263 @@ */ package org.elasticsearch.persistent; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaData.Custom; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.tasks.Task; -import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.AbstractDiffableSerializationTestCase; +import org.elasticsearch.persistent.PersistentTasksInProgress.Builder; import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.persistent.TestPersistentActionPlugin.Status; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction; +import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; -public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase { +import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY; +import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT; + +public class PersistentTasksInProgressTests extends AbstractDiffableSerializationTestCase { @Override protected PersistentTasksInProgress createTestInstance() { int numberOfTasks = randomInt(10); - Map> entries = new HashMap<>(); + PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(); for (int i = 0; i < numberOfTasks; i++) { - PersistentTaskInProgress taskInProgress = new PersistentTaskInProgress<>( - randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)), - randomAsciiOfLength(10)); + tasks.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), + randomBoolean(), randomBoolean(), randomAsciiOfLength(10)); if (randomBoolean()) { // From time to time update status - taskInProgress = new PersistentTaskInProgress<>(taskInProgress, new Status(randomAsciiOfLength(10))); + tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10))); } - entries.put(taskInProgress.getId(), taskInProgress); } - return new PersistentTasksInProgress(randomLong(), entries); + return tasks.build(); } @Override - protected Writeable.Reader instanceReader() { + protected Writeable.Reader instanceReader() { return PersistentTasksInProgress::new; } @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Arrays.asList( - new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestPersistentActionPlugin.TestRequest::new), + new Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), + new Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), + new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new), new Entry(Task.Status.class, Status.NAME, Status::new) )); } + + @Override + protected Custom makeTestChanges(Custom testInstance) { + PersistentTasksInProgress tasksInProgress = (PersistentTasksInProgress) testInstance; + Builder builder = new Builder(); + switch (randomInt(3)) { + case 0: + addRandomTask(builder); + break; + case 1: + if (tasksInProgress.tasks().isEmpty()) { + addRandomTask(builder); + } else { + builder.reassignTask(pickRandomTask(tasksInProgress), randomAsciiOfLength(10)); + } + break; + case 2: + if (tasksInProgress.tasks().isEmpty()) { + addRandomTask(builder); + } else { + builder.updateTaskStatus(pickRandomTask(tasksInProgress), randomBoolean() ? new Status(randomAsciiOfLength(10)) : null); + } + break; + case 3: + if (tasksInProgress.tasks().isEmpty()) { + addRandomTask(builder); + } else { + builder.removeTask(pickRandomTask(tasksInProgress)); + } + break; + } + return builder.build(); + } + + @Override + protected Writeable.Reader> diffReader() { + return PersistentTasksInProgress::readDiffFrom; + } + + @Override + protected PersistentTasksInProgress doParseInstance(XContentParser parser) throws IOException { + return PersistentTasksInProgress.fromXContent(parser); + } + + @Override + protected XContentBuilder toXContent(Custom instance, XContentType contentType) throws IOException { + return toXContent(instance, contentType, new ToXContent.MapParams( + Collections.singletonMap(MetaData.CONTEXT_MODE_PARAM, MetaData.XContentContext.API.toString()))); + } + + protected XContentBuilder toXContent(Custom instance, XContentType contentType, ToXContent.MapParams params) throws IOException { + // We need all attribute to be serialized/de-serialized for testing + XContentBuilder builder = XContentFactory.contentBuilder(contentType); + if (randomBoolean()) { + builder.prettyPrint(); + } + if (instance.isFragment()) { + builder.startObject(); + } + instance.toXContent(builder, params); + if (instance.isFragment()) { + builder.endObject(); + } + return builder; + } + + private Builder addRandomTask(Builder builder) { + builder.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), + randomBoolean(), randomBoolean(), randomAsciiOfLength(10)); + return builder; + } + + private long pickRandomTask(PersistentTasksInProgress testInstance) { + return randomFrom(new ArrayList<>(testInstance.tasks())).getId(); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return new NamedXContentRegistry(Arrays.asList( + new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME), + TestRequest::fromXContent), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentAction.NAME), + Status::fromXContent) + )); + } + + @SuppressWarnings("unchecked") + public void testSerializationContext() throws Exception { + PersistentTasksInProgress testInstance = createTestInstance(); + for (int i = 0; i < randomInt(10); i++) { + testInstance = (PersistentTasksInProgress) makeTestChanges(testInstance); + } + + ToXContent.MapParams params = new ToXContent.MapParams( + Collections.singletonMap(MetaData.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY))); + + XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder builder = toXContent(testInstance, xContentType, params); + XContentBuilder shuffled = shuffleXContent(builder); + + XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled.bytes()); + PersistentTasksInProgress newInstance = doParseInstance(parser); + assertNotSame(newInstance, testInstance); + + assertEquals(testInstance.tasks().size(), newInstance.tasks().size()); + for (PersistentTaskInProgress testTask : testInstance.tasks()) { + PersistentTaskInProgress newTask = (PersistentTaskInProgress) newInstance.getTask(testTask.getId()); + assertNotNull(newTask); + + // Things that should be serialized + assertEquals(testTask.getAction(), newTask.getAction()); + assertEquals(testTask.getId(), newTask.getId()); + assertEquals(testTask.getStatus(), newTask.getStatus()); + assertEquals(testTask.getRequest(), newTask.getRequest()); + assertEquals(testTask.isStopped(), newTask.isStopped()); + + // Things that shouldn't be serialized + assertEquals(0, newTask.getAllocationId()); + assertNull(newTask.getExecutorNode()); + } + } + + public void testBuilder() { + PersistentTasksInProgress persistentTasksInProgress = null; + long lastKnownTask = -1; + for (int i = 0; i < randomIntBetween(10, 100); i++) { + final Builder builder; + if (randomBoolean()) { + builder = new Builder(); + } else { + builder = new Builder(persistentTasksInProgress); + } + boolean changed = false; + for (int j = 0; j < randomIntBetween(1, 10); j++) { + switch (randomInt(5)) { + case 0: + lastKnownTask = addRandomTask(builder).getCurrentId(); + changed = true; + break; + case 1: + if (builder.hasTask(lastKnownTask)) { + changed = true; + } + if (randomBoolean()) { + builder.reassignTask(lastKnownTask, randomAsciiOfLength(10)); + } else { + builder.reassignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); + } + break; + case 2: + if (builder.hasTask(lastKnownTask)) { + PersistentTaskInProgress task = builder.build().getTask(lastKnownTask); + if (randomBoolean()) { + // Trying to reassign to the same node + builder.assignTask(lastKnownTask, (s, request) -> task.getExecutorNode()); + // should change if the task was stopped AND unassigned + if (task.getExecutorNode() == null && task.isStopped()) { + changed = true; + } + } else { + // Trying to reassign to a different node + builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); + // should change if the task was unassigned + if (task.getExecutorNode() == null) { + changed = true; + } + } + } else { + // task doesn't exist - shouldn't change + builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); + } + break; + case 3: + if (builder.hasTask(lastKnownTask)) { + changed = true; + } + builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAsciiOfLength(10)) : null); + break; + case 4: + if (builder.hasTask(lastKnownTask)) { + changed = true; + } + builder.removeTask(lastKnownTask); + break; + case 5: + if (builder.hasTask(lastKnownTask)) { + changed = true; + } + builder.finishTask(lastKnownTask); + break; + } + } + assertEquals(changed, builder.isChanged()); + persistentTasksInProgress = builder.build(); + } + + } + } diff --git a/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java index 52be69cbeb8..17d8ca69543 100644 --- a/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.persistent; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; -import org.elasticsearch.persistent.StartPersistentTaskAction.Request; +import org.elasticsearch.persistent.CreatePersistentTaskAction.Request; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest; import org.elasticsearch.test.AbstractStreamableTestCase; diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentActionPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentActionPlugin.java index c97b451ae85..bc8651dad24 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentActionPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentActionPlugin.java @@ -37,8 +37,10 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; @@ -47,8 +49,10 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; @@ -72,6 +76,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Objects.requireNonNull; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.junit.Assert.assertTrue; @@ -87,6 +92,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { return Arrays.asList( new ActionHandler<>(TestPersistentAction.INSTANCE, TransportTestPersistentAction.class), new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class), + new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class), new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), @@ -114,14 +120,32 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { new NamedWriteableRegistry.Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new), new NamedWriteableRegistry.Entry(Task.Status.class, PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new), - new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), + new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new) ); } + @Override + public List getNamedXContent() { + return Arrays.asList( + new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE), + PersistentTasksInProgress::fromXContent), + new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME), + TestRequest::fromXContent), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent) + ); + } + public static class TestRequest extends PersistentActionRequest { + public static final ConstructingObjectParser REQUEST_PARSER = + new ConstructingObjectParser<>(TestPersistentAction.NAME, args -> new TestRequest((String) args[0])); + + static { + REQUEST_PARSER.declareString(constructorArg(), new ParseField("param")); + } + private String executorNodeAttr = null; private String responseNode = null; @@ -185,10 +209,15 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field("param", testParam); builder.endObject(); return builder; } + public static TestRequest fromXContent(XContentParser parser) throws IOException { + return REQUEST_PARSER.parse(parser, null); + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -255,6 +284,13 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { private final String phase; + public static final ConstructingObjectParser STATUS_PARSER = + new ConstructingObjectParser<>(TestPersistentAction.NAME, args -> new Status((String) args[0])); + + static { + STATUS_PARSER.declareString(constructorArg(), new ParseField("phase")); + } + public Status(String phase) { this.phase = requireNonNull(phase, "Phase cannot be null"); } @@ -276,6 +312,11 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { return builder; } + public static Task.Status fromXContent(XContentParser parser) throws IOException { + return STATUS_PARSER.parse(parser, null); + } + + @Override public boolean isFragment() { return false; @@ -319,7 +360,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, TestPersistentAction.NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry, actionFilters, indexNameExpressionResolver, TestRequest::new, - ThreadPool.Names.MANAGEMENT); + ThreadPool.Names.GENERIC); this.transportService = transportService; } @@ -342,8 +383,9 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { while (true) { // wait for something to happen assertTrue(awaitBusy(() -> testTask.isCancelled() || - testTask.getOperation() != null || - transportService.lifecycleState() != Lifecycle.State.STARTED)); // speedup finishing on closed nodes + testTask.getOperation() != null || + transportService.lifecycleState() != Lifecycle.State.STARTED, // speedup finishing on closed nodes + 30, TimeUnit.SECONDS)); // This can take a while during large cluster restart if (transportService.lifecycleState() != Lifecycle.State.STARTED) { return; } @@ -430,6 +472,11 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { public void setOperation(String operation) { this.operation = operation; } + + @Override + public String toString() { + return "TestTask[" + this.getId() + ", " + this.getParentTaskId() + ", " + this.getOperation() + "]"; + } } static class TestTaskResponse implements Writeable { diff --git a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java new file mode 100644 index 00000000000..751286eafa0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.persistent.TestPersistentActionPlugin.Status; +import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction.Request; + +import java.util.Collections; + +public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase { + + @Override + protected Request createTestInstance() { + return new Request(randomLong(), new Status(randomAsciiOfLength(10))); + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(Collections.singletonList( + new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new) + )); + } +}