Fix .tasks index strict mapping: parent_id should be parent_task_id (#48393)

* Fix .tasks index strict mapping: parent_id should be parent_task_id

The .tasks index has mappings that's strictly defined. `parent_task_id`
was defined as `parent_id` though which would cause an exception in case
a task is persisted that has a parent task id set.

While at it, a couple of compiler warnings were addressed and a test
request builder was removed in favour of using its corresponding request.

* increment version
This commit is contained in:
Luca Cavanna 2019-10-25 15:14:30 +02:00
parent 9c48ed12bc
commit d6d2edf324
4 changed files with 62 additions and 77 deletions

View File

@ -55,8 +55,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
* Service that can store task results.
@ -73,7 +73,7 @@ public class TaskResultsService {
public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";
public static final int TASK_RESULT_MAPPING_VERSION = 2;
public static final int TASK_RESULT_MAPPING_VERSION = 3;
/**
* The backoff policy to use when saving a task result fails. The total wait

View File

@ -19,7 +19,7 @@
"id": {
"type": "long"
},
"parent_id": {
"parent_task_id": {
"type": "keyword"
},
"node": {

View File

@ -143,14 +143,14 @@ public class TasksIT extends ESIntegTestCase {
}
public void testMasterNodeOperationTasks() {
registerTaskManageListeners(ClusterHealthAction.NAME);
registerTaskManagerListeners(ClusterHealthAction.NAME);
// First run the health on the master node - should produce only one task on the master node
internalCluster().masterClient().admin().cluster().prepareHealth().get();
assertEquals(1, numberOfEvents(ClusterHealthAction.NAME, Tuple::v1)); // counting only registration events
assertEquals(1, numberOfEvents(ClusterHealthAction.NAME, event -> event.v1() == false)); // counting only unregistration events
resetTaskManageListeners(ClusterHealthAction.NAME);
resetTaskManagerListeners(ClusterHealthAction.NAME);
// Now run the health on a non-master node - should produce one task on master and one task on another node
internalCluster().nonMasterClient().admin().cluster().prepareHealth().get();
@ -167,8 +167,8 @@ public class TasksIT extends ESIntegTestCase {
}
public void testTransportReplicationAllShardsTasks() {
registerTaskManageListeners(ValidateQueryAction.NAME); // main task
registerTaskManageListeners(ValidateQueryAction.NAME + "[s]"); // shard
registerTaskManagerListeners(ValidateQueryAction.NAME); // main task
registerTaskManagerListeners(ValidateQueryAction.NAME + "[s]"); // shard
// level
// tasks
createIndex("test");
@ -186,8 +186,8 @@ public class TasksIT extends ESIntegTestCase {
}
public void testTransportBroadcastByNodeTasks() {
registerTaskManageListeners(UpgradeAction.NAME); // main task
registerTaskManageListeners(UpgradeAction.NAME + "[n]"); // node level tasks
registerTaskManagerListeners(UpgradeAction.NAME); // main task
registerTaskManagerListeners(UpgradeAction.NAME + "[n]"); // node level tasks
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated
client().admin().indices().prepareUpgrade("test").get();
@ -202,8 +202,8 @@ public class TasksIT extends ESIntegTestCase {
}
public void testTransportReplicationSingleShardTasks() {
registerTaskManageListeners(ValidateQueryAction.NAME); // main task
registerTaskManageListeners(ValidateQueryAction.NAME + "[s]"); // shard level tasks
registerTaskManagerListeners(ValidateQueryAction.NAME); // main task
registerTaskManagerListeners(ValidateQueryAction.NAME + "[s]"); // shard level tasks
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated
client().admin().indices().prepareValidateQuery("test").get();
@ -218,9 +218,9 @@ public class TasksIT extends ESIntegTestCase {
public void testTransportBroadcastReplicationTasks() {
registerTaskManageListeners(RefreshAction.NAME); // main task
registerTaskManageListeners(RefreshAction.NAME + "[s]"); // shard level tasks
registerTaskManageListeners(RefreshAction.NAME + "[s][*]"); // primary and replica shard tasks
registerTaskManagerListeners(RefreshAction.NAME); // main task
registerTaskManagerListeners(RefreshAction.NAME + "[s]"); // shard level tasks
registerTaskManagerListeners(RefreshAction.NAME + "[s][*]"); // primary and replica shard tasks
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated
client().admin().indices().prepareRefresh("test").get();
@ -292,10 +292,10 @@ public class TasksIT extends ESIntegTestCase {
}
public void testTransportBulkTasks() {
registerTaskManageListeners(BulkAction.NAME); // main task
registerTaskManageListeners(BulkAction.NAME + "[s]"); // shard task
registerTaskManageListeners(BulkAction.NAME + "[s][p]"); // shard task on primary
registerTaskManageListeners(BulkAction.NAME + "[s][r]"); // shard task on replica
registerTaskManagerListeners(BulkAction.NAME); // main task
registerTaskManagerListeners(BulkAction.NAME + "[s]"); // shard task
registerTaskManagerListeners(BulkAction.NAME + "[s][p]"); // shard task on primary
registerTaskManagerListeners(BulkAction.NAME + "[s][r]"); // shard task on replica
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks
// ensures the mapping is available on all nodes so we won't retry the request (in case replicas don't have the right mapping).
@ -345,10 +345,9 @@ public class TasksIT extends ESIntegTestCase {
assertParentTask(findEvents(BulkAction.NAME + "[s][r]", Tuple::v1), shardTask);
}
public void testSearchTaskDescriptions() {
registerTaskManageListeners(SearchAction.NAME); // main task
registerTaskManageListeners(SearchAction.NAME + "[*]"); // shard task
registerTaskManagerListeners(SearchAction.NAME); // main task
registerTaskManagerListeners(SearchAction.NAME + "[*]"); // shard task
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks
client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}", XContentType.JSON)
@ -494,8 +493,9 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksCancellation() throws Exception {
// Start blocking test task
// Get real client (the plugin is not registered on transport nodes)
ActionFuture<TestTaskPlugin.NodesResponse> future = new TestTaskPlugin.NodesRequestBuilder(client(),
TestTaskPlugin.TestTaskAction.INSTANCE).execute();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
logger.info("--> started test tasks");
// Wait for the task to start on all nodes
@ -516,8 +516,8 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksUnblocking() throws Exception {
// Start blocking test task
ActionFuture<TestTaskPlugin.NodesResponse> future =
new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE).execute();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().size(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
@ -580,8 +580,9 @@ public class TasksIT extends ESIntegTestCase {
private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ActionFuture<T>> wait, Consumer<T> validator)
throws Exception {
// Start blocking test task
ActionFuture<TestTaskPlugin.NodesResponse> future = new TestTaskPlugin.NodesRequestBuilder(client(),
TestTaskPlugin.TestTaskAction.INSTANCE).setShouldStoreResult(storeResult).execute();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
request.setShouldStoreResult(storeResult);
ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
ActionFuture<T> waitResponseFuture;
TaskId taskId;
@ -654,8 +655,8 @@ public class TasksIT extends ESIntegTestCase {
*/
private void waitForTimeoutTestCase(Function<TaskId, ? extends Iterable<? extends Throwable>> wait) throws Exception {
// Start blocking test task
ActionFuture<TestTaskPlugin.NodesResponse> future = new TestTaskPlugin.NodesRequestBuilder(client(),
TestTaskPlugin.TestTaskAction.INSTANCE).execute();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
try {
TaskId taskId = waitForTestTaskStartOnAllNodes();
@ -722,12 +723,17 @@ public class TasksIT extends ESIntegTestCase {
assertThat(response.getTasks().size(), greaterThanOrEqualTo(1));
}
public void testTaskStoringSuccesfulResult() throws Exception {
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
public void testTaskStoringSuccessfulResult() throws Exception {
registerTaskManagerListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
// Start non-blocking test task
new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE)
.setShouldStoreResult(true).setShouldBlock(false).get();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
request.setShouldStoreResult(true);
request.setShouldBlock(false);
TaskId parentTaskId = new TaskId("parent_node", randomLong());
request.setParentTask(parentTaskId);
client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request).get();
List<TaskInfo> events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
@ -741,6 +747,7 @@ public class TasksIT extends ESIntegTestCase {
assertNull(taskResult.getError());
assertEquals(taskInfo.getTaskId(), taskResult.getTask().getTaskId());
assertEquals(taskInfo.getParentTaskId(), taskResult.getTask().getParentTaskId());
assertEquals(taskInfo.getType(), taskResult.getTask().getType());
assertEquals(taskInfo.getAction(), taskResult.getTask().getAction());
assertEquals(taskInfo.getDescription(), taskResult.getTask().getDescription());
@ -770,14 +777,16 @@ public class TasksIT extends ESIntegTestCase {
}
public void testTaskStoringFailureResult() throws Exception {
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
registerTaskManagerListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
request.setShouldFail(true);
request.setShouldStoreResult(true);
request.setShouldBlock(false);
// Start non-blocking test task that should fail
assertThrows(
new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE)
.setShouldFail(true)
.setShouldStoreResult(true)
.setShouldBlock(false),
client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request),
IllegalStateException.class
);
@ -858,7 +867,7 @@ public class TasksIT extends ESIntegTestCase {
/**
* Registers recording task event listeners with the given action mask on all nodes
*/
private void registerTaskManageListeners(String actionMasks) {
private void registerTaskManagerListeners(String actionMasks) {
for (String nodeName : internalCluster().getNodeNames()) {
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(","));
@ -871,7 +880,7 @@ public class TasksIT extends ESIntegTestCase {
/**
* Resets all recording task event listeners with the given action mask on all nodes
*/
private void resetTaskManageListeners(String actionMasks) {
private void resetTaskManagerListeners(String actionMasks) {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
entry.getValue().reset();
@ -925,11 +934,12 @@ public class TasksIT extends ESIntegTestCase {
assertEquals(parentTask.getId(), task.getParentTaskId().getId());
}
private ResourceNotFoundException expectNotFound(ThrowingRunnable r) {
private void expectNotFound(ThrowingRunnable r) {
Exception e = expectThrows(Exception.class, r);
ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class);
if (notFound == null) throw new RuntimeException("Expected ResourceNotFoundException", e);
return notFound;
if (notFound == null) {
throw new AssertionError("Expected " + ResourceNotFoundException.class.getSimpleName(), e);
}
}
/**

View File

@ -31,7 +31,6 @@ import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
@ -138,11 +137,11 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
public static class NodesResponse extends BaseNodesResponse<NodeResponse> implements ToXContentFragment {
public NodesResponse(StreamInput in) throws IOException {
NodesResponse(StreamInput in) throws IOException {
super(in);
}
public NodesResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
NodesResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@ -168,8 +167,8 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
}
public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
protected boolean shouldBlock;
protected final String requestName;
protected final boolean shouldBlock;
public NodeRequest(StreamInput in) throws IOException {
super(in);
@ -214,7 +213,7 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
shouldFail = in.readBoolean();
}
public NodesRequest(String requestName, String... nodesIds) {
NodesRequest(String requestName, String... nodesIds) {
super(nodesIds);
this.requestName = requestName;
}
@ -336,37 +335,13 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
}
}
public static class NodesRequestBuilder extends NodesOperationRequestBuilder<NodesRequest, NodesResponse, NodesRequestBuilder> {
protected NodesRequestBuilder(ElasticsearchClient client, ActionType<NodesResponse> action) {
super(client, action, new NodesRequest("test"));
}
public NodesRequestBuilder setShouldStoreResult(boolean shouldStoreResult) {
request().setShouldStoreResult(shouldStoreResult);
return this;
}
public NodesRequestBuilder setShouldBlock(boolean shouldBlock) {
request().setShouldBlock(shouldBlock);
return this;
}
public NodesRequestBuilder setShouldFail(boolean shouldFail) {
request().setShouldFail(shouldFail);
return this;
}
}
public static class UnblockTestTaskResponse implements Writeable {
public UnblockTestTaskResponse() {
UnblockTestTaskResponse() {
}
public UnblockTestTaskResponse(StreamInput in) {
UnblockTestTaskResponse(StreamInput in) {
}
@Override
@ -393,13 +368,13 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
private List<UnblockTestTaskResponse> tasks;
public UnblockTestTasksResponse(List<UnblockTestTaskResponse> tasks, List<TaskOperationFailure> taskFailures, List<? extends
UnblockTestTasksResponse(List<UnblockTestTaskResponse> tasks, List<TaskOperationFailure> taskFailures, List<? extends
FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
public UnblockTestTasksResponse(StreamInput in) throws IOException {
UnblockTestTasksResponse(StreamInput in) throws IOException {
super(in);
int taskCount = in.readVInt();
List<UnblockTestTaskResponse> builder = new ArrayList<>();