Add descriptions to bulk tasks

Related to #21768
This commit is contained in:
Igor Motov 2016-12-07 12:09:09 -05:00
parent ef64d230e7
commit 7f79c99e9a
3 changed files with 24 additions and 2 deletions

View File

@ -48,9 +48,10 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors; import java.util.Set;
import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -73,6 +74,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
* the one with the least casts. * the one with the least casts.
*/ */
final List<DocWriteRequest> requests = new ArrayList<>(); final List<DocWriteRequest> requests = new ArrayList<>();
private final Set<String> indices = new HashSet<>();
List<Object> payloads = null; List<Object> payloads = null;
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
@ -114,6 +116,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
} else { } else {
throw new IllegalArgumentException("No support for request [" + request + "]"); throw new IllegalArgumentException("No support for request [" + request + "]");
} }
indices.add(request.index());
return this; return this;
} }
@ -145,6 +148,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
addPayload(payload); addPayload(payload);
// lack of source is validated in validate() method // lack of source is validated in validate() method
sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD; sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD;
indices.add(request.index());
return this; return this;
} }
@ -172,6 +176,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
if (request.script() != null) { if (request.script() != null) {
sizeInBytes += request.script().getIdOrCode().length() * 2; sizeInBytes += request.script().getIdOrCode().length() * 2;
} }
indices.add(request.index());
return this; return this;
} }
@ -187,6 +192,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
requests.add(request); requests.add(request);
addPayload(payload); addPayload(payload);
sizeInBytes += REQUEST_OVERHEAD; sizeInBytes += REQUEST_OVERHEAD;
indices.add(request.index());
return this; return this;
} }
@ -548,4 +554,10 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
refreshPolicy.writeTo(out); refreshPolicy.writeTo(out);
timeout.writeTo(out); timeout.writeTo(out);
} }
@Override
public String getDescription() {
return "requests[" + requests.size() + "], indices[" + Strings.collectionToDelimitedString(indices, ", ") + "]";
}
} }

View File

@ -100,6 +100,11 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
return b.toString(); return b.toString();
} }
@Override
public String getDescription() {
return "requests[" + items.length + "], index[" + index + "]";
}
@Override @Override
public void onRetry() { public void onRetry() {
for (BulkItemRequest item : items) { for (BulkItemRequest item : items) {

View File

@ -303,7 +303,9 @@ public class TasksIT extends ESIntegTestCase {
client().prepareBulk().add(client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}")).get(); client().prepareBulk().add(client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}")).get();
// the bulk operation should produce one main task // the bulk operation should produce one main task
assertEquals(1, numberOfEvents(BulkAction.NAME, Tuple::v1)); List<TaskInfo> topTask = findEvents(BulkAction.NAME, Tuple::v1);
assertEquals(1, topTask.size());
assertEquals("requests[1], indices[test]", topTask.get(0).getDescription());
// we should also get 1 or 2 [s] operation with main operation as a parent // we should also get 1 or 2 [s] operation with main operation as a parent
// in case the primary is located on the coordinating node we will have 1 operation, otherwise - 2 // in case the primary is located on the coordinating node we will have 1 operation, otherwise - 2
@ -317,17 +319,20 @@ public class TasksIT extends ESIntegTestCase {
shardTask = shardTasks.get(0); shardTask = shardTasks.get(0);
// and it should have the main task as a parent // and it should have the main task as a parent
assertParentTask(shardTask, findEvents(BulkAction.NAME, Tuple::v1).get(0)); assertParentTask(shardTask, findEvents(BulkAction.NAME, Tuple::v1).get(0));
assertEquals("requests[1], index[test]", shardTask.getDescription());
} else { } else {
if (shardTasks.get(0).getParentTaskId().equals(shardTasks.get(1).getTaskId())) { if (shardTasks.get(0).getParentTaskId().equals(shardTasks.get(1).getTaskId())) {
// task 1 is the parent of task 0, that means that task 0 will control [s][p] and [s][r] tasks // task 1 is the parent of task 0, that means that task 0 will control [s][p] and [s][r] tasks
shardTask = shardTasks.get(0); shardTask = shardTasks.get(0);
// in turn the parent of the task 1 should be the main task // in turn the parent of the task 1 should be the main task
assertParentTask(shardTasks.get(1), findEvents(BulkAction.NAME, Tuple::v1).get(0)); assertParentTask(shardTasks.get(1), findEvents(BulkAction.NAME, Tuple::v1).get(0));
assertEquals("requests[1], index[test]", shardTask.getDescription());
} else { } else {
// otherwise task 1 will control [s][p] and [s][r] tasks // otherwise task 1 will control [s][p] and [s][r] tasks
shardTask = shardTasks.get(1); shardTask = shardTasks.get(1);
// in turn the parent of the task 0 should be the main task // in turn the parent of the task 0 should be the main task
assertParentTask(shardTasks.get(0), findEvents(BulkAction.NAME, Tuple::v1).get(0)); assertParentTask(shardTasks.get(0), findEvents(BulkAction.NAME, Tuple::v1).get(0));
assertEquals("requests[1], index[test]", shardTask.getDescription());
} }
} }