diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index 21d3fa7e957..180e8f11104 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -30,6 +30,7 @@ import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; /** @@ -48,8 +49,9 @@ public class AllocatedPersistentTask extends CancellableTask { private volatile TaskManager taskManager; - public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask) { - super(id, type, action, description, parentTask); + public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask, + Map headers) { + super(id, type, action, description, parentTask, headers); this.state = new AtomicReference<>(State.STARTED); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index 0fa7f2dcba0..ed61ad58053 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -29,6 +29,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import java.util.Map; import java.util.function.Predicate; /** @@ -102,8 +103,8 @@ public abstract class PersistentTasksExecutor taskInProgress) { - return new AllocatedPersistentTask(id, type, action, getDescription(taskInProgress), parentTaskId); + PersistentTask taskInProgress, Map headers) { + return new AllocatedPersistentTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers); } /** diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index cfdf221e681..e53834d6f46 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -160,8 +160,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return executor.createTask(id, type, action, parentTaskId, taskInProgress); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers); } }; AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 7f36d6c8902..f9a70637e50 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -35,11 +35,14 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.ArrayList; @@ -61,6 +64,23 @@ import static org.mockito.Mockito.when; public class PersistentTasksNodeServiceTests extends ESTestCase { + private ThreadPool threadPool; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + } + + + @Override + @After + public void tearDown() throws Exception { + terminate(threadPool); + super.tearDown(); + } + private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings settings) { ClusterState.Builder state = ClusterState.builder(new ClusterName("PersistentActionExecutorTests")); state.metaData(MetaData.builder().generateClusterUuidIfNeeded()); @@ -84,14 +104,14 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // need to account for 5 original tasks on each node and their relocations for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) { TaskId parentId = new TaskId("cluster", i); - when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn( - new TestPersistentTasksPlugin.TestTask(i, "persistent", "test", "", parentId)); + when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any(), any())).thenReturn( + new TestPersistentTasksPlugin.TestTask(i, "persistent", "test", "", parentId, Collections.emptyMap())); } PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action)); MockExecutor executor = new MockExecutor(); PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, - registry, new TaskManager(Settings.EMPTY), executor); + registry, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), executor); ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY); @@ -175,13 +195,14 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME); TaskId parentId = new TaskId("cluster", 1); - AllocatedPersistentTask nodeTask = new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId); - when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(nodeTask); + AllocatedPersistentTask nodeTask = + new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId, Collections.emptyMap()); + when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any(), any())).thenReturn(nodeTask); PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action)); MockExecutor executor = new MockExecutor(); PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, - registry, new TaskManager(Settings.EMPTY), executor); + registry, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), executor); ClusterState state = createInitialClusterState(1, Settings.EMPTY); @@ -223,13 +244,14 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); when(action.getTaskName()).thenReturn("test"); - when(action.createTask(anyLong(), anyString(), anyString(), any(), any())) - .thenReturn(new TestPersistentTasksPlugin.TestTask(1, "persistent", "test", "", new TaskId("cluster", 1))); + when(action.createTask(anyLong(), anyString(), anyString(), any(), any(), any())) + .thenReturn(new TestPersistentTasksPlugin.TestTask(1, "persistent", "test", "", new TaskId("cluster", 1), + Collections.emptyMap())); PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action)); int nonLocalNodesCount = randomInt(10); MockExecutor executor = new MockExecutor(); - TaskManager taskManager = new TaskManager(Settings.EMPTY); + TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, registry, taskManager, executor); diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 5204a64eacb..ba8e2337fd7 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -71,6 +71,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -384,8 +385,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { @Override protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, - PersistentTask task) { - return new TestTask(id, type, action, getDescription(task), parentTaskId); + PersistentTask task, Map headers) { + return new TestTask(id, type, action, getDescription(task), parentTaskId, headers); } } @@ -413,8 +414,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { public static class TestTask extends AllocatedPersistentTask { private volatile String operation; - public TestTask(long id, String type, String action, String description, TaskId parentTask) { - super(id, type, action, description, parentTask); + public TestTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + super(id, type, action, description, parentTask, headers); } public String getOperation() {