Add enrich policy list API (#41553)

This commit wires up the Rest calls and Transport calls for listing all
enrich policies, as well  as tests and rest spec additions.
This commit is contained in:
Michael Basnight 2019-05-01 13:16:18 -05:00
parent e429cd7f28
commit 2978ac3061
12 changed files with 413 additions and 10 deletions

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -53,16 +54,20 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
);
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
declareParserOptions(PARSER);
}
private static void declareParserOptions(ConstructingObjectParser parser) {
parser.declareString(ConstructingObjectParser.constructorArg(), TYPE);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
XContentBuilder contentBuilder = XContentBuilder.builder(p.contentType().xContent());
contentBuilder.generator().copyCurrentStructure(p);
return new QuerySource(BytesReference.bytes(contentBuilder), contentBuilder.contentType());
}, QUERY);
PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_PATTERN);
PARSER.declareString(ConstructingObjectParser.constructorArg(), ENRICH_KEY);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_VALUES);
PARSER.declareString(ConstructingObjectParser.constructorArg(), SCHEDULE);
parser.declareString(ConstructingObjectParser.constructorArg(), INDEX_PATTERN);
parser.declareString(ConstructingObjectParser.constructorArg(), ENRICH_KEY);
parser.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_VALUES);
parser.declareString(ConstructingObjectParser.constructorArg(), SCHEDULE);
}
public static EnrichPolicy fromXContent(XContentParser parser) throws IOException {
@ -228,4 +233,84 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
return Objects.hash(query, contentType);
}
}
public static class NamedPolicy implements Writeable, ToXContent {
static final ParseField NAME = new ParseField("name");
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<NamedPolicy, Void> PARSER = new ConstructingObjectParser<>("named_policy",
args -> {
return new NamedPolicy(
(String) args[0],
new EnrichPolicy((String) args[1],
(QuerySource) args[2],
(String) args[3],
(String) args[4],
(List<String>) args[5],
(String) args[6])
);
}
);
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME);
declareParserOptions(PARSER);
}
private final String name;
private final EnrichPolicy policy;
public NamedPolicy(String name, EnrichPolicy policy) {
this.name = name;
this.policy = policy;
}
public NamedPolicy(StreamInput in) throws IOException {
name = in.readString();
policy = new EnrichPolicy(in);
}
public String getName() {
return name;
}
public EnrichPolicy getPolicy() {
return policy;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
policy.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(NAME.getPreferredName(), name);
policy.toXContent(builder, params);
}
builder.endObject();
return builder;
}
public static NamedPolicy fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NamedPolicy that = (NamedPolicy) o;
return name.equals(that.name) &&
policy.equals(that.policy);
}
@Override
public int hashCode() {
return Objects.hash(name, policy);
}
}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.enrich.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;
public class ListEnrichPolicyAction extends Action<ListEnrichPolicyAction.Response> {
public static final ListEnrichPolicyAction INSTANCE = new ListEnrichPolicyAction();
public static final String NAME = "cluster:admin/xpack/enrich/list";
private ListEnrichPolicyAction() {
super(NAME);
}
@Override
public Response newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
public static class Request extends MasterNodeRequest<ListEnrichPolicyAction.Request> {
public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private final List<EnrichPolicy.NamedPolicy> policies;
public Response(Map<String, EnrichPolicy> policies) {
Objects.requireNonNull(policies, "policies cannot be null");
// use a treemap to guarantee ordering in the set, then transform it to the list of named policies
this.policies = new TreeMap<>(policies).entrySet().stream()
.map(entry -> new EnrichPolicy.NamedPolicy(entry.getKey(), entry.getValue())).collect(Collectors.toList());
}
public Response(StreamInput in) throws IOException {
policies = in.readList(EnrichPolicy.NamedPolicy::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(policies);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.startArray("policies");
{
for (EnrichPolicy.NamedPolicy policy: policies) {
policy.toXContent(builder, params);
}
}
builder.endArray();
}
builder.endObject();
return builder;
}
public List<EnrichPolicy.NamedPolicy> getPolicies() {
return policies;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return policies.equals(response.policies);
}
@Override
public int hashCode() {
return Objects.hash(policies);
}
}
}

View File

@ -22,7 +22,7 @@ public class PutEnrichPolicyAction extends Action<AcknowledgedResponse> {
public static final PutEnrichPolicyAction INSTANCE = new PutEnrichPolicyAction();
public static final String NAME = "cluster:admin/xpack/enrich/put";
protected PutEnrichPolicyAction() {
private PutEnrichPolicyAction() {
super(NAME);
}

View File

@ -10,5 +10,14 @@
enrich_key: baz
enrich_values: ["a", "b"]
schedule: "*/120"
- is_true: acknowledged
- do:
enrich.list_policy: {}
- length: { policies: 1 }
- match: { policies.0.name: policy-crud }
- match: { policies.0.type: exact_match }
- match: { policies.0.index_pattern: "bar*" }
- match: { policies.0.enrich_key: baz }
- match: { policies.0.enrich_values: ["a", "b"] }
- match: { policies.0.schedule: "*/120" }

View File

@ -25,8 +25,11 @@ import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportListEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestListEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction;
import java.util.Arrays;
@ -68,6 +71,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
}
return Arrays.asList(
new ActionHandler<>(ListEnrichPolicyAction.INSTANCE, TransportListEnrichPolicyAction.class),
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class)
);
}
@ -81,6 +85,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
}
return Arrays.asList(
new RestListEnrichPolicyAction(settings, restController),
new RestPutEnrichPolicyAction(settings, restController)
);
}

View File

@ -85,7 +85,13 @@ public final class EnrichStore {
return getPolicies(state).get(name);
}
private static Map<String, EnrichPolicy> getPolicies(ClusterState state) {
/**
* Gets all policies in the cluster.
*
* @param state the cluster state
* @return a Map of <code>policyName, EnrichPolicy</code> of the policies
*/
public static Map<String, EnrichPolicy> getPolicies(ClusterState state) {
final Map<String, EnrichPolicy> policies;
final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE);
if (enrichMetadata != null) {

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.EnrichStore;
import java.util.Map;
public class TransportListEnrichPolicyAction
extends TransportMasterNodeAction<ListEnrichPolicyAction.Request, ListEnrichPolicyAction.Response> {
@Inject
public TransportListEnrichPolicyAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(ListEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
ListEnrichPolicyAction.Request::new, indexNameExpressionResolver);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected ListEnrichPolicyAction.Response newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected void masterOperation(ListEnrichPolicyAction.Request request, ClusterState state,
ActionListener<ListEnrichPolicyAction.Response> listener) throws Exception {
Map<String, EnrichPolicy> policies = EnrichStore.getPolicies(clusterService.state());
listener.onResponse(new ListEnrichPolicyAction.Response(policies));
}
@Override
protected ClusterBlockException checkBlock(ListEnrichPolicyAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
import java.io.IOException;
public class RestListEnrichPolicyAction extends BaseRestHandler {
public RestListEnrichPolicyAction(final Settings settings, final RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, "/_enrich/policy", this);
}
@Override
public String getName() {
return "list_enrich_policy";
}
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final ListEnrichPolicyAction.Request request = new ListEnrichPolicyAction.Request();
return channel -> client.execute(ListEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -81,7 +81,7 @@ public class EnrichPolicyTests extends AbstractSerializingTestCase<EnrichPolicy>
assertEqualPolicies(expectedInstance, newInstance);
}
static void assertEqualPolicies(EnrichPolicy expectedInstance, EnrichPolicy newInstance) {
public static void assertEqualPolicies(EnrichPolicy expectedInstance, EnrichPolicy newInstance) {
assertThat(newInstance.getType(), equalTo(expectedInstance.getType()));
if (newInstance.getQuery() != null) {
// testFromXContent, always shuffles the xcontent and then byte wise the query is different, so we check the parsed version:

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@ -31,6 +32,10 @@ public class EnrichStoreTests extends ESSingleNodeTestCase {
EnrichPolicy result = EnrichStore.getPolicy(name, clusterService.state());
assertThat(result, equalTo(policy));
Map<String, EnrichPolicy> listPolicies = EnrichStore.getPolicies(clusterService.state());
assertThat(listPolicies.size(), equalTo(1));
assertThat(listPolicies.get(name), equalTo(policy));
error = deleteEnrichPolicy(name, clusterService);
assertThat(error.get(), nullValue());
}
@ -87,6 +92,12 @@ public class EnrichStoreTests extends ESSingleNodeTestCase {
assertNull(policy);
}
public void testListValidation() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
Map<String, EnrichPolicy> policies = EnrichStore.getPolicies(clusterService.state());
assertTrue(policies.isEmpty());
}
private AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
ClusterService clusterService) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);

View File

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.assertEqualPolicies;
import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy;
import static org.hamcrest.Matchers.equalTo;
public class ListEnrichPolicyActionResponseTests extends AbstractSerializingTestCase<ListEnrichPolicyAction.Response> {
@Override
protected ListEnrichPolicyAction.Response doParseInstance(XContentParser parser) throws IOException {
Map<String, EnrichPolicy> policies = new HashMap<>();
assert parser.nextToken() == XContentParser.Token.START_OBJECT;
assert parser.nextToken() == XContentParser.Token.FIELD_NAME;
assert parser.currentName().equals("policies");
assert parser.nextToken() == XContentParser.Token.START_ARRAY;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
assert token == XContentParser.Token.START_OBJECT;
EnrichPolicy.NamedPolicy policy = EnrichPolicy.NamedPolicy.fromXContent(parser);
policies.put(policy.getName(), policy.getPolicy());
}
return new ListEnrichPolicyAction.Response(policies);
}
@Override
protected ListEnrichPolicyAction.Response createTestInstance() {
Map<String, EnrichPolicy> items = new HashMap<>();
for (int i = 0; i < randomIntBetween(0, 3); i++) {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
items.put(randomAlphaOfLength(3), policy);
}
return new ListEnrichPolicyAction.Response(items);
}
@Override
protected Writeable.Reader<ListEnrichPolicyAction.Response> instanceReader() {
return ListEnrichPolicyAction.Response::new;
}
@Override
protected void assertEqualInstances(ListEnrichPolicyAction.Response expectedInstance, ListEnrichPolicyAction.Response newInstance) {
assertThat(expectedInstance.getPolicies().size(), equalTo(newInstance.getPolicies().size()));
for (EnrichPolicy.NamedPolicy expectedPolicy: expectedInstance.getPolicies()) {
// contains and indexOf cannot be used here as the query source may be represented differently, so we need to check
// if the name is the same and if it is, use that to ensure the policies are the same
Optional<EnrichPolicy.NamedPolicy> maybePolicy = newInstance.getPolicies().stream()
.filter(p -> p.getName().equals(expectedPolicy.getName())).findFirst();
assertTrue(maybePolicy.isPresent());
EnrichPolicy.NamedPolicy newPolicy = maybePolicy.get();
assertEqualPolicies(expectedPolicy.getPolicy(), newPolicy.getPolicy());
assertThat(expectedPolicy.getName(), equalTo(newPolicy.getName()));
}
}
}

View File

@ -0,0 +1,13 @@
{
"enrich.list_policy": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-list-policy.html",
"methods": [ "GET" ],
"url": {
"path": "/_enrich/policy",
"paths": ["/_enrich/policy"],
"parts": {},
"params": {}
},
"body": null
}
}