Make the persistent task status available to PersistentTasksExecutor.nodeOperation(...) method

This commit is contained in:
Martijn van Groningen 2017-10-24 10:20:33 +02:00
parent 1c489ee867
commit 4dd69951f3
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
5 changed files with 55 additions and 11 deletions

View File

@ -20,6 +20,7 @@ package org.elasticsearch.persistent;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
/** /**
@ -35,6 +36,7 @@ public class NodePersistentTasksExecutor {
} }
public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params, public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
@Nullable Task.Status status,
AllocatedPersistentTask task, AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) { PersistentTasksExecutor<Params> executor) {
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() { threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
@ -47,7 +49,7 @@ public class NodePersistentTasksExecutor {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
try { try {
executor.nodeOperation(task, params); executor.nodeOperation(task, params, status);
} catch (Exception ex) { } catch (Exception ex) {
task.markAsFailed(ex); task.markAsFailed(ex);
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -115,10 +116,10 @@ public abstract class PersistentTasksExecutor<Params extends PersistentTaskParam
/** /**
* This operation will be executed on the executor node. * This operation will be executed on the executor node.
* <p> * <p>
* NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
* indicate that the persistent task has finished. * indicate that the persistent task has finished.
*/ */
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params); protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status);
public String getExecutor() { public String getExecutor() {
return executor; return executor;

View File

@ -173,7 +173,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
task.getPersistentTaskId(), task.getAllocationId()); task.getPersistentTaskId(), task.getAllocationId());
try { try {
runningTasks.put(taskInProgress.getAllocationId(), task); runningTasks.put(taskInProgress.getAllocationId(), task);
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor); nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor);
} catch (Exception e) { } catch (Exception e) {
// Submit task failure // Submit task failure
task.markAsFailed(e); task.markAsFailed(e);

View File

@ -49,12 +49,14 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class PersistentTasksNodeServiceTests extends ESTestCase { public class PersistentTasksNodeServiceTests extends ESTestCase {
@ -167,6 +169,41 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
} }
public void testParamsStatusAndNodeTaskAreDelegated() throws Exception {
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME);
TaskId parentId = new TaskId("cluster", 1);
AllocatedPersistentTask nodeTask = new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId);
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(nodeTask);
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
MockExecutor executor = new MockExecutor();
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
registry, new TaskManager(Settings.EMPTY), executor);
ClusterState state = createInitialClusterState(1, Settings.EMPTY);
Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase");
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
String taskId = UUIDs.base64UUID();
TestParams taskParams = new TestParams("other_0");
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams,
new Assignment("this_node", "test assignment on other node"));
tasks.updateTaskStatus(taskId, status);
MetaData.Builder metaData = MetaData.builder(state.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
assertThat(executor.size(), equalTo(1));
assertThat(executor.get(0).params, sameInstance(taskParams));
assertThat(executor.get(0).status, sameInstance(status));
assertThat(executor.get(0).task, sameInstance(nodeTask));
}
public void testTaskCancellation() { public void testTaskCancellation() {
AtomicLong capturedTaskId = new AtomicLong(); AtomicLong capturedTaskId = new AtomicLong();
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>(); AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
@ -271,11 +308,13 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
private class Execution { private class Execution {
private final PersistentTaskParams params; private final PersistentTaskParams params;
private final AllocatedPersistentTask task; private final AllocatedPersistentTask task;
private final Task.Status status;
private final PersistentTasksExecutor<?> holder; private final PersistentTasksExecutor<?> holder;
Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder) { Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor<?> holder) {
this.params = params; this.params = params;
this.task = task; this.task = task;
this.status = status;
this.holder = holder; this.holder = holder;
} }
} }
@ -288,9 +327,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
} }
@Override @Override
public <Params extends PersistentTaskParams> void executeTask(Params params, AllocatedPersistentTask task, public <Params extends PersistentTaskParams> void executeTask(Params params,
Task.Status status,
AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) { PersistentTasksExecutor<Params> executor) {
executions.add(new Execution(params, task, executor)); executions.add(new Execution(params, task, status, executor));
} }
public Execution get(int i) { public Execution get(int i) {

View File

@ -323,7 +323,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
} }
@Override @Override
protected void nodeOperation(AllocatedPersistentTask task, TestParams params) { protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) {
logger.info("started node operation for the task {}", task); logger.info("started node operation for the task {}", task);
try { try {
TestTask testTask = (TestTask) task; TestTask testTask = (TestTask) task;
@ -346,9 +346,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
} else if ("update_status".equals(testTask.getOperation())) { } else if ("update_status".equals(testTask.getOperation())) {
testTask.setOperation(null); testTask.setOperation(null);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Status status = new Status("phase " + phase.incrementAndGet()); Status newStatus = new Status("phase " + phase.incrementAndGet());
logger.info("updating the task status to {}", status); logger.info("updating the task status to {}", newStatus);
task.updatePersistentStatus(status, new ActionListener<PersistentTask<?>>() { task.updatePersistentStatus(newStatus, new ActionListener<PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTask<?> persistentTask) { public void onResponse(PersistentTask<?> persistentTask) {
logger.info("updating was successful"); logger.info("updating was successful");