diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9d9b36ba072..013b170c00f 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -107,7 +108,12 @@ public class TransportBulkAction extends HandledTransportAction listener) { + protected final void doExecute(final BulkRequest bulkRequest, final ActionListener listener) { + throw new UnsupportedOperationException("task parameter is required for this operation"); + } + + @Override + protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); @@ -143,7 +149,7 @@ public class TransportBulkAction extends HandledTransportAction listener) { final long startTimeNanos = relativeTime(); - executeBulk(bulkRequest, startTimeNanos, listener, new AtomicArray<>(bulkRequest.requests.size())); + executeBulk(null, bulkRequest, startTimeNanos, listener, new AtomicArray<>(bulkRequest.requests.size())); } private long buildTookInMillis(long startTimeNanos) { return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos); } - void executeBulk(final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, final AtomicArray responses ) { + void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, final AtomicArray responses ) { final ClusterState clusterState = clusterService.state(); // TODO use timeout to wait here if its blocked... clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE); @@ -333,12 +339,16 @@ public class TransportBulkAction extends HandledTransportAction> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId, bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()])); bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel()); bulkShardRequest.timeout(bulkRequest.timeout()); + if (task != null) { + bulkShardRequest.setParentTask(nodeId, task.getId()); + } shardBulkAction.execute(bulkShardRequest, new ActionListener() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 3c2d90b77ab..5d6baf3f9cc 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; +import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.fieldstats.FieldStatsAction; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -61,6 +62,7 @@ import java.util.function.Function; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyCollectionOf; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -246,6 +248,55 @@ public class TasksIT extends ESIntegTestCase { } } + + public void testTransportBulkTasks() { + registerTaskManageListeners(BulkAction.NAME); // main task + registerTaskManageListeners(BulkAction.NAME + "[s]"); // shard task + registerTaskManageListeners(BulkAction.NAME + "[s][p]"); // shard task on primary + registerTaskManageListeners(BulkAction.NAME + "[s][r]"); // shard task on replica + createIndex("test"); + ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks + client().prepareBulk().add(client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}")).get(); + + // the bulk operation should produce one main task + assertEquals(1, numberOfEvents(BulkAction.NAME, Tuple::v1)); + + // we should also get 1 or 2 [s] operation with main operation as a parent + // in case the primary is located on the coordinating node we will have 1 operation, otherwise - 2 + List shardTasks = findEvents(BulkAction.NAME + "[s]", Tuple::v1); + assertThat(shardTasks.size(), allOf(lessThanOrEqualTo(2), greaterThanOrEqualTo(1))); + + // Select the effective shard task + TaskInfo shardTask; + if (shardTasks.size() == 1) { + // we have only one task - it's going to be the parent task for all [s][p] and [s][r] tasks + shardTask = shardTasks.get(0); + // and it should have the main task as a parent + assertParentTask(shardTask, findEvents(BulkAction.NAME, Tuple::v1).get(0)); + } else { + if (shardTasks.get(0).getParentTaskId().equals(shardTasks.get(1).getTaskId())) { + // task 1 is the parent of task 0, that means that task 0 will control [s][p] and [s][r] tasks + shardTask = shardTasks.get(0); + // in turn the parent of the task 1 should be the main task + assertParentTask(shardTasks.get(1), findEvents(BulkAction.NAME, Tuple::v1).get(0)); + } else { + // otherwise task 1 will control [s][p] and [s][r] tasks + shardTask = shardTasks.get(1); + // in turn the parent of the task 0 should be the main task + assertParentTask(shardTasks.get(0), findEvents(BulkAction.NAME, Tuple::v1).get(0)); + } + } + + // we should also get one [s][p] operation with shard operation as a parent + assertEquals(1, numberOfEvents(BulkAction.NAME + "[s][p]", Tuple::v1)); + assertParentTask(findEvents(BulkAction.NAME + "[s][p]", Tuple::v1), shardTask); + + // we should get as many [s][r] operations as we have replica shards + // they all should have the same shard task as a parent + assertEquals(getNumShards("test").numReplicas, numberOfEvents(BulkAction.NAME + "[s][r]", Tuple::v1)); + assertParentTask(findEvents(BulkAction.NAME + "[s][r]", Tuple::v1), shardTask); +} + /** * Very basic "is it plugged in" style test that indexes a document and * makes sure that you can fetch the status of the process. The goal here is @@ -487,10 +538,14 @@ public class TasksIT extends ESIntegTestCase { */ private void assertParentTask(List tasks, TaskInfo parentTask) { for (TaskInfo task : tasks) { - assertTrue(task.getParentTaskId().isSet()); - assertEquals(parentTask.getNode().getId(), task.getParentTaskId().getNodeId()); - assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId())); - assertEquals(parentTask.getId(), task.getParentTaskId().getId()); + assertParentTask(task, parentTask); } } + + private void assertParentTask(TaskInfo task, TaskInfo parentTask) { + assertTrue(task.getParentTaskId().isSet()); + assertEquals(parentTask.getNode().getId(), task.getParentTaskId().getNodeId()); + assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId())); + assertEquals(parentTask.getId(), task.getParentTaskId().getId()); + } } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 219fa7274e1..39202fcc43a 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -122,12 +122,13 @@ public class TransportBulkActionTookTests extends ESTestCase { @Override void executeBulk( + Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses) { expected.set(1000000); - super.executeBulk(bulkRequest, startTimeNanos, listener, responses); + super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses); } }; } else { @@ -151,13 +152,14 @@ public class TransportBulkActionTookTests extends ESTestCase { @Override void executeBulk( + Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses) { long elapsed = spinForAtLeastOneMillisecond(); expected.set(elapsed); - super.executeBulk(bulkRequest, startTimeNanos, listener, responses); + super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses); } }; } @@ -183,7 +185,7 @@ public class TransportBulkActionTookTests extends ESTestCase { bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null); AtomicLong expected = new AtomicLong(); TransportBulkAction action = createAction(controlled, expected); - action.doExecute(bulkRequest, new ActionListener() { + action.doExecute(null, bulkRequest, new ActionListener() { @Override public void onResponse(BulkResponse bulkItemResponses) { if (controlled) {