Small code cleanups and refactorings in persistent tasks (#29109)

This commit consists of small code cleanups and refactorings in the
persistent tasks framework. Most changes are in
PersistentTasksClusterService where some methods have been renamed
or merged together, documentation has been added, unused code removed
in order to improve readability of the code.
This commit is contained in:
Tanguy Leroux 2018-03-19 09:26:17 +01:00 committed by GitHub
parent 7608480a62
commit b57bd695f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 242 additions and 135 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.persistent; package org.elasticsearch.persistent;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -33,9 +32,9 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task;
import java.util.Objects; import java.util.Objects;
@ -52,29 +51,31 @@ public class PersistentTasksClusterService extends AbstractComponent implements
this.clusterService = clusterService; this.clusterService = clusterService;
clusterService.addListener(this); clusterService.addListener(this);
this.registry = registry; this.registry = registry;
} }
/** /**
* Creates a new persistent task on master node * Creates a new persistent task on master node
* *
* @param action the action name * @param taskId the task's id
* @param params params * @param taskName the task's name
* @param listener the listener that will be called when task is started * @param taskParams the task's parameters
* @param listener the listener that will be called when task is started
*/ */
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String action, @Nullable Params params, public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams,
ActionListener<PersistentTask<?>> listener) { ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData.Builder builder = builder(currentState); PersistentTasksCustomMetaData.Builder builder = builder(currentState);
if (builder.hasTask(taskId)) { if (builder.hasTask(taskId)) {
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist"); throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
} }
validate(action, currentState, params);
final Assignment assignment; PersistentTasksExecutor<Params> taskExecutor = registry.getPersistentTaskExecutorSafe(taskName);
assignment = getAssignement(action, currentState, params); taskExecutor.validate(taskParams, currentState);
return update(currentState, builder.addTask(taskId, action, params, assignment));
Assignment assignment = createAssignment(taskName, taskParams, currentState);
return update(currentState, builder.addTask(taskId, taskName, taskParams, assignment));
} }
@Override @Override
@ -95,7 +96,6 @@ public class PersistentTasksClusterService extends AbstractComponent implements
}); });
} }
/** /**
* Restarts a record about a running persistent task from cluster state * Restarts a record about a running persistent task from cluster state
* *
@ -114,7 +114,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
} }
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id, allocationId)) { if (tasksInProgress.hasTask(id, allocationId)) {
tasksInProgress.removeTask(id); tasksInProgress.removeTask(id);
@ -185,7 +185,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) { public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id, allocationId)) { if (tasksInProgress.hasTask(id, allocationId)) {
return update(currentState, tasksInProgress.updateTaskStatus(id, status)); return update(currentState, tasksInProgress.updateTaskStatus(id, status));
@ -211,93 +211,85 @@ public class PersistentTasksClusterService extends AbstractComponent implements
}); });
} }
private <Params extends PersistentTaskParams> Assignment getAssignement(String taskName, ClusterState currentState, /**
@Nullable Params params) { * Creates a new {@link Assignment} for the given persistent task.
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); *
return persistentTasksExecutor.getAssignment(params, currentState); * @param taskName the task's name
} * @param taskParams the task's parameters
* @param currentState the current {@link ClusterState}
private <Params extends PersistentTaskParams> void validate(String taskName, ClusterState currentState, @Nullable Params params) { * @return a new {@link Assignment}
*/
private <Params extends PersistentTaskParams> Assignment createAssignment(final String taskName,
final @Nullable Params taskParams,
final ClusterState currentState) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(params, currentState); return persistentTasksExecutor.getAssignment(taskParams, currentState);
} }
@Override @Override
public void clusterChanged(ClusterChangedEvent event) { public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) { if (event.localNodeMaster()) {
logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); if (shouldReassignPersistentTasks(event)) {
if (reassignmentRequired(event, this::getAssignement)) { logger.trace("checking task reassignment for cluster state {}", event.state().getVersion());
logger.trace("task reassignment is needed"); clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
reassignTasks(); @Override
} else { public ClusterState execute(ClusterState currentState) {
logger.trace("task reassignment is not needed"); return reassignTasks(currentState);
}
@Override
public void onFailure(String source, Exception e) {
logger.warn("failed to reassign persistent tasks", e);
}
});
} }
} }
} }
interface ExecutorNodeDecider { /**
<Params extends PersistentTaskParams> Assignment getAssignment(String action, ClusterState currentState, Params params); * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following
} * situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed.
*/
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks == null) {
return false;
}
static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) { boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false;
PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData prevTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) {
if (tasks != null && (Objects.equals(tasks, prevTasks) == false || for (PersistentTask<?> task : tasks.tasks()) {
event.nodesChanged() || if (needsReassignment(task.getAssignment(), event.state().nodes())) {
event.routingTableChanged() || Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());
event.previousState().nodes().isLocalNodeElectedMaster() == false)) { if (Objects.equals(assignment, task.getAssignment()) == false) {
// We need to check if removed nodes were running any of the tasks and reassign them return true;
boolean reassignmentRequired = false;
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
if (taskInProgress.needsReassignment(event.state().nodes())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getAssignment(),
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) {
// it looks like a assignment for at least one task is possible - let's trigger reassignment
reassignmentRequired = true;
break;
} }
} }
} }
return reassignmentRequired;
} }
return false; return false;
} }
/** /**
* Evaluates the cluster state and tries to assign tasks to nodes * Evaluates the cluster state and tries to assign tasks to nodes.
*
* @param currentState the cluster state to analyze
* @return an updated version of the cluster state
*/ */
public void reassignTasks() { ClusterState reassignTasks(final ClusterState currentState) {
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return reassignTasks(currentState, logger, PersistentTasksClusterService.this::getAssignement);
}
@Override
public void onFailure(String source, Exception e) {
logger.warn("Unsuccessful persistent task reassignment", e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) {
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
ClusterState clusterState = currentState; ClusterState clusterState = currentState;
DiscoveryNodes nodes = currentState.nodes();
final PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) { if (tasks != null) {
logger.trace("reassigning {} persistent tasks", tasks.tasks().size()); logger.trace("reassigning {} persistent tasks", tasks.tasks().size());
final DiscoveryNodes nodes = currentState.nodes();
// We need to check if removed nodes were running any of the tasks and reassign them // We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTask<?> task : tasks.tasks()) { for (PersistentTask<?> task : tasks.tasks()) {
if (task.needsReassignment(nodes)) { if (needsReassignment(task.getAssignment(), nodes)) {
// there is an unassigned task - we need to try assigning it Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState);
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams());
if (Objects.equals(assignment, task.getAssignment()) == false) { if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace("reassigning task {} from node {} to node {}", task.getId(), logger.trace("reassigning task {} from node {} to node {}", task.getId(),
task.getAssignment().getExecutorNode(), assignment.getExecutorNode()); task.getAssignment().getExecutorNode(), assignment.getExecutorNode());
@ -313,6 +305,17 @@ public class PersistentTasksClusterService extends AbstractComponent implements
return clusterState; return clusterState;
} }
/** Returns true if the persistent tasks are not equal between the previous and the current cluster state **/
static boolean persistentTasksChanged(final ClusterChangedEvent event) {
String type = PersistentTasksCustomMetaData.TYPE;
return Objects.equals(event.state().metaData().custom(type), event.previousState().metaData().custom(type)) == false;
}
/** Returns true if the task is not assigned or is assigned to a non-existing node */
static boolean needsReassignment(final Assignment assignment, final DiscoveryNodes nodes) {
return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
}
private static PersistentTasksCustomMetaData.Builder builder(ClusterState currentState) { private static PersistentTasksCustomMetaData.Builder builder(ClusterState currentState) {
return PersistentTasksCustomMetaData.builder(currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); return PersistentTasksCustomMetaData.builder(currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
} }

View File

@ -145,7 +145,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} }
} }
public Collection<PersistentTask<?>> tasks() { public Collection<PersistentTask<?>> tasks() {
return this.tasks.values(); return this.tasks.values();
} }
@ -165,12 +164,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public boolean tasksExist(String taskName, Predicate<PersistentTask<?>> predicate) {
return this.tasks().stream()
.filter(p -> taskName.equals(p.getTaskName()))
.anyMatch(predicate);
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
@ -279,7 +272,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Nullable @Nullable
private final Long allocationIdOnLastStatusUpdate; private final Long allocationIdOnLastStatusUpdate;
public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) { public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) {
this(id, allocationId, taskName, params, null, assignment, null); this(id, allocationId, taskName, params, null, assignment, null);
} }
@ -395,13 +387,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return assignment.isAssigned(); return assignment.isAssigned();
} }
/**
* Returns true if the tasks is not stopped and unassigned or assigned to a non-existing node.
*/
public boolean needsReassignment(DiscoveryNodes nodes) {
return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
}
@Nullable @Nullable
public Status getStatus() { public Status getStatus() {
return status; return status;
@ -522,16 +507,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return readDiffFrom(MetaData.Custom.class, TYPE, in); return readDiffFrom(MetaData.Custom.class, TYPE, in);
} }
public long getLastAllocationId() {
return lastAllocationId;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("last_allocation_id", lastAllocationId); builder.field("last_allocation_id", lastAllocationId);
builder.startArray("tasks"); builder.startArray("tasks");
for (PersistentTask<?> entry : tasks.values()) { {
entry.toXContent(builder, params); for (PersistentTask<?> entry : tasks.values()) {
entry.toXContent(builder, params);
}
} }
builder.endArray(); builder.endArray();
return builder; return builder;

View File

@ -95,9 +95,7 @@ public abstract class PersistentTasksExecutor<Params extends PersistentTaskParam
* <p> * <p>
* Throws an exception if the supplied params cannot be executed on the cluster in the current state. * Throws an exception if the supplied params cannot be executed on the cluster in the current state.
*/ */
public void validate(Params params, ClusterState clusterState) { public void validate(Params params, ClusterState clusterState) {}
}
/** /**
* Creates a AllocatedPersistentTask for communicating with task manager * Creates a AllocatedPersistentTask for communicating with task manager

View File

@ -29,31 +29,42 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.function.BiFunction;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment;
import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged;
import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class PersistentTasksClusterServiceTests extends ESTestCase { public class PersistentTasksClusterServiceTests extends ESTestCase {
public void testReassignmentRequired() { public void testReassignmentRequired() {
final PersistentTasksClusterService service = createService((params, clusterState) ->
"never_assign".equals(((TestParams) params).getTestParam()) ? NO_NODE_FOUND : randomNodeAssignment(clusterState.nodes())
);
int numberOfIterations = randomIntBetween(1, 30); int numberOfIterations = randomIntBetween(1, 30);
ClusterState clusterState = initialState(); ClusterState clusterState = initialState();
for (int i = 0; i < numberOfIterations; i++) { for (int i = 0; i < numberOfIterations; i++) {
@ -66,17 +77,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
clusterState = insignificantChange(clusterState); clusterState = insignificantChange(clusterState);
} }
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState); ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState);
assertThat(dumpEvent(event), PersistentTasksClusterService.reassignmentRequired(event, assertThat(dumpEvent(event), service.shouldReassignPersistentTasks(event), equalTo(significant));
new PersistentTasksClusterService.ExecutorNodeDecider() {
@Override
public <Params extends PersistentTaskParams> Assignment getAssignment(
String action, ClusterState currentState, Params params) {
if ("never_assign".equals(((TestParams) params).getTestParam())) {
return NO_NODE_FOUND;
}
return randomNodeAssignment(currentState.nodes());
}
}), equalTo(significant));
} }
} }
@ -175,6 +176,115 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
} }
} }
public void testPersistentTasksChangedNoTasks() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.build();
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes)
.build();
ClusterState current = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes)
.build();
assertFalse("persistent tasks unchanged (no tasks)",
persistentTasksChanged(new ClusterChangedEvent("test", current, previous)));
}
public void testPersistentTasksChangedTaskAdded() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.build();
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes)
.build();
PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder()
.addTask("_task_1", "test", null, new Assignment(null, "_reason"))
.build();
ClusterState current = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes)
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
.build();
assertTrue("persistent tasks changed (task added)",
persistentTasksChanged(new ClusterChangedEvent("test", current, previous)));
}
public void testPersistentTasksChangedTaskRemoved() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT))
.build();
PersistentTasksCustomMetaData previousTasks = PersistentTasksCustomMetaData.builder()
.addTask("_task_1", "test", null, new Assignment("_node_1", "_reason"))
.addTask("_task_2", "test", null, new Assignment("_node_1", "_reason"))
.addTask("_task_3", "test", null, new Assignment("_node_2", "_reason"))
.build();
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes)
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, previousTasks))
.build();
PersistentTasksCustomMetaData currentTasks = PersistentTasksCustomMetaData.builder()
.addTask("_task_1", "test", null, new Assignment("_node_1", "_reason"))
.addTask("_task_3", "test", null, new Assignment("_node_2", "_reason"))
.build();
ClusterState current = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes)
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, currentTasks))
.build();
assertTrue("persistent tasks changed (task removed)",
persistentTasksChanged(new ClusterChangedEvent("test", current, previous)));
}
public void testPersistentTasksAssigned() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT))
.build();
PersistentTasksCustomMetaData previousTasks = PersistentTasksCustomMetaData.builder()
.addTask("_task_1", "test", null, new Assignment("_node_1", ""))
.addTask("_task_2", "test", null, new Assignment(null, "unassigned"))
.build();
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes)
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, previousTasks))
.build();
PersistentTasksCustomMetaData currentTasks = PersistentTasksCustomMetaData.builder()
.addTask("_task_1", "test", null, new Assignment("_node_1", ""))
.addTask("_task_2", "test", null, new Assignment("_node_2", ""))
.build();
ClusterState current = ClusterState.builder(new ClusterName("_name"))
.nodes(nodes)
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, currentTasks))
.build();
assertTrue("persistent tasks changed (task assigned)",
persistentTasksChanged(new ClusterChangedEvent("test", current, previous)));
}
public void testNeedsReassignment() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT))
.build();
assertTrue(needsReassignment(new Assignment(null, "unassigned"), nodes));
assertTrue(needsReassignment(new Assignment("_node_left", "assigned to a node that left"), nodes));
assertFalse(needsReassignment(new Assignment("_node_1", "assigned"), nodes));
}
private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) { private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) {
for (int i = 0; i < nonLocalNodesCount; i++) { for (int i = 0; i < nonLocalNodesCount; i++) {
@ -183,29 +293,25 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
} }
private ClusterState reassign(ClusterState clusterState) { private ClusterState reassign(ClusterState clusterState) {
return PersistentTasksClusterService.reassignTasks(clusterState, logger, PersistentTasksClusterService service = createService((params, currentState) -> {
new PersistentTasksClusterService.ExecutorNodeDecider() { TestParams testParams = (TestParams) params;
@Override switch (testParams.getTestParam()) {
public <Params extends PersistentTaskParams> Assignment getAssignment( case "assign_me":
String action, ClusterState currentState, Params params) { return randomNodeAssignment(currentState.nodes());
TestParams testParams = (TestParams) params; case "dont_assign_me":
switch (testParams.getTestParam()) { return NO_NODE_FOUND;
case "assign_me": case "fail_me_if_called":
return randomNodeAssignment(currentState.nodes()); fail("the decision decider shouldn't be called on this task");
case "dont_assign_me": return null;
return NO_NODE_FOUND; case "assign_one":
case "fail_me_if_called": return assignOnlyOneTaskAtATime(currentState);
fail("the decision decider shouldn't be called on this task"); default:
return null; fail("unknown param " + testParams.getTestParam());
case "assign_one": }
return assignOnlyOneTaskAtATime(currentState); return NO_NODE_FOUND;
default: });
fail("unknown param " + testParams.getTestParam());
}
return NO_NODE_FOUND;
}
});
return service.reassignTasks(clusterState);
} }
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
@ -450,4 +556,21 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
metaData.put(indexMetaData, false); metaData.put(indexMetaData, false);
routingTable.addAsNew(indexMetaData); routingTable.addAsNew(indexMetaData);
} }
/** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/
static <P extends PersistentTaskParams> PersistentTasksClusterService createService(final BiFunction<P, ClusterState, Assignment> fn) {
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
singleton(new PersistentTasksExecutor<P>(Settings.EMPTY, TestPersistentTasksExecutor.NAME, null) {
@Override
public Assignment getAssignment(P params, ClusterState clusterState) {
return fn.apply(params, clusterState);
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, P params, Task.Status status) {
throw new UnsupportedOperationException();
}
}));
return new PersistentTasksClusterService(Settings.EMPTY, registry, mock(ClusterService.class));
}
} }