Add adding ability to associate an ID with tasks.
Persistent tasks portion of elastic/elasticsearch#23250
This commit is contained in:
parent
8521b2d11e
commit
41071e4711
|
@ -30,6 +30,7 @@ import org.elasticsearch.tasks.TaskCancelledException;
|
||||||
import org.elasticsearch.tasks.TaskId;
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.tasks.TaskManager;
|
import org.elasticsearch.tasks.TaskManager;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,8 +49,9 @@ public class AllocatedPersistentTask extends CancellableTask {
|
||||||
private volatile TaskManager taskManager;
|
private volatile TaskManager taskManager;
|
||||||
|
|
||||||
|
|
||||||
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
|
||||||
super(id, type, action, description, parentTask);
|
Map<String, String> headers) {
|
||||||
|
super(id, type, action, description, parentTask, headers);
|
||||||
this.state = new AtomicReference<>(State.STARTED);
|
this.state = new AtomicReference<>(State.STARTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,8 +103,8 @@ public abstract class PersistentTasksExecutor<Params extends PersistentTaskParam
|
||||||
* Creates a AllocatedPersistentTask for communicating with task manager
|
* Creates a AllocatedPersistentTask for communicating with task manager
|
||||||
*/
|
*/
|
||||||
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
|
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
|
||||||
PersistentTask<Params> taskInProgress) {
|
PersistentTask<Params> taskInProgress, Map<String, String> headers) {
|
||||||
return new AllocatedPersistentTask(id, type, action, getDescription(taskInProgress), parentTaskId);
|
return new AllocatedPersistentTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -160,8 +160,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
|
||||||
return executor.createTask(id, type, action, parentTaskId, taskInProgress);
|
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
|
AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
|
||||||
|
|
|
@ -35,11 +35,14 @@ import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.tasks.TaskId;
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.tasks.TaskManager;
|
import org.elasticsearch.tasks.TaskManager;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -61,6 +64,23 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class PersistentTasksNodeServiceTests extends ESTestCase {
|
public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
|
private ThreadPool threadPool;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
threadPool = new TestThreadPool(getClass().getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
terminate(threadPool);
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings settings) {
|
private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings settings) {
|
||||||
ClusterState.Builder state = ClusterState.builder(new ClusterName("PersistentActionExecutorTests"));
|
ClusterState.Builder state = ClusterState.builder(new ClusterName("PersistentActionExecutorTests"));
|
||||||
state.metaData(MetaData.builder().generateClusterUuidIfNeeded());
|
state.metaData(MetaData.builder().generateClusterUuidIfNeeded());
|
||||||
|
@ -84,14 +104,14 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
// need to account for 5 original tasks on each node and their relocations
|
// need to account for 5 original tasks on each node and their relocations
|
||||||
for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) {
|
for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) {
|
||||||
TaskId parentId = new TaskId("cluster", i);
|
TaskId parentId = new TaskId("cluster", i);
|
||||||
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(
|
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any(), any())).thenReturn(
|
||||||
new TestPersistentTasksPlugin.TestTask(i, "persistent", "test", "", parentId));
|
new TestPersistentTasksPlugin.TestTask(i, "persistent", "test", "", parentId, Collections.emptyMap()));
|
||||||
}
|
}
|
||||||
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
|
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
|
||||||
|
|
||||||
MockExecutor executor = new MockExecutor();
|
MockExecutor executor = new MockExecutor();
|
||||||
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
|
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
|
||||||
registry, new TaskManager(Settings.EMPTY), executor);
|
registry, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), executor);
|
||||||
|
|
||||||
ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY);
|
ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY);
|
||||||
|
|
||||||
|
@ -175,13 +195,14 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
|
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
|
||||||
when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME);
|
when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME);
|
||||||
TaskId parentId = new TaskId("cluster", 1);
|
TaskId parentId = new TaskId("cluster", 1);
|
||||||
AllocatedPersistentTask nodeTask = new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId);
|
AllocatedPersistentTask nodeTask =
|
||||||
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(nodeTask);
|
new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId, Collections.emptyMap());
|
||||||
|
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any(), any())).thenReturn(nodeTask);
|
||||||
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
|
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
|
||||||
|
|
||||||
MockExecutor executor = new MockExecutor();
|
MockExecutor executor = new MockExecutor();
|
||||||
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
|
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
|
||||||
registry, new TaskManager(Settings.EMPTY), executor);
|
registry, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), executor);
|
||||||
|
|
||||||
ClusterState state = createInitialClusterState(1, Settings.EMPTY);
|
ClusterState state = createInitialClusterState(1, Settings.EMPTY);
|
||||||
|
|
||||||
|
@ -223,13 +244,14 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
|
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
|
||||||
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
|
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
|
||||||
when(action.getTaskName()).thenReturn("test");
|
when(action.getTaskName()).thenReturn("test");
|
||||||
when(action.createTask(anyLong(), anyString(), anyString(), any(), any()))
|
when(action.createTask(anyLong(), anyString(), anyString(), any(), any(), any()))
|
||||||
.thenReturn(new TestPersistentTasksPlugin.TestTask(1, "persistent", "test", "", new TaskId("cluster", 1)));
|
.thenReturn(new TestPersistentTasksPlugin.TestTask(1, "persistent", "test", "", new TaskId("cluster", 1),
|
||||||
|
Collections.emptyMap()));
|
||||||
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
|
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
|
||||||
|
|
||||||
int nonLocalNodesCount = randomInt(10);
|
int nonLocalNodesCount = randomInt(10);
|
||||||
MockExecutor executor = new MockExecutor();
|
MockExecutor executor = new MockExecutor();
|
||||||
TaskManager taskManager = new TaskManager(Settings.EMPTY);
|
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
|
||||||
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
|
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
|
||||||
registry, taskManager, executor);
|
registry, taskManager, executor);
|
||||||
|
|
||||||
|
|
|
@ -71,6 +71,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -384,8 +385,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
|
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
|
||||||
PersistentTask<TestParams> task) {
|
PersistentTask<TestParams> task, Map<String, String> headers) {
|
||||||
return new TestTask(id, type, action, getDescription(task), parentTaskId);
|
return new TestTask(id, type, action, getDescription(task), parentTaskId, headers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -413,8 +414,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
public static class TestTask extends AllocatedPersistentTask {
|
public static class TestTask extends AllocatedPersistentTask {
|
||||||
private volatile String operation;
|
private volatile String operation;
|
||||||
|
|
||||||
public TestTask(long id, String type, String action, String description, TaskId parentTask) {
|
public TestTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
|
||||||
super(id, type, action, description, parentTask);
|
super(id, type, action, description, parentTask, headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getOperation() {
|
public String getOperation() {
|
||||||
|
|
Loading…
Reference in New Issue