From 4dd69951f346bd7ea4fa068f9d80ad89542db340 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 24 Oct 2017 10:20:33 +0200 Subject: [PATCH] Make the persistent task status available to PersistentTasksExecutor.nodeOperation(...) method --- .../NodePersistentTasksExecutor.java | 4 +- .../persistent/PersistentTasksExecutor.java | 5 +- .../PersistentTasksNodeService.java | 2 +- .../PersistentTasksNodeServiceTests.java | 47 +++++++++++++++++-- .../persistent/TestPersistentTasksPlugin.java | 8 ++-- 5 files changed, 55 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java index 7efbc92b278..efed0aef9b8 100644 --- a/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java @@ -20,6 +20,7 @@ package org.elasticsearch.persistent; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; /** @@ -35,6 +36,7 @@ public class NodePersistentTasksExecutor { } public void executeTask(@Nullable Params params, + @Nullable Task.Status status, AllocatedPersistentTask task, PersistentTasksExecutor executor) { threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() { @@ -47,7 +49,7 @@ public class NodePersistentTasksExecutor { @Override protected void doRun() throws Exception { try { - executor.nodeOperation(task, params); + executor.nodeOperation(task, params, status); } catch (Exception ex) { task.markAsFailed(ex); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index d8f5d93126a..0fa7f2dcba0 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -115,10 +116,10 @@ public abstract class PersistentTasksExecutor - * NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to + * NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * indicate that the persistent task has finished. */ - protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params); + protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status); public String getExecutor() { return executor; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 111041fcd8b..cfdf221e681 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -173,7 +173,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu task.getPersistentTaskId(), task.getAllocationId()); try { runningTasks.put(taskInProgress.getAllocationId(), task); - nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor); + nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor); } catch (Exception e) { // Submit task failure task.markAsFailed(e); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 02d44d4eb1f..7f36d6c8902 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -49,12 +49,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class PersistentTasksNodeServiceTests extends ESTestCase { @@ -167,6 +169,41 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } + public void testParamsStatusAndNodeTaskAreDelegated() throws Exception { + PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); + @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); + when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); + when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME); + TaskId parentId = new TaskId("cluster", 1); + AllocatedPersistentTask nodeTask = new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId); + when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(nodeTask); + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action)); + + MockExecutor executor = new MockExecutor(); + PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, + registry, new TaskManager(Settings.EMPTY), executor); + + ClusterState state = createInitialClusterState(1, Settings.EMPTY); + + Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase"); + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + String taskId = UUIDs.base64UUID(); + TestParams taskParams = new TestParams("other_0"); + tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams, + new Assignment("this_node", "test assignment on other node")); + tasks.updateTaskStatus(taskId, status); + MetaData.Builder metaData = MetaData.builder(state.metaData()); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); + + coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); + + assertThat(executor.size(), equalTo(1)); + assertThat(executor.get(0).params, sameInstance(taskParams)); + assertThat(executor.get(0).status, sameInstance(status)); + assertThat(executor.get(0).task, sameInstance(nodeTask)); + } + public void testTaskCancellation() { AtomicLong capturedTaskId = new AtomicLong(); AtomicReference> capturedListener = new AtomicReference<>(); @@ -271,11 +308,13 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { private class Execution { private final PersistentTaskParams params; private final AllocatedPersistentTask task; + private final Task.Status status; private final PersistentTasksExecutor holder; - Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTasksExecutor holder) { + Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor holder) { this.params = params; this.task = task; + this.status = status; this.holder = holder; } } @@ -288,9 +327,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } @Override - public void executeTask(Params params, AllocatedPersistentTask task, + public void executeTask(Params params, + Task.Status status, + AllocatedPersistentTask task, PersistentTasksExecutor executor) { - executions.add(new Execution(params, task, executor)); + executions.add(new Execution(params, task, status, executor)); } public Execution get(int i) { diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 12773c4b6f7..b024359e209 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -323,7 +323,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } @Override - protected void nodeOperation(AllocatedPersistentTask task, TestParams params) { + protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; @@ -346,9 +346,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } else if ("update_status".equals(testTask.getOperation())) { testTask.setOperation(null); CountDownLatch latch = new CountDownLatch(1); - Status status = new Status("phase " + phase.incrementAndGet()); - logger.info("updating the task status to {}", status); - task.updatePersistentStatus(status, new ActionListener>() { + Status newStatus = new Status("phase " + phase.incrementAndGet()); + logger.info("updating the task status to {}", newStatus); + task.updatePersistentStatus(newStatus, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { logger.info("updating was successful");