Persistent Tasks: Remove unused stopped and removeOnCompletion flags (#853)

The stopped and removeOnCompletion flags are not currently used, this commit removes them for now to simplify things.
This commit is contained in:
Igor Motov 2017-03-29 12:54:44 -04:00 committed by Martijn van Groningen
parent 37fad04879
commit b142d7e29c
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
9 changed files with 84 additions and 311 deletions

View File

@ -69,10 +69,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
private PersistentTaskRequest request;
private boolean stopped;
private boolean removeOnCompletion = true;
public Request() {
}
@ -80,8 +76,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
public Request(String action, PersistentTaskRequest request) {
this.action = action;
this.request = request;
this.stopped = false;
this.removeOnCompletion = true;
}
@Override
@ -89,8 +83,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super.readFrom(in);
action = in.readString();
request = in.readNamedWriteable(PersistentTaskRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
}
@Override
@ -98,8 +90,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super.writeTo(out);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
}
@Override
@ -120,14 +110,12 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request) &&
removeOnCompletion == request1.removeOnCompletion &&
stopped == request1.stopped;
Objects.equals(request, request1.request);
}
@Override
public int hashCode() {
return Objects.hash(action, request, removeOnCompletion, stopped);
return Objects.hash(action, request);
}
public String getAction() {
@ -146,21 +134,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
this.request = request;
}
public boolean isStopped() {
return stopped;
}
public void setStopped(boolean stopped) {
this.stopped = stopped;
}
public boolean shouldRemoveOnCompletion() {
return removeOnCompletion;
}
public void setRemoveOnCompletion(boolean removeOnCompletion) {
this.removeOnCompletion = removeOnCompletion;
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CreatePersistentTaskAction.Request,
@ -180,21 +153,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
return this;
}
/**
* Indicates if the persistent task should be created in the stopped state. Defaults to false.
*/
public RequestBuilder setStopped(boolean stopped) {
request.setStopped(stopped);
return this;
}
/**
* Indicates if the persistent task record should be removed upon the first successful completion of the task. Defaults to true.
*/
public RequestBuilder setRemoveOnCompletion(boolean removeOnCompletion) {
request.setRemoveOnCompletion(removeOnCompletion);
return this;
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
@ -235,7 +193,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion,
persistentTasksClusterService.createPersistentTask(request.action, request.request,
new ActionListener<Long>() {
@Override
public void onResponse(Long newTaskId) {

View File

@ -61,21 +61,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param request request
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request, boolean stopped,
boolean removeOnCompletion,
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request,
ActionListener<Long> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
validate(action, clusterService.state(), request);
final Assignment assignment;
if (stopped) {
// the task is stopped no need to assign it anywhere
assignment = PersistentTasksCustomMetaData.FINISHED_TASK_ASSIGNMENT;
} else {
assignment = getAssignement(action, currentState, request);
}
return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, assignment));
assignment = getAssignement(action, currentState, request);
return update(currentState, builder(currentState).addTask(action, request, assignment));
}
@Override
@ -319,11 +313,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
logger.trace("ignoring task {} because assignment is the same {}", task.getId(), assignment);
}
} else {
if (task.isStopped()) {
logger.trace("ignoring task {} because it is stopped", task.getId());
} else {
logger.trace("ignoring task {} because it is still running", task.getId());
}
logger.trace("ignoring task {} because it is still running", task.getId());
}
}
}

View File

@ -97,8 +97,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion"));
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskRequest> taskBuilder, List<PersistentTaskRequest> objects) -> {
if (objects.size() != 1) {
@ -240,8 +238,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private final long allocationId;
private final String taskName;
private final Request request;
private final boolean stopped;
private final boolean removeOnCompletion;
@Nullable
private final Status status;
private final Assignment assignment;
@ -249,29 +245,27 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private final Long allocationIdOnLastStatusUpdate;
public PersistentTask(long id, String taskName, Request request, boolean stopped, boolean removeOnCompletion, Assignment assignment) {
this(id, 0L, taskName, request, stopped, removeOnCompletion, null, assignment, null);
public PersistentTask(long id, String taskName, Request request, Assignment assignment) {
this(id, 0L, taskName, request, null, assignment, null);
}
public PersistentTask(PersistentTask<Request> task, boolean stopped, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.taskName, task.request, stopped, task.removeOnCompletion, task.status,
public PersistentTask(PersistentTask<Request> task, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.taskName, task.request, task.status,
assignment, task.allocationId);
}
public PersistentTask(PersistentTask<Request> task, Status status) {
this(task.id, task.allocationId, task.taskName, task.request, task.stopped, task.removeOnCompletion, status,
this(task.id, task.allocationId, task.taskName, task.request, status,
task.assignment, task.allocationId);
}
private PersistentTask(long id, long allocationId, String taskName, Request request, boolean stopped, boolean removeOnCompletion,
private PersistentTask(long id, long allocationId, String taskName, Request request,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.taskName = taskName;
this.request = request;
this.status = status;
this.stopped = stopped;
this.removeOnCompletion = removeOnCompletion;
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
@ -284,8 +278,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
allocationId = in.readLong();
taskName = in.readString();
request = (Request) in.readNamedWriteable(PersistentTaskRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
status = in.readOptionalNamedWriteable(Task.Status.class);
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
@ -297,8 +289,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
out.writeLong(allocationId);
out.writeString(taskName);
out.writeNamedWriteable(request);
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
@ -314,8 +304,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
allocationId == that.allocationId &&
Objects.equals(taskName, that.taskName) &&
Objects.equals(request, that.request) &&
stopped == that.stopped &&
removeOnCompletion == that.removeOnCompletion &&
Objects.equals(status, that.status) &&
Objects.equals(assignment, that.assignment) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
@ -323,7 +311,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public int hashCode() {
return Objects.hash(id, allocationId, taskName, request, stopped, removeOnCompletion, status, assignment,
return Objects.hash(id, allocationId, taskName, request, status, assignment,
allocationIdOnLastStatusUpdate);
}
@ -365,7 +353,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* Returns true if the tasks is not stopped and unassigned or assigned to a non-existing node.
*/
public boolean needsReassignment(DiscoveryNodes nodes) {
return isStopped() == false && (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
}
@Nullable
@ -373,14 +361,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return status;
}
public boolean isStopped() {
return stopped;
}
public boolean shouldRemoveOnCompletion() {
return removeOnCompletion;
}
/**
* @return Whether the task status isn't stale. When a task gets unassigned from the executor node or assigned
* to a new executor node and the status hasn't been updated then the task status is stale.
@ -421,8 +401,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
builder.field("allocation_id_on_last_status_update", allocationIdOnLastStatusUpdate);
}
}
builder.field("stopped", stopped);
builder.field("remove_on_completion", removeOnCompletion);
}
builder.endObject();
return builder;
@ -439,8 +417,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private long allocationId;
private String taskName;
private Request request;
private boolean stopped = true;
private boolean removeOnCompletion;
private Status status;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
@ -471,16 +447,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
public TaskBuilder<Request> setStopped(boolean stopped) {
this.stopped = stopped;
return this;
}
public TaskBuilder<Request> setRemoveOnCompletion(boolean removeOnCompletion) {
this.removeOnCompletion = removeOnCompletion;
return this;
}
public TaskBuilder<Request> setAssignment(Assignment assignment) {
this.assignment = assignment;
return this;
@ -492,7 +458,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
public PersistentTask<Request> build() {
return new PersistentTask<>(id, allocationId, taskName, request, stopped, removeOnCompletion, status,
return new PersistentTask<>(id, allocationId, taskName, request, status,
assignment, allocationIdOnLastStatusUpdate);
}
}
@ -578,11 +544,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* <p>
* After the task is added its id can be found by calling {{@link #getCurrentId()}} method.
*/
public <Request extends PersistentTaskRequest> Builder addTask(String taskName, Request request, boolean stopped,
boolean removeOnCompletion, Assignment assignment) {
public <Request extends PersistentTaskRequest> Builder addTask(String taskName, Request request, Assignment assignment) {
changed = true;
currentId++;
tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, stopped, removeOnCompletion, assignment));
tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, assignment));
return this;
}
@ -593,7 +558,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
}
return this;
}
@ -609,9 +574,9 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks
Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request);
if (assignment.isAssigned() || taskInProgress.isStopped()) {
if (assignment.isAssigned()) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
}
}
return this;
@ -649,11 +614,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
if (taskInProgress.removeOnCompletion) {
tasks.remove(taskId);
} else {
tasks.put(taskId, new PersistentTask<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT));
}
tasks.remove(taskId);
}
return this;
}

View File

@ -42,27 +42,14 @@ public class PersistentTasksService extends AbstractComponent {
this.clusterService = clusterService;
}
/**
* Creates the specified persistent action and tries to start it immediately, upon completion the task is
* removed from the cluster state
*/
public <Request extends PersistentTaskRequest> void createPersistentActionTask(String action, Request request,
PersistentTaskOperationListener listener) {
createPersistentActionTask(action, request, false, true, listener);
}
/**
* Creates the specified persistent action. The action is started unless the stopped parameter is equal to true.
* If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion.
* Otherwise it will remain there in the stopped state.
*/
public <Request extends PersistentTaskRequest> void createPersistentActionTask(String action, Request request,
boolean stopped,
boolean removeOnCompletion,
PersistentTaskOperationListener listener) {
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request);
createPersistentActionRequest.setStopped(stopped);
createPersistentActionRequest.setRemoveOnCompletion(removeOnCompletion);
try {
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse(o.getTaskId()), listener::onFailure));

View File

@ -92,7 +92,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(2, 40);
for (int i = 0; i < numberOfTasks; i++) {
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false);
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits");
}
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
@ -113,21 +113,17 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
addTestNodes(nodes, randomIntBetween(1, 10));
int numberOfTasks = randomIntBetween(0, 40);
for (int i = 0; i < numberOfTasks; i++) {
switch (randomInt(3)) {
switch (randomInt(2)) {
case 0:
// add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned
addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits", false);
addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits");
break;
case 1:
// add a task assigned to non-existing node that should not get assigned
addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits", false);
addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits");
break;
case 2:
// add a stopped task assigned to non-existing node that should not get assigned
addTask(tasks, "should_not_assign", "fail_me_if_called", null, true);
break;
case 3:
addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits", false);
addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits");
break;
}
@ -145,39 +141,34 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
int assignOneCount = 0;
for (PersistentTask<?> task : tasksInProgress.tasks()) {
if (task.isStopped()) {
assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue());
assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getTaskName()));
} else {
// explanation should correspond to the action name
switch (task.getTaskName()) {
case "should_assign":
assertThat(task.getExecutorNode(), notNullValue());
assertThat(task.isAssigned(), equalTo(true));
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
logger.info(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE).toString());
}
assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(),
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
// explanation should correspond to the action name
switch (task.getTaskName()) {
case "should_assign":
assertThat(task.getExecutorNode(), notNullValue());
assertThat(task.isAssigned(), equalTo(true));
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
logger.info(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE).toString());
}
assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(),
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
break;
case "should_not_assign":
assertThat(task.getExecutorNode(), nullValue());
assertThat(task.isAssigned(), equalTo(false));
assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment"));
break;
case "assign_one":
if (task.isAssigned()) {
assignOneCount++;
assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
break;
case "should_not_assign":
assertThat(task.getExecutorNode(), nullValue());
assertThat(task.isAssigned(), equalTo(false));
assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment"));
break;
case "assign_one":
if (task.isAssigned()) {
assignOneCount++;
assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
} else {
assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time"));
}
break;
default:
fail("Unknown action " + task.getTaskName());
}
} else {
assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time"));
}
break;
default:
fail("Unknown action " + task.getTaskName());
}
}
}
@ -218,8 +209,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
DiscoveryNodes nodes = clusterState.nodes();
PersistentTasksCustomMetaData tasksInProgress = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasksInProgress.findTasks("assign_one",
task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
if (tasksInProgress.findTasks("assign_one", task -> nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
return randomNodeAssignment(clusterState.nodes());
} else {
return new Assignment(null, "only one task can be assigned at a time");
@ -269,13 +259,12 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
// we don't have any unassigned tasks - add some
if (randomBoolean()) {
logger.info("added random task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), null,
false);
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), null);
tasksOrNodesChanged = true;
} else {
logger.info("added unassignable task with custom assignment message");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks),
new Assignment(null, "change me"), "never_assign", false);
new Assignment(null, "change me"), "never_assign");
tasksOrNodesChanged = true;
}
}
@ -332,23 +321,16 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
}
if (randomBoolean()) {
logger.info("added random unassignable task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign", false);
return builder.build();
}
if (randomBoolean()) {
// add unassigned task in stopped state
logger.info("added random stopped task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, null, true);
return builder.build();
} else {
logger.info("changed routing table");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build());
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
changeRoutingTable(metaData, routingTable);
builder.metaData(metaData).routingTable(routingTable.build());
addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign");
return builder.build();
}
logger.info("changed routing table");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build());
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
changeRoutingTable(metaData, routingTable);
builder.metaData(metaData).routingTable(routingTable.build());
return builder.build();
}
}
if (randomBoolean()) {
@ -398,9 +380,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
return false;
}
return tasks.tasks().stream().anyMatch(task -> {
if (task.isStopped()) {
return false;
}
if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) {
return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false;
}
@ -415,20 +394,20 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
String node, boolean stopped) {
String node) {
return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)),
randomAsciiOfLength(10), stopped);
randomAsciiOfLength(10));
}
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
Assignment assignment, String param, boolean stopped) {
Assignment assignment, String param) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build()));
tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), assignment).build()));
}
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node, boolean stopped) {
tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action));
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) {
tasks.addTask(action, new TestRequest(param), new Assignment(node, "explanation: " + action));
}
private DiscoveryNode newNode(String nodeId) {

View File

@ -57,9 +57,8 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
int numberOfTasks = randomInt(10);
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
for (int i = 0; i < numberOfTasks; i++) {
boolean stopped = randomBoolean();
tasks.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)),
stopped, randomBoolean(), stopped ? new Assignment(null, "stopped") : randomAssignment());
randomAssignment());
if (randomBoolean()) {
// From time to time update status
tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10)));
@ -149,9 +148,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
}
private Builder addRandomTask(Builder builder) {
boolean stopped = randomBoolean();
builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(),
stopped ? new Assignment(null, "stopped") : randomAssignment());
builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), randomAssignment());
return builder;
}
@ -196,7 +193,6 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
assertEquals(testTask.getId(), newTask.getId());
assertEquals(testTask.getStatus(), newTask.getStatus());
assertEquals(testTask.getRequest(), newTask.getRequest());
assertEquals(testTask.isStopped(), newTask.isStopped());
// Things that shouldn't be serialized
assertEquals(0, newTask.getAllocationId());
@ -233,16 +229,12 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
if (randomBoolean()) {
// Trying to reassign to the same node
builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment());
// should change if the task was stopped AND unassigned
if (task.getExecutorNode() == null && task.isStopped()) {
changed = true;
}
} else {
// Trying to reassign to a different node
Assignment randomAssignment = randomAssignment();
builder.assignTask(lastKnownTask, (s, request) -> randomAssignment);
// should change if the task was unassigned and was reassigned to a different node or started
if ((task.isAssigned() == false && randomAssignment.isAssigned()) || task.isStopped()) {
if ((task.isAssigned() == false && randomAssignment.isAssigned())) {
changed = true;
}
}

View File

@ -60,35 +60,26 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
long[] taskIds = new long[numberOfTasks];
List<PersistentTaskOperationFuture> futures = new ArrayList<>(numberOfTasks);
boolean[] stopped = new boolean[numberOfTasks];
int runningTasks = 0;
for (int i = 0; i < numberOfTasks; i++) {
stopped[i] = randomBoolean();
if (stopped[i] == false) {
runningTasks++;
}
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
futures.add(future);
service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), stopped[i], true, future);
service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
}
for (int i = 0; i < numberOfTasks; i++) {
taskIds[i] = futures.get(i).get();
}
final int numberOfRunningTasks = runningTasks;
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
if (numberOfRunningTasks > 0) {
// Make sure that at least one of the tasks is running
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), greaterThan(0));
});
}
// Make sure that at least one of the tasks is running
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), greaterThan(0));
});
// Restart cluster
internalCluster().fullRestart();
@ -100,30 +91,6 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
for (int i = 0; i < numberOfTasks; i++) {
PersistentTask<?> task = tasksInProgress.getTask(taskIds[i]);
assertNotNull(task);
assertThat(task.isStopped(), equalTo(stopped[i]));
}
logger.info("Waiting for {} original tasks to start", numberOfRunningTasks);
assertBusy(() -> {
// Wait for the running task to start automatically
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), equalTo(numberOfRunningTasks));
});
// Start all other tasks
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
service = internalCluster().getInstance(PersistentTasksService.class);
for (int i = 0; i < numberOfTasks; i++) {
PersistentTask<?> task = tasksInProgress.getTask(taskIds[i]);
assertNotNull(task);
logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode());
assertThat(task.isStopped(), equalTo(stopped[i]));
assertThat(task.getExecutorNode(), stopped[i] ? nullValue() : notNullValue());
if (stopped[i]) {
PersistentTaskOperationFuture startFuture = new PersistentTaskOperationFuture();
service.startTask(task.getId(), startFuture);
assertEquals(startFuture.get(), (Long) task.getId());
}
}
logger.info("Waiting for {} tasks to start", numberOfTasks);

View File

@ -126,67 +126,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
stopOrCancelTask(firstRunningTask.getTaskId());
}
public void testPersistentActionCompletionWithoutRemoval() throws Exception {
boolean stopped = randomBoolean();
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), stopped, false,
future);
long taskId = future.get();
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped));
assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue());
assertThat(tasksInProgress.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false));
int numberOfIters = randomIntBetween(1, 5); // we will start/stop the action a few times before removing it
logger.info("start/stop the task {} times stating with stopped {}", numberOfIters, stopped);
for (int i = 0; i < numberOfIters; i++) {
logger.info("iteration {}", i);
if (stopped) {
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks(), empty());
PersistentTaskOperationFuture startFuture = new PersistentTaskOperationFuture();
persistentTasksService.startTask(taskId, startFuture);
assertEquals(startFuture.get(), (Long) taskId);
}
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);
stopOrCancelTask(firstRunningTask.getTaskId());
assertBusy(() -> {
// Wait for the task to finish
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(0));
});
stopped = true;
}
assertBusy(() -> {
// Wait for the task to be marked as stopped
PersistentTasksCustomMetaData tasks = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(1));
assertThat(tasks.getTask(taskId).isStopped(), equalTo(true));
assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false));
});
logger.info("Removing action record from cluster state");
PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture();
persistentTasksService.removeTask(taskId, removeFuture);
assertEquals(removeFuture.get(), (Long) taskId);
}
public void testPersistentActionWithNoAvailableNode() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();

View File

@ -90,11 +90,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.addTask("test_action", new TestRequest("other_" + i), false, true,
tasks.addTask("test_action", new TestRequest("other_" + i),
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
if (added == false && randomBoolean()) {
added = true;
tasks.addTask("test", new TestRequest("this_param"), false, true,
tasks.addTask("test", new TestRequest("this_param"),
new Assignment("this_node", "test assignment on this node"));
}
}
@ -317,7 +317,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE,
builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build();
builder.addTask(action, request, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {