Add HLRC support for enrich stats API (#47306)
This PR also includes HLRC docs for the enrich stats api. Relates to #32789
This commit is contained in:
parent
19393fc5a7
commit
aace42d38d
|
@ -24,6 +24,8 @@ import org.elasticsearch.client.enrich.DeletePolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.GetPolicyResponse;
|
import org.elasticsearch.client.enrich.GetPolicyResponse;
|
||||||
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
||||||
|
import org.elasticsearch.client.enrich.StatsRequest;
|
||||||
|
import org.elasticsearch.client.enrich.StatsResponse;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -177,4 +179,49 @@ public final class EnrichClient {
|
||||||
Collections.emptySet()
|
Collections.emptySet()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes the enrich stats api, which retrieves enrich related stats.
|
||||||
|
*
|
||||||
|
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#stats-api">
|
||||||
|
* the docs</a> for more.
|
||||||
|
*
|
||||||
|
* @param request the {@link StatsRequest}
|
||||||
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
|
* @return the response
|
||||||
|
* @throws IOException in case there is a problem sending the request or parsing back the response
|
||||||
|
*/
|
||||||
|
public StatsResponse stats(StatsRequest request, RequestOptions options) throws IOException {
|
||||||
|
return restHighLevelClient.performRequestAndParseEntity(
|
||||||
|
request,
|
||||||
|
EnrichRequestConverters::stats,
|
||||||
|
options,
|
||||||
|
StatsResponse::fromXContent,
|
||||||
|
Collections.emptySet()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asynchronously executes the enrich stats api, which retrieves enrich related stats.
|
||||||
|
*
|
||||||
|
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#stats-api">
|
||||||
|
* the docs</a> for more.
|
||||||
|
*
|
||||||
|
* @param request the {@link StatsRequest}
|
||||||
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
|
* @param listener the listener to be notified upon request completion
|
||||||
|
* @return cancellable that may be used to cancel the request
|
||||||
|
*/
|
||||||
|
public Cancellable statsAsync(StatsRequest request,
|
||||||
|
RequestOptions options,
|
||||||
|
ActionListener<StatsResponse> listener) {
|
||||||
|
return restHighLevelClient.performRequestAsyncAndParseEntity(
|
||||||
|
request,
|
||||||
|
EnrichRequestConverters::stats,
|
||||||
|
options,
|
||||||
|
StatsResponse::fromXContent,
|
||||||
|
listener,
|
||||||
|
Collections.emptySet()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.http.client.methods.HttpPut;
|
||||||
import org.elasticsearch.client.enrich.DeletePolicyRequest;
|
import org.elasticsearch.client.enrich.DeletePolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
||||||
|
import org.elasticsearch.client.enrich.StatsRequest;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -50,7 +51,7 @@ final class EnrichRequestConverters {
|
||||||
return new Request(HttpDelete.METHOD_NAME, endpoint);
|
return new Request(HttpDelete.METHOD_NAME, endpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Request getPolicy(GetPolicyRequest getPolicyRequest) throws IOException {
|
static Request getPolicy(GetPolicyRequest getPolicyRequest) {
|
||||||
String endpoint = new RequestConverters.EndpointBuilder()
|
String endpoint = new RequestConverters.EndpointBuilder()
|
||||||
.addPathPartAsIs("_enrich", "policy")
|
.addPathPartAsIs("_enrich", "policy")
|
||||||
.addCommaSeparatedPathParts(getPolicyRequest.getNames())
|
.addCommaSeparatedPathParts(getPolicyRequest.getNames())
|
||||||
|
@ -58,4 +59,11 @@ final class EnrichRequestConverters {
|
||||||
return new Request(HttpGet.METHOD_NAME, endpoint);
|
return new Request(HttpGet.METHOD_NAME, endpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Request stats(StatsRequest statsRequest) {
|
||||||
|
String endpoint = new RequestConverters.EndpointBuilder()
|
||||||
|
.addPathPartAsIs("_enrich", "_stats")
|
||||||
|
.build();
|
||||||
|
return new Request(HttpGet.METHOD_NAME, endpoint);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.client.enrich;
|
||||||
import org.elasticsearch.client.Validatable;
|
import org.elasticsearch.client.Validatable;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
|
||||||
public class DeletePolicyRequest implements Validatable {
|
public final class DeletePolicyRequest implements Validatable {
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class PutPolicyRequest implements Validatable, ToXContentObject {
|
public final class PutPolicyRequest implements Validatable, ToXContentObject {
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
private final String type;
|
private final String type;
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.client.enrich;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.Validatable;
|
||||||
|
|
||||||
|
public final class StatsRequest implements Validatable {
|
||||||
|
}
|
|
@ -0,0 +1,191 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.client.enrich;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.ParseField;
|
||||||
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
import org.elasticsearch.tasks.TaskInfo;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public final class StatsResponse {
|
||||||
|
|
||||||
|
private static ParseField EXECUTING_POLICIES_FIELD = new ParseField("executing_policies");
|
||||||
|
private static ParseField COORDINATOR_STATS_FIELD = new ParseField("coordinator_stats");
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static final ConstructingObjectParser<StatsResponse, Void> PARSER = new ConstructingObjectParser<>(
|
||||||
|
"stats_response",
|
||||||
|
true,
|
||||||
|
args -> new StatsResponse((List<ExecutingPolicy>) args[0], (List<CoordinatorStats>) args[1])
|
||||||
|
);
|
||||||
|
|
||||||
|
static {
|
||||||
|
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), ExecutingPolicy.PARSER::apply, EXECUTING_POLICIES_FIELD);
|
||||||
|
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), CoordinatorStats.PARSER::apply, COORDINATOR_STATS_FIELD);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static StatsResponse fromXContent(XContentParser parser) {
|
||||||
|
return PARSER.apply(parser, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final List<ExecutingPolicy> executingPolicies;
|
||||||
|
private final List<CoordinatorStats> coordinatorStats;
|
||||||
|
|
||||||
|
public StatsResponse(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats) {
|
||||||
|
this.executingPolicies = executingPolicies;
|
||||||
|
this.coordinatorStats = coordinatorStats;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ExecutingPolicy> getExecutingPolicies() {
|
||||||
|
return executingPolicies;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<CoordinatorStats> getCoordinatorStats() {
|
||||||
|
return coordinatorStats;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class CoordinatorStats {
|
||||||
|
|
||||||
|
static ParseField NODE_ID_FIELD = new ParseField("node_id");
|
||||||
|
static ParseField QUEUE_SIZE_FIELD = new ParseField("queue_size");
|
||||||
|
static ParseField REMOTE_REQUESTS_CONCURRENT_FIELD = new ParseField("remote_requests_current");
|
||||||
|
static ParseField REMOTE_REQUESTS_TOTAL_FIELD = new ParseField("remote_requests_total");
|
||||||
|
static ParseField EXECUTED_SEARCHES_FIELD = new ParseField("executed_searches_total");
|
||||||
|
|
||||||
|
private static final ConstructingObjectParser<CoordinatorStats, Void> PARSER = new ConstructingObjectParser<>(
|
||||||
|
"coordinator_stats_item",
|
||||||
|
true,
|
||||||
|
args -> new CoordinatorStats((String) args[0], (int) args[1], (int) args[2], (long) args[3], (long) args[4])
|
||||||
|
);
|
||||||
|
|
||||||
|
static {
|
||||||
|
PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD);
|
||||||
|
PARSER.declareInt(ConstructingObjectParser.constructorArg(), QUEUE_SIZE_FIELD);
|
||||||
|
PARSER.declareInt(ConstructingObjectParser.constructorArg(), REMOTE_REQUESTS_CONCURRENT_FIELD);
|
||||||
|
PARSER.declareLong(ConstructingObjectParser.constructorArg(), REMOTE_REQUESTS_TOTAL_FIELD);
|
||||||
|
PARSER.declareLong(ConstructingObjectParser.constructorArg(), EXECUTED_SEARCHES_FIELD);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String nodeId;
|
||||||
|
private final int queueSize;
|
||||||
|
private final int remoteRequestsCurrent;
|
||||||
|
private final long remoteRequestsTotal;
|
||||||
|
private final long executedSearchesTotal;
|
||||||
|
|
||||||
|
public CoordinatorStats(String nodeId,
|
||||||
|
int queueSize,
|
||||||
|
int remoteRequestsCurrent,
|
||||||
|
long remoteRequestsTotal,
|
||||||
|
long executedSearchesTotal) {
|
||||||
|
this.nodeId = nodeId;
|
||||||
|
this.queueSize = queueSize;
|
||||||
|
this.remoteRequestsCurrent = remoteRequestsCurrent;
|
||||||
|
this.remoteRequestsTotal = remoteRequestsTotal;
|
||||||
|
this.executedSearchesTotal = executedSearchesTotal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNodeId() {
|
||||||
|
return nodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getQueueSize() {
|
||||||
|
return queueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getRemoteRequestsCurrent() {
|
||||||
|
return remoteRequestsCurrent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getRemoteRequestsTotal() {
|
||||||
|
return remoteRequestsTotal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getExecutedSearchesTotal() {
|
||||||
|
return executedSearchesTotal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
CoordinatorStats stats = (CoordinatorStats) o;
|
||||||
|
return Objects.equals(nodeId, stats.nodeId) &&
|
||||||
|
queueSize == stats.queueSize &&
|
||||||
|
remoteRequestsCurrent == stats.remoteRequestsCurrent &&
|
||||||
|
remoteRequestsTotal == stats.remoteRequestsTotal &&
|
||||||
|
executedSearchesTotal == stats.executedSearchesTotal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(nodeId, queueSize, remoteRequestsCurrent, remoteRequestsTotal, executedSearchesTotal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ExecutingPolicy {
|
||||||
|
|
||||||
|
static ParseField NAME_FIELD = new ParseField("name");
|
||||||
|
static ParseField TASK_FIELD = new ParseField("task");
|
||||||
|
|
||||||
|
private static final ConstructingObjectParser<ExecutingPolicy, Void> PARSER = new ConstructingObjectParser<>(
|
||||||
|
"executing_policy_item",
|
||||||
|
true,
|
||||||
|
args -> new ExecutingPolicy((String) args[0], (TaskInfo) args[1])
|
||||||
|
);
|
||||||
|
|
||||||
|
static {
|
||||||
|
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
|
||||||
|
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> TaskInfo.fromXContent(p), TASK_FIELD);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
private final TaskInfo taskInfo;
|
||||||
|
|
||||||
|
public ExecutingPolicy(String name, TaskInfo taskInfo) {
|
||||||
|
this.name = name;
|
||||||
|
this.taskInfo = taskInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TaskInfo getTaskInfo() {
|
||||||
|
return taskInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
ExecutingPolicy that = (ExecutingPolicy) o;
|
||||||
|
return name.equals(that.name) &&
|
||||||
|
taskInfo.equals(that.taskInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(name, taskInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -23,11 +23,15 @@ import org.elasticsearch.client.enrich.DeletePolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.GetPolicyResponse;
|
import org.elasticsearch.client.enrich.GetPolicyResponse;
|
||||||
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
||||||
|
import org.elasticsearch.client.enrich.StatsRequest;
|
||||||
|
import org.elasticsearch.client.enrich.StatsResponse;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
|
||||||
public class EnrichIT extends ESRestHighLevelClientTestCase {
|
public class EnrichIT extends ESRestHighLevelClientTestCase {
|
||||||
|
|
||||||
|
@ -46,6 +50,16 @@ public class EnrichIT extends ESRestHighLevelClientTestCase {
|
||||||
assertThat(getPolicyResponse.getPolicies().get(0).getMatchField(), equalTo(putPolicyRequest.getMatchField()));
|
assertThat(getPolicyResponse.getPolicies().get(0).getMatchField(), equalTo(putPolicyRequest.getMatchField()));
|
||||||
assertThat(getPolicyResponse.getPolicies().get(0).getEnrichFields(), equalTo(putPolicyRequest.getEnrichFields()));
|
assertThat(getPolicyResponse.getPolicies().get(0).getEnrichFields(), equalTo(putPolicyRequest.getEnrichFields()));
|
||||||
|
|
||||||
|
StatsRequest statsRequest = new StatsRequest();
|
||||||
|
StatsResponse statsResponse = execute(statsRequest, enrichClient::stats, enrichClient::statsAsync);
|
||||||
|
assertThat(statsResponse.getExecutingPolicies().size(), equalTo(0));
|
||||||
|
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
|
||||||
|
assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), notNullValue());
|
||||||
|
assertThat(statsResponse.getCoordinatorStats().get(0).getQueueSize(), greaterThanOrEqualTo(0));
|
||||||
|
assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsCurrent(), greaterThanOrEqualTo(0));
|
||||||
|
assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(0L));
|
||||||
|
assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), greaterThanOrEqualTo(0L));
|
||||||
|
|
||||||
DeletePolicyRequest deletePolicyRequest = new DeletePolicyRequest("my-policy");
|
DeletePolicyRequest deletePolicyRequest = new DeletePolicyRequest("my-policy");
|
||||||
AcknowledgedResponse deletePolicyResponse =
|
AcknowledgedResponse deletePolicyResponse =
|
||||||
execute(deletePolicyRequest, enrichClient::deletePolicy, enrichClient::deletePolicyAsync);
|
execute(deletePolicyRequest, enrichClient::deletePolicy, enrichClient::deletePolicyAsync);
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.client.enrich.DeletePolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.PutPolicyRequestTests;
|
import org.elasticsearch.client.enrich.PutPolicyRequestTests;
|
||||||
|
import org.elasticsearch.client.enrich.StatsRequest;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -42,7 +43,7 @@ public class EnrichRequestConvertersTests extends ESTestCase {
|
||||||
RequestConvertersTests.assertToXContentBody(request, result.getEntity());
|
RequestConvertersTests.assertToXContentBody(request, result.getEntity());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDeletePolicy() throws Exception {
|
public void testDeletePolicy() {
|
||||||
DeletePolicyRequest request = new DeletePolicyRequest(randomAlphaOfLength(4));
|
DeletePolicyRequest request = new DeletePolicyRequest(randomAlphaOfLength(4));
|
||||||
Request result = EnrichRequestConverters.deletePolicy(request);
|
Request result = EnrichRequestConverters.deletePolicy(request);
|
||||||
|
|
||||||
|
@ -52,7 +53,7 @@ public class EnrichRequestConvertersTests extends ESTestCase {
|
||||||
assertThat(result.getEntity(), nullValue());
|
assertThat(result.getEntity(), nullValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetPolicy() throws Exception {
|
public void testGetPolicy() {
|
||||||
GetPolicyRequest request = new GetPolicyRequest(randomAlphaOfLength(4));
|
GetPolicyRequest request = new GetPolicyRequest(randomAlphaOfLength(4));
|
||||||
Request result = EnrichRequestConverters.getPolicy(request);
|
Request result = EnrichRequestConverters.getPolicy(request);
|
||||||
|
|
||||||
|
@ -78,4 +79,14 @@ public class EnrichRequestConvertersTests extends ESTestCase {
|
||||||
assertThat(result.getEntity(), nullValue());
|
assertThat(result.getEntity(), nullValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testStats() {
|
||||||
|
StatsRequest request = new StatsRequest();
|
||||||
|
Request result = EnrichRequestConverters.stats(request);
|
||||||
|
|
||||||
|
assertThat(result.getMethod(), equalTo(HttpGet.METHOD_NAME));
|
||||||
|
assertThat(result.getEndpoint(), equalTo("/_enrich/_stats"));
|
||||||
|
assertThat(result.getParameters().size(), equalTo(0));
|
||||||
|
assertThat(result.getEntity(), nullValue());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,10 @@ import org.elasticsearch.client.enrich.NamedPolicy;
|
||||||
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
import org.elasticsearch.client.enrich.GetPolicyRequest;
|
||||||
import org.elasticsearch.client.enrich.GetPolicyResponse;
|
import org.elasticsearch.client.enrich.GetPolicyResponse;
|
||||||
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
import org.elasticsearch.client.enrich.PutPolicyRequest;
|
||||||
|
import org.elasticsearch.client.enrich.StatsRequest;
|
||||||
|
import org.elasticsearch.client.enrich.StatsResponse;
|
||||||
|
import org.elasticsearch.client.enrich.StatsResponse.CoordinatorStats;
|
||||||
|
import org.elasticsearch.client.enrich.StatsResponse.ExecutingPolicy;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -198,4 +202,53 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
|
||||||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testStats() throws Exception {
|
||||||
|
RestHighLevelClient client = highLevelClient();
|
||||||
|
|
||||||
|
// tag::enrich-stats-request
|
||||||
|
StatsRequest statsRequest = new StatsRequest();
|
||||||
|
// end::enrich-stats-request
|
||||||
|
|
||||||
|
// tag::enrich-stats-execute
|
||||||
|
StatsResponse statsResponse =
|
||||||
|
client.enrich().stats(statsRequest, RequestOptions.DEFAULT);
|
||||||
|
// end::enrich-stats-execute
|
||||||
|
|
||||||
|
// tag::enrich-stats-response
|
||||||
|
List<ExecutingPolicy> executingPolicies =
|
||||||
|
statsResponse.getExecutingPolicies(); // <1>
|
||||||
|
List<CoordinatorStats> coordinatorStats =
|
||||||
|
statsResponse.getCoordinatorStats(); // <2>
|
||||||
|
// end::enrich-stats-response
|
||||||
|
|
||||||
|
// tag::enrich-stats-execute-listener
|
||||||
|
ActionListener<StatsResponse> listener =
|
||||||
|
new ActionListener<StatsResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(StatsResponse response) { // <1>
|
||||||
|
List<ExecutingPolicy> executingPolicies =
|
||||||
|
statsResponse.getExecutingPolicies();
|
||||||
|
List<CoordinatorStats> coordinatorStats =
|
||||||
|
statsResponse.getCoordinatorStats();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
// <2>
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// end::enrich-stats-execute-listener
|
||||||
|
|
||||||
|
// Replace the empty listener by a blocking listener in test
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
listener = new LatchedActionListener<>(listener, latch);
|
||||||
|
|
||||||
|
// tag::enrich-stats-execute-async
|
||||||
|
client.enrich().statsAsync(statsRequest, RequestOptions.DEFAULT,
|
||||||
|
listener); // <1>
|
||||||
|
// end::enrich-stats-execute-async
|
||||||
|
|
||||||
|
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.client.enrich;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.AbstractResponseTestCase;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.tasks.TaskId;
|
||||||
|
import org.elasticsearch.tasks.TaskInfo;
|
||||||
|
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
public class StatsResponseTests extends AbstractResponseTestCase<EnrichStatsAction.Response, StatsResponse> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected EnrichStatsAction.Response createServerTestInstance(XContentType xContentType) {
|
||||||
|
int numExecutingPolicies = randomIntBetween(0, 16);
|
||||||
|
List<EnrichStatsAction.Response.ExecutingPolicy> executingPolicies = new ArrayList<>(numExecutingPolicies);
|
||||||
|
for (int i = 0; i < numExecutingPolicies; i++) {
|
||||||
|
TaskInfo taskInfo = randomTaskInfo();
|
||||||
|
executingPolicies.add(new EnrichStatsAction.Response.ExecutingPolicy(randomAlphaOfLength(4), taskInfo));
|
||||||
|
}
|
||||||
|
int numCoordinatingStats = randomIntBetween(0, 16);
|
||||||
|
List<EnrichStatsAction.Response.CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatingStats);
|
||||||
|
for (int i = 0; i < numCoordinatingStats; i++) {
|
||||||
|
EnrichStatsAction.Response.CoordinatorStats stats = new EnrichStatsAction.Response.CoordinatorStats(
|
||||||
|
randomAlphaOfLength(4), randomIntBetween(0, 8096), randomIntBetween(0, 8096), randomNonNegativeLong(),
|
||||||
|
randomNonNegativeLong());
|
||||||
|
coordinatorStats.add(stats);
|
||||||
|
}
|
||||||
|
return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected StatsResponse doParseToClientInstance(XContentParser parser) throws IOException {
|
||||||
|
return StatsResponse.fromXContent(parser);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertInstances(EnrichStatsAction.Response serverTestInstance, StatsResponse clientInstance) {
|
||||||
|
assertThat(clientInstance.getExecutingPolicies().size(), equalTo(serverTestInstance.getExecutingPolicies().size()));
|
||||||
|
for (int i = 0; i < clientInstance.getExecutingPolicies().size(); i++) {
|
||||||
|
StatsResponse.ExecutingPolicy actual = clientInstance.getExecutingPolicies().get(i);
|
||||||
|
EnrichStatsAction.Response.ExecutingPolicy expected = serverTestInstance.getExecutingPolicies().get(i);
|
||||||
|
assertThat(actual.getName(), equalTo(expected.getName()));
|
||||||
|
assertThat(actual.getTaskInfo(), equalTo(actual.getTaskInfo()));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(clientInstance.getCoordinatorStats().size(), equalTo(serverTestInstance.getCoordinatorStats().size()));
|
||||||
|
for (int i = 0; i < clientInstance.getCoordinatorStats().size(); i++) {
|
||||||
|
StatsResponse.CoordinatorStats actual = clientInstance.getCoordinatorStats().get(i);
|
||||||
|
EnrichStatsAction.Response.CoordinatorStats expected = serverTestInstance.getCoordinatorStats().get(i);
|
||||||
|
assertThat(actual.getNodeId(), equalTo(expected.getNodeId()));
|
||||||
|
assertThat(actual.getQueueSize(), equalTo(expected.getQueueSize()));
|
||||||
|
assertThat(actual.getRemoteRequestsCurrent(), equalTo(expected.getRemoteRequestsCurrent()));
|
||||||
|
assertThat(actual.getRemoteRequestsTotal(), equalTo(expected.getRemoteRequestsTotal()));
|
||||||
|
assertThat(actual.getExecutedSearchesTotal(), equalTo(expected.getExecutedSearchesTotal()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TaskInfo randomTaskInfo() {
|
||||||
|
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomLong());
|
||||||
|
String type = randomAlphaOfLength(5);
|
||||||
|
String action = randomAlphaOfLength(5);
|
||||||
|
String description = randomAlphaOfLength(5);
|
||||||
|
long startTime = randomLong();
|
||||||
|
long runningTimeNanos = randomLong();
|
||||||
|
boolean cancellable = randomBoolean();
|
||||||
|
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
|
||||||
|
Map<String, String> headers = randomBoolean() ?
|
||||||
|
Collections.emptyMap() :
|
||||||
|
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
|
||||||
|
return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
--
|
||||||
|
:api: enrich-stats
|
||||||
|
:request: StatsRequest
|
||||||
|
:response: StatsResponse
|
||||||
|
--
|
||||||
|
|
||||||
|
[id="{upid}-{api}"]
|
||||||
|
=== Stats API
|
||||||
|
|
||||||
|
[id="{upid}-{api}-request"]
|
||||||
|
==== Request
|
||||||
|
|
||||||
|
The stats API returns enrich related stats.
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests-file}[{api}-request]
|
||||||
|
--------------------------------------------------
|
||||||
|
|
||||||
|
[id="{upid}-{api}-response"]
|
||||||
|
==== Response
|
||||||
|
|
||||||
|
The returned +{response}+ includes enrich related stats.
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests-file}[{api}-response]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> List of policies that are currently executing with
|
||||||
|
additional details.
|
||||||
|
<2> List of coordinator stats per ingest node.
|
||||||
|
|
||||||
|
include::../execution.asciidoc[]
|
|
@ -613,7 +613,9 @@ The Java High Level REST Client supports the following Enrich APIs:
|
||||||
* <<{upid}-enrich-put-policy>>
|
* <<{upid}-enrich-put-policy>>
|
||||||
* <<{upid}-enrich-delete-policy>>
|
* <<{upid}-enrich-delete-policy>>
|
||||||
* <<{upid}-enrich-get-policy>>
|
* <<{upid}-enrich-get-policy>>
|
||||||
|
* <<{upid}-enrich-stats-policy>>
|
||||||
|
|
||||||
include::enrich/put_policy.asciidoc[]
|
include::enrich/put_policy.asciidoc[]
|
||||||
include::enrich/delete_policy.asciidoc[]
|
include::enrich/delete_policy.asciidoc[]
|
||||||
include::enrich/get_policy.asciidoc[]
|
include::enrich/get_policy.asciidoc[]
|
||||||
|
include::enrich/stats.asciidoc[]
|
||||||
|
|
|
@ -18,8 +18,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.tasks.TaskInfo;
|
import org.elasticsearch.tasks.TaskInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UncheckedIOException;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
@ -81,26 +79,18 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
builder.startArray("executing_policies");
|
builder.startArray("executing_policies");
|
||||||
executingPolicies.stream().sorted(Comparator.comparing(ExecutingPolicy::getName)).forEachOrdered(policy -> {
|
for (ExecutingPolicy policy : executingPolicies) {
|
||||||
try {
|
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
policy.toXContent(builder, params);
|
policy.toXContent(builder, params);
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
} catch (IOException e) {
|
|
||||||
throw new UncheckedIOException(e);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
builder.endArray();
|
builder.endArray();
|
||||||
builder.startArray("coordinator_stats");
|
builder.startArray("coordinator_stats");
|
||||||
coordinatorStats.stream().sorted(Comparator.comparing(CoordinatorStats::getNodeId)).forEachOrdered(entry -> {
|
for (CoordinatorStats entry : coordinatorStats) {
|
||||||
try {
|
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
entry.toXContent(builder, params);
|
entry.toXContent(builder, params);
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
} catch (IOException e) {
|
|
||||||
throw new UncheckedIOException(e);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
builder.endArray();
|
builder.endArray();
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.Exe
|
||||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -74,11 +75,13 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction<Enrich
|
||||||
|
|
||||||
List<CoordinatorStats> coordinatorStats = response.getNodes().stream()
|
List<CoordinatorStats> coordinatorStats = response.getNodes().stream()
|
||||||
.map(EnrichCoordinatorStatsAction.NodeResponse::getCoordinatorStats)
|
.map(EnrichCoordinatorStatsAction.NodeResponse::getCoordinatorStats)
|
||||||
|
.sorted(Comparator.comparing(CoordinatorStats::getNodeId))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks().values().stream()
|
List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks().values().stream()
|
||||||
.filter(t -> t.getAction().equals(ExecuteEnrichPolicyAction.NAME))
|
.filter(t -> t.getAction().equals(ExecuteEnrichPolicyAction.NAME))
|
||||||
.map(t -> t.taskInfo(clusterService.localNode().getId(), true))
|
.map(t -> t.taskInfo(clusterService.localNode().getId(), true))
|
||||||
.map(t -> new ExecutingPolicy(t.getDescription(), t))
|
.map(t -> new ExecutingPolicy(t.getDescription(), t))
|
||||||
|
.sorted(Comparator.comparing(ExecutingPolicy::getName))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats));
|
listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats));
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue