Replace List with Map in PersistentTasksInProgress

Store currently running persistent tasks in a map instead of a list.

Original commit: elastic/x-pack-elasticsearch@f383b0bbed
This commit is contained in:
Igor Motov 2017-02-03 15:20:44 -05:00
parent aa86d57487
commit a0b37a2510
11 changed files with 152 additions and 134 deletions

View File

@ -269,7 +269,7 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
if (tasksInProgress != null) {
Predicate<PersistentTaskInProgress<?>> predicate = ALL.equals(request.getDatafeedId()) ? p -> true :
p -> request.getDatafeedId().equals(((StartDatafeedAction.Request) p.getRequest()).getDatafeedId());
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.findEntries(StartDatafeedAction.NAME, predicate)) {
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.findTasks(StartDatafeedAction.NAME, predicate)) {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) taskInProgress.getRequest();
states.put(storedRequest.getDatafeedId(), DatafeedState.STARTED);
}

View File

@ -251,7 +251,7 @@ public class StartDatafeedAction
Request storedRequest = (Request) taskInProgress.getRequest();
return storedRequest.getDatafeedId().equals(request.getDatafeedId());
};
if (persistentTasksInProgress.entriesExist(NAME, predicate)) {
if (persistentTasksInProgress.tasksExist(NAME, predicate)) {
throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED);
}

View File

@ -148,7 +148,7 @@ public class StopDatafeedAction
PersistentTasksInProgress tasksInProgress = state.custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress != null) {
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.findEntries(StartDatafeedAction.NAME, p -> true)) {
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.findTasks(StartDatafeedAction.NAME, p -> true)) {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) taskInProgress.getRequest();
if (storedRequest.getDatafeedId().equals(datafeedId)) {
RemovePersistentTaskAction.Request cancelTasksRequest = new RemovePersistentTaskAction.Request();

View File

@ -327,7 +327,7 @@ public class MlMetadata implements MetaData.Custom {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
if (persistentTasksInProgress.entriesExist(StartDatafeedAction.NAME, predicate)) {
if (persistentTasksInProgress.tasksExist(StartDatafeedAction.NAME, predicate)) {
String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId,
DatafeedState.STARTED);
throw ExceptionsHelper.conflictStatusException(msg);

View File

@ -69,7 +69,7 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
String localNodeId = event.state().getNodes().getLocalNodeId();
Set<PersistentTaskId> notVisitedTasks = new HashSet<>(runningTasks.keySet());
if (tasks != null) {
for (PersistentTaskInProgress<?> taskInProgress : tasks.entries()) {
for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
if (localNodeId.equals(taskInProgress.getExecutorNode())) {
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId());
RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId);

View File

@ -16,11 +16,12 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.util.ArrayList;
import java.util.List;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -55,17 +56,13 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
public ClusterState execute(ClusterState currentState) throws Exception {
final String executorNodeId = executorNode(action, currentState, request);
PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE);
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
final long nextId;
long nextId;
if (tasksInProgress != null) {
nextId = tasksInProgress.getCurrentId() + 1;
currentTasks.addAll(tasksInProgress.entries());
} else {
nextId = 1;
}
currentTasks.add(new PersistentTaskInProgress<>(nextId, action, request, executorNodeId));
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(nextId, currentTasks)).build();
return createPersistentTask(currentState, new PersistentTaskInProgress<>(nextId, action, request, executorNodeId));
}
@Override
@ -104,23 +101,18 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
// Nothing to do, the task was already deleted
return currentState;
}
boolean found = false;
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) {
if (taskInProgress.getId() == id) {
assert found == false;
found = true;
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
String executorNode = executorNode(taskInProgress.getAction(), currentState, taskInProgress.getRequest());
currentTasks.add(new PersistentTaskInProgress<>(taskInProgress, executorNode));
}
} else {
currentTasks.add(taskInProgress);
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
PersistentTaskInProgress<?> taskInProgress = tasksInProgress.getTask(id);
if (taskInProgress != null) {
String executorNode = executorNode(taskInProgress.getAction(), currentState, taskInProgress.getRequest());
return updatePersistentTask(currentState, new PersistentTaskInProgress<>(taskInProgress, executorNode));
}
return currentState;
} else {
return removePersistentTask(currentState, id);
}
return rebuildClusterStateIfNeeded(found, currentState, currentTasks);
}
@Override
@ -151,19 +143,11 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
// Nothing to do, the task no longer exists
return currentState;
}
boolean found = false;
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) {
if (taskInProgress.getId() == id) {
assert found == false;
found = true;
currentTasks.add(new PersistentTaskInProgress<>(taskInProgress, status));
} else {
currentTasks.add(taskInProgress);
}
PersistentTaskInProgress<?> task = tasksInProgress.getTask(id);
if (task != null) {
return updatePersistentTask(currentState, new PersistentTaskInProgress<>(task, status));
}
return rebuildClusterStateIfNeeded(found, currentState, currentTasks);
return currentState;
}
@Override
@ -178,14 +162,40 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
});
}
private ClusterState rebuildClusterStateIfNeeded(boolean rebuild, ClusterState oldState,
List<PersistentTaskInProgress<?>> currentTasks) {
if (rebuild) {
private ClusterState updatePersistentTask(ClusterState oldState, PersistentTaskInProgress<?> newTask) {
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.putAll(oldTasks.taskMap());
taskMap.put(newTask.getId(), newTask);
ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
}
private ClusterState createPersistentTask(ClusterState oldState, PersistentTaskInProgress<?> newTask) {
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
if (oldTasks != null) {
taskMap.putAll(oldTasks.taskMap());
}
taskMap.put(newTask.getId(), newTask);
ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(newTask.getId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
}
private ClusterState removePersistentTask(ClusterState oldState, long taskId) {
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
if (oldTasks != null) {
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), currentTasks);
return builder.putCustom(PersistentTasksInProgress.TYPE, tasks).build();
taskMap.putAll(oldTasks.taskMap());
taskMap.remove(taskId);
PersistentTasksInProgress newTasks =
new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap));
return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build();
} else {
// no tasks - nothing to do
return oldState;
}
}
@ -213,7 +223,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
// We need to check if removed nodes were running any of the tasks and reassign them
boolean reassignmentRequired = false;
Set<String> removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
for (PersistentTaskInProgress<?> taskInProgress : tasks.entries()) {
for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
if (taskInProgress.getExecutorNode() == null) {
// there is an unassigned task - we need to try assigning it
reassignmentRequired = true;
@ -244,22 +254,12 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
DiscoveryNodes nodes = currentState.nodes();
if (tasks != null) {
// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTaskInProgress<?> task : tasks.entries()) {
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false) {
// there is an unassigned task - we need to try assigning it
String executorNode = executorNode(task.getAction(), currentState, task.getRequest());
if (Objects.equals(executorNode, task.getExecutorNode()) == false) {
PersistentTasksInProgress tasksInProgress = newClusterState.custom(PersistentTasksInProgress.TYPE);
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) {
if (task.getId() == taskInProgress.getId()) {
currentTasks.add(new PersistentTaskInProgress<>(task, executorNode));
} else {
currentTasks.add(taskInProgress);
}
}
newClusterState = ClusterState.builder(newClusterState).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(tasksInProgress.getCurrentId(), currentTasks)).build();
newClusterState = updatePersistentTask(newClusterState, new PersistentTaskInProgress<>(task, executorNode));
}
}
}

View File

@ -20,7 +20,7 @@ import org.elasticsearch.tasks.Task.Status;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -31,29 +31,37 @@ import java.util.stream.Collectors;
public final class PersistentTasksInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
public static final String TYPE = "persistent_tasks";
// TODO: Implement custom Diff for entries
private final List<PersistentTaskInProgress<?>> entries;
// TODO: Implement custom Diff for tasks
private final Map<Long, PersistentTaskInProgress<?>> tasks;
private final long currentId;
public PersistentTasksInProgress(long currentId, List<PersistentTaskInProgress<?>> entries) {
public PersistentTasksInProgress(long currentId, Map<Long, PersistentTaskInProgress<?>> tasks) {
this.currentId = currentId;
this.entries = entries;
this.tasks = tasks;
}
public List<PersistentTaskInProgress<?>> entries() {
return this.entries;
public Collection<PersistentTaskInProgress<?>> tasks() {
return this.tasks.values();
}
public Collection<PersistentTaskInProgress<?>> findEntries(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
return this.entries().stream()
public Map<Long, PersistentTaskInProgress<?>> taskMap() {
return this.tasks;
}
public PersistentTaskInProgress<?> getTask(long id) {
return this.tasks.get(id);
}
public Collection<PersistentTaskInProgress<?>> findTasks(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
return this.tasks().stream()
.filter(p -> actionName.equals(p.getAction()))
.filter(predicate)
.collect(Collectors.toList());
}
public boolean entriesExist(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
return this.entries().stream()
public boolean tasksExist(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
return this.tasks().stream()
.filter(p -> actionName.equals(p.getAction()))
.anyMatch(predicate);
}
@ -64,16 +72,16 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
if (o == null || getClass() != o.getClass()) return false;
PersistentTasksInProgress that = (PersistentTasksInProgress) o;
return currentId == that.currentId &&
Objects.equals(entries, that.entries);
Objects.equals(tasks, that.tasks);
}
@Override
public int hashCode() {
return Objects.hash(entries, currentId);
return Objects.hash(tasks, currentId);
}
public long getNumberOfTasksOnNode(String nodeId, String action) {
return entries.stream().filter(task -> action.equals(task.action) && nodeId.equals(task.executorNode)).count();
return tasks.values().stream().filter(task -> action.equals(task.action) && nodeId.equals(task.executorNode)).count();
}
@Override
@ -84,7 +92,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
/**
* A record that represents a single running persistent task
*/
public static class PersistentTaskInProgress<Request extends PersistentActionRequest> implements Writeable {
public static class PersistentTaskInProgress<Request extends PersistentActionRequest> implements Writeable, ToXContent {
private final long id;
private final long allocationId;
private final String action;
@ -183,6 +191,28 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
public Status getStatus() {
return status;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field("uuid", id);
builder.field("action", action);
builder.field("request");
request.toXContent(builder, params);
if (status != null) {
builder.field("status", status, params);
}
builder.field("executor_node", executorNode);
}
builder.endObject();
return builder;
}
@Override
public boolean isFragment() {
return false;
}
}
@Override
@ -192,13 +222,15 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
public PersistentTasksInProgress(StreamInput in) throws IOException {
currentId = in.readLong();
entries = in.readList(PersistentTaskInProgress::new);
tasks = in.readMap(StreamInput::readLong, PersistentTaskInProgress::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(currentId);
out.writeList(entries);
out.writeMap(tasks, StreamOutput::writeLong, (stream, value) -> {
value.writeTo(stream);
});
}
public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
@ -214,25 +246,11 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("current_id", currentId);
builder.startArray("running_tasks");
for (PersistentTaskInProgress<?> entry : entries) {
toXContent(entry, builder, params);
for (PersistentTaskInProgress<?> entry : tasks.values()) {
entry.toXContent(builder, params);
}
builder.endArray();
return builder;
}
public void toXContent(PersistentTaskInProgress<?> entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
{
builder.field("uuid", entry.id);
builder.field("action", entry.action);
builder.field("request");
entry.request.toXContent(builder, params);
if (entry.status != null) {
builder.field("status", entry.status, params);
}
builder.field("executor_node", entry.executorNode);
}
builder.endObject();
}
}

View File

@ -196,7 +196,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
builder = new MlMetadata.Builder(result);
builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyList()));
builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyMap()));
result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getAllocations().get("foo").getState(), equalTo(JobState.CLOSED));
@ -262,7 +262,8 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTasksInProgress.PersistentTaskInProgress<StartDatafeedAction.Request> taskInProgress =
new PersistentTasksInProgress.PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, null);
PersistentTasksInProgress tasksInProgress = new PersistentTasksInProgress(1, Collections.singletonList(taskInProgress));
PersistentTasksInProgress tasksInProgress =
new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,

View File

@ -27,7 +27,9 @@ import org.elasticsearch.transport.TransportResponse.Empty;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -71,17 +73,17 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY))
.build();
List<PersistentTaskInProgress<?>> tasks = new ArrayList<>();
Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>();
long taskId = randomLong();
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.add(new PersistentTaskInProgress<>(taskId, "test_action", new TestRequest("other_" + i),
tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test_action", new TestRequest("other_" + i),
"other_node_" + randomInt(nonLocalNodesCount)));
taskId++;
if (added == false && randomBoolean()) {
added = true;
tasks.add(new PersistentTaskInProgress<>(taskId, "test", new TestRequest("this_param"), "this_node"));
tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test", new TestRequest("this_param"), "this_node"));
taskId++;
}
}
@ -288,38 +290,33 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
private <Request extends PersistentActionRequest> ClusterState addTask(ClusterState state, String action, Request request,
String node) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries());
tasks.add(new PersistentTaskInProgress<>(prevTasks == null ? 0 : prevTasks.getCurrentId(), action, request, node));
Map<Long, PersistentTaskInProgress<?>> tasks = prevTasks == null ? new HashMap<>() : new HashMap<>(prevTasks.taskMap());
long id = prevTasks == null ? 0 : prevTasks.getCurrentId();
tasks.put(id, new PersistentTaskInProgress<>(id, action, request, node));
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build();
}
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries());
for (int i = 0; i < tasks.size(); i++) {
if (tasks.get(i).getId() == taskId) {
tasks.set(i, new PersistentTaskInProgress<>(tasks.get(i), node));
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build();
}
}
fail("didn't find task with id " + taskId);
return null;
assertNotNull(prevTasks);
Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>(prevTasks.taskMap());
PersistentTaskInProgress<?> prevTask = tasks.get(taskId);
assertNotNull(prevTask);
tasks.put(prevTask.getId(), new PersistentTaskInProgress<>(prevTask, node));
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build();
}
private ClusterState removeTask(ClusterState state, long taskId) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries());
for (int i = 0; i < tasks.size(); i++) {
if (tasks.get(i).getId() == taskId) {
tasks.remove(i);
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build();
}
}
fail("didn't find task with id " + taskId);
return null;
assertNotNull(prevTasks);
Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>(prevTasks.taskMap());
PersistentTaskInProgress<?> prevTask = tasks.get(taskId);
assertNotNull(prevTask);
tasks.remove(prevTask.getId());
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build();
}
private class Execution {

View File

@ -11,6 +11,7 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestTasksRequestBuilder;
import org.junit.After;
import java.util.Collection;
import java.util.Collections;
@ -52,6 +53,11 @@ public class PersistentActionIT extends ESIntegTestCase {
.build();
}
@After
public void cleanup() throws Exception {
assertNoRunningTasks();
}
public void testPersistentActionRestart() throws Exception {
long taskId = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get().getTaskId();
assertBusy(() -> {
@ -119,8 +125,6 @@ public class PersistentActionIT extends ESIntegTestCase {
.get().getTasks().size(), equalTo(1));
}
assertNoRunningTasks();
}
public void testPersistentActionWithNoAvailableNode() throws Exception {
@ -165,8 +169,8 @@ public class PersistentActionIT extends ESIntegTestCase {
.get().getTasks().get(0);
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress.entries().size(), equalTo(1));
assertThat(tasksInProgress.entries().get(0).getStatus(), nullValue());
assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue());
int numberOfUpdates = randomIntBetween(1, 10);
for (int i = 0; i < numberOfUpdates; i++) {
@ -178,9 +182,9 @@ public class PersistentActionIT extends ESIntegTestCase {
int finalI = i;
assertBusy(() -> {
PersistentTasksInProgress tasks = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE);
assertThat(tasks.entries().size(), equalTo(1));
assertThat(tasks.entries().get(0).getStatus(), notNullValue());
assertThat(tasks.entries().get(0).getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
assertThat(tasks.tasks().size(), equalTo(1));
assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue());
assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
});
}
@ -189,8 +193,6 @@ public class PersistentActionIT extends ESIntegTestCase {
// Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
assertNoRunningTasks();
}
private void assertNoRunningTasks() throws Exception {
@ -203,7 +205,7 @@ public class PersistentActionIT extends ESIntegTestCase {
// Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE))
.entries(), empty());
.tasks(), empty());
});
}

View File

@ -14,16 +14,16 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.Status;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase<PersistentTasksInProgress> {
@Override
protected PersistentTasksInProgress createTestInstance() {
int numberOfTasks = randomInt(10);
List<PersistentTaskInProgress<?>> entries = new ArrayList<>();
Map<Long, PersistentTaskInProgress<?>> entries = new HashMap<>();
for (int i = 0; i < numberOfTasks; i++) {
PersistentTaskInProgress<?> taskInProgress = new PersistentTaskInProgress<>(
randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)),
@ -32,7 +32,7 @@ public class PersistentTasksInProgressTests extends AbstractWireSerializingTestC
// From time to time update status
taskInProgress = new PersistentTaskInProgress<>(taskInProgress, new Status(randomAsciiOfLength(10)));
}
entries.add(taskInProgress);
entries.put(taskInProgress.getId(), taskInProgress);
}
return new PersistentTasksInProgress(randomLong(), entries);
}