[Transform] Return parsed count for get transform stats (#62809)

In case of more than 500 transforms, get and stats return paged results which can be requested using
page parameters. For >500 transforms count wasn't parsed out of the server response but taken from
size of the list of transforms.

The change also adds client/server hlrc tests and fixes a wrong type for count in get.

fixes #56245
This commit is contained in:
Hendrik Muhs 2020-09-24 08:36:51 +02:00
parent 1c26926dea
commit a70389015d
7 changed files with 220 additions and 65 deletions

View File

@ -38,20 +38,25 @@ public class GetTransformResponse {
public static final ParseField COUNT = new ParseField("count");
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<InvalidTransforms, Void> INVALID_TRANSFORMS_PARSER =
new ConstructingObjectParser<>("invalid_transforms", true, args -> new InvalidTransforms((List<String>) args[0]));
static final ConstructingObjectParser<InvalidTransforms, Void> INVALID_TRANSFORMS_PARSER = new ConstructingObjectParser<>(
"invalid_transforms",
true,
args -> new InvalidTransforms((List<String>) args[0])
);
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<GetTransformResponse, Void> PARSER = new ConstructingObjectParser<>(
"get_transform", true, args -> new GetTransformResponse(
(List<TransformConfig>) args[0], (int) args[1], (InvalidTransforms) args[2]));
"get_transform",
true,
args -> new GetTransformResponse((List<TransformConfig>) args[0], (long) args[1], (InvalidTransforms) args[2])
);
static {
// Discard the count field which is the size of the transforms array
INVALID_TRANSFORMS_PARSER.declareInt((a, b) -> {}, COUNT);
INVALID_TRANSFORMS_PARSER.declareLong((a, b) -> {}, COUNT);
INVALID_TRANSFORMS_PARSER.declareStringArray(constructorArg(), TRANSFORMS);
PARSER.declareObjectArray(constructorArg(), TransformConfig.PARSER::apply, TRANSFORMS);
PARSER.declareInt(constructorArg(), COUNT);
PARSER.declareLong(constructorArg(), COUNT);
PARSER.declareObject(optionalConstructorArg(), INVALID_TRANSFORMS_PARSER::apply, INVALID_TRANSFORMS);
}
@ -60,12 +65,10 @@ public class GetTransformResponse {
}
private List<TransformConfig> transformConfigurations;
private int count;
private long count;
private InvalidTransforms invalidTransforms;
public GetTransformResponse(List<TransformConfig> transformConfigurations,
int count,
@Nullable InvalidTransforms invalidTransforms) {
public GetTransformResponse(List<TransformConfig> transformConfigurations, long count, @Nullable InvalidTransforms invalidTransforms) {
this.transformConfigurations = transformConfigurations;
this.count = count;
this.invalidTransforms = invalidTransforms;
@ -76,7 +79,7 @@ public class GetTransformResponse {
return invalidTransforms;
}
public int getCount() {
public long getCount() {
return count;
}
@ -101,8 +104,8 @@ public class GetTransformResponse {
final GetTransformResponse that = (GetTransformResponse) other;
return Objects.equals(this.transformConfigurations, that.transformConfigurations)
&& Objects.equals(this.count, that.count)
&& Objects.equals(this.invalidTransforms, that.invalidTransforms);
&& Objects.equals(this.count, that.count)
&& Objects.equals(this.invalidTransforms, that.invalidTransforms);
}
static class InvalidTransforms {
@ -112,7 +115,7 @@ public class GetTransformResponse {
this.transformIds = transformIds;
}
public int getCount() {
public long getCount() {
return transformIds.size();
}

View File

@ -41,18 +41,29 @@ public class GetTransformStatsResponse {
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<GetTransformStatsResponse, Void> PARSER = new ConstructingObjectParser<>(
"get_transform_stats_response", true,
args -> new GetTransformStatsResponse((List<TransformStats>) args[0],
(List<TaskOperationFailure>) args[1], (List<ElasticsearchException>) args[2]));
"get_transform_stats_response",
true,
args -> new GetTransformStatsResponse(
(List<TransformStats>) args[0],
(long) args[1],
(List<TaskOperationFailure>) args[2],
(List<ElasticsearchException>) args[3]
)
);
static {
PARSER.declareObjectArray(constructorArg(), TransformStats.PARSER::apply, TRANSFORMS);
// Discard the count field which is the size of the transforms array
PARSER.declareInt((a, b) -> {}, COUNT);
PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p),
AcknowledgedTasksResponse.TASK_FAILURES);
PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p),
AcknowledgedTasksResponse.NODE_FAILURES);
PARSER.declareLong(constructorArg(), COUNT);
PARSER.declareObjectArray(
optionalConstructorArg(),
(p, c) -> TaskOperationFailure.fromXContent(p),
AcknowledgedTasksResponse.TASK_FAILURES
);
PARSER.declareObjectArray(
optionalConstructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
AcknowledgedTasksResponse.NODE_FAILURES
);
}
public static GetTransformStatsResponse fromXContent(final XContentParser parser) {
@ -60,13 +71,18 @@ public class GetTransformStatsResponse {
}
private final List<TransformStats> transformsStats;
private final long count;
private final List<TaskOperationFailure> taskFailures;
private final List<ElasticsearchException> nodeFailures;
public GetTransformStatsResponse(List<TransformStats> transformsStats,
@Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures) {
public GetTransformStatsResponse(
List<TransformStats> transformsStats,
long count,
@Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures
) {
this.transformsStats = transformsStats;
this.count = count;
this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(taskFailures);
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(nodeFailures);
}
@ -75,6 +91,10 @@ public class GetTransformStatsResponse {
return transformsStats;
}
public long getCount() {
return count;
}
public List<ElasticsearchException> getNodeFailures() {
return nodeFailures;
}
@ -85,7 +105,7 @@ public class GetTransformStatsResponse {
@Override
public int hashCode() {
return Objects.hash(transformsStats, nodeFailures, taskFailures);
return Objects.hash(transformsStats, count, nodeFailures, taskFailures);
}
@Override
@ -100,7 +120,8 @@ public class GetTransformStatsResponse {
final GetTransformStatsResponse that = (GetTransformStatsResponse) other;
return Objects.equals(this.transformsStats, that.transformsStats)
&& Objects.equals(this.nodeFailures, that.nodeFailures)
&& Objects.equals(this.taskFailures, that.taskFailures);
&& Objects.equals(this.count, that.count)
&& Objects.equals(this.nodeFailures, that.nodeFailures)
&& Objects.equals(this.taskFailures, that.taskFailures);
}
}

View File

@ -35,21 +35,22 @@ import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public class GetTransformStatsResponseTests extends ESTestCase {
public void testXContentParser() throws IOException {
xContentTester(this::createParser,
GetTransformStatsResponseTests::createTestInstance,
GetTransformStatsResponseTests::toXContent,
GetTransformStatsResponse::fromXContent)
.assertEqualsConsumer(GetTransformStatsResponseTests::assertEqualInstances)
.assertToXContentEquivalence(false)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(path -> path.isEmpty() == false)
.test();
xContentTester(
this::createParser,
GetTransformStatsResponseTests::createTestInstance,
GetTransformStatsResponseTests::toXContent,
GetTransformStatsResponse::fromXContent
).assertEqualsConsumer(GetTransformStatsResponseTests::assertEqualInstances)
.assertToXContentEquivalence(false)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(path -> path.isEmpty() == false)
.test();
}
private static GetTransformStatsResponse createTestInstance() {
int count = randomIntBetween(1, 3);
List<TransformStats> stats = new ArrayList<>();
for (int i=0; i<count; i++) {
for (int i = 0; i < count; i++) {
stats.add(TransformStatsTests.randomInstance());
}
@ -57,7 +58,7 @@ public class GetTransformStatsResponseTests extends ESTestCase {
if (randomBoolean()) {
taskFailures = new ArrayList<>();
int numTaskFailures = randomIntBetween(1, 4);
for (int i=0; i<numTaskFailures; i++) {
for (int i = 0; i < numTaskFailures; i++) {
taskFailures.add(new TaskOperationFailure(randomAlphaOfLength(4), randomNonNegativeLong(), new IllegalStateException()));
}
}
@ -65,17 +66,18 @@ public class GetTransformStatsResponseTests extends ESTestCase {
if (randomBoolean()) {
nodeFailures = new ArrayList<>();
int numNodeFailures = randomIntBetween(1, 4);
for (int i=0; i<numNodeFailures; i++) {
for (int i = 0; i < numNodeFailures; i++) {
nodeFailures.add(new ElasticsearchException("GetTransformStatsResponseTests"));
}
}
return new GetTransformStatsResponse(stats, taskFailures, nodeFailures);
return new GetTransformStatsResponse(stats, stats.size() + randomLongBetween(0, 10), taskFailures, nodeFailures);
}
private static void toXContent(GetTransformStatsResponse response, XContentBuilder builder) throws IOException {
builder.startObject();
{
builder.field("count", response.getCount());
builder.startArray("transforms");
for (TransformStats stats : response.getTransformsStats()) {
TransformStatsTests.toXContent(stats, builder);
@ -90,8 +92,8 @@ public class GetTransformStatsResponseTests extends ESTestCase {
// Serialisation of TaskOperationFailure and ElasticsearchException changes
// the object so use a custom compare method rather than Object.equals
private static void assertEqualInstances(GetTransformStatsResponse expected,
GetTransformStatsResponse actual) {
private static void assertEqualInstances(GetTransformStatsResponse expected, GetTransformStatsResponse actual) {
assertEquals(expected.getCount(), actual.getCount());
assertEquals(expected.getTransformsStats(), actual.getTransformsStats());
AcknowledgedTasksResponseTests.assertTaskOperationFailuresEqual(expected.getTaskFailures(), actual.getTaskFailures());
AcknowledgedTasksResponseTests.assertNodeFailuresEqual(expected.getNodeFailures(), actual.getNodeFailures());

View File

@ -0,0 +1,107 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transform.hlrc;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.client.transform.GetTransformStatsResponse;
import org.elasticsearch.client.transform.transforms.hlrc.TransformCheckpointingInfoTests;
import org.elasticsearch.client.transform.transforms.hlrc.TransformIndexerStatsTests;
import org.elasticsearch.client.transform.transforms.hlrc.TransformStatsTests;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction.Response;
import org.elasticsearch.xpack.core.transform.transforms.NodeAttributes;
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class GetTransformStatsResponseTests extends AbstractResponseTestCase<
GetTransformStatsAction.Response,
org.elasticsearch.client.transform.GetTransformStatsResponse> {
private static NodeAttributes randomNodeAttributes() {
return new NodeAttributes(
randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10))
);
}
private static TransformStats randomTransformStats() {
return new TransformStats(
randomAlphaOfLength(10),
randomFrom(TransformStats.State.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : randomNodeAttributes(),
TransformIndexerStatsTests.randomStats(),
TransformCheckpointingInfoTests.randomTransformCheckpointingInfo()
);
}
public static Response randomStatsResponse() {
List<TransformStats> stats = new ArrayList<>();
int totalStats = randomInt(10);
for (int i = 0; i < totalStats; ++i) {
stats.add(randomTransformStats());
}
int totalErrors = randomInt(10);
List<TaskOperationFailure> taskFailures = new ArrayList<>(totalErrors);
List<ElasticsearchException> nodeFailures = new ArrayList<>(totalErrors);
for (int i = 0; i < totalErrors; i++) {
taskFailures.add(new TaskOperationFailure("node1", randomLongBetween(1, 10), new Exception("error")));
nodeFailures.add(new FailedNodeException("node1", "message", new Exception("error")));
}
return new Response(stats, randomLongBetween(stats.size(), 10_000_000L), taskFailures, nodeFailures);
}
@Override
protected Response createServerTestInstance(XContentType xContentType) {
return randomStatsResponse();
}
@Override
protected GetTransformStatsResponse doParseToClientInstance(XContentParser parser) throws IOException {
return org.elasticsearch.client.transform.GetTransformStatsResponse.fromXContent(parser);
}
@Override
protected void assertInstances(Response serverTestInstance, GetTransformStatsResponse clientInstance) {
assertEquals(serverTestInstance.getTransformsStats().size(), clientInstance.getTransformsStats().size());
Iterator<TransformStats> serverIt = serverTestInstance.getTransformsStats().iterator();
Iterator<org.elasticsearch.client.transform.transforms.TransformStats> clientIt = clientInstance.getTransformsStats().iterator();
while (serverIt.hasNext()) {
TransformStatsTests.assertHlrcEquals(serverIt.next(), clientIt.next());
}
assertThat(serverTestInstance.getCount(), equalTo(clientInstance.getCount()));
}
}

View File

@ -56,6 +56,18 @@ public class TransformStatsTests extends AbstractResponseTestCase<
);
}
public static void assertHlrcEquals(
org.elasticsearch.xpack.core.transform.transforms.TransformStats serverTestInstance,
TransformStats clientInstance
) {
assertThat(serverTestInstance.getId(), equalTo(clientInstance.getId()));
assertThat(serverTestInstance.getState().value(), equalTo(clientInstance.getState().value()));
assertTransformIndexerStats(serverTestInstance.getIndexerStats(), clientInstance.getIndexerStats());
assertTransformCheckpointInfo(serverTestInstance.getCheckpointingInfo(), clientInstance.getCheckpointingInfo());
assertNodeAttributes(serverTestInstance.getNode(), clientInstance.getNode());
assertThat(serverTestInstance.getReason(), equalTo(clientInstance.getReason()));
}
@Override
protected org.elasticsearch.xpack.core.transform.transforms.TransformStats createServerTestInstance(XContentType xContentType) {
return new org.elasticsearch.xpack.core.transform.transforms.TransformStats(
@ -78,15 +90,10 @@ public class TransformStatsTests extends AbstractResponseTestCase<
org.elasticsearch.xpack.core.transform.transforms.TransformStats serverTestInstance,
TransformStats clientInstance
) {
assertThat(serverTestInstance.getId(), equalTo(clientInstance.getId()));
assertThat(serverTestInstance.getState().value(), equalTo(clientInstance.getState().value()));
assertTransformIndexerStats(serverTestInstance.getIndexerStats(), clientInstance.getIndexerStats());
assertTransformCheckpointInfo(serverTestInstance.getCheckpointingInfo(), clientInstance.getCheckpointingInfo());
assertNodeAttributes(serverTestInstance.getNode(), clientInstance.getNode());
assertThat(serverTestInstance.getReason(), equalTo(clientInstance.getReason()));
assertHlrcEquals(serverTestInstance, clientInstance);
}
private void assertNodeAttributes(
private static void assertNodeAttributes(
org.elasticsearch.xpack.core.transform.transforms.NodeAttributes serverTestInstance,
NodeAttributes clientInstance
) {

View File

@ -62,8 +62,10 @@ public class GetTransformAction extends ActionType<GetTransformAction.Response>
public ActionRequestValidationException validate() {
ActionRequestValidationException exception = null;
if (getPageParams() != null && getPageParams().getSize() > MAX_SIZE_RETURN) {
exception = addValidationError("Param [" + PageParams.SIZE.getPreferredName() +
"] has a max acceptable value of [" + MAX_SIZE_RETURN + "]", exception);
exception = addValidationError(
"Param [" + PageParams.SIZE.getPreferredName() + "] has a max acceptable value of [" + MAX_SIZE_RETURN + "]",
exception
);
}
return exception;
}
@ -95,6 +97,10 @@ public class GetTransformAction extends ActionType<GetTransformAction.Response>
return getResources().results();
}
public long getCount() {
return getResources().count();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
List<String> invalidTransforms = new ArrayList<>();

View File

@ -38,6 +38,7 @@ public class GetTransformStatsAction extends ActionType<GetTransformStatsAction.
public static final GetTransformStatsAction INSTANCE = new GetTransformStatsAction();
public static final String NAME = "cluster:monitor/transform/stats/get";
public GetTransformStatsAction() {
super(NAME, GetTransformStatsAction.Response::new);
}
@ -120,8 +121,10 @@ public class GetTransformStatsAction extends ActionType<GetTransformStatsAction.
public ActionRequestValidationException validate() {
ActionRequestValidationException exception = null;
if (getPageParams() != null && getPageParams().getSize() > MAX_SIZE_RETURN) {
exception = addValidationError("Param [" + PageParams.SIZE.getPreferredName() +
"] has a max acceptable value of [" + MAX_SIZE_RETURN + "]", exception);
exception = addValidationError(
"Param [" + PageParams.SIZE.getPreferredName() + "] has a max acceptable value of [" + MAX_SIZE_RETURN + "]",
exception
);
}
return exception;
}
@ -140,9 +143,7 @@ public class GetTransformStatsAction extends ActionType<GetTransformStatsAction.
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id)
&& Objects.equals(pageParams, other.pageParams)
&& allowNoMatch == other.allowNoMatch;
return Objects.equals(id, other.id) && Objects.equals(pageParams, other.pageParams) && allowNoMatch == other.allowNoMatch;
}
}
@ -153,10 +154,12 @@ public class GetTransformStatsAction extends ActionType<GetTransformStatsAction.
this(new QueryPage<>(transformStateAndStats, count, TransformField.TRANSFORMS));
}
public Response(List<TransformStats> transformStateAndStats,
long count,
List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures) {
public Response(
List<TransformStats> transformStateAndStats,
long count,
List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures
) {
this(new QueryPage<>(transformStateAndStats, count, TransformField.TRANSFORMS), taskFailures, nodeFailures);
}
@ -164,9 +167,11 @@ public class GetTransformStatsAction extends ActionType<GetTransformStatsAction.
this(transformsStats, Collections.emptyList(), Collections.emptyList());
}
private Response(QueryPage<TransformStats> transformsStats,
List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures) {
private Response(
QueryPage<TransformStats> transformsStats,
List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures
) {
super(taskFailures, nodeFailures);
this.transformsStats = ExceptionsHelper.requireNonNull(transformsStats, "transformsStats");
}
@ -185,6 +190,10 @@ public class GetTransformStatsAction extends ActionType<GetTransformStatsAction.
return transformsStats.results();
}
public long getCount() {
return transformsStats.count();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);