Add enrich policy PUT API (#41383)

This commit wires up the Rest calls and Transport calls for PUT enrich
policy, as well as tests and rest spec additions.
This commit is contained in:
Michael Basnight 2019-04-25 15:13:59 -05:00
parent a61ec11f36
commit fad45ea6bd
9 changed files with 311 additions and 2 deletions

View File

@ -39,6 +39,11 @@ public class XPackSettings {
}
/**
* Setting for controlling whether or not enrich is enabled.
*/
public static final Setting<Boolean> ENRICH_ENABLED_SETTING = Setting.boolSetting("xpack.enrich.enabled", true, Property.NodeScope);
/**
* Setting for controlling whether or not CCR is enabled.
*/

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.enrich.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.io.IOException;
import java.util.Objects;
public class PutEnrichPolicyAction extends Action<AcknowledgedResponse> {
public static final PutEnrichPolicyAction INSTANCE = new PutEnrichPolicyAction();
public static final String NAME = "cluster:admin/xpack/enrich/put";
protected PutEnrichPolicyAction() {
super(NAME);
}
public static Request fromXContent(XContentParser parser, String name) throws IOException {
return new Request(name, EnrichPolicy.fromXContent(parser));
}
@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
public static class Request extends MasterNodeRequest<PutEnrichPolicyAction.Request> {
private final EnrichPolicy policy;
private final String name;
public Request(String name, EnrichPolicy policy) {
this.name = Objects.requireNonNull(name, "name cannot be null");
this.policy = policy;
}
public Request(StreamInput in) throws IOException {
super(in);
name = in.readString();
policy = new EnrichPolicy(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
policy.writeTo(out);
}
public String getName() {
return name;
}
public EnrichPolicy getPolicy() {
return policy;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return policy.equals(request.policy) &&
name.equals(request.name);
}
@Override
public int hashCode() {
return Objects.hash(policy, name);
}
}
}

View File

@ -5,25 +5,75 @@
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
public class EnrichPlugin extends Plugin implements IngestPlugin {
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING;
public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
private final Settings settings;
private final Boolean enabled;
public EnrichPlugin(final Settings settings) {
this.settings = settings;
this.enabled = ENRICH_ENABLED_SETTING.get(settings);
}
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (enabled == false) {
return emptyList();
}
return Arrays.asList(
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class)
);
}
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
if (enabled == false) {
return emptyList();
}
return Arrays.asList(
new RestPutEnrichPolicyAction(settings, restController)
);
}
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE,

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.EnrichStore;
public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction<PutEnrichPolicyAction.Request, AcknowledgedResponse> {
@Inject
public TransportPutEnrichPolicyAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(PutEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
PutEnrichPolicyAction.Request::new, indexNameExpressionResolver);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected void masterOperation(PutEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
EnrichStore.putPolicy(request.getName(), request.getPolicy(), clusterService, e -> {
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
listener.onFailure(e);
}
});
}
@Override
protected ClusterBlockException checkBlock(PutEnrichPolicyAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import java.io.IOException;
public class RestPutEnrichPolicyAction extends BaseRestHandler {
public RestPutEnrichPolicyAction(final Settings settings, final RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.PUT, "/_enrich/policy/{name}", this);
}
@Override
public String getName() {
return "put_enrich_policy";
}
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final PutEnrichPolicyAction.Request request = createRequest(restRequest);
return channel -> client.execute(PutEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
static PutEnrichPolicyAction.Request createRequest(RestRequest restRequest) throws IOException {
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
return PutEnrichPolicyAction.fromXContent(parser, restRequest.param("name"));
}
}
}

View File

@ -43,7 +43,7 @@ public class EnrichPolicyTests extends AbstractSerializingTestCase<EnrichPolicy>
return randomEnrichPolicy(xContentType);
}
static EnrichPolicy randomEnrichPolicy(XContentType xContentType) {
public static EnrichPolicy randomEnrichPolicy(XContentType xContentType) {
final QueryBuilder queryBuilder;
if (randomBoolean()) {
queryBuilder = new MatchAllQueryBuilder();

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy;
public class PutEnrichPolicyActionRequestTests extends AbstractWireSerializingTestCase<PutEnrichPolicyAction.Request> {
@Override
protected PutEnrichPolicyAction.Request createTestInstance() {
final EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
return new PutEnrichPolicyAction.Request(randomAlphaOfLength(3), policy);
}
@Override
protected Writeable.Reader<PutEnrichPolicyAction.Request> instanceReader() {
return PutEnrichPolicyAction.Request::new;
}
}

View File

@ -0,0 +1,21 @@
{
"enrich.put_policy": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-put-policy.html",
"methods": [ "PUT" ],
"url": {
"path": "/_enrich/policy/{name}",
"paths": ["/_enrich/policy/{name}"],
"parts": {
"name": {
"type" : "string",
"description" : "The name of the enrich policy"
}
},
"params": {
}
},
"body": {
"description": "The enrich policy to register"
}
}
}

View File

@ -0,0 +1,14 @@
---
"Test enrich crud apis":
- do:
enrich.put_policy:
name: policy-crud
body:
type: exact_match
index_pattern: "bar*"
enrich_key: baz
enrich_values: ["a", "b"]
schedule: "*/120"
- is_true: acknowledged