Add PersistentTasksClusterService::unassignPersistentTask method (#37576)
* Add PersistentTasksClusterService::unassignPersistentTask method * adding cancellation test * Adding integration test for unallocating tasks from a node * Addressing review comments * adressing minor PR comments
This commit is contained in:
parent
e3672aa551
commit
1c2ae9185c
|
@ -247,6 +247,45 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* This unassigns a task from any node, i.e. it is assigned to a {@code null} node with the provided reason.
|
||||
*
|
||||
* Since the assignment executor node is null, the {@link PersistentTasksClusterService} will attempt to reassign it to a valid
|
||||
* node quickly.
|
||||
*
|
||||
* @param taskId the id of a persistent task
|
||||
* @param taskAllocationId the expected allocation id of the persistent task
|
||||
* @param reason the reason for unassigning the task from any node
|
||||
* @param listener the listener that will be called when task is unassigned
|
||||
*/
|
||||
public void unassignPersistentTask(final String taskId,
|
||||
final long taskAllocationId,
|
||||
final String reason,
|
||||
final ActionListener<PersistentTask<?>> listener) {
|
||||
clusterService.submitStateUpdateTask("unassign persistent task from any node", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
|
||||
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
|
||||
logger.trace("Unassigning task {} with allocation id {}", taskId, taskAllocationId);
|
||||
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason)));
|
||||
} else {
|
||||
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link Assignment} for the given persistent task.
|
||||
*
|
||||
|
@ -263,7 +302,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
|
|||
|
||||
AssignmentDecision decision = decider.canAssign();
|
||||
if (decision.getType() == AssignmentDecision.Type.NO) {
|
||||
return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
|
||||
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
|
||||
}
|
||||
|
||||
return persistentTasksExecutor.getAssignment(taskParams, currentState);
|
||||
|
@ -404,6 +443,10 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
|
|||
}
|
||||
}
|
||||
|
||||
private static Assignment unassignedAssignment(String reason) {
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to periodically try to reassign unassigned persistent tasks.
|
||||
*/
|
||||
|
|
|
@ -20,7 +20,9 @@
|
|||
package org.elasticsearch.persistent;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -63,10 +65,13 @@ import static org.elasticsearch.persistent.PersistentTasksClusterService.needsRe
|
|||
import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged;
|
||||
import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
@ -464,6 +469,56 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public void testUnassignTask() {
|
||||
ClusterState clusterState = initialState();
|
||||
ClusterState.Builder builder = ClusterState.builder(clusterState);
|
||||
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(
|
||||
clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
|
||||
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
|
||||
.localNodeId("_node_1")
|
||||
.masterNodeId("_node_1")
|
||||
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT));
|
||||
|
||||
String unassignedId = addTask(tasks, "unassign", "_node_2");
|
||||
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
|
||||
clusterState = builder.metaData(metaData).nodes(nodes).build();
|
||||
setState(clusterService, clusterState);
|
||||
PersistentTasksClusterService service = createService((params, currentState) ->
|
||||
new Assignment("_node_2", "test"));
|
||||
service.unassignPersistentTask(unassignedId, tasks.getLastAllocationId(), "unassignment test", ActionListener.wrap(
|
||||
task -> {
|
||||
assertThat(task.getAssignment().getExecutorNode(), is(nullValue()));
|
||||
assertThat(task.getId(), equalTo(unassignedId));
|
||||
assertThat(task.getAssignment().getExplanation(), equalTo("unassignment test"));
|
||||
},
|
||||
e -> fail()
|
||||
));
|
||||
}
|
||||
|
||||
public void testUnassignNonExistentTask() {
|
||||
ClusterState clusterState = initialState();
|
||||
ClusterState.Builder builder = ClusterState.builder(clusterState);
|
||||
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(
|
||||
clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
|
||||
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
|
||||
.localNodeId("_node_1")
|
||||
.masterNodeId("_node_1")
|
||||
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT));
|
||||
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
|
||||
clusterState = builder.metaData(metaData).nodes(nodes).build();
|
||||
setState(clusterService, clusterState);
|
||||
PersistentTasksClusterService service = createService((params, currentState) ->
|
||||
new Assignment("_node_2", "test"));
|
||||
service.unassignPersistentTask("missing-task", tasks.getLastAllocationId(), "unassignment test", ActionListener.wrap(
|
||||
task -> fail(),
|
||||
e -> assertThat(e, instanceOf(ResourceNotFoundException.class))
|
||||
));
|
||||
}
|
||||
|
||||
private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) {
|
||||
AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure);
|
||||
AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
|
||||
|
@ -728,9 +783,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
|||
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), assignment).build()));
|
||||
}
|
||||
|
||||
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) {
|
||||
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param),
|
||||
private String addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) {
|
||||
String id = UUIDs.base64UUID();
|
||||
tasks.addTask(id, TestPersistentTasksExecutor.NAME, new TestParams(param),
|
||||
new Assignment(node, "explanation: " + param));
|
||||
return id;
|
||||
}
|
||||
|
||||
private DiscoveryNode newNode(String nodeId) {
|
||||
|
|
|
@ -46,8 +46,10 @@ import java.util.Objects;
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
|
||||
public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
||||
|
@ -155,11 +157,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
|
||||
String newNode = internalCluster().startNode(nodeSettings);
|
||||
String newNodeId = internalCluster().clusterService(newNode).localNode().getId();
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
|
||||
.size(), equalTo(1));
|
||||
});
|
||||
waitForTaskToStart();
|
||||
|
||||
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
|
||||
|
@ -199,11 +198,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
|
||||
TestPersistentTasksExecutor.setNonClusterStateCondition(true);
|
||||
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
|
||||
.size(), equalTo(1));
|
||||
});
|
||||
waitForTaskToStart();
|
||||
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
|
||||
|
@ -221,12 +216,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
|
||||
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
|
||||
String taskId = future.get().getId();
|
||||
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
|
||||
.size(), equalTo(1));
|
||||
});
|
||||
waitForTaskToStart();
|
||||
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
|
||||
|
@ -307,6 +297,62 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public void testUnassignRunningPersistentTask() throws Exception {
|
||||
PersistentTasksClusterService persistentTasksClusterService =
|
||||
internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName());
|
||||
// Speed up rechecks to a rate that is quicker than what settings would allow
|
||||
persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1));
|
||||
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
|
||||
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
|
||||
TestParams testParams = new TestParams("Blah");
|
||||
testParams.setExecutorNodeAttr("test");
|
||||
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
|
||||
PersistentTask<TestParams> task = future.get();
|
||||
String taskId = task.getId();
|
||||
|
||||
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
|
||||
internalCluster().startNode(nodeSettings);
|
||||
|
||||
waitForTaskToStart();
|
||||
|
||||
PlainActionFuture<PersistentTask<?>> unassignmentFuture = new PlainActionFuture<>();
|
||||
|
||||
// Disallow re-assignment after it is unallocated to verify master and node state
|
||||
TestPersistentTasksExecutor.setNonClusterStateCondition(false);
|
||||
|
||||
persistentTasksClusterService.unassignPersistentTask(taskId,
|
||||
task.getAllocationId() + 1,
|
||||
"unassignment test",
|
||||
unassignmentFuture);
|
||||
PersistentTask<?> unassignedTask = unassignmentFuture.get();
|
||||
assertThat(unassignedTask.getId(), equalTo(taskId));
|
||||
assertThat(unassignedTask.getAssignment().getExplanation(), equalTo("unassignment test"));
|
||||
assertThat(unassignedTask.getAssignment().getExecutorNode(), is(nullValue()));
|
||||
|
||||
assertBusy(() -> {
|
||||
// Verify that the task is NOT running on the node
|
||||
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
|
||||
.getTasks();
|
||||
assertThat(tasks.size(), equalTo(0));
|
||||
|
||||
// Verify that the task is STILL in internal cluster state
|
||||
assertClusterStateHasTask(taskId);
|
||||
});
|
||||
|
||||
// Allow it to be reassigned again to the same node
|
||||
TestPersistentTasksExecutor.setNonClusterStateCondition(true);
|
||||
|
||||
// Verify it starts again
|
||||
waitForTaskToStart();
|
||||
|
||||
assertClusterStateHasTask(taskId);
|
||||
|
||||
// Complete or cancel the running task
|
||||
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
stopOrCancelTask(taskInfo.getTaskId());
|
||||
}
|
||||
|
||||
private void stopOrCancelTask(TaskId taskId) {
|
||||
if (randomBoolean()) {
|
||||
logger.info("Completing the running task");
|
||||
|
@ -322,6 +368,25 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static void waitForTaskToStart() throws Exception {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
|
||||
.size(), equalTo(1));
|
||||
});
|
||||
}
|
||||
|
||||
private static void assertClusterStateHasTask(String taskId) {
|
||||
Collection<PersistentTask<?>> clusterTasks = ((PersistentTasksCustomMetaData) internalCluster()
|
||||
.clusterService()
|
||||
.state()
|
||||
.getMetaData()
|
||||
.custom(PersistentTasksCustomMetaData.TYPE))
|
||||
.tasks();
|
||||
assertThat(clusterTasks, hasSize(1));
|
||||
assertThat(clusterTasks.iterator().next().getId(), equalTo(taskId));
|
||||
}
|
||||
|
||||
private void assertNoRunningTasks() throws Exception {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to finish
|
||||
|
|
Loading…
Reference in New Issue