Make persistent task persist full cluster restart

This commit moves persistent tasks from ClusterState.Custom to MetaData.Custom and adds ability for the task to remain in the metadata after completion.
This commit is contained in:
Igor Motov 2017-02-10 19:58:13 -05:00 committed by Martijn van Groningen
parent 243b7e4499
commit 16e661c34b
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
17 changed files with 1885 additions and 246 deletions

View File

@ -0,0 +1,254 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
*/
package org.elasticsearch.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* This action can be used to add the record for the persistent action to the cluster state.
*/
public class CreatePersistentTaskAction extends Action<CreatePersistentTaskAction.Request,
PersistentActionResponse,
CreatePersistentTaskAction.RequestBuilder> {
public static final CreatePersistentTaskAction INSTANCE = new CreatePersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/create";
private CreatePersistentTaskAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public PersistentActionResponse newResponse() {
return new PersistentActionResponse();
}
public static class Request extends MasterNodeRequest<Request> {
private String action;
private PersistentActionRequest request;
private boolean stopped;
private boolean removeOnCompletion = true;
public Request() {
}
public Request(String action, PersistentActionRequest request) {
this.action = action;
this.request = request;
this.stopped = false;
this.removeOnCompletion = true;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
action = in.readString();
request = in.readNamedWriteable(PersistentActionRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (this.action == null) {
validationException = addValidationError("action must be specified", validationException);
}
if (this.request == null) {
validationException = addValidationError("request must be specified", validationException);
}
return validationException;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request) &&
removeOnCompletion == request1.removeOnCompletion &&
stopped == request1.stopped;
}
@Override
public int hashCode() {
return Objects.hash(action, request, removeOnCompletion, stopped);
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public PersistentActionRequest getRequest() {
return request;
}
public void setRequest(PersistentActionRequest request) {
this.request = request;
}
public boolean isStopped() {
return stopped;
}
public void setStopped(boolean stopped) {
this.stopped = stopped;
}
public boolean shouldRemoveOnCompletion() {
return removeOnCompletion;
}
public void setRemoveOnCompletion(boolean removeOnCompletion) {
this.removeOnCompletion = removeOnCompletion;
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CreatePersistentTaskAction.Request,
PersistentActionResponse, CreatePersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, CreatePersistentTaskAction action) {
super(client, action, new Request());
}
public RequestBuilder setAction(String action) {
request.setAction(action);
return this;
}
public RequestBuilder setRequest(PersistentActionRequest persistentActionRequest) {
request.setRequest(persistentActionRequest);
return this;
}
/**
* Indicates if the persistent task should be created in the stopped state. Defaults to false.
*/
public RequestBuilder setStopped(boolean stopped) {
request.setStopped(stopped);
return this;
}
/**
* Indicates if the persistent task record should be removed upon the first successful completion of the task. Defaults to true.
*/
public RequestBuilder setRemoveOnCompletion(boolean removeOnCompletion) {
request.setRemoveOnCompletion(removeOnCompletion);
return this;
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentActionResponse> {
private final PersistentTaskClusterService persistentTaskClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTaskClusterService persistentTaskClusterService,
PersistentActionRegistry persistentActionRegistry,
PersistentActionService persistentActionService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CreatePersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTaskClusterService = persistentTaskClusterService;
PersistentActionExecutor executor = new PersistentActionExecutor(threadPool);
clusterService.addListener(new PersistentActionCoordinator(settings, persistentActionService, persistentActionRegistry,
transportService.getTaskManager(), executor));
}
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
protected PersistentActionResponse newResponse() {
return new PersistentActionResponse();
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentActionResponse> listener) {
persistentTaskClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion,
new ActionListener<Long>() {
@Override
public void onResponse(Long newTaskId) {
listener.onResponse(new PersistentActionResponse(newTaskId));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -75,8 +75,8 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
@Override
public void clusterChanged(ClusterChangedEvent event) {
PersistentTasksInProgress tasks = event.state().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress previousTasks = event.previousState().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress previousTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE);
if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) {
// We have some changes let's check if they are related to our node
@ -402,6 +402,19 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
public boolean isFragment() {
return false;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return state == status.state;
}
@Override
public int hashCode() {
return Objects.hash(state);
}
}
}

View File

@ -49,9 +49,9 @@ public class PersistentActionService extends AbstractComponent {
public <Request extends PersistentActionRequest> void sendRequest(String action, Request request,
ActionListener<PersistentActionResponse> listener) {
StartPersistentTaskAction.Request startRequest = new StartPersistentTaskAction.Request(action, request);
CreatePersistentTaskAction.Request startRequest = new CreatePersistentTaskAction.Request(action, request);
try {
client.execute(StartPersistentTaskAction.INSTANCE, startRequest, listener);
client.execute(CreatePersistentTaskAction.INSTANCE, startRequest, listener);
} catch (Exception e) {
listener.onFailure(e);
}

View File

@ -19,11 +19,14 @@
package org.elasticsearch.persistent;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
@ -33,9 +36,6 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -63,20 +63,19 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
* @param request request
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentActionRequest> void createPersistentTask(String action, Request request,
public <Request extends PersistentActionRequest> void createPersistentTask(String action, Request request, boolean stopped,
boolean removeOnCompletion,
ActionListener<Long> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final String executorNodeId = executorNode(action, currentState, request);
PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE);
long nextId;
if (tasksInProgress != null) {
nextId = tasksInProgress.getCurrentId() + 1;
final String executorNodeId;
if (stopped) {
executorNodeId = null; // the task is stopped no need to assign it anywhere yet
} else {
nextId = 1;
executorNodeId = executorNode(action, currentState, request);
}
return createPersistentTask(currentState, new PersistentTaskInProgress<>(nextId, action, request, executorNodeId));
return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, executorNodeId));
}
@Override
@ -86,7 +85,8 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(((PersistentTasksInProgress) newState.custom(PersistentTasksInProgress.TYPE)).getCurrentId());
listener.onResponse(
((PersistentTasksInProgress) newState.getMetaData().custom(PersistentTasksInProgress.TYPE)).getCurrentId());
}
});
}
@ -110,23 +110,81 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress == null) {
// Nothing to do, the task was already deleted
return currentState;
}
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
PersistentTaskInProgress<?> taskInProgress = tasksInProgress.getTask(id);
if (taskInProgress != null) {
String executorNode = executorNode(taskInProgress.getAction(), currentState, taskInProgress.getRequest());
return updatePersistentTask(currentState, new PersistentTaskInProgress<>(taskInProgress, executorNode));
}
return currentState;
tasksInProgress.reassignTask(id, (action, request) -> executorNode(action, currentState, request));
} else {
return removePersistentTask(currentState, id);
tasksInProgress.finishTask(id);
}
return update(currentState, tasksInProgress);
} else {
// we don't send the error message back to the caller becase that would cause an infinite loop of notifications
logger.warn("The task {} wasn't found, status is not updated", id);
return currentState;
}
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(Empty.INSTANCE);
}
});
}
/**
* Switches the persistent task from stopped to started mode
*
* @param id the id of a persistent task
* @param listener the listener that will be called when task is removed
*/
public void startPersistentTask(long id, ActionListener<Empty> listener) {
clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
return update(currentState, tasksInProgress
.assignTask(id, (action, request) -> executorNode(action, currentState, request)));
} else {
throw new ResourceNotFoundException("the task with id {} doesn't exist", id);
}
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(Empty.INSTANCE);
}
});
}
/**
* Removes the persistent task
*
* @param id the id of a persistent task
* @param listener the listener that will be called when task is removed
*/
public void removePersistentTask(long id, ActionListener<Empty> listener) {
clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
return update(currentState, tasksInProgress.removeTask(id));
} else {
throw new ResourceNotFoundException("the task with id {} doesn't exist", id);
}
}
@Override
@ -152,16 +210,12 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress == null) {
// Nothing to do, the task no longer exists
return currentState;
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
return update(currentState, tasksInProgress.updateTaskStatus(id, status));
} else {
throw new ResourceNotFoundException("the task with id {} doesn't exist", id);
}
PersistentTaskInProgress<?> task = tasksInProgress.getTask(id);
if (task != null) {
return updatePersistentTask(currentState, new PersistentTaskInProgress<>(task, status));
}
return currentState;
}
@Override
@ -176,44 +230,6 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
});
}
private ClusterState updatePersistentTask(ClusterState oldState, PersistentTaskInProgress<?> newTask) {
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.putAll(oldTasks.taskMap());
taskMap.put(newTask.getId(), newTask);
ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
}
private ClusterState createPersistentTask(ClusterState oldState, PersistentTaskInProgress<?> newTask) {
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
if (oldTasks != null) {
taskMap.putAll(oldTasks.taskMap());
}
taskMap.put(newTask.getId(), newTask);
ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(newTask.getId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
}
private ClusterState removePersistentTask(ClusterState oldState, long taskId) {
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
if (oldTasks != null) {
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
ClusterState.Builder builder = ClusterState.builder(oldState);
taskMap.putAll(oldTasks.taskMap());
taskMap.remove(taskId);
PersistentTasksInProgress newTasks =
new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
} else {
// no tasks - nothing to do
return oldState;
}
}
private <Request extends PersistentActionRequest> String executorNode(String action, ClusterState currentState, Request request) {
TransportPersistentAction<Request> persistentAction = registry.getPersistentActionSafe(action);
persistentAction.validate(request, currentState);
@ -232,28 +248,46 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) {
PersistentTasksInProgress tasks = event.state().custom(PersistentTasksInProgress.TYPE);
if (tasks != null && (event.nodesChanged() || event.previousState().nodes().isLocalNodeElectedMaster() == false)) {
logger.trace("checking task reassignment for cluster state {}", event.state().getVersion());
if (reassignmentRequired(event, this::executorNode)) {
logger.trace("task reassignment is needed");
reassignTasks();
} else {
logger.trace("task reassignment is not needed");
}
}
}
interface ExecutorNodeDecider {
<Request extends PersistentActionRequest> String executorNode(String action, ClusterState currentState, Request request);
}
static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress prevTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE);
if (tasks != null && (Objects.equals(tasks, prevTasks) == false ||
event.nodesChanged() ||
event.routingTableChanged() ||
event.previousState().nodes().isLocalNodeElectedMaster() == false)) {
// We need to check if removed nodes were running any of the tasks and reassign them
boolean reassignmentRequired = false;
Set<String> removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
if (taskInProgress.getExecutorNode() == null) {
// there is an unassigned task - we need to try assigning it
reassignmentRequired = true;
break;
}
if (removedNodes.contains(taskInProgress.getExecutorNode())) {
// The caller node disappeared, we need to assign a new caller node
if (taskInProgress.isStopped() == false) { // skipping stopped tasks
if (taskInProgress.getExecutorNode() == null || removedNodes.contains(taskInProgress.getExecutorNode())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getRequest(),
decider.executorNode(taskInProgress.getAction(), event.state(), taskInProgress.getRequest())) == false) {
// it looks like a assignment for at least one task is possible - let's trigger reassignment
reassignmentRequired = true;
break;
}
}
if (reassignmentRequired) {
reassignTasks();
}
}
return reassignmentRequired;
}
return false;
}
/**
@ -263,22 +297,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress tasks = currentState.custom(PersistentTasksInProgress.TYPE);
ClusterState newClusterState = currentState;
DiscoveryNodes nodes = currentState.nodes();
if (tasks != null) {
// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false) {
// there is an unassigned task - we need to try assigning it
String executorNode = executorNode(task.getAction(), currentState, task.getRequest());
if (Objects.equals(executorNode, task.getExecutorNode()) == false) {
newClusterState = updatePersistentTask(newClusterState, new PersistentTaskInProgress<>(task, executorNode));
}
}
}
}
return newClusterState;
return reassignTasks(currentState, logger, PersistentTaskClusterService.this::executorNode);
}
@Override
@ -292,4 +311,49 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
}
});
}
static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) {
PersistentTasksInProgress tasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE);
ClusterState clusterState = currentState;
DiscoveryNodes nodes = currentState.nodes();
if (tasks != null) {
logger.trace("reassigning {} persistent tasks", tasks.tasks().size());
// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.isStopped() == false &&
(task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false)) {
// there is an unassigned task - we need to try assigning it
String executorNode = decider.executorNode(task.getAction(), clusterState, task.getRequest());
if (Objects.equals(executorNode, task.getExecutorNode()) == false) {
logger.trace("reassigning task {} from node {} to node {}", task.getId(),
task.getExecutorNode(), executorNode);
clusterState = update(clusterState, builder(clusterState).reassignTask(task.getId(), executorNode));
} else {
logger.trace("ignoring task {} because executor nodes are the same {}", task.getId(), executorNode);
}
} else {
if (task.isStopped()) {
logger.trace("ignoring task {} because it is stopped", task.getId());
} else {
logger.trace("ignoring task {} because it is still running", task.getId());
}
}
}
}
return clusterState;
}
private static PersistentTasksInProgress.Builder builder(ClusterState currentState) {
return PersistentTasksInProgress.builder(currentState.getMetaData().custom(PersistentTasksInProgress.TYPE));
}
private static ClusterState update(ClusterState currentState, PersistentTasksInProgress.Builder tasksInProgress) {
if (tasksInProgress.isChanged()) {
return ClusterState.builder(currentState).metaData(
MetaData.builder(currentState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasksInProgress.build())
).build();
} else {
return currentState;
}
}
}

View File

@ -20,30 +20,44 @@ package org.elasticsearch.persistent;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.NamedObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.Task.Status;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.MetaData.ALL_CONTEXTS;
/**
* A cluster state record that contains a list of all running persistent tasks
*/
public final class PersistentTasksInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
public static final String TYPE = "persistent_tasks";
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
// TODO: Implement custom Diff for tasks
private final Map<Long, PersistentTaskInProgress<?>> tasks;
@ -54,6 +68,69 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
this.tasks = tasks;
}
public static final ObjectParser<Builder, Void> PERSISTENT_TASKS_IN_PROGRESS_PARSER = new ObjectParser<>(TYPE,
Builder::new);
public static final ObjectParser<TaskBuilder<PersistentActionRequest>, Void> PERSISTENT_TASK_IN_PROGRESS_PARSER =
new ObjectParser<>("running_tasks", TaskBuilder::new);
public static final NamedObjectParser<ActionDescriptionBuilder<PersistentActionRequest>, Void> ACTION_PARSER;
static {
// Tasks parser initialization
PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id"));
PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_IN_PROGRESS_PARSER,
new ParseField("running_tasks"));
// Action parser initialization
ObjectParser<ActionDescriptionBuilder<PersistentActionRequest>, String> parser = new ObjectParser<>("named");
parser.declareObject(ActionDescriptionBuilder::setRequest,
(p, c) -> p.namedObject(PersistentActionRequest.class, c, null), new ParseField("request"));
parser.declareObject(ActionDescriptionBuilder::setStatus,
(p, c) -> p.namedObject(Status.class, c, null), new ParseField("status"));
ACTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new ActionDescriptionBuilder<>(name), name);
// Task parser initialization
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareNamedObjects(
(TaskBuilder<PersistentActionRequest> taskBuilder, List<ActionDescriptionBuilder<PersistentActionRequest>> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one action description per task is allowed");
}
ActionDescriptionBuilder<PersistentActionRequest> builder = objects.get(0);
taskBuilder.setAction(builder.action);
taskBuilder.setRequest(builder.request);
taskBuilder.setStatus(builder.status);
}, ACTION_PARSER, new ParseField("action"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareStringOrNull(TaskBuilder::setExecutorNode, new ParseField("executor_node"));
}
/**
* Private builder used in XContent parser
*/
private static class ActionDescriptionBuilder<Request extends PersistentActionRequest> {
private final String action;
private Request request;
private Status status;
private ActionDescriptionBuilder(String action) {
this.action = action;
}
private ActionDescriptionBuilder setRequest(Request request) {
this.request = request;
return this;
}
private ActionDescriptionBuilder setStatus(Status status) {
this.status = status;
return this;
}
}
public Collection<PersistentTaskInProgress<?>> tasks() {
return this.tasks.values();
}
@ -93,6 +170,11 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
return Objects.hash(tasks, currentId);
}
@Override
public String toString() {
return Strings.toString(this);
}
public long getNumberOfTasksOnNode(String nodeId, String action) {
return tasks.values().stream().filter(task -> action.equals(task.action) && nodeId.equals(task.executorNode)).count();
}
@ -102,6 +184,15 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
return Version.V_5_3_0_UNRELEASED;
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return ALL_CONTEXTS;
}
public static PersistentTasksInProgress fromXContent(XContentParser parser) throws IOException {
return PERSISTENT_TASKS_IN_PROGRESS_PARSER.parse(parser, null).build();
}
/**
* A record that represents a single running persistent task
*/
@ -110,32 +201,37 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
private final long allocationId;
private final String action;
private final Request request;
private final boolean stopped;
private final boolean removeOnCompletion;
@Nullable
private final Status status;
@Nullable
private final String executorNode;
public PersistentTaskInProgress(long id, String action, Request request, String executorNode) {
this(id, 0L, action, request, null, executorNode);
public PersistentTaskInProgress(long id, String action, Request request, boolean stopped, boolean removeOnCompletion,
String executorNode) {
this(id, 0L, action, request, stopped, removeOnCompletion, null, executorNode);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> persistentTaskInProgress, String newExecutorNode) {
this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId + 1L,
persistentTaskInProgress.action, persistentTaskInProgress.request, persistentTaskInProgress.status, newExecutorNode);
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, boolean stopped, String newExecutorNode) {
this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status,
newExecutorNode);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> persistentTaskInProgress, Status status) {
this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId,
persistentTaskInProgress.action, persistentTaskInProgress.request, status, persistentTaskInProgress.executorNode);
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, Status status) {
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.executorNode);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request, Status status, String executorNode) {
private PersistentTaskInProgress(long id, long allocationId, String action, Request request,
boolean stopped, boolean removeOnCompletion, Status status, String executorNode) {
this.id = id;
this.allocationId = allocationId;
this.action = action;
this.request = request;
this.status = status;
this.stopped = stopped;
this.removeOnCompletion = removeOnCompletion;
this.executorNode = executorNode;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id);
@ -147,6 +243,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
allocationId = in.readLong();
action = in.readString();
request = (Request) in.readNamedWriteable(PersistentActionRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
status = in.readOptionalNamedWriteable(Task.Status.class);
executorNode = in.readOptionalString();
}
@ -157,6 +255,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
out.writeLong(allocationId);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(executorNode);
}
@ -170,13 +270,20 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
allocationId == that.allocationId &&
Objects.equals(action, that.action) &&
Objects.equals(request, that.request) &&
stopped == that.stopped &&
removeOnCompletion == that.removeOnCompletion &&
Objects.equals(status, that.status) &&
Objects.equals(executorNode, that.executorNode);
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId, action, request, status, executorNode);
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode);
}
@Override
public String toString() {
return Strings.toString(this);
}
public long getId() {
@ -205,19 +312,40 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
return status;
}
public boolean isStopped() {
return stopped;
}
public boolean shouldRemoveOnCompletion() {
return removeOnCompletion;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field("uuid", id);
builder.field("action", action);
builder.field("id", id);
builder.startObject("action");
{
builder.startObject(action);
{
builder.field("request");
request.toXContent(builder, params);
if (status != null) {
builder.field("status", status, params);
}
}
builder.endObject();
}
builder.endObject();
if (API_CONTEXT.equals(params.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
builder.field("allocation_id", allocationId);
builder.field("executor_node", executorNode);
}
builder.field("stopped", stopped);
builder.field("remove_on_completion", removeOnCompletion);
}
builder.endObject();
return builder;
}
@ -228,6 +356,62 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
}
}
private static class TaskBuilder<Request extends PersistentActionRequest> {
private long id;
private long allocationId;
private String action;
private Request request;
private boolean stopped = true;
private boolean removeOnCompletion;
private Status status;
private String executorNode;
public TaskBuilder<Request> setId(long id) {
this.id = id;
return this;
}
public TaskBuilder<Request> setAllocationId(long allocationId) {
this.allocationId = allocationId;
return this;
}
public TaskBuilder<Request> setAction(String action) {
this.action = action;
return this;
}
public TaskBuilder<Request> setRequest(Request request) {
this.request = request;
return this;
}
public TaskBuilder<Request> setStatus(Status status) {
this.status = status;
return this;
}
public TaskBuilder<Request> setStopped(boolean stopped) {
this.stopped = stopped;
return this;
}
public TaskBuilder<Request> setRemoveOnCompletion(boolean removeOnCompletion) {
this.removeOnCompletion = removeOnCompletion;
return this;
}
public TaskBuilder<Request> setExecutorNode(String executorNode) {
this.executorNode = executorNode;
return this;
}
public PersistentTaskInProgress<Request> build() {
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode);
}
}
@Override
public String getWriteableName() {
return TYPE;
@ -246,8 +430,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
});
}
public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(ClusterState.Custom.class, TYPE, in);
public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(MetaData.Custom.class, TYPE, in);
}
public long getCurrentId() {
@ -266,4 +450,168 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
return builder;
}
public static Builder builder() {
return new Builder();
}
public static Builder builder(PersistentTasksInProgress tasks) {
return new Builder(tasks);
}
public static class Builder {
private final Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>();
private long currentId;
private boolean changed;
public Builder() {
}
public Builder(PersistentTasksInProgress tasksInProgress) {
if (tasksInProgress != null) {
tasks.putAll(tasksInProgress.tasks);
currentId = tasksInProgress.currentId;
} else {
currentId = 0;
}
}
private Builder setCurrentId(long currentId) {
this.currentId = currentId;
return this;
}
private <Request extends PersistentActionRequest> Builder setTasks(List<TaskBuilder<Request>> tasks) {
for (TaskBuilder builder : tasks) {
PersistentTaskInProgress<?> task = builder.build();
this.tasks.put(task.getId(), task);
}
return this;
}
/**
* Adds a new task to the builder
* <p>
* After the task is added its id can be found by calling {{@link #getCurrentId()}} method.
*/
public <Request extends PersistentActionRequest> Builder addTask(String action, Request request, boolean stopped,
boolean removeOnCompletion, String executorNode) {
changed = true;
currentId++;
tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion,
executorNode));
return this;
}
/**
* Reassigns the task to another node if the task exist
*/
public Builder reassignTask(long taskId, String executorNode) {
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode));
}
return this;
}
/**
* Assigns the task to another node if the task exist and not currently assigned
* <p>
* The operation is only performed if the task is not currently assigned to any nodes. To force assignment use
* {@link #reassignTask(long, BiFunction)} instead
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentActionRequest> Builder assignTask(long taskId,
BiFunction<String, Request, String> executorNodeFunc) {
PersistentTaskInProgress<Request> taskInProgress = (PersistentTaskInProgress<Request>) tasks.get(taskId);
if (taskInProgress != null && taskInProgress.getExecutorNode() == null) { // only assign unassigned tasks
String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
if (executorNode != null || taskInProgress.isStopped()) {
changed = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode));
}
}
return this;
}
/**
* Reassigns the task to another node if the task exist
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentActionRequest> Builder reassignTask(long taskId,
BiFunction<String, Request, String> executorNodeFunc) {
PersistentTaskInProgress<Request> taskInProgress = (PersistentTaskInProgress<Request>) tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode));
}
return this;
}
/**
* Updates the task status if the task exist
*/
public Builder updateTaskStatus(long taskId, Status status) {
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, status));
}
return this;
}
/**
* Removes the task if the task exist
*/
public Builder removeTask(long taskId) {
if (tasks.remove(taskId) != null) {
changed = true;
}
return this;
}
/**
* Finishes the task if the task exist.
*
* If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped.
*/
public Builder finishTask(long taskId) {
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
if (taskInProgress.removeOnCompletion) {
tasks.remove(taskId);
} else {
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, null));
}
}
return this;
}
/**
* Checks if the task is currently present in the list
*/
public boolean hasTask(long taskId) {
return tasks.containsKey(taskId);
}
/**
* Returns the id of the last added task
*/
public long getCurrentId() {
return currentId;
}
/**
* Returns true if any the task list was changed since the builder was created
*/
public boolean isChanged() {
return changed;
}
public PersistentTasksInProgress build() {
return new PersistentTasksInProgress(currentId, Collections.unmodifiableMap(tasks));
}
}
}

View File

@ -191,7 +191,7 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
@Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTaskClusterService.completeOrRestartPersistentTask(request.taskId, null, new ActionListener<Empty>() {
persistentTaskClusterService.removePersistentTask(request.taskId, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(new Response(true));

View File

@ -22,6 +22,7 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -36,16 +37,17 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
/**
* Internal action used by TransportPersistentAction to add the record for the persistent action to the cluster state.
* This action can be used to start persistent action previously created using {@link CreatePersistentTaskAction}
*/
public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.Request,
PersistentActionResponse,
StartPersistentTaskAction.Response,
StartPersistentTaskAction.RequestBuilder> {
public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction();
@ -61,37 +63,36 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
}
@Override
public PersistentActionResponse newResponse() {
return new PersistentActionResponse();
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeRequest<Request> {
private String action;
private PersistentActionRequest request;
private long taskId;
public Request() {
}
public Request(String action, PersistentActionRequest request) {
this.action = action;
this.request = request;
public Request(long taskId) {
this.taskId = taskId;
}
public void setTaskId(long taskId) {
this.taskId = taskId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
action = in.readString();
request = in.readOptionalNamedWriteable(PersistentActionRequest.class);
taskId = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(action);
out.writeOptionalNamedWriteable(request);
out.writeLong(taskId);
}
@Override
@ -103,26 +104,65 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request);
Request request = (Request) o;
return taskId == request.taskId;
}
@Override
public int hashCode() {
return Objects.hash(action, request);
return Objects.hash(taskId);
}
}
public static class Response extends AcknowledgedResponse {
public Response() {
super();
}
public Response(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged();
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<StartPersistentTaskAction.Request,
PersistentActionResponse, StartPersistentTaskAction.RequestBuilder> {
StartPersistentTaskAction.Response, StartPersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) {
super(client, action, new Request());
}
public final RequestBuilder setTaskId(long taskId) {
request.setTaskId(taskId);
return this;
}
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentActionResponse> {
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final PersistentTaskClusterService persistentTaskClusterService;
@ -130,25 +170,20 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTaskClusterService persistentTaskClusterService,
PersistentActionRegistry persistentActionRegistry,
PersistentActionService persistentActionService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, StartPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTaskClusterService = persistentTaskClusterService;
PersistentActionExecutor executor = new PersistentActionExecutor(threadPool);
clusterService.addListener(new PersistentActionCoordinator(settings, persistentActionService, persistentActionRegistry,
transportService.getTaskManager(), executor));
}
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected PersistentActionResponse newResponse() {
return new PersistentActionResponse();
protected Response newResponse() {
return new Response();
}
@Override
@ -158,12 +193,11 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
}
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentActionResponse> listener) {
persistentTaskClusterService.createPersistentTask(request.action, request.request, new ActionListener<Long>() {
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTaskClusterService.startPersistentTask(request.taskId, new ActionListener<Empty>() {
@Override
public void onResponse(Long newTaskId) {
listener.onResponse(new PersistentActionResponse(newTaskId));
public void onResponse(Empty empty) {
listener.onResponse(new Response(true));
}
@Override

View File

@ -71,7 +71,7 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predicate<DiscoveryNode> selector) {
long minLoad = Long.MAX_VALUE;
DiscoveryNode minLoadedNode = null;
PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
if (selector.test(node)) {
if (persistentTasksInProgress == null) {

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
*/
package org.elasticsearch.persistent;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.persistent.PersistentActionCoordinator.State;
import org.elasticsearch.persistent.PersistentActionCoordinator.Status;
import static org.hamcrest.Matchers.containsString;
public class PersistentActionCoordinatorStatusTests extends AbstractWireSerializingTestCase<Status> {
@Override
protected Status createTestInstance() {
return new Status(randomFrom(State.values()));
}
@Override
protected Writeable.Reader<Status> instanceReader() {
return Status::new;
}
public void testToString() {
assertThat(createTestInstance().toString(), containsString("state"));
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
@ -36,14 +37,11 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.persistent.CompletionPersistentTaskAction.Response;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -87,18 +85,14 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY))
.build();
Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>();
long taskId = randomLong();
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder();
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test_action", new TestRequest("other_" + i),
"other_node_" + randomInt(nonLocalNodesCount)));
taskId++;
tasks.addTask("test_action", new TestRequest("other_" + i), false, true, "other_node_" + randomInt(nonLocalNodesCount));
if (added == false && randomBoolean()) {
added = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test", new TestRequest("this_param"), "this_node"));
taskId++;
tasks.addTask("test", new TestRequest("this_param"), false, true, "this_node");
}
}
}
@ -107,8 +101,9 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
logger.info("No local node action was added");
}
ClusterState newClusterState = ClusterState.builder(state)
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(taskId, tasks)).build();
MetaData.Builder metaData = MetaData.builder(state.metaData());
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks.build());
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
if (added) {
@ -304,34 +299,26 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
private <Request extends PersistentActionRequest> ClusterState addTask(ClusterState state, String action, Request request,
String node) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> tasks = prevTasks == null ? new HashMap<>() : new HashMap<>(prevTasks.taskMap());
long id = prevTasks == null ? 0 : prevTasks.getCurrentId();
tasks.put(id, new PersistentTaskInProgress<>(id, action, request, node));
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build();
PersistentTasksInProgress.Builder builder =
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
builder.addTask(action, request, false, true, node).build())).build();
}
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
assertNotNull(prevTasks);
Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>(prevTasks.taskMap());
PersistentTaskInProgress<?> prevTask = tasks.get(taskId);
assertNotNull(prevTask);
tasks.put(prevTask.getId(), new PersistentTaskInProgress<>(prevTask, node));
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build();
PersistentTasksInProgress.Builder builder =
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
assertTrue(builder.hasTask(taskId));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
builder.reassignTask(taskId, node).build())).build();
}
private ClusterState removeTask(ClusterState state, long taskId) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
assertNotNull(prevTasks);
Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>(prevTasks.taskMap());
PersistentTaskInProgress<?> prevTask = tasks.get(taskId);
assertNotNull(prevTask);
tasks.remove(prevTask.getId());
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build();
PersistentTasksInProgress.Builder builder =
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
assertTrue(builder.hasTask(taskId));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
builder.removeTask(taskId).build())).build();
}
private class Execution {

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
*/
package org.elasticsearch.persistent;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 1)
public class PersistentActionFullRestartIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(TestPersistentActionPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
protected boolean ignoreExternalCluster() {
return true;
}
@TestLogging("org.elasticsearch.persistent:TRACE,org.elasticsearch.cluster.service:DEBUG")
public void testFullClusterRestart() throws Exception {
int numberOfTasks = randomIntBetween(1, 10);
long[] taskIds = new long[numberOfTasks];
boolean[] stopped = new boolean[numberOfTasks];
int runningTasks = 0;
for (int i = 0; i < numberOfTasks; i++) {
if (randomBoolean()) {
runningTasks++;
taskIds[i] = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get().getTaskId();
stopped[i] = false;
} else {
taskIds[i] = CreatePersistentTaskAction.INSTANCE.newRequestBuilder(client())
.setAction(TestPersistentAction.NAME)
.setRequest(new TestRequest("Blah"))
.setStopped(true)
.get().getTaskId();
stopped[i] = true;
}
}
final int numberOfRunningTasks = runningTasks;
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
if (numberOfRunningTasks > 0) {
// Make sure that at least one of the tasks is running
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get()
.getTasks().size(), greaterThan(0));
});
}
// Restart cluster
internalCluster().fullRestart();
ensureYellow();
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
// Check that cluster state is correct
for (int i = 0; i < numberOfTasks; i++) {
PersistentTaskInProgress<?> task = tasksInProgress.getTask(taskIds[i]);
assertNotNull(task);
assertThat(task.isStopped(), equalTo(stopped[i]));
}
logger.info("Waiting for {} original tasks to start", numberOfRunningTasks);
assertBusy(() -> {
// Wait for the running task to start automatically
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(),
equalTo(numberOfRunningTasks));
});
// Start all other tasks
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE);
for (int i = 0; i < numberOfTasks; i++) {
PersistentTaskInProgress<?> task = tasksInProgress.getTask(taskIds[i]);
assertNotNull(task);
logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode());
assertThat(task.isStopped(), equalTo(stopped[i]));
assertThat(task.getExecutorNode(), stopped[i] ? nullValue() : notNullValue());
if (stopped[i]) {
assertAcked(StartPersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(task.getId()).get());
}
}
logger.info("Waiting for {} tasks to start", numberOfTasks);
assertBusy(() -> {
// Wait for all tasks to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(),
equalTo(numberOfTasks));
});
logger.info("Complete all tasks");
// Complete the running task and make sure it finishes properly
assertThat(new TestPersistentActionPlugin.TestTasksRequestBuilder(client()).setOperation("finish").get().getTasks().size(),
equalTo(numberOfTasks));
assertBusy(() -> {
// Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE)).tasks(), empty());
});
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.persistent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
@ -51,22 +52,10 @@ public class PersistentActionIT extends ESIntegTestCase {
return nodePlugins();
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return super.getMockPlugins();
}
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.build();
}
@After
public void cleanup() throws Exception {
assertNoRunningTasks();
@ -126,19 +115,64 @@ public class PersistentActionIT extends ESIntegTestCase {
// Verifying parent
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId));
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
if (randomBoolean()) {
logger.info("Completing the running task");
// Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
} else {
logger.info("Cancelling the running task");
// Cancel the running task and make sure it finishes properly
assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
stopOrCancelTask(firstRunningTask.getTaskId());
}
public void testPersistentActionCompletionWithoutRemoval() throws Exception {
boolean stopped = randomBoolean();
long taskId = CreatePersistentTaskAction.INSTANCE.newRequestBuilder(client())
.setAction(TestPersistentAction.NAME)
.setRequest(new TestPersistentActionPlugin.TestRequest("Blah"))
.setRemoveOnCompletion(false)
.setStopped(stopped).get().getTaskId();
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped));
assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue());
assertThat(tasksInProgress.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false));
int numberOfIters = randomIntBetween(1, 5); // we will start/stop the action a few times before removing it
logger.info("start/stop the task {} times stating with stopped {}", numberOfIters, stopped);
for (int i = 0; i < numberOfIters; i++) {
logger.info("iteration {}", i);
if (stopped) {
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks(),
empty());
assertAcked(StartPersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(taskId).get());
}
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks()
.size(), equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]")
.get().getTasks().get(0);
stopOrCancelTask(firstRunningTask.getTaskId());
assertBusy(() -> {
// Wait for the task to finish
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get()
.getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(0));
});
stopped = true;
}
assertBusy(() -> {
// Wait for the task to be marked as stopped
PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
assertThat(tasks.tasks().size(), equalTo(1));
assertThat(tasks.getTask(taskId).isStopped(), equalTo(true));
assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false));
});
logger.info("Removing action record from cluster state");
assertAcked(RemovePersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(taskId).get());
}
public void testPersistentActionWithNoAvailableNode() throws Exception {
@ -182,7 +216,8 @@ public class PersistentActionIT extends ESIntegTestCase {
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]")
.get().getTasks().get(0);
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue());
@ -195,7 +230,8 @@ public class PersistentActionIT extends ESIntegTestCase {
int finalI = i;
assertBusy(() -> {
PersistentTasksInProgress tasks = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE);
assertThat(tasks.tasks().size(), equalTo(1));
assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue());
assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
@ -209,6 +245,24 @@ public class PersistentActionIT extends ESIntegTestCase {
.get().getTasks().size(), equalTo(1));
}
private void stopOrCancelTask(TaskId taskId) {
if (randomBoolean()) {
logger.info("Completing the running task");
// Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(taskId)
.get().getTasks().size(), equalTo(1));
} else {
logger.info("Cancelling the running task");
// Cancel the running task and make sure it finishes properly
assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(taskId)
.get().getTasks().size(), equalTo(1));
}
}
private void assertNoRunningTasks() throws Exception {
assertBusy(() -> {
// Wait for the task to finish
@ -218,8 +272,8 @@ public class PersistentActionIT extends ESIntegTestCase {
assertThat(tasks.size(), equalTo(0));
// Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE))
.tasks(), empty());
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksInProgress.TYPE)).tasks(), empty());
});
}

View File

@ -0,0 +1,401 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
*/
package org.elasticsearch.persistent;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class PersistentTaskClusterServiceTests extends ESTestCase {
public void testReassignmentRequired() {
int numberOfIterations = randomIntBetween(10, 100);
ClusterState clusterState = initialState();
for (int i = 0; i < numberOfIterations; i++) {
boolean significant = randomBoolean();
ClusterState previousState = clusterState;
logger.info("inter {} significant: {}", i, significant);
if (significant) {
clusterState = significantChange(clusterState);
} else {
clusterState = insignificantChange(clusterState);
}
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState);
assertThat(dumpEvent(event), significant, equalTo(PersistentTaskClusterService.reassignmentRequired(event,
new PersistentTaskClusterService.ExecutorNodeDecider() {
@Override
public <Request extends PersistentActionRequest> String executorNode(
String action, ClusterState currentState, Request request) {
return randomNode(currentState.nodes());
}
})));
}
}
public void testReassignTasksWithNoTasks() {
ClusterState clusterState = initialState();
assertThat(reassign(clusterState).metaData().custom(PersistentTasksInProgress.TYPE), nullValue());
}
public void testReassignConsidersClusterStateUpdates() {
ClusterState clusterState = initialState();
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(
clusterState.metaData().custom(PersistentTasksInProgress.TYPE));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(2, 40);
for (int i = 0; i < numberOfTasks; i++) {
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false);
}
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build());
clusterState = builder.metaData(metaData).nodes(nodes).build();
ClusterState newClusterState = reassign(clusterState);
PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress, notNullValue());
}
public void testReassignTasks() {
ClusterState clusterState = initialState();
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(
clusterState.metaData().custom(PersistentTasksInProgress.TYPE));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(0, 40);
for (int i = 0; i < numberOfTasks; i++) {
switch (randomInt(3)) {
case 0:
// add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned
addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits", false);
break;
case 1:
// add a task assigned to non-existing node that should not get assigned
addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits", false);
break;
case 2:
// add a stopped task assigned to non-existing node that should not get assigned
addTask(tasks, "should_not_assign", "fail_me_if_called", null, true);
break;
case 3:
addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits", false);
break;
}
}
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build());
clusterState = builder.metaData(metaData).nodes(nodes).build();
ClusterState newClusterState = reassign(clusterState);
PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress, notNullValue());
assertThat("number of tasks shouldn't change as a result or reassignment",
numberOfTasks, equalTo(tasksInProgress.tasks().size()));
int assignOneCount = 0;
for (PersistentTaskInProgress<?> task : tasksInProgress.tasks()) {
if (task.isStopped()) {
assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue());
} else {
switch (task.getAction()) {
case "should_assign":
assertThat(task.getExecutorNode(), notNullValue());
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
logger.info(clusterState.metaData().custom(PersistentTasksInProgress.TYPE).toString());
}
assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(),
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
break;
case "should_not_assign":
assertThat(task.getExecutorNode(), nullValue());
break;
case "assign_one":
if (task.getExecutorNode() != null) {
assignOneCount++;
assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1));
}
break;
default:
fail("Unknown action " + task.getAction());
}
}
}
}
private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) {
for (int i = 0; i < nonLocalNodesCount; i++) {
nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT));
}
}
private ClusterState reassign(ClusterState clusterState) {
return PersistentTaskClusterService.reassignTasks(clusterState, logger,
new PersistentTaskClusterService.ExecutorNodeDecider() {
@Override
public <Request extends PersistentActionRequest> String executorNode(
String action, ClusterState currentState, Request request) {
TestRequest testRequest = (TestRequest) request;
switch (testRequest.getTestParam()) {
case "assign_me":
return randomNode(currentState.nodes());
case "dont_assign_me":
return null;
case "fail_me_if_called":
fail("the decision decider shouldn't be called on this task");
return null;
case "assign_one":
return assignOnlyOneTaskAtATime(currentState);
default:
fail("unknown param " + testRequest.getTestParam());
}
return null;
}
});
}
private String assignOnlyOneTaskAtATime(ClusterState clusterState) {
DiscoveryNodes nodes = clusterState.nodes();
PersistentTasksInProgress tasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress.findTasks("assign_one",
task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
return randomNode(clusterState.nodes());
} else {
return null;
}
}
private String randomNode(DiscoveryNodes nodes) {
if (nodes.getNodes().isEmpty()) {
return null;
}
List<String> nodeList = new ArrayList<>();
for (ObjectCursor<String> node : nodes.getNodes().keys()) {
nodeList.add(node.value);
}
return randomFrom(nodeList);
}
private String dumpEvent(ClusterChangedEvent event) {
return "nodes_changed: " + event.nodesChanged() +
" nodes_removed:" + event.nodesRemoved() +
" routing_table_changed:" + event.routingTableChanged() +
" tasks: " + event.state().metaData().custom(PersistentTasksInProgress.TYPE);
}
private ClusterState significantChange(ClusterState clusterState) {
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
if (tasks != null) {
if (randomBoolean()) {
//
boolean removedNode = false;
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.getExecutorNode() != null && clusterState.nodes().nodeExists(task.getExecutorNode())) {
logger.info("removed node {}", task.getExecutorNode());
builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(task.getExecutorNode()));
return builder.build();
}
}
}
}
boolean tasksOrNodesChanged = false;
// add a new unassigned task
if (hasUnassigned(tasks, clusterState.nodes()) == false) {
// we don't have any unassigned tasks - add some
logger.info("added random task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false);
tasksOrNodesChanged = true;
}
// add a node if there are unassigned tasks
if (clusterState.nodes().getNodes().isEmpty()) {
logger.info("added random node");
builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode(randomAsciiOfLength(10))));
tasksOrNodesChanged = true;
}
if (tasksOrNodesChanged == false) {
// change routing table to simulate a change
logger.info("changed routing table");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
changeRoutingTable(metaData, routingTable);
builder.metaData(metaData).routingTable(routingTable.build());
}
return builder.build();
}
private ClusterState insignificantChange(ClusterState clusterState) {
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
if (randomBoolean()) {
if (hasUnassigned(tasks, clusterState.nodes()) == false) {
// we don't have any unassigned tasks - adding a node or changing a routing table shouldn't affect anything
if (randomBoolean()) {
logger.info("added random node");
builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode(randomAsciiOfLength(10))));
}
if (randomBoolean()) {
// add unassigned task in stopped state
logger.info("added random stopped task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, true);
return builder.build();
} else {
logger.info("changed routing table");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
changeRoutingTable(metaData, routingTable);
builder.metaData(metaData).routingTable(routingTable.build());
}
return builder.build();
}
}
if (randomBoolean()) {
// remove a node that doesn't have any tasks assigned to it and it's not the master node
for (DiscoveryNode node : clusterState.nodes()) {
if (hasTasksAssignedTo(tasks, node.getId()) == false && "this_node".equals(node.getId()) == false) {
logger.info("removed unassigned node {}", node.getId());
return builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(node.getId())).build();
}
}
}
if (randomBoolean()) {
// clear the task
if (randomBoolean()) {
logger.info("removed all tasks");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE,
PersistentTasksInProgress.builder().build());
return builder.metaData(metaData).build();
} else {
logger.info("set task custom to null");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasksInProgress.TYPE);
return builder.metaData(metaData).build();
}
}
logger.info("removed all unassigned tasks and changed routing table");
PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks);
if (tasks != null) {
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null) {
tasksBuilder.removeTask(task.getId());
}
}
}
// Just add a random index - that shouldn't change anything
IndexMetaData indexMetaData = IndexMetaData.builder(randomAsciiOfLength(10))
.settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random())))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).put(indexMetaData, false)
.putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build());
return builder.metaData(metaData).build();
}
private boolean hasUnassigned(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) {
if (tasks == null || tasks.tasks().isEmpty()) {
return false;
}
return tasks.tasks().stream().anyMatch(task ->
task.isStopped() == false &&
(task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())));
}
private boolean hasTasksAssignedTo(PersistentTasksInProgress tasks, String nodeId) {
return tasks != null && tasks.tasks().stream().anyMatch(
task -> nodeId.equals(task.getExecutorNode())) == false;
}
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks,
String node,
boolean stopped) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksInProgress.TYPE,
tasks.addTask(randomAsciiOfLength(10), new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(), node).build()));
}
private void addTask(PersistentTasksInProgress.Builder tasks, String action, String param, String node, boolean stopped) {
tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), node);
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(),
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))),
Version.CURRENT);
}
private ClusterState initialState() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
int randomIndices = randomIntBetween(0, 5);
for (int i = 0; i < randomIndices; i++) {
changeRoutingTable(metaData, routingTable);
}
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
nodes.add(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "this_node"));
nodes.localNodeId("this_node");
nodes.masterNodeId("this_node");
return ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable.build())
.build();
}
private void changeRoutingTable(MetaData.Builder metaData, RoutingTable.Builder routingTable) {
IndexMetaData indexMetaData = IndexMetaData.builder(randomAsciiOfLength(10))
.settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random())))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
metaData.put(indexMetaData, false);
routingTable.addAsNew(indexMetaData);
}
}

View File

@ -18,48 +18,263 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.AbstractDiffableSerializationTestCase;
import org.elasticsearch.persistent.PersistentTasksInProgress.Builder;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.persistent.TestPersistentActionPlugin.Status;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase<PersistentTasksInProgress> {
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY;
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT;
public class PersistentTasksInProgressTests extends AbstractDiffableSerializationTestCase<Custom> {
@Override
protected PersistentTasksInProgress createTestInstance() {
int numberOfTasks = randomInt(10);
Map<Long, PersistentTaskInProgress<?>> entries = new HashMap<>();
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder();
for (int i = 0; i < numberOfTasks; i++) {
PersistentTaskInProgress<?> taskInProgress = new PersistentTaskInProgress<>(
randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)),
randomAsciiOfLength(10));
tasks.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)),
randomBoolean(), randomBoolean(), randomAsciiOfLength(10));
if (randomBoolean()) {
// From time to time update status
taskInProgress = new PersistentTaskInProgress<>(taskInProgress, new Status(randomAsciiOfLength(10)));
tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10)));
}
entries.put(taskInProgress.getId(), taskInProgress);
}
return new PersistentTasksInProgress(randomLong(), entries);
return tasks.build();
}
@Override
protected Writeable.Reader<PersistentTasksInProgress> instanceReader() {
protected Writeable.Reader<Custom> instanceReader() {
return PersistentTasksInProgress::new;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Arrays.asList(
new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestPersistentActionPlugin.TestRequest::new),
new Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new),
new Entry(Task.Status.class, Status.NAME, Status::new)
));
}
@Override
protected Custom makeTestChanges(Custom testInstance) {
PersistentTasksInProgress tasksInProgress = (PersistentTasksInProgress) testInstance;
Builder builder = new Builder();
switch (randomInt(3)) {
case 0:
addRandomTask(builder);
break;
case 1:
if (tasksInProgress.tasks().isEmpty()) {
addRandomTask(builder);
} else {
builder.reassignTask(pickRandomTask(tasksInProgress), randomAsciiOfLength(10));
}
break;
case 2:
if (tasksInProgress.tasks().isEmpty()) {
addRandomTask(builder);
} else {
builder.updateTaskStatus(pickRandomTask(tasksInProgress), randomBoolean() ? new Status(randomAsciiOfLength(10)) : null);
}
break;
case 3:
if (tasksInProgress.tasks().isEmpty()) {
addRandomTask(builder);
} else {
builder.removeTask(pickRandomTask(tasksInProgress));
}
break;
}
return builder.build();
}
@Override
protected Writeable.Reader<Diff<Custom>> diffReader() {
return PersistentTasksInProgress::readDiffFrom;
}
@Override
protected PersistentTasksInProgress doParseInstance(XContentParser parser) throws IOException {
return PersistentTasksInProgress.fromXContent(parser);
}
@Override
protected XContentBuilder toXContent(Custom instance, XContentType contentType) throws IOException {
return toXContent(instance, contentType, new ToXContent.MapParams(
Collections.singletonMap(MetaData.CONTEXT_MODE_PARAM, MetaData.XContentContext.API.toString())));
}
protected XContentBuilder toXContent(Custom instance, XContentType contentType, ToXContent.MapParams params) throws IOException {
// We need all attribute to be serialized/de-serialized for testing
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
if (randomBoolean()) {
builder.prettyPrint();
}
if (instance.isFragment()) {
builder.startObject();
}
instance.toXContent(builder, params);
if (instance.isFragment()) {
builder.endObject();
}
return builder;
}
private Builder addRandomTask(Builder builder) {
builder.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)),
randomBoolean(), randomBoolean(), randomAsciiOfLength(10));
return builder;
}
private long pickRandomTask(PersistentTasksInProgress testInstance) {
return randomFrom(new ArrayList<>(testInstance.tasks())).getId();
}
@Override
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(Arrays.asList(
new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME),
TestRequest::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentAction.NAME),
Status::fromXContent)
));
}
@SuppressWarnings("unchecked")
public void testSerializationContext() throws Exception {
PersistentTasksInProgress testInstance = createTestInstance();
for (int i = 0; i < randomInt(10); i++) {
testInstance = (PersistentTasksInProgress) makeTestChanges(testInstance);
}
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(MetaData.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY)));
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder builder = toXContent(testInstance, xContentType, params);
XContentBuilder shuffled = shuffleXContent(builder);
XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled.bytes());
PersistentTasksInProgress newInstance = doParseInstance(parser);
assertNotSame(newInstance, testInstance);
assertEquals(testInstance.tasks().size(), newInstance.tasks().size());
for (PersistentTaskInProgress<?> testTask : testInstance.tasks()) {
PersistentTaskInProgress<TestRequest> newTask = (PersistentTaskInProgress<TestRequest>) newInstance.getTask(testTask.getId());
assertNotNull(newTask);
// Things that should be serialized
assertEquals(testTask.getAction(), newTask.getAction());
assertEquals(testTask.getId(), newTask.getId());
assertEquals(testTask.getStatus(), newTask.getStatus());
assertEquals(testTask.getRequest(), newTask.getRequest());
assertEquals(testTask.isStopped(), newTask.isStopped());
// Things that shouldn't be serialized
assertEquals(0, newTask.getAllocationId());
assertNull(newTask.getExecutorNode());
}
}
public void testBuilder() {
PersistentTasksInProgress persistentTasksInProgress = null;
long lastKnownTask = -1;
for (int i = 0; i < randomIntBetween(10, 100); i++) {
final Builder builder;
if (randomBoolean()) {
builder = new Builder();
} else {
builder = new Builder(persistentTasksInProgress);
}
boolean changed = false;
for (int j = 0; j < randomIntBetween(1, 10); j++) {
switch (randomInt(5)) {
case 0:
lastKnownTask = addRandomTask(builder).getCurrentId();
changed = true;
break;
case 1:
if (builder.hasTask(lastKnownTask)) {
changed = true;
}
if (randomBoolean()) {
builder.reassignTask(lastKnownTask, randomAsciiOfLength(10));
} else {
builder.reassignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10));
}
break;
case 2:
if (builder.hasTask(lastKnownTask)) {
PersistentTaskInProgress<?> task = builder.build().getTask(lastKnownTask);
if (randomBoolean()) {
// Trying to reassign to the same node
builder.assignTask(lastKnownTask, (s, request) -> task.getExecutorNode());
// should change if the task was stopped AND unassigned
if (task.getExecutorNode() == null && task.isStopped()) {
changed = true;
}
} else {
// Trying to reassign to a different node
builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10));
// should change if the task was unassigned
if (task.getExecutorNode() == null) {
changed = true;
}
}
} else {
// task doesn't exist - shouldn't change
builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10));
}
break;
case 3:
if (builder.hasTask(lastKnownTask)) {
changed = true;
}
builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAsciiOfLength(10)) : null);
break;
case 4:
if (builder.hasTask(lastKnownTask)) {
changed = true;
}
builder.removeTask(lastKnownTask);
break;
case 5:
if (builder.hasTask(lastKnownTask)) {
changed = true;
}
builder.finishTask(lastKnownTask);
break;
}
}
assertEquals(changed, builder.isChanged());
persistentTasksInProgress = builder.build();
}
}
}

View File

@ -20,7 +20,7 @@ package org.elasticsearch.persistent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.persistent.StartPersistentTaskAction.Request;
import org.elasticsearch.persistent.CreatePersistentTaskAction.Request;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
import org.elasticsearch.test.AbstractStreamableTestCase;

View File

@ -37,8 +37,10 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
@ -47,8 +49,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
@ -72,6 +76,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.junit.Assert.assertTrue;
@ -87,6 +92,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
return Arrays.asList(
new ActionHandler<>(TestPersistentAction.INSTANCE, TransportTestPersistentAction.class),
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
@ -114,14 +120,32 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
new NamedWriteableRegistry.Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new),
new NamedWriteableRegistry.Entry(Task.Status.class,
PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new),
new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new)
);
}
@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE),
PersistentTasksInProgress::fromXContent),
new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME),
TestRequest::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
);
}
public static class TestRequest extends PersistentActionRequest {
public static final ConstructingObjectParser<TestRequest, Void> REQUEST_PARSER =
new ConstructingObjectParser<>(TestPersistentAction.NAME, args -> new TestRequest((String) args[0]));
static {
REQUEST_PARSER.declareString(constructorArg(), new ParseField("param"));
}
private String executorNodeAttr = null;
private String responseNode = null;
@ -185,10 +209,15 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("param", testParam);
builder.endObject();
return builder;
}
public static TestRequest fromXContent(XContentParser parser) throws IOException {
return REQUEST_PARSER.parse(parser, null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -255,6 +284,13 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
private final String phase;
public static final ConstructingObjectParser<Status, Void> STATUS_PARSER =
new ConstructingObjectParser<>(TestPersistentAction.NAME, args -> new Status((String) args[0]));
static {
STATUS_PARSER.declareString(constructorArg(), new ParseField("phase"));
}
public Status(String phase) {
this.phase = requireNonNull(phase, "Phase cannot be null");
}
@ -276,6 +312,11 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
return builder;
}
public static Task.Status fromXContent(XContentParser parser) throws IOException {
return STATUS_PARSER.parse(parser, null);
}
@Override
public boolean isFragment() {
return false;
@ -319,7 +360,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, TestPersistentAction.NAME, false, threadPool, transportService, persistentActionService,
persistentActionRegistry, actionFilters, indexNameExpressionResolver, TestRequest::new,
ThreadPool.Names.MANAGEMENT);
ThreadPool.Names.GENERIC);
this.transportService = transportService;
}
@ -343,7 +384,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
// wait for something to happen
assertTrue(awaitBusy(() -> testTask.isCancelled() ||
testTask.getOperation() != null ||
transportService.lifecycleState() != Lifecycle.State.STARTED)); // speedup finishing on closed nodes
transportService.lifecycleState() != Lifecycle.State.STARTED, // speedup finishing on closed nodes
30, TimeUnit.SECONDS)); // This can take a while during large cluster restart
if (transportService.lifecycleState() != Lifecycle.State.STARTED) {
return;
}
@ -430,6 +472,11 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
public void setOperation(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "TestTask[" + this.getId() + ", " + this.getParentTaskId() + ", " + this.getOperation() + "]";
}
}
static class TestTaskResponse implements Writeable {

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.persistent.TestPersistentActionPlugin.Status;
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction.Request;
import java.util.Collections;
public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomLong(), new Status(randomAsciiOfLength(10)));
}
@Override
protected Request createBlankInstance() {
return new Request();
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new)
));
}
}