From 56e75125000e5b766ccf39bbeb21b13a4a98e84c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 24 Oct 2017 10:20:33 +0200 Subject: [PATCH] Make the persistent task status available to PersistentTasksExecutor.nodeOperation(...) method Original commit: elastic/x-pack-elasticsearch@ce6d788e7759db2a61f775e332de0d795b0943f3 --- .../xpack/ml/action/OpenJobAction.java | 2 +- .../xpack/ml/action/StartDatafeedAction.java | 3 +- .../NodePersistentTasksExecutor.java | 4 +- .../persistent/PersistentTasksExecutor.java | 5 +- .../PersistentTasksNodeService.java | 2 +- .../PersistentTasksNodeServiceTests.java | 47 +++++++++++++++++-- .../persistent/TestPersistentTasksPlugin.java | 8 ++-- 7 files changed, 58 insertions(+), 13 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 127aaa17b4f..8ca53d8fe9e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -607,7 +607,7 @@ public class OpenJobAction extends Action { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 06cc24c5755..37d2d6581c8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -545,7 +546,7 @@ public class StartDatafeedAction } @Override - protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, DatafeedParams params) { + protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, DatafeedParams params, Task.Status status) { DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; datafeedTask.datafeedManager = datafeedManager; datafeedManager.run(datafeedTask, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java index 8133fb4724d..9595883ffb5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; /** @@ -22,6 +23,7 @@ public class NodePersistentTasksExecutor { } public void executeTask(@Nullable Params params, + @Nullable Task.Status status, AllocatedPersistentTask task, PersistentTasksExecutor executor) { threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() { @@ -34,7 +36,7 @@ public class NodePersistentTasksExecutor { @Override protected void doRun() throws Exception { try { - executor.nodeOperation(task, params); + executor.nodeOperation(task, params, status); } catch (Exception ex) { task.markAsFailed(ex); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java index c39cde85425..4fe04febb07 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -101,10 +102,10 @@ public abstract class PersistentTasksExecutor - * NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to + * NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * indicate that the persistent task has finished. */ - protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params); + protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status); public String getExecutor() { return executor; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java index 03f53e0f05d..49246a8be20 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java @@ -160,7 +160,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu task.getPersistentTaskId(), task.getAllocationId()); try { runningTasks.put(taskInProgress.getAllocationId(), task); - nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor); + nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor); } catch (Exception e) { // Submit task failure task.markAsFailed(e); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index 8dcd23f193f..ec049e088a4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -35,12 +35,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class PersistentTasksNodeServiceTests extends ESTestCase { @@ -153,6 +155,41 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } + public void testParamsStatusAndNodeTaskAreDelegated() throws Exception { + PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); + @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); + when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); + when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME); + TaskId parentId = new TaskId("cluster", 1); + AllocatedPersistentTask nodeTask = new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId); + when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(nodeTask); + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action)); + + MockExecutor executor = new MockExecutor(); + PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, + registry, new TaskManager(Settings.EMPTY), executor); + + ClusterState state = createInitialClusterState(1, Settings.EMPTY); + + Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase"); + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + String taskId = UUIDs.base64UUID(); + TestParams taskParams = new TestParams("other_0"); + tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams, + new Assignment("this_node", "test assignment on other node")); + tasks.updateTaskStatus(taskId, status); + MetaData.Builder metaData = MetaData.builder(state.metaData()); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); + + coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); + + assertThat(executor.size(), equalTo(1)); + assertThat(executor.get(0).params, sameInstance(taskParams)); + assertThat(executor.get(0).status, sameInstance(status)); + assertThat(executor.get(0).task, sameInstance(nodeTask)); + } + public void testTaskCancellation() { AtomicLong capturedTaskId = new AtomicLong(); AtomicReference> capturedListener = new AtomicReference<>(); @@ -257,11 +294,13 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { private class Execution { private final PersistentTaskParams params; private final AllocatedPersistentTask task; + private final Task.Status status; private final PersistentTasksExecutor holder; - Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTasksExecutor holder) { + Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor holder) { this.params = params; this.task = task; + this.status = status; this.holder = holder; } } @@ -274,9 +313,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } @Override - public void executeTask(Params params, AllocatedPersistentTask task, + public void executeTask(Params params, + Task.Status status, + AllocatedPersistentTask task, PersistentTasksExecutor executor) { - executions.add(new Execution(params, task, executor)); + executions.add(new Execution(params, task, status, executor)); } public Execution get(int i) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java index 218daa6f7f3..980ac6484fc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java @@ -309,7 +309,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } @Override - protected void nodeOperation(AllocatedPersistentTask task, TestParams params) { + protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; @@ -332,9 +332,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } else if ("update_status".equals(testTask.getOperation())) { testTask.setOperation(null); CountDownLatch latch = new CountDownLatch(1); - Status status = new Status("phase " + phase.incrementAndGet()); - logger.info("updating the task status to {}", status); - task.updatePersistentStatus(status, new ActionListener>() { + Status newStatus = new Status("phase " + phase.incrementAndGet()); + logger.info("updating the task status to {}", newStatus); + task.updatePersistentStatus(newStatus, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { logger.info("updating was successful");