Make the persistent task status available to PersistentTasksExecutor.nodeOperation(...) method
Original commit: elastic/x-pack-elasticsearch@ce6d788e77
This commit is contained in:
parent
439830890f
commit
56e7512500
|
@ -607,7 +607,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask task, JobParams params) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, JobParams params, Task.Status status) {
|
||||
JobTask jobTask = (JobTask) task;
|
||||
jobTask.autodetectProcessManager = autodetectProcessManager;
|
||||
autodetectProcessManager.openJob(jobTask, e2 -> {
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
|
|||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -545,7 +546,7 @@ public class StartDatafeedAction
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, DatafeedParams params) {
|
||||
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, DatafeedParams params, Task.Status status) {
|
||||
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
|
||||
datafeedTask.datafeedManager = datafeedManager;
|
||||
datafeedManager.run(datafeedTask,
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.persistent;
|
|||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
/**
|
||||
|
@ -22,6 +23,7 @@ public class NodePersistentTasksExecutor {
|
|||
}
|
||||
|
||||
public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
|
||||
@Nullable Task.Status status,
|
||||
AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Params> executor) {
|
||||
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
|
||||
|
@ -34,7 +36,7 @@ public class NodePersistentTasksExecutor {
|
|||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
try {
|
||||
executor.nodeOperation(task, params);
|
||||
executor.nodeOperation(task, params, status);
|
||||
} catch (Exception ex) {
|
||||
task.markAsFailed(ex);
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
@ -101,10 +102,10 @@ public abstract class PersistentTasksExecutor<Params extends PersistentTaskParam
|
|||
/**
|
||||
* This operation will be executed on the executor node.
|
||||
* <p>
|
||||
* NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
|
||||
* NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
|
||||
* indicate that the persistent task has finished.
|
||||
*/
|
||||
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params);
|
||||
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status);
|
||||
|
||||
public String getExecutor() {
|
||||
return executor;
|
||||
|
|
|
@ -160,7 +160,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
task.getPersistentTaskId(), task.getAllocationId());
|
||||
try {
|
||||
runningTasks.put(taskInProgress.getAllocationId(), task);
|
||||
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor);
|
||||
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor);
|
||||
} catch (Exception e) {
|
||||
// Submit task failure
|
||||
task.markAsFailed(e);
|
||||
|
|
|
@ -35,12 +35,14 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.core.IsEqual.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||
|
@ -153,6 +155,41 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testParamsStatusAndNodeTaskAreDelegated() throws Exception {
|
||||
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
|
||||
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
|
||||
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
|
||||
when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME);
|
||||
TaskId parentId = new TaskId("cluster", 1);
|
||||
AllocatedPersistentTask nodeTask = new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId);
|
||||
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(nodeTask);
|
||||
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
|
||||
|
||||
MockExecutor executor = new MockExecutor();
|
||||
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
|
||||
registry, new TaskManager(Settings.EMPTY), executor);
|
||||
|
||||
ClusterState state = createInitialClusterState(1, Settings.EMPTY);
|
||||
|
||||
Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase");
|
||||
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
|
||||
String taskId = UUIDs.base64UUID();
|
||||
TestParams taskParams = new TestParams("other_0");
|
||||
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams,
|
||||
new Assignment("this_node", "test assignment on other node"));
|
||||
tasks.updateTaskStatus(taskId, status);
|
||||
MetaData.Builder metaData = MetaData.builder(state.metaData());
|
||||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
|
||||
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
|
||||
|
||||
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
||||
|
||||
assertThat(executor.size(), equalTo(1));
|
||||
assertThat(executor.get(0).params, sameInstance(taskParams));
|
||||
assertThat(executor.get(0).status, sameInstance(status));
|
||||
assertThat(executor.get(0).task, sameInstance(nodeTask));
|
||||
}
|
||||
|
||||
public void testTaskCancellation() {
|
||||
AtomicLong capturedTaskId = new AtomicLong();
|
||||
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
|
||||
|
@ -257,11 +294,13 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
private class Execution {
|
||||
private final PersistentTaskParams params;
|
||||
private final AllocatedPersistentTask task;
|
||||
private final Task.Status status;
|
||||
private final PersistentTasksExecutor<?> holder;
|
||||
|
||||
Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder) {
|
||||
Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor<?> holder) {
|
||||
this.params = params;
|
||||
this.task = task;
|
||||
this.status = status;
|
||||
this.holder = holder;
|
||||
}
|
||||
}
|
||||
|
@ -274,9 +313,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public <Params extends PersistentTaskParams> void executeTask(Params params, AllocatedPersistentTask task,
|
||||
public <Params extends PersistentTaskParams> void executeTask(Params params,
|
||||
Task.Status status,
|
||||
AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Params> executor) {
|
||||
executions.add(new Execution(params, task, executor));
|
||||
executions.add(new Execution(params, task, status, executor));
|
||||
}
|
||||
|
||||
public Execution get(int i) {
|
||||
|
|
|
@ -309,7 +309,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask task, TestParams params) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) {
|
||||
logger.info("started node operation for the task {}", task);
|
||||
try {
|
||||
TestTask testTask = (TestTask) task;
|
||||
|
@ -332,9 +332,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
} else if ("update_status".equals(testTask.getOperation())) {
|
||||
testTask.setOperation(null);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Status status = new Status("phase " + phase.incrementAndGet());
|
||||
logger.info("updating the task status to {}", status);
|
||||
task.updatePersistentStatus(status, new ActionListener<PersistentTask<?>>() {
|
||||
Status newStatus = new Status("phase " + phase.incrementAndGet());
|
||||
logger.info("updating the task status to {}", newStatus);
|
||||
task.updatePersistentStatus(newStatus, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
logger.info("updating was successful");
|
||||
|
|
Loading…
Reference in New Issue