Add search task descriptions

Since we added ability to cancel searches it would be nice to see which searches we are actually cancelling.
This commit is contained in:
Igor Motov 2016-11-21 18:49:22 -05:00
parent 6940b2b8c7
commit c7b69a0133
8 changed files with 126 additions and 3 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
@ -36,6 +37,7 @@ import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
/**
@ -45,12 +47,15 @@ import java.util.Objects;
* Note, the search {@link #source(org.elasticsearch.search.builder.SearchSourceBuilder)}
* is required. The search source is the different search options, including aggregations and such.
* </p>
*
* @see org.elasticsearch.client.Requests#searchRequest(String...)
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
*/
public final class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));
private SearchType searchType = SearchType.DEFAULT;
private String[] indices = Strings.EMPTY_ARRAY;
@ -279,7 +284,26 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId) {
@Override
public String getDescription() {
StringBuilder sb = new StringBuilder();
sb.append("indices[");
Strings.arrayToDelimitedString(indices, ",", sb);
sb.append("], ");
sb.append("types[");
Strings.arrayToDelimitedString(types, ",", sb);
sb.append("], ");
sb.append("search_type[").append(searchType).append("], ");
if (source != null) {
sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]");
} else {
sb.append("source[]");
}
return sb.toString();
}
};
}
@Override

View File

@ -139,4 +139,10 @@ public class SearchScrollRequest extends ActionRequest {
", scroll=" + scroll +
'}';
}
@Override
public String getDescription() {
return "scrollId[" + scrollId + "], scroll[" + scroll + "]";
}
}

View File

@ -69,10 +69,16 @@ public abstract class ToXContentToBytes implements ToXContent {
@Override
public final String toString() {
return toString(EMPTY_PARAMS);
}
public final String toString(Params params) {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.prettyPrint();
toXContent(builder, EMPTY_PARAMS);
if (params.paramAsBoolean("pretty", true)) {
builder.prettyPrint();
}
toXContent(builder, params);
return builder.string();
} catch (Exception e) {
// So we have a stack trace logged somewhere

View File

@ -114,4 +114,10 @@ public class ShardFetchRequest extends TransportRequest {
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
@Override
public String getDescription() {
return "id[" + id + "], size[" + size + "], lastEmittedDoc[" + lastEmittedDoc + "]";
}
}

View File

@ -75,4 +75,10 @@ public class InternalScrollSearchRequest extends TransportRequest {
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
@Override
public String getDescription() {
return "id[" + id + "], scroll[" + scroll + "]";
}
}

View File

@ -166,4 +166,10 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
@Override
public String getDescription() {
// Shard id is enough here, the request itself can be found by looking at the parent task description
return "shardId[" + shardSearchLocalRequest.shardId() + "]";
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.dfs.AggregatedDfs;
@ -90,4 +91,16 @@ public class QuerySearchRequest extends TransportRequest implements IndicesReque
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}
public String getDescription() {
StringBuilder sb = new StringBuilder();
sb.append("id[");
sb.append(id);
sb.append("], ");
sb.append("indices[");
Strings.arrayToDelimitedString(originalIndices.indices(), ",", sb);
sb.append("]");
return sb.toString();
}
}

View File

@ -38,13 +38,17 @@ import org.elasticsearch.action.fieldstats.FieldStatsAction;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationActionTests;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
@ -82,14 +86,22 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
/**
* Integration tests for task management API
@ -329,6 +341,50 @@ public class TasksIT extends ESIntegTestCase {
assertParentTask(findEvents(BulkAction.NAME + "[s][r]", Tuple::v1), shardTask);
}
public void testSearchTaskDescriptions() {
registerTaskManageListeners(SearchAction.NAME); // main task
registerTaskManageListeners(SearchAction.NAME + "[*]"); // shard task
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks
client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertSearchResponse(client().prepareSearch("test").setTypes("doc").setQuery(QueryBuilders.matchAllQuery()).get());
// the search operation should produce one main task
List<TaskInfo> mainTask = findEvents(SearchAction.NAME, Tuple::v1);
assertEquals(1, mainTask.size());
assertThat(mainTask.get(0).getDescription(), startsWith("indices[test], types[doc], search_type["));
assertThat(mainTask.get(0).getDescription(), containsString("\"query\":{\"match_all\""));
// check that if we have any shard-level requests they all have non-zero length description
List<TaskInfo> shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1);
for (TaskInfo taskInfo : shardTasks) {
assertThat(taskInfo.getParentTaskId(), notNullValue());
assertEquals(mainTask.get(0).getTaskId(), taskInfo.getParentTaskId());
switch (taskInfo.getAction()) {
case SearchTransportService.QUERY_ACTION_NAME:
case SearchTransportService.QUERY_FETCH_ACTION_NAME:
case SearchTransportService.DFS_ACTION_NAME:
assertTrue(taskInfo.getDescription(), Regex.simpleMatch("shardId[[test][*]]", taskInfo.getDescription()));
break;
case SearchTransportService.QUERY_ID_ACTION_NAME:
assertTrue(taskInfo.getDescription(), Regex.simpleMatch("id[*], indices[test]", taskInfo.getDescription()));
break;
case SearchTransportService.FETCH_ID_ACTION_NAME:
assertTrue(taskInfo.getDescription(), Regex.simpleMatch("id[*], size[1], lastEmittedDoc[null]",
taskInfo.getDescription()));
break;
default:
fail("Unexpected action [" + taskInfo.getAction() + "] with description [" + taskInfo.getDescription() + "]");
}
// assert that all task descriptions have non-zero length
assertThat(taskInfo.getDescription().length(), greaterThan(0));
}
}
/**
* Very basic "is it plugged in" style test that indexes a document and makes sure that you can fetch the status of the process. The
* goal here is to verify that the large moving parts that make fetching task status work fit together rather than to verify any