From 7f79c99e9a8f7cb79e9a0ad99e351e631784e555 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 7 Dec 2016 12:09:09 -0500 Subject: [PATCH] Add descriptions to bulk tasks Related to #21768 --- .../org/elasticsearch/action/bulk/BulkRequest.java | 14 +++++++++++++- .../action/bulk/BulkShardRequest.java | 5 +++++ .../action/admin/cluster/node/tasks/TasksIT.java | 7 ++++++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 5351579278d..843e718a94e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -48,9 +48,10 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; +import java.util.Set; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -73,6 +74,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques * the one with the least casts. */ final List requests = new ArrayList<>(); + private final Set indices = new HashSet<>(); List payloads = null; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; @@ -114,6 +116,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques } else { throw new IllegalArgumentException("No support for request [" + request + "]"); } + indices.add(request.index()); return this; } @@ -145,6 +148,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques addPayload(payload); // lack of source is validated in validate() method sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD; + indices.add(request.index()); return this; } @@ -172,6 +176,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques if (request.script() != null) { sizeInBytes += request.script().getIdOrCode().length() * 2; } + indices.add(request.index()); return this; } @@ -187,6 +192,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques requests.add(request); addPayload(payload); sizeInBytes += REQUEST_OVERHEAD; + indices.add(request.index()); return this; } @@ -548,4 +554,10 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques refreshPolicy.writeTo(out); timeout.writeTo(out); } + + @Override + public String getDescription() { + return "requests[" + requests.size() + "], indices[" + Strings.collectionToDelimitedString(indices, ", ") + "]"; + } + } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 25366d034ca..d53e9f8997e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -100,6 +100,11 @@ public class BulkShardRequest extends ReplicatedWriteRequest { return b.toString(); } + @Override + public String getDescription() { + return "requests[" + items.length + "], index[" + index + "]"; + } + @Override public void onRetry() { for (BulkItemRequest item : items) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 994cb3b4308..7f490ebab90 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -303,7 +303,9 @@ public class TasksIT extends ESIntegTestCase { client().prepareBulk().add(client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}")).get(); // the bulk operation should produce one main task - assertEquals(1, numberOfEvents(BulkAction.NAME, Tuple::v1)); + List topTask = findEvents(BulkAction.NAME, Tuple::v1); + assertEquals(1, topTask.size()); + assertEquals("requests[1], indices[test]", topTask.get(0).getDescription()); // we should also get 1 or 2 [s] operation with main operation as a parent // in case the primary is located on the coordinating node we will have 1 operation, otherwise - 2 @@ -317,17 +319,20 @@ public class TasksIT extends ESIntegTestCase { shardTask = shardTasks.get(0); // and it should have the main task as a parent assertParentTask(shardTask, findEvents(BulkAction.NAME, Tuple::v1).get(0)); + assertEquals("requests[1], index[test]", shardTask.getDescription()); } else { if (shardTasks.get(0).getParentTaskId().equals(shardTasks.get(1).getTaskId())) { // task 1 is the parent of task 0, that means that task 0 will control [s][p] and [s][r] tasks shardTask = shardTasks.get(0); // in turn the parent of the task 1 should be the main task assertParentTask(shardTasks.get(1), findEvents(BulkAction.NAME, Tuple::v1).get(0)); + assertEquals("requests[1], index[test]", shardTask.getDescription()); } else { // otherwise task 1 will control [s][p] and [s][r] tasks shardTask = shardTasks.get(1); // in turn the parent of the task 0 should be the main task assertParentTask(shardTasks.get(0), findEvents(BulkAction.NAME, Tuple::v1).get(0)); + assertEquals("requests[1], index[test]", shardTask.getDescription()); } }