[ML][Data Frame] make response.count be total count of hits (#43241) (#43389)

* [ML][Data Frame] make response.count be total count of hits

* addressing line length check

* changing response count for filters

* adjusting serialization, variable name, and total count logic

* making count mandatory for creation
This commit is contained in:
Benjamin Trent 2019-06-19 16:19:06 -05:00 committed by GitHub
parent b333ced5a7
commit 77ce3260dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 108 additions and 59 deletions

View File

@ -1286,7 +1286,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
GetFiltersResponse getFiltersResponse = execute(getFiltersRequest,
machineLearningClient::getFilter,
machineLearningClient::getFilterAsync);
assertThat(getFiltersResponse.count(), equalTo(2L));
assertThat(getFiltersResponse.count(), equalTo(3L));
assertThat(getFiltersResponse.filters().size(), equalTo(2));
assertThat(getFiltersResponse.filters().stream().map(MlFilter::getId).collect(Collectors.toList()),
containsInAnyOrder("get-filter-test-2", "get-filter-test-3"));

View File

@ -88,7 +88,7 @@ public abstract class AbstractTransportGetResourcesAction<Resource extends ToXCo
indicesOptions.expandWildcardsOpen(),
indicesOptions.expandWildcardsClosed(),
indicesOptions))
.source(sourceBuilder);
.source(sourceBuilder.trackTotalHits(true));
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
executionOrigin(),
@ -98,6 +98,7 @@ public abstract class AbstractTransportGetResourcesAction<Resource extends ToXCo
public void onResponse(SearchResponse response) {
List<Resource> docs = new ArrayList<>();
Set<String> foundResourceIds = new HashSet<>();
long totalHitCount = response.getHits().getTotalHits().value;
for (SearchHit hit : response.getHits().getHits()) {
BytesReference docSource = hit.getSourceRef();
try (InputStream stream = docSource.streamInput();
@ -115,7 +116,7 @@ public abstract class AbstractTransportGetResourcesAction<Resource extends ToXCo
if (requiredMatches.hasUnmatchedIds()) {
listener.onFailure(notFoundException(requiredMatches.unmatchedIdsString()));
} else {
listener.onResponse(new QueryPage<>(docs, docs.size(), getResultsField()));
listener.onResponse(new QueryPage<>(docs, totalHitCount, getResultsField()));
}
}

View File

@ -91,8 +91,8 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
public static final String INVALID_TRANSFORMS_DEPRECATION_WARNING = "Found [{}] invalid transforms";
private static final ParseField INVALID_TRANSFORMS = new ParseField("invalid_transforms");
public Response(List<DataFrameTransformConfig> transformConfigs) {
super(new QueryPage<>(transformConfigs, transformConfigs.size(), DataFrameField.TRANSFORMS));
public Response(List<DataFrameTransformConfig> transformConfigs, long count) {
super(new QueryPage<>(transformConfigs, count, DataFrameField.TRANSFORMS));
}
public Response() {

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.TaskOperationFailure;
@ -21,8 +22,10 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
@ -138,32 +141,52 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
}
public static class Response extends BaseTasksResponse implements ToXContentObject {
private List<DataFrameTransformStateAndStats> transformsStateAndStats;
private final QueryPage<DataFrameTransformStateAndStats> transformsStateAndStats;
public Response(List<DataFrameTransformStateAndStats> transformsStateAndStats) {
super(Collections.emptyList(), Collections.emptyList());
this.transformsStateAndStats = transformsStateAndStats;
public Response(List<DataFrameTransformStateAndStats> transformStateAndStats, long count) {
this(new QueryPage<>(transformStateAndStats, count, DataFrameField.TRANSFORMS));
}
public Response(List<DataFrameTransformStateAndStats> transformsStateAndStats, List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures) {
public Response(List<DataFrameTransformStateAndStats> transformStateAndStats,
long count,
List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures) {
this(new QueryPage<>(transformStateAndStats, count, DataFrameField.TRANSFORMS), taskFailures, nodeFailures);
}
private Response(QueryPage<DataFrameTransformStateAndStats> transformsStateAndStats) {
this(transformsStateAndStats, Collections.emptyList(), Collections.emptyList());
}
private Response(QueryPage<DataFrameTransformStateAndStats> transformsStateAndStats,
List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures) {
super(taskFailures, nodeFailures);
this.transformsStateAndStats = transformsStateAndStats;
this.transformsStateAndStats = ExceptionsHelper.requireNonNull(transformsStateAndStats, "transformsStateAndStats");
}
public Response(StreamInput in) throws IOException {
super(in);
transformsStateAndStats = in.readList(DataFrameTransformStateAndStats::new);
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
transformsStateAndStats = new QueryPage<>(in, DataFrameTransformStateAndStats::new);
} else {
List<DataFrameTransformStateAndStats> stats = in.readList(DataFrameTransformStateAndStats::new);
transformsStateAndStats = new QueryPage<>(stats, stats.size(), DataFrameField.TRANSFORMS);
}
}
public List<DataFrameTransformStateAndStats> getTransformsStateAndStats() {
return transformsStateAndStats;
return transformsStateAndStats.results();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(transformsStateAndStats);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
transformsStateAndStats.writeTo(out);
} else {
out.writeList(transformsStateAndStats.results());
}
}
@Override
@ -175,8 +198,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field(DataFrameField.COUNT.getPreferredName(), transformsStateAndStats.size());
builder.field(DataFrameField.TRANSFORMS.getPreferredName(), transformsStateAndStats);
transformsStateAndStats.doXContentBody(builder, params);
builder.endObject();
return builder;
}

View File

@ -32,7 +32,7 @@ public class GetDataFrameTransformsActionResponseTests extends AbstractWireSeria
transforms.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
transforms.add(DataFrameTransformConfigTests.randomInvalidDataFrameTransformConfig());
Response r = new Response(transforms);
Response r = new Response(transforms, transforms.size());
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
r.toXContent(builder, XContent.EMPTY_PARAMS);
Map<String, Object> responseAsMap = createParser(builder).map();
@ -52,7 +52,7 @@ public class GetDataFrameTransformsActionResponseTests extends AbstractWireSeria
transforms.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
}
Response r = new Response(transforms);
Response r = new Response(transforms, transforms.size());
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
r.toXContent(builder, XContent.EMPTY_PARAMS);
Map<String, Object> responseAsMap = createParser(builder).map();
@ -76,7 +76,7 @@ public class GetDataFrameTransformsActionResponseTests extends AbstractWireSeria
configs.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
}
return new Response(configs);
return new Response(configs, randomNonNegativeLong());
}
@Override

View File

@ -32,7 +32,7 @@ public class GetDataFrameTransformsStatsActionResponseTests extends AbstractWire
taskFailures.add(new TaskOperationFailure("node1", randomLongBetween(1, 10), new Exception("error")));
nodeFailures.add(new FailedNodeException("node1", "message", new Exception("error")));
}
return new Response(stats, taskFailures, nodeFailures);
return new Response(stats, randomLongBetween(stats.size(), 10_000_000L), taskFailures, nodeFailures);
}
@Override

View File

@ -44,7 +44,7 @@ public class TransportGetDataFrameTransformsAction extends AbstractTransportGetR
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
searchResources(request, ActionListener.wrap(
r -> listener.onResponse(new Response(r.results())),
r -> listener.onResponse(new Response(r.results(), r.count())),
listener::onFailure
));
}

View File

@ -73,7 +73,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
.collect(Collectors.toList());
List<ElasticsearchException> allFailedNodeExceptions = new ArrayList<>(failedNodeExceptions);
allFailedNodeExceptions.addAll(tasks.stream().flatMap(r -> r.getNodeFailures().stream()).collect(Collectors.toList()));
return new Response(responses, taskOperationFailures, allFailedNodeExceptions);
return new Response(responses, responses.size(), taskOperationFailures, allFailedNodeExceptions);
}
@Override
@ -83,36 +83,47 @@ public class TransportGetDataFrameTransformsStatsAction extends
String nodeId = state.nodes().getLocalNode().getId();
if (task.isCancelled() == false) {
transformsCheckpointService.getCheckpointStats(task.getTransformId(), task.getCheckpoint(), task.getInProgressCheckpoint(),
ActionListener.wrap(checkpointStats -> {
listener.onResponse(new Response(Collections.singletonList(
new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(), task.getStats(), checkpointStats))));
}, e -> {
listener.onResponse(new Response(
Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(),
task.getStats(), DataFrameTransformCheckpointingInfo.EMPTY)),
ActionListener.wrap(checkpointStats -> listener.onResponse(new Response(
Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(),
task.getState(),
task.getStats(),
checkpointStats)),
1L)),
e -> listener.onResponse(new Response(
Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(),
task.getState(),
task.getStats(),
DataFrameTransformCheckpointingInfo.EMPTY)),
1L,
Collections.emptyList(),
Collections.singletonList(new FailedNodeException(nodeId, "Failed to retrieve checkpointing info", e))));
}));
Collections.singletonList(new FailedNodeException(nodeId, "Failed to retrieve checkpointing info", e))))
));
} else {
listener.onResponse(new Response(Collections.emptyList()));
listener.onResponse(new Response(Collections.emptyList(), 0L));
}
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> finalListener) {
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap(
ids -> {
request.setExpandedIds(ids);
request.setNodes(DataFrameNodes.dataFrameTaskNodes(ids, clusterService.state()));
hitsAndIds -> {
request.setExpandedIds(hitsAndIds.v2());
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state()));
super.doExecute(task, request, ActionListener.wrap(
response -> collectStatsForTransformsWithoutTasks(request, response, finalListener),
response -> collectStatsForTransformsWithoutTasks(request, response, ActionListener.wrap(
finalResponse -> finalListener.onResponse(new Response(finalResponse.getTransformsStateAndStats(),
hitsAndIds.v1(),
finalResponse.getTaskFailures(),
finalResponse.getNodeFailures())),
finalListener::onFailure
)),
finalListener::onFailure
));
},
e -> {
// If the index to search, or the individual config is not there, just return empty
if (e instanceof ResourceNotFoundException) {
finalListener.onResponse(new Response(Collections.emptyList()));
finalListener.onResponse(new Response(Collections.emptyList(), 0L));
} else {
finalListener.onFailure(e);
}
@ -165,7 +176,10 @@ public class TransportGetDataFrameTransformsStatsAction extends
// it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs
allStateAndStats.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
listener.onResponse(new Response(allStateAndStats, response.getTaskFailures(), response.getNodeFailures()));
listener.onResponse(new Response(allStateAndStats,
allStateAndStats.size(),
response.getTaskFailures(),
response.getNodeFailures()));
},
e -> {
if (e instanceof IndexNotFoundException) {

View File

@ -85,12 +85,12 @@ public class TransportStopDataFrameTransformAction extends
}
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap(
expandedIds -> {
request.setExpandedIds(new HashSet<>(expandedIds));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state()));
super.doExecute(task, request, finalListener);
},
listener::onFailure
hitsAndIds -> {
request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state()));
super.doExecute(task, request, finalListener);
},
listener::onFailure
));
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -196,13 +197,16 @@ public class DataFrameTransformsConfigManager {
* @param pageParams The paging params
* @param foundIdsListener The listener on signal on success or failure
*/
public void expandTransformIds(String transformIdsExpression, PageParams pageParams, ActionListener<List<String>> foundIdsListener) {
public void expandTransformIds(String transformIdsExpression,
PageParams pageParams,
ActionListener<Tuple<Long, List<String>>> foundIdsListener) {
String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression);
QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, DataFrameTransformConfig.NAME);
SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setFrom(pageParams.getFrom())
.setTrackTotalHits(true)
.setSize(pageParams.getSize())
.setQuery(queryBuilder)
// We only care about the `id` field, small optimization
@ -214,6 +218,7 @@ public class DataFrameTransformsConfigManager {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
long totalHits = searchResponse.getHits().getTotalHits().value;
List<String> ids = new ArrayList<>(searchResponse.getHits().getHits().length);
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
@ -235,7 +240,7 @@ public class DataFrameTransformsConfigManager {
requiredMatches.unmatchedIdsString())));
return;
}
foundIdsListener.onResponse(ids);
foundIdsListener.onResponse(new Tuple<>(totalHits, ids));
},
foundIdsListener::onFailure
), client::search);

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.persistence;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
@ -159,7 +160,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
transformsConfigManager.expandTransformIds(transformConfig1.getId(),
PageParams.defaultParams(),
listener),
Collections.singletonList("transform1_expand"),
new Tuple<>(1L, Collections.singletonList("transform1_expand")),
null,
null);
@ -168,7 +169,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
transformsConfigManager.expandTransformIds("transform1_expand,transform2_expand",
PageParams.defaultParams(),
listener),
Arrays.asList("transform1_expand", "transform2_expand"),
new Tuple<>(2L, Arrays.asList("transform1_expand", "transform2_expand")),
null,
null);
@ -177,7 +178,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
transformsConfigManager.expandTransformIds("transform1*,transform2_expand,transform3_expand",
PageParams.defaultParams(),
listener),
Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"),
new Tuple<>(3L, Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand")),
null,
null);
@ -186,7 +187,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
transformsConfigManager.expandTransformIds("_all",
PageParams.defaultParams(),
listener),
Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"),
new Tuple<>(3L, Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand")),
null,
null);
@ -195,7 +196,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
transformsConfigManager.expandTransformIds("_all",
new PageParams(0, 1),
listener),
Collections.singletonList("transform1_expand"),
new Tuple<>(3L, Collections.singletonList("transform1_expand")),
null,
null);
@ -204,7 +205,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
transformsConfigManager.expandTransformIds("_all",
new PageParams(1, 2),
listener),
Arrays.asList("transform2_expand", "transform3_expand"),
new Tuple<>(3L, Arrays.asList("transform2_expand", "transform3_expand")),
null,
null);
@ -213,7 +214,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
transformsConfigManager.expandTransformIds("unknown,unknown2",
new PageParams(1, 2),
listener),
(List<String>)null,
(Tuple<Long, List<String>>)null,
null,
e -> {
assertThat(e, instanceOf(ResourceNotFoundException.class));

View File

@ -127,7 +127,7 @@ setup:
transform_id: "airline-transform*"
from: 0
size: 1
- match: { count: 1 }
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform" }
- do:
@ -135,7 +135,7 @@ setup:
transform_id: "airline-transform*"
from: 1
size: 1
- match: { count: 1 }
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-dos" }
---

View File

@ -123,7 +123,7 @@ teardown:
transform_id: "_all"
from: 0
size: 1
- match: { count: 1 }
- match: { count: 3 }
- match: { transforms.0.id: "airline-transform-stats" }
- do:
@ -131,7 +131,7 @@ teardown:
transform_id: "_all"
from: 1
size: 2
- match: { count: 2 }
- match: { count: 3 }
- match: { transforms.0.id: "airline-transform-stats-dos" }
- match: { transforms.1.id: "airline-transform-stats-the-third" }

View File

@ -86,7 +86,13 @@ setup:
from: 1
size: 1
- match: { count: 1 }
- match: { count: 2 }
- match:
filters.0:
filter_id: "filter-foo2"
description: "This filter has a description"
items: ["123", "lmnop"]
---
"Test get filters API with expression ID":