diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java index f852f217cda..3997a3364a4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java @@ -20,7 +20,7 @@ public class DeleteEnrichPolicyAction extends Action { public static final DeleteEnrichPolicyAction INSTANCE = new DeleteEnrichPolicyAction(); public static final String NAME = "cluster:admin/xpack/enrich/delete"; - protected DeleteEnrichPolicyAction() { + private DeleteEnrichPolicyAction() { super(NAME); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java new file mode 100644 index 00000000000..89bd874ca18 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.enrich.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class ExecuteEnrichPolicyAction extends Action { + + public static final ExecuteEnrichPolicyAction INSTANCE = new ExecuteEnrichPolicyAction(); + public static final String NAME = "cluster:admin/xpack/enrich/execute"; + + private ExecuteEnrichPolicyAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends MasterNodeReadRequest { + + private final String name; + + public Request(String name) { + this.name = Objects.requireNonNull(name, "name cannot be null"); + } + + public Request(StreamInput in) throws IOException { + super(in); + name = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } + + public String getName() { + return name; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + } +} diff --git a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml index 23ce787301b..eb2c71cf375 100644 --- a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml +++ b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml @@ -11,6 +11,11 @@ enrich_values: ["a", "b"] - is_true: acknowledged + - do: + enrich.execute_policy: + name: policy-crud + - is_true: acknowledged + - do: enrich.list_policy: {} - length: { policies: 1 } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 7875f5a0123..5057c1587fe 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -33,12 +33,15 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction; +import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.TransportListEnrichPolicyAction; import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction; import org.elasticsearch.xpack.enrich.rest.RestDeleteEnrichPolicyAction; +import org.elasticsearch.xpack.enrich.rest.RestExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.rest.RestListEnrichPolicyAction; import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction; @@ -87,7 +90,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { return Arrays.asList( new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class), new ActionHandler<>(ListEnrichPolicyAction.INSTANCE, TransportListEnrichPolicyAction.class), - new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class) + new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class), + new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class) ); } @@ -102,7 +106,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { return Arrays.asList( new RestDeleteEnrichPolicyAction(settings, restController), new RestListEnrichPolicyAction(settings, restController), - new RestPutEnrichPolicyAction(settings, restController) + new RestPutEnrichPolicyAction(settings, restController), + new RestExecuteEnrichPolicyAction(settings, restController) ); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 6a5886479f0..4a979007ac5 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -17,7 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -class EnrichPolicyExecutor { +public class EnrichPolicyExecutor { private final ClusterService clusterService; private final Client client; diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java new file mode 100644 index 00000000000..c8df791dca2 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; +import org.elasticsearch.xpack.enrich.PolicyExecutionResult; + +public class TransportExecuteEnrichPolicyAction + extends TransportMasterNodeReadAction { + + private final EnrichPolicyExecutor executor; + + @Inject + public TransportExecuteEnrichPolicyAction(TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + EnrichPolicyExecutor executor) { + super(ExecuteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters, + ExecuteEnrichPolicyAction.Request::new, indexNameExpressionResolver); + this.executor = executor; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state, + ActionListener listener) { + executor.runPolicy(request.getName(), new ActionListener() { + @Override + public void onResponse(PolicyExecutionResult policyExecutionResult) { + listener.onResponse(new AcknowledgedResponse(policyExecutionResult.isCompleted())); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(ExecuteEnrichPolicyAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java new file mode 100644 index 00000000000..5129c583f0b --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; + +import java.io.IOException; + +public class RestExecuteEnrichPolicyAction extends BaseRestHandler { + + public RestExecuteEnrichPolicyAction(final Settings settings, final RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.PUT, "/_enrich/policy/{name}/_execute", this); + controller.registerHandler(RestRequest.Method.POST, "/_enrich/policy/{name}/_execute", this); + } + + @Override + public String getName() { + return "execute_enrich_policy"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { + final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name")); + return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java new file mode 100644 index 00000000000..5e7fb69a129 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; + +public class ExecuteEnrichPolicyActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected ExecuteEnrichPolicyAction.Request createTestInstance() { + return new ExecuteEnrichPolicyAction.Request(randomAlphaOfLength(3)); + } + + @Override + protected Writeable.Reader instanceReader() { + return ExecuteEnrichPolicyAction.Request::new; + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json new file mode 100644 index 00000000000..06b69be213a --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json @@ -0,0 +1,19 @@ +{ + "enrich.execute_policy": { + "documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-execute-policy.html", + "methods": [ "PUT" ], + "url": { + "path": "/_enrich/policy/{name}/_execute", + "paths": ["/_enrich/policy/{name}/_execute"], + "parts": { + "name": { + "type" : "string", + "description" : "The name of the enrich policy" + } + }, + "params": { + } + }, + "body": null + } +}