Persistent Tasks: Remove unused stopped and removeOnCompletion flags (elastic/x-pack-elasticsearch#853)

The stopped and removeOnCompletion flags are not currently used, this commit removes them for now to simplify things.

Original commit: elastic/x-pack-elasticsearch@c636c2817e
This commit is contained in:
Igor Motov 2017-03-29 12:54:44 -04:00 committed by GitHub
parent 0d202d3d36
commit 4bb9e00dbf
14 changed files with 99 additions and 330 deletions

View File

@ -60,10 +60,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
private PersistentTaskRequest request;
private boolean stopped;
private boolean removeOnCompletion = true;
public Request() {
}
@ -71,8 +67,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
public Request(String action, PersistentTaskRequest request) {
this.action = action;
this.request = request;
this.stopped = false;
this.removeOnCompletion = true;
}
@Override
@ -80,8 +74,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super.readFrom(in);
action = in.readString();
request = in.readNamedWriteable(PersistentTaskRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
}
@Override
@ -89,8 +81,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super.writeTo(out);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
}
@Override
@ -111,14 +101,12 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request) &&
removeOnCompletion == request1.removeOnCompletion &&
stopped == request1.stopped;
Objects.equals(request, request1.request);
}
@Override
public int hashCode() {
return Objects.hash(action, request, removeOnCompletion, stopped);
return Objects.hash(action, request);
}
public String getAction() {
@ -137,21 +125,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
this.request = request;
}
public boolean isStopped() {
return stopped;
}
public void setStopped(boolean stopped) {
this.stopped = stopped;
}
public boolean shouldRemoveOnCompletion() {
return removeOnCompletion;
}
public void setRemoveOnCompletion(boolean removeOnCompletion) {
this.removeOnCompletion = removeOnCompletion;
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CreatePersistentTaskAction.Request,
@ -171,21 +144,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
return this;
}
/**
* Indicates if the persistent task should be created in the stopped state. Defaults to false.
*/
public RequestBuilder setStopped(boolean stopped) {
request.setStopped(stopped);
return this;
}
/**
* Indicates if the persistent task record should be removed upon the first successful completion of the task. Defaults to true.
*/
public RequestBuilder setRemoveOnCompletion(boolean removeOnCompletion) {
request.setRemoveOnCompletion(removeOnCompletion);
return this;
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
@ -226,7 +184,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion,
persistentTasksClusterService.createPersistentTask(request.action, request.request,
new ActionListener<Long>() {
@Override
public void onResponse(Long newTaskId) {

View File

@ -47,21 +47,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param request request
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request, boolean stopped,
boolean removeOnCompletion,
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request,
ActionListener<Long> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
validate(action, clusterService.state(), request);
final Assignment assignment;
if (stopped) {
// the task is stopped no need to assign it anywhere
assignment = PersistentTasksCustomMetaData.FINISHED_TASK_ASSIGNMENT;
} else {
assignment = getAssignement(action, currentState, request);
}
return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, assignment));
assignment = getAssignement(action, currentState, request);
return update(currentState, builder(currentState).addTask(action, request, assignment));
}
@Override
@ -305,11 +299,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
logger.trace("ignoring task {} because assignment is the same {}", task.getId(), assignment);
}
} else {
if (task.isStopped()) {
logger.trace("ignoring task {} because it is stopped", task.getId());
} else {
logger.trace("ignoring task {} because it is still running", task.getId());
}
logger.trace("ignoring task {} because it is still running", task.getId());
}
}
}

View File

@ -84,8 +84,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion"));
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskRequest> taskBuilder, List<PersistentTaskRequest> objects) -> {
if (objects.size() != 1) {
@ -227,8 +225,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private final long allocationId;
private final String taskName;
private final Request request;
private final boolean stopped;
private final boolean removeOnCompletion;
@Nullable
private final Status status;
private final Assignment assignment;
@ -236,29 +232,27 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private final Long allocationIdOnLastStatusUpdate;
public PersistentTask(long id, String taskName, Request request, boolean stopped, boolean removeOnCompletion, Assignment assignment) {
this(id, 0L, taskName, request, stopped, removeOnCompletion, null, assignment, null);
public PersistentTask(long id, String taskName, Request request, Assignment assignment) {
this(id, 0L, taskName, request, null, assignment, null);
}
public PersistentTask(PersistentTask<Request> task, boolean stopped, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.taskName, task.request, stopped, task.removeOnCompletion, task.status,
public PersistentTask(PersistentTask<Request> task, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.taskName, task.request, task.status,
assignment, task.allocationId);
}
public PersistentTask(PersistentTask<Request> task, Status status) {
this(task.id, task.allocationId, task.taskName, task.request, task.stopped, task.removeOnCompletion, status,
this(task.id, task.allocationId, task.taskName, task.request, status,
task.assignment, task.allocationId);
}
private PersistentTask(long id, long allocationId, String taskName, Request request, boolean stopped, boolean removeOnCompletion,
private PersistentTask(long id, long allocationId, String taskName, Request request,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.taskName = taskName;
this.request = request;
this.status = status;
this.stopped = stopped;
this.removeOnCompletion = removeOnCompletion;
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
@ -271,8 +265,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
allocationId = in.readLong();
taskName = in.readString();
request = (Request) in.readNamedWriteable(PersistentTaskRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
status = in.readOptionalNamedWriteable(Task.Status.class);
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
@ -284,8 +276,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
out.writeLong(allocationId);
out.writeString(taskName);
out.writeNamedWriteable(request);
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
@ -301,8 +291,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
allocationId == that.allocationId &&
Objects.equals(taskName, that.taskName) &&
Objects.equals(request, that.request) &&
stopped == that.stopped &&
removeOnCompletion == that.removeOnCompletion &&
Objects.equals(status, that.status) &&
Objects.equals(assignment, that.assignment) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
@ -310,7 +298,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public int hashCode() {
return Objects.hash(id, allocationId, taskName, request, stopped, removeOnCompletion, status, assignment,
return Objects.hash(id, allocationId, taskName, request, status, assignment,
allocationIdOnLastStatusUpdate);
}
@ -352,7 +340,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* Returns true if the tasks is not stopped and unassigned or assigned to a non-existing node.
*/
public boolean needsReassignment(DiscoveryNodes nodes) {
return isStopped() == false && (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
}
@Nullable
@ -360,14 +348,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return status;
}
public boolean isStopped() {
return stopped;
}
public boolean shouldRemoveOnCompletion() {
return removeOnCompletion;
}
/**
* @return Whether the task status isn't stale. When a task gets unassigned from the executor node or assigned
* to a new executor node and the status hasn't been updated then the task status is stale.
@ -408,8 +388,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
builder.field("allocation_id_on_last_status_update", allocationIdOnLastStatusUpdate);
}
}
builder.field("stopped", stopped);
builder.field("remove_on_completion", removeOnCompletion);
}
builder.endObject();
return builder;
@ -426,8 +404,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private long allocationId;
private String taskName;
private Request request;
private boolean stopped = true;
private boolean removeOnCompletion;
private Status status;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
@ -458,16 +434,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
public TaskBuilder<Request> setStopped(boolean stopped) {
this.stopped = stopped;
return this;
}
public TaskBuilder<Request> setRemoveOnCompletion(boolean removeOnCompletion) {
this.removeOnCompletion = removeOnCompletion;
return this;
}
public TaskBuilder<Request> setAssignment(Assignment assignment) {
this.assignment = assignment;
return this;
@ -479,7 +445,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
public PersistentTask<Request> build() {
return new PersistentTask<>(id, allocationId, taskName, request, stopped, removeOnCompletion, status,
return new PersistentTask<>(id, allocationId, taskName, request, status,
assignment, allocationIdOnLastStatusUpdate);
}
}
@ -565,11 +531,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* <p>
* After the task is added its id can be found by calling {{@link #getCurrentId()}} method.
*/
public <Request extends PersistentTaskRequest> Builder addTask(String taskName, Request request, boolean stopped,
boolean removeOnCompletion, Assignment assignment) {
public <Request extends PersistentTaskRequest> Builder addTask(String taskName, Request request, Assignment assignment) {
changed = true;
currentId++;
tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, stopped, removeOnCompletion, assignment));
tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, assignment));
return this;
}
@ -580,7 +545,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
}
return this;
}
@ -596,9 +561,9 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks
Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request);
if (assignment.isAssigned() || taskInProgress.isStopped()) {
if (assignment.isAssigned()) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
}
}
return this;
@ -636,11 +601,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
if (taskInProgress.removeOnCompletion) {
tasks.remove(taskId);
} else {
tasks.put(taskId, new PersistentTask<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT));
}
tasks.remove(taskId);
}
return this;
}

View File

@ -29,27 +29,14 @@ public class PersistentTasksService extends AbstractComponent {
this.clusterService = clusterService;
}
/**
* Creates the specified persistent action and tries to start it immediately, upon completion the task is
* removed from the cluster state
*/
public <Request extends PersistentTaskRequest> void createPersistentActionTask(String action, Request request,
PersistentTaskOperationListener listener) {
createPersistentActionTask(action, request, false, true, listener);
}
/**
* Creates the specified persistent action. The action is started unless the stopped parameter is equal to true.
* If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion.
* Otherwise it will remain there in the stopped state.
*/
public <Request extends PersistentTaskRequest> void createPersistentActionTask(String action, Request request,
boolean stopped,
boolean removeOnCompletion,
PersistentTaskOperationListener listener) {
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request);
createPersistentActionRequest.setStopped(stopped);
createPersistentActionRequest.setRemoveOnCompletion(removeOnCompletion);
try {
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse(o.getTaskId()), listener::onFailure));

View File

@ -270,7 +270,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
PersistentTask<StartDatafeedAction.Request> taskInProgress =
new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
new PersistentTask<>(0, StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasksInProgress =
new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
@ -332,7 +332,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTask<StartDatafeedAction.Request> taskInProgress =
new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
new PersistentTask<>(0, StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasksInProgress =
new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));

View File

@ -104,7 +104,7 @@ public class CloseJobActionTests extends ESTestCase {
public static PersistentTask<StartDatafeedAction.Request> createDatafeedTask(long id, String datafeedId, long startTime,
String nodeId, DatafeedState datafeedState) {
PersistentTask<StartDatafeedAction.Request> task =
new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime), false, true,
new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, datafeedState);
return task;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -118,12 +119,9 @@ public class OpenJobActionTests extends ESTestCase {
.build();
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
new Assignment("_node_id1", "test assignment")));
taskMap.put(1L, new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true,
new Assignment("_node_id1", "test assignment")));
taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true,
new Assignment("_node_id2", "test assignment")));
taskMap.put(0L, new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), new Assignment("_node_id1", "test assignment")));
taskMap.put(1L, new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), new Assignment("_node_id1", "test assignment")));
taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), new Assignment("_node_id2", "test assignment")));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
@ -180,7 +178,7 @@ public class OpenJobActionTests extends ESTestCase {
.build();
PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"),
new Assignment("_node_id1", "test assignment"));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
@ -241,7 +239,7 @@ public class OpenJobActionTests extends ESTestCase {
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTask<>(lastTask, false, new Assignment("_node_id3", "test assignment")));
taskMap.put(5L, new PersistentTask<>(lastTask, new Assignment("_node_id3", "test assignment")));
tasks = new PersistentTasksCustomMetaData(6L, taskMap);
csBuilder = ClusterState.builder(cs);
@ -251,7 +249,7 @@ public class OpenJobActionTests extends ESTestCase {
assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTask<>(lastTask, null));
taskMap.put(5L, new PersistentTask<>(lastTask, (Task.Status) null));
tasks = new PersistentTasksCustomMetaData(6L, taskMap);
csBuilder = ClusterState.builder(cs);
@ -300,8 +298,7 @@ public class OpenJobActionTests extends ESTestCase {
public static PersistentTask<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true,
new Assignment(nodeId, "test assignment"));
new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), new Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, jobState);
return task;
}

View File

@ -126,8 +126,7 @@ public class StartDatafeedActionTests extends ESTestCase {
.putJob(job1, false)
.build();
PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true,
INITIAL_ASSIGNMENT);
new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task));
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
@ -153,7 +152,7 @@ public class StartDatafeedActionTests extends ESTestCase {
PersistentTask<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED);
PersistentTask<StartDatafeedAction.Request> datafeedTask =
new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, new Assignment("node_id", "test assignment"));
new Assignment("node_id", "test assignment"));
datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask);

View File

@ -48,7 +48,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
public void testValidate() {
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
new StartDatafeedAction.Request("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
@ -69,7 +69,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
PersistentTasksCustomMetaData tasks;
if (randomBoolean()) {
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
new StartDatafeedAction.Request("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STOPPED);
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
} else {

View File

@ -78,7 +78,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(2, 40);
for (int i = 0; i < numberOfTasks; i++) {
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false);
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits");
}
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
@ -99,21 +99,17 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(0, 40);
for (int i = 0; i < numberOfTasks; i++) {
switch (randomInt(3)) {
switch (randomInt(2)) {
case 0:
// add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned
addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits", false);
addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits");
break;
case 1:
// add a task assigned to non-existing node that should not get assigned
addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits", false);
addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits");
break;
case 2:
// add a stopped task assigned to non-existing node that should not get assigned
addTask(tasks, "should_not_assign", "fail_me_if_called", null, true);
break;
case 3:
addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits", false);
addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits");
break;
}
@ -131,39 +127,34 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
int assignOneCount = 0;
for (PersistentTask<?> task : tasksInProgress.tasks()) {
if (task.isStopped()) {
assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue());
assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getTaskName()));
} else {
// explanation should correspond to the action name
switch (task.getTaskName()) {
case "should_assign":
assertThat(task.getExecutorNode(), notNullValue());
assertThat(task.isAssigned(), equalTo(true));
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
logger.info(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE).toString());
}
assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(),
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
// explanation should correspond to the action name
switch (task.getTaskName()) {
case "should_assign":
assertThat(task.getExecutorNode(), notNullValue());
assertThat(task.isAssigned(), equalTo(true));
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
logger.info(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE).toString());
}
assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(),
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
break;
case "should_not_assign":
assertThat(task.getExecutorNode(), nullValue());
assertThat(task.isAssigned(), equalTo(false));
assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment"));
break;
case "assign_one":
if (task.isAssigned()) {
assignOneCount++;
assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
break;
case "should_not_assign":
assertThat(task.getExecutorNode(), nullValue());
assertThat(task.isAssigned(), equalTo(false));
assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment"));
break;
case "assign_one":
if (task.isAssigned()) {
assignOneCount++;
assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
} else {
assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time"));
}
break;
default:
fail("Unknown action " + task.getTaskName());
}
} else {
assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time"));
}
break;
default:
fail("Unknown action " + task.getTaskName());
}
}
}
@ -204,8 +195,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
DiscoveryNodes nodes = clusterState.nodes();
PersistentTasksCustomMetaData tasksInProgress = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasksInProgress.findTasks("assign_one",
task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
if (tasksInProgress.findTasks("assign_one", task -> nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
return randomNodeAssignment(clusterState.nodes());
} else {
return new Assignment(null, "only one task can be assigned at a time");
@ -255,13 +245,12 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
// we don't have any unassigned tasks - add some
if (randomBoolean()) {
logger.info("added random task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), null,
false);
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), null);
tasksOrNodesChanged = true;
} else {
logger.info("added unassignable task with custom assignment message");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks),
new Assignment(null, "change me"), "never_assign", false);
new Assignment(null, "change me"), "never_assign");
tasksOrNodesChanged = true;
}
}
@ -318,23 +307,16 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
}
if (randomBoolean()) {
logger.info("added random unassignable task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign", false);
return builder.build();
}
if (randomBoolean()) {
// add unassigned task in stopped state
logger.info("added random stopped task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, null, true);
return builder.build();
} else {
logger.info("changed routing table");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build());
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
changeRoutingTable(metaData, routingTable);
builder.metaData(metaData).routingTable(routingTable.build());
addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign");
return builder.build();
}
logger.info("changed routing table");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build());
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
changeRoutingTable(metaData, routingTable);
builder.metaData(metaData).routingTable(routingTable.build());
return builder.build();
}
}
if (randomBoolean()) {
@ -384,9 +366,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
return false;
}
return tasks.tasks().stream().anyMatch(task -> {
if (task.isStopped()) {
return false;
}
if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) {
return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false;
}
@ -401,20 +380,20 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
String node, boolean stopped) {
String node) {
return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)),
randomAsciiOfLength(10), stopped);
randomAsciiOfLength(10));
}
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
Assignment assignment, String param, boolean stopped) {
Assignment assignment, String param) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build()));
tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), assignment).build()));
}
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node, boolean stopped) {
tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action));
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) {
tasks.addTask(action, new TestRequest(param), new Assignment(node, "explanation: " + action));
}
private DiscoveryNode newNode(String nodeId) {

View File

@ -44,9 +44,8 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
int numberOfTasks = randomInt(10);
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
for (int i = 0; i < numberOfTasks; i++) {
boolean stopped = randomBoolean();
tasks.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)),
stopped, randomBoolean(), stopped ? new Assignment(null, "stopped") : randomAssignment());
randomAssignment());
if (randomBoolean()) {
// From time to time update status
tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10)));
@ -136,9 +135,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
}
private Builder addRandomTask(Builder builder) {
boolean stopped = randomBoolean();
builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(),
stopped ? new Assignment(null, "stopped") : randomAssignment());
builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), randomAssignment());
return builder;
}
@ -183,7 +180,6 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
assertEquals(testTask.getId(), newTask.getId());
assertEquals(testTask.getStatus(), newTask.getStatus());
assertEquals(testTask.getRequest(), newTask.getRequest());
assertEquals(testTask.isStopped(), newTask.isStopped());
// Things that shouldn't be serialized
assertEquals(0, newTask.getAllocationId());
@ -220,16 +216,12 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
if (randomBoolean()) {
// Trying to reassign to the same node
builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment());
// should change if the task was stopped AND unassigned
if (task.getExecutorNode() == null && task.isStopped()) {
changed = true;
}
} else {
// Trying to reassign to a different node
Assignment randomAssignment = randomAssignment();
builder.assignTask(lastKnownTask, (s, request) -> randomAssignment);
// should change if the task was unassigned and was reassigned to a different node or started
if ((task.isAssigned() == false && randomAssignment.isAssigned()) || task.isStopped()) {
if ((task.isAssigned() == false && randomAssignment.isAssigned())) {
changed = true;
}
}

View File

@ -47,35 +47,26 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
long[] taskIds = new long[numberOfTasks];
List<PersistentTaskOperationFuture> futures = new ArrayList<>(numberOfTasks);
boolean[] stopped = new boolean[numberOfTasks];
int runningTasks = 0;
for (int i = 0; i < numberOfTasks; i++) {
stopped[i] = randomBoolean();
if (stopped[i] == false) {
runningTasks++;
}
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
futures.add(future);
service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), stopped[i], true, future);
service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
}
for (int i = 0; i < numberOfTasks; i++) {
taskIds[i] = futures.get(i).get();
}
final int numberOfRunningTasks = runningTasks;
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
if (numberOfRunningTasks > 0) {
// Make sure that at least one of the tasks is running
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), greaterThan(0));
});
}
// Make sure that at least one of the tasks is running
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), greaterThan(0));
});
// Restart cluster
internalCluster().fullRestart();
@ -87,30 +78,6 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
for (int i = 0; i < numberOfTasks; i++) {
PersistentTask<?> task = tasksInProgress.getTask(taskIds[i]);
assertNotNull(task);
assertThat(task.isStopped(), equalTo(stopped[i]));
}
logger.info("Waiting for {} original tasks to start", numberOfRunningTasks);
assertBusy(() -> {
// Wait for the running task to start automatically
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), equalTo(numberOfRunningTasks));
});
// Start all other tasks
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
service = internalCluster().getInstance(PersistentTasksService.class);
for (int i = 0; i < numberOfTasks; i++) {
PersistentTask<?> task = tasksInProgress.getTask(taskIds[i]);
assertNotNull(task);
logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode());
assertThat(task.isStopped(), equalTo(stopped[i]));
assertThat(task.getExecutorNode(), stopped[i] ? nullValue() : notNullValue());
if (stopped[i]) {
PersistentTaskOperationFuture startFuture = new PersistentTaskOperationFuture();
service.startTask(task.getId(), startFuture);
assertEquals(startFuture.get(), (Long) task.getId());
}
}
logger.info("Waiting for {} tasks to start", numberOfTasks);

View File

@ -112,67 +112,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
stopOrCancelTask(firstRunningTask.getTaskId());
}
public void testPersistentActionCompletionWithoutRemoval() throws Exception {
boolean stopped = randomBoolean();
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), stopped, false,
future);
long taskId = future.get();
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped));
assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue());
assertThat(tasksInProgress.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false));
int numberOfIters = randomIntBetween(1, 5); // we will start/stop the action a few times before removing it
logger.info("start/stop the task {} times stating with stopped {}", numberOfIters, stopped);
for (int i = 0; i < numberOfIters; i++) {
logger.info("iteration {}", i);
if (stopped) {
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks(), empty());
PersistentTaskOperationFuture startFuture = new PersistentTaskOperationFuture();
persistentTasksService.startTask(taskId, startFuture);
assertEquals(startFuture.get(), (Long) taskId);
}
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);
stopOrCancelTask(firstRunningTask.getTaskId());
assertBusy(() -> {
// Wait for the task to finish
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(0));
});
stopped = true;
}
assertBusy(() -> {
// Wait for the task to be marked as stopped
PersistentTasksCustomMetaData tasks = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(1));
assertThat(tasks.getTask(taskId).isStopped(), equalTo(true));
assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false));
});
logger.info("Removing action record from cluster state");
PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture();
persistentTasksService.removeTask(taskId, removeFuture);
assertEquals(removeFuture.get(), (Long) taskId);
}
public void testPersistentActionWithNoAvailableNode() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();

View File

@ -76,11 +76,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.addTask("test_action", new TestRequest("other_" + i), false, true,
tasks.addTask("test_action", new TestRequest("other_" + i),
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
if (added == false && randomBoolean()) {
added = true;
tasks.addTask("test", new TestRequest("this_param"), false, true,
tasks.addTask("test", new TestRequest("this_param"),
new Assignment("this_node", "test assignment on this node"));
}
}
@ -303,7 +303,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE,
builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build();
builder.addTask(action, request, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {