Replace List with Map in PersistentTasksInProgress

Store currently running persistent tasks in a map instead of a list.

Original commit: elastic/x-pack@f88c9adef5
This commit is contained in:
Igor Motov 2017-02-03 15:20:44 -05:00 committed by Martijn van Groningen
parent 32e406181e
commit d340c190b2
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
6 changed files with 145 additions and 128 deletions

View File

@ -82,7 +82,7 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
String localNodeId = event.state().getNodes().getLocalNodeId(); String localNodeId = event.state().getNodes().getLocalNodeId();
Set<PersistentTaskId> notVisitedTasks = new HashSet<>(runningTasks.keySet()); Set<PersistentTaskId> notVisitedTasks = new HashSet<>(runningTasks.keySet());
if (tasks != null) { if (tasks != null) {
for (PersistentTaskInProgress<?> taskInProgress : tasks.entries()) { for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
if (localNodeId.equals(taskInProgress.getExecutorNode())) { if (localNodeId.equals(taskInProgress.getExecutorNode())) {
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()); PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId());
RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId); RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId);

View File

@ -30,11 +30,12 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.util.ArrayList; import java.util.Collections;
import java.util.List; import java.util.HashMap;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -69,17 +70,13 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
final String executorNodeId = executorNode(action, currentState, request); final String executorNodeId = executorNode(action, currentState, request);
PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE);
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>(); long nextId;
final long nextId;
if (tasksInProgress != null) { if (tasksInProgress != null) {
nextId = tasksInProgress.getCurrentId() + 1; nextId = tasksInProgress.getCurrentId() + 1;
currentTasks.addAll(tasksInProgress.entries());
} else { } else {
nextId = 1; nextId = 1;
} }
currentTasks.add(new PersistentTaskInProgress<>(nextId, action, request, executorNodeId)); return createPersistentTask(currentState, new PersistentTaskInProgress<>(nextId, action, request, executorNodeId));
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(nextId, currentTasks)).build();
} }
@Override @Override
@ -118,23 +115,18 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
// Nothing to do, the task was already deleted // Nothing to do, the task was already deleted
return currentState; return currentState;
} }
if (failure != null) {
boolean found = false; // If the task failed - we need to restart it on another node, otherwise we just remove it
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>(); PersistentTaskInProgress<?> taskInProgress = tasksInProgress.getTask(id);
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) { if (taskInProgress != null) {
if (taskInProgress.getId() == id) { String executorNode = executorNode(taskInProgress.getAction(), currentState, taskInProgress.getRequest());
assert found == false; return updatePersistentTask(currentState, new PersistentTaskInProgress<>(taskInProgress, executorNode));
found = true;
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
String executorNode = executorNode(taskInProgress.getAction(), currentState, taskInProgress.getRequest());
currentTasks.add(new PersistentTaskInProgress<>(taskInProgress, executorNode));
}
} else {
currentTasks.add(taskInProgress);
} }
return currentState;
} else {
return removePersistentTask(currentState, id);
} }
return rebuildClusterStateIfNeeded(found, currentState, currentTasks);
} }
@Override @Override
@ -165,19 +157,11 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
// Nothing to do, the task no longer exists // Nothing to do, the task no longer exists
return currentState; return currentState;
} }
PersistentTaskInProgress<?> task = tasksInProgress.getTask(id);
boolean found = false; if (task != null) {
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>(); return updatePersistentTask(currentState, new PersistentTaskInProgress<>(task, status));
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) {
if (taskInProgress.getId() == id) {
assert found == false;
found = true;
currentTasks.add(new PersistentTaskInProgress<>(taskInProgress, status));
} else {
currentTasks.add(taskInProgress);
}
} }
return rebuildClusterStateIfNeeded(found, currentState, currentTasks); return currentState;
} }
@Override @Override
@ -192,14 +176,40 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
}); });
} }
private ClusterState rebuildClusterStateIfNeeded(boolean rebuild, ClusterState oldState, private ClusterState updatePersistentTask(ClusterState oldState, PersistentTaskInProgress<?> newTask) {
List<PersistentTaskInProgress<?>> currentTasks) { PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
if (rebuild) { Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.putAll(oldTasks.taskMap());
taskMap.put(newTask.getId(), newTask);
ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
}
private ClusterState createPersistentTask(ClusterState oldState, PersistentTaskInProgress<?> newTask) {
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
if (oldTasks != null) {
taskMap.putAll(oldTasks.taskMap());
}
taskMap.put(newTask.getId(), newTask);
ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(newTask.getId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
}
private ClusterState removePersistentTask(ClusterState oldState, long taskId) {
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
if (oldTasks != null) {
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
ClusterState.Builder builder = ClusterState.builder(oldState); ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE); taskMap.putAll(oldTasks.taskMap());
PersistentTasksInProgress tasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), currentTasks); taskMap.remove(taskId);
return builder.putCustom(PersistentTasksInProgress.TYPE, tasks).build(); PersistentTasksInProgress newTasks =
new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
} else { } else {
// no tasks - nothing to do
return oldState; return oldState;
} }
} }
@ -227,7 +237,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
// We need to check if removed nodes were running any of the tasks and reassign them // We need to check if removed nodes were running any of the tasks and reassign them
boolean reassignmentRequired = false; boolean reassignmentRequired = false;
Set<String> removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); Set<String> removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
for (PersistentTaskInProgress<?> taskInProgress : tasks.entries()) { for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
if (taskInProgress.getExecutorNode() == null) { if (taskInProgress.getExecutorNode() == null) {
// there is an unassigned task - we need to try assigning it // there is an unassigned task - we need to try assigning it
reassignmentRequired = true; reassignmentRequired = true;
@ -258,22 +268,12 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
DiscoveryNodes nodes = currentState.nodes(); DiscoveryNodes nodes = currentState.nodes();
if (tasks != null) { if (tasks != null) {
// We need to check if removed nodes were running any of the tasks and reassign them // We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTaskInProgress<?> task : tasks.entries()) { for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false) { if (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false) {
// there is an unassigned task - we need to try assigning it // there is an unassigned task - we need to try assigning it
String executorNode = executorNode(task.getAction(), currentState, task.getRequest()); String executorNode = executorNode(task.getAction(), currentState, task.getRequest());
if (Objects.equals(executorNode, task.getExecutorNode()) == false) { if (Objects.equals(executorNode, task.getExecutorNode()) == false) {
PersistentTasksInProgress tasksInProgress = newClusterState.custom(PersistentTasksInProgress.TYPE); newClusterState = updatePersistentTask(newClusterState, new PersistentTaskInProgress<>(task, executorNode));
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) {
if (task.getId() == taskInProgress.getId()) {
currentTasks.add(new PersistentTaskInProgress<>(task, executorNode));
} else {
currentTasks.add(taskInProgress);
}
}
newClusterState = ClusterState.builder(newClusterState).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(tasksInProgress.getCurrentId(), currentTasks)).build();
} }
} }
} }

View File

@ -33,7 +33,7 @@ import org.elasticsearch.tasks.Task.Status;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -44,29 +44,37 @@ import java.util.stream.Collectors;
public final class PersistentTasksInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom { public final class PersistentTasksInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
public static final String TYPE = "persistent_tasks"; public static final String TYPE = "persistent_tasks";
// TODO: Implement custom Diff for entries // TODO: Implement custom Diff for tasks
private final List<PersistentTaskInProgress<?>> entries; private final Map<Long, PersistentTaskInProgress<?>> tasks;
private final long currentId; private final long currentId;
public PersistentTasksInProgress(long currentId, List<PersistentTaskInProgress<?>> entries) { public PersistentTasksInProgress(long currentId, Map<Long, PersistentTaskInProgress<?>> tasks) {
this.currentId = currentId; this.currentId = currentId;
this.entries = entries; this.tasks = tasks;
} }
public List<PersistentTaskInProgress<?>> entries() { public Collection<PersistentTaskInProgress<?>> tasks() {
return this.entries; return this.tasks.values();
} }
public Collection<PersistentTaskInProgress<?>> findEntries(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) { public Map<Long, PersistentTaskInProgress<?>> taskMap() {
return this.entries().stream() return this.tasks;
}
public PersistentTaskInProgress<?> getTask(long id) {
return this.tasks.get(id);
}
public Collection<PersistentTaskInProgress<?>> findTasks(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
return this.tasks().stream()
.filter(p -> actionName.equals(p.getAction())) .filter(p -> actionName.equals(p.getAction()))
.filter(predicate) .filter(predicate)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public boolean entriesExist(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) { public boolean tasksExist(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
return this.entries().stream() return this.tasks().stream()
.filter(p -> actionName.equals(p.getAction())) .filter(p -> actionName.equals(p.getAction()))
.anyMatch(predicate); .anyMatch(predicate);
} }
@ -77,16 +85,16 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
PersistentTasksInProgress that = (PersistentTasksInProgress) o; PersistentTasksInProgress that = (PersistentTasksInProgress) o;
return currentId == that.currentId && return currentId == that.currentId &&
Objects.equals(entries, that.entries); Objects.equals(tasks, that.tasks);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(entries, currentId); return Objects.hash(tasks, currentId);
} }
public long getNumberOfTasksOnNode(String nodeId, String action) { public long getNumberOfTasksOnNode(String nodeId, String action) {
return entries.stream().filter(task -> action.equals(task.action) && nodeId.equals(task.executorNode)).count(); return tasks.values().stream().filter(task -> action.equals(task.action) && nodeId.equals(task.executorNode)).count();
} }
@Override @Override
@ -97,7 +105,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
/** /**
* A record that represents a single running persistent task * A record that represents a single running persistent task
*/ */
public static class PersistentTaskInProgress<Request extends PersistentActionRequest> implements Writeable { public static class PersistentTaskInProgress<Request extends PersistentActionRequest> implements Writeable, ToXContent {
private final long id; private final long id;
private final long allocationId; private final long allocationId;
private final String action; private final String action;
@ -196,6 +204,28 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
public Status getStatus() { public Status getStatus() {
return status; return status;
} }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field("uuid", id);
builder.field("action", action);
builder.field("request");
request.toXContent(builder, params);
if (status != null) {
builder.field("status", status, params);
}
builder.field("executor_node", executorNode);
}
builder.endObject();
return builder;
}
@Override
public boolean isFragment() {
return false;
}
} }
@Override @Override
@ -205,13 +235,15 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
public PersistentTasksInProgress(StreamInput in) throws IOException { public PersistentTasksInProgress(StreamInput in) throws IOException {
currentId = in.readLong(); currentId = in.readLong();
entries = in.readList(PersistentTaskInProgress::new); tasks = in.readMap(StreamInput::readLong, PersistentTaskInProgress::new);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(currentId); out.writeLong(currentId);
out.writeList(entries); out.writeMap(tasks, StreamOutput::writeLong, (stream, value) -> {
value.writeTo(stream);
});
} }
public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException { public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
@ -227,25 +259,11 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("current_id", currentId); builder.field("current_id", currentId);
builder.startArray("running_tasks"); builder.startArray("running_tasks");
for (PersistentTaskInProgress<?> entry : entries) { for (PersistentTaskInProgress<?> entry : tasks.values()) {
toXContent(entry, builder, params); entry.toXContent(builder, params);
} }
builder.endArray(); builder.endArray();
return builder; return builder;
} }
public void toXContent(PersistentTaskInProgress<?> entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
{
builder.field("uuid", entry.id);
builder.field("action", entry.action);
builder.field("request");
entry.request.toXContent(builder, params);
if (entry.status != null) {
builder.field("status", entry.status, params);
}
builder.field("executor_node", entry.executorNode);
}
builder.endObject();
}
} }

View File

@ -41,7 +41,9 @@ import org.elasticsearch.transport.TransportResponse.Empty;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -85,17 +87,17 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY))
.build(); .build();
List<PersistentTaskInProgress<?>> tasks = new ArrayList<>(); Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>();
long taskId = randomLong(); long taskId = randomLong();
boolean added = false; boolean added = false;
if (nonLocalNodesCount > 0) { if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) { for (int i = 0; i < randomInt(5); i++) {
tasks.add(new PersistentTaskInProgress<>(taskId, "test_action", new TestRequest("other_" + i), tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test_action", new TestRequest("other_" + i),
"other_node_" + randomInt(nonLocalNodesCount))); "other_node_" + randomInt(nonLocalNodesCount)));
taskId++; taskId++;
if (added == false && randomBoolean()) { if (added == false && randomBoolean()) {
added = true; added = true;
tasks.add(new PersistentTaskInProgress<>(taskId, "test", new TestRequest("this_param"), "this_node")); tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test", new TestRequest("this_param"), "this_node"));
taskId++; taskId++;
} }
} }
@ -302,38 +304,33 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
private <Request extends PersistentActionRequest> ClusterState addTask(ClusterState state, String action, Request request, private <Request extends PersistentActionRequest> ClusterState addTask(ClusterState state, String action, Request request,
String node) { String node) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries()); Map<Long, PersistentTaskInProgress<?>> tasks = prevTasks == null ? new HashMap<>() : new HashMap<>(prevTasks.taskMap());
tasks.add(new PersistentTaskInProgress<>(prevTasks == null ? 0 : prevTasks.getCurrentId(), action, request, node)); long id = prevTasks == null ? 0 : prevTasks.getCurrentId();
tasks.put(id, new PersistentTaskInProgress<>(id, action, request, node));
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build(); new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build();
} }
private ClusterState reallocateTask(ClusterState state, long taskId, String node) { private ClusterState reallocateTask(ClusterState state, long taskId, String node) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries()); assertNotNull(prevTasks);
for (int i = 0; i < tasks.size(); i++) { Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>(prevTasks.taskMap());
if (tasks.get(i).getId() == taskId) { PersistentTaskInProgress<?> prevTask = tasks.get(taskId);
tasks.set(i, new PersistentTaskInProgress<>(tasks.get(i), node)); assertNotNull(prevTask);
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, tasks.put(prevTask.getId(), new PersistentTaskInProgress<>(prevTask, node));
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build(); return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
} new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build();
}
fail("didn't find task with id " + taskId);
return null;
} }
private ClusterState removeTask(ClusterState state, long taskId) { private ClusterState removeTask(ClusterState state, long taskId) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries()); assertNotNull(prevTasks);
for (int i = 0; i < tasks.size(); i++) { Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>(prevTasks.taskMap());
if (tasks.get(i).getId() == taskId) { PersistentTaskInProgress<?> prevTask = tasks.get(taskId);
tasks.remove(i); assertNotNull(prevTask);
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, tasks.remove(prevTask.getId());
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build(); return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
} new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build();
}
fail("didn't find task with id " + taskId);
return null;
} }
private class Execution { private class Execution {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestTasksRequestBuilder; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestTasksRequestBuilder;
import org.junit.After;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -66,6 +67,11 @@ public class PersistentActionIT extends ESIntegTestCase {
.build(); .build();
} }
@After
public void cleanup() throws Exception {
assertNoRunningTasks();
}
public void testPersistentActionRestart() throws Exception { public void testPersistentActionRestart() throws Exception {
long taskId = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get().getTaskId(); long taskId = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get().getTaskId();
assertBusy(() -> { assertBusy(() -> {
@ -133,8 +139,6 @@ public class PersistentActionIT extends ESIntegTestCase {
.get().getTasks().size(), equalTo(1)); .get().getTasks().size(), equalTo(1));
} }
assertNoRunningTasks();
} }
public void testPersistentActionWithNoAvailableNode() throws Exception { public void testPersistentActionWithNoAvailableNode() throws Exception {
@ -179,8 +183,8 @@ public class PersistentActionIT extends ESIntegTestCase {
.get().getTasks().get(0); .get().getTasks().get(0);
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress.entries().size(), equalTo(1)); assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.entries().get(0).getStatus(), nullValue()); assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue());
int numberOfUpdates = randomIntBetween(1, 10); int numberOfUpdates = randomIntBetween(1, 10);
for (int i = 0; i < numberOfUpdates; i++) { for (int i = 0; i < numberOfUpdates; i++) {
@ -192,9 +196,9 @@ public class PersistentActionIT extends ESIntegTestCase {
int finalI = i; int finalI = i;
assertBusy(() -> { assertBusy(() -> {
PersistentTasksInProgress tasks = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress tasks = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE);
assertThat(tasks.entries().size(), equalTo(1)); assertThat(tasks.tasks().size(), equalTo(1));
assertThat(tasks.entries().get(0).getStatus(), notNullValue()); assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue());
assertThat(tasks.entries().get(0).getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}")); assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
}); });
} }
@ -203,8 +207,6 @@ public class PersistentActionIT extends ESIntegTestCase {
// Complete the running task and make sure it finishes properly // Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1)); .get().getTasks().size(), equalTo(1));
assertNoRunningTasks();
} }
private void assertNoRunningTasks() throws Exception { private void assertNoRunningTasks() throws Exception {
@ -217,7 +219,7 @@ public class PersistentActionIT extends ESIntegTestCase {
// Make sure the task is removed from the cluster state // Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE)) assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE))
.entries(), empty()); .tasks(), empty());
}); });
} }

View File

@ -27,16 +27,16 @@ import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInPr
import org.elasticsearch.persistent.TestPersistentActionPlugin.Status; import org.elasticsearch.persistent.TestPersistentActionPlugin.Status;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.HashMap;
import java.util.Map;
public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase<PersistentTasksInProgress> { public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase<PersistentTasksInProgress> {
@Override @Override
protected PersistentTasksInProgress createTestInstance() { protected PersistentTasksInProgress createTestInstance() {
int numberOfTasks = randomInt(10); int numberOfTasks = randomInt(10);
List<PersistentTaskInProgress<?>> entries = new ArrayList<>(); Map<Long, PersistentTaskInProgress<?>> entries = new HashMap<>();
for (int i = 0; i < numberOfTasks; i++) { for (int i = 0; i < numberOfTasks; i++) {
PersistentTaskInProgress<?> taskInProgress = new PersistentTaskInProgress<>( PersistentTaskInProgress<?> taskInProgress = new PersistentTaskInProgress<>(
randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)), randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)),
@ -45,7 +45,7 @@ public class PersistentTasksInProgressTests extends AbstractWireSerializingTestC
// From time to time update status // From time to time update status
taskInProgress = new PersistentTaskInProgress<>(taskInProgress, new Status(randomAsciiOfLength(10))); taskInProgress = new PersistentTaskInProgress<>(taskInProgress, new Status(randomAsciiOfLength(10)));
} }
entries.add(taskInProgress); entries.put(taskInProgress.getId(), taskInProgress);
} }
return new PersistentTasksInProgress(randomLong(), entries); return new PersistentTasksInProgress(randomLong(), entries);
} }