From 2325ffb757a5b5b0cb6da35dc52a612cb49f08e6 Mon Sep 17 00:00:00 2001
From: Michael Basnight <mbasnight@gmail.com>
Date: Wed, 22 May 2019 11:19:27 -0500
Subject: [PATCH] Add enrich policy execute API (#41762)

This commit wires up the Rest calls and Transport calls for execute
enrich policy, as well as tests and rest spec additions.
---
 .../action/DeleteEnrichPolicyAction.java      |  2 +-
 .../action/ExecuteEnrichPolicyAction.java     | 73 +++++++++++++++++++
 .../rest-api-spec/test/enrich/10_basic.yml    |  5 ++
 .../xpack/enrich/EnrichPlugin.java            | 11 ++-
 .../xpack/enrich/EnrichPolicyExecutor.java    |  2 +-
 .../TransportExecuteEnrichPolicyAction.java   | 71 ++++++++++++++++++
 .../rest/RestExecuteEnrichPolicyAction.java   | 36 +++++++++
 ...ExecuteEnrichPolicyActionRequestTests.java | 23 ++++++
 .../api/enrich.execute_policy.json            | 19 +++++
 9 files changed, 237 insertions(+), 5 deletions(-)
 create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java
 create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java
 create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java
 create mode 100644 x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java
 create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json

diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java
index f852f217cda..3997a3364a4 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java
@@ -20,7 +20,7 @@ public class DeleteEnrichPolicyAction extends Action<AcknowledgedResponse> {
     public static final DeleteEnrichPolicyAction INSTANCE = new DeleteEnrichPolicyAction();
     public static final String NAME = "cluster:admin/xpack/enrich/delete";
 
-    protected DeleteEnrichPolicyAction() {
+    private DeleteEnrichPolicyAction() {
         super(NAME);
     }
 
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java
new file mode 100644
index 00000000000..89bd874ca18
--- /dev/null
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.enrich.action;
+
+import org.elasticsearch.action.Action;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class ExecuteEnrichPolicyAction extends Action<AcknowledgedResponse> {
+
+    public static final ExecuteEnrichPolicyAction INSTANCE = new ExecuteEnrichPolicyAction();
+    public static final String NAME = "cluster:admin/xpack/enrich/execute";
+
+    private ExecuteEnrichPolicyAction() {
+        super(NAME);
+    }
+
+    @Override
+    public AcknowledgedResponse newResponse() {
+        return new AcknowledgedResponse();
+    }
+
+    public static class Request extends MasterNodeReadRequest<Request> {
+
+        private final String name;
+
+        public Request(String name) {
+            this.name = Objects.requireNonNull(name, "name cannot be null");
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            name = in.readString();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeString(name);
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return name.equals(request.name);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name);
+        }
+    }
+}
diff --git a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml
index 23ce787301b..eb2c71cf375 100644
--- a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml
+++ b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml
@@ -11,6 +11,11 @@
           enrich_values: ["a", "b"]
   - is_true: acknowledged
 
+  - do:
+      enrich.execute_policy:
+        name: policy-crud
+  - is_true: acknowledged
+
   - do:
       enrich.list_policy: {}
   - length: { policies: 1 }
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java
index 7875f5a0123..5057c1587fe 100644
--- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java
@@ -7,8 +7,8 @@ package org.elasticsearch.xpack.enrich;
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -33,12 +33,15 @@ import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.watcher.ResourceWatcherService;
 import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
 import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
 import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
 import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction;
 import org.elasticsearch.xpack.enrich.action.TransportListEnrichPolicyAction;
 import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction;
 import org.elasticsearch.xpack.enrich.rest.RestDeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.enrich.rest.RestExecuteEnrichPolicyAction;
 import org.elasticsearch.xpack.enrich.rest.RestListEnrichPolicyAction;
 import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction;
 
@@ -87,7 +90,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
         return Arrays.asList(
             new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class),
             new ActionHandler<>(ListEnrichPolicyAction.INSTANCE, TransportListEnrichPolicyAction.class),
-            new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class)
+            new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class),
+            new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class)
         );
     }
 
@@ -102,7 +106,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
         return Arrays.asList(
             new RestDeleteEnrichPolicyAction(settings, restController),
             new RestListEnrichPolicyAction(settings, restController),
-            new RestPutEnrichPolicyAction(settings, restController)
+            new RestPutEnrichPolicyAction(settings, restController),
+            new RestExecuteEnrichPolicyAction(settings, restController)
         );
     }
 
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java
index 6a5886479f0..4a979007ac5 100644
--- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java
@@ -17,7 +17,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 
-class EnrichPolicyExecutor {
+public class EnrichPolicyExecutor {
 
     private final ClusterService clusterService;
     private final Client client;
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java
new file mode 100644
index 00000000000..c8df791dca2
--- /dev/null
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.enrich.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
+import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor;
+import org.elasticsearch.xpack.enrich.PolicyExecutionResult;
+
+public class TransportExecuteEnrichPolicyAction
+    extends TransportMasterNodeReadAction<ExecuteEnrichPolicyAction.Request, AcknowledgedResponse> {
+
+    private final EnrichPolicyExecutor executor;
+
+    @Inject
+    public TransportExecuteEnrichPolicyAction(TransportService transportService,
+                                              ClusterService clusterService,
+                                              ThreadPool threadPool,
+                                              ActionFilters actionFilters,
+                                              IndexNameExpressionResolver indexNameExpressionResolver,
+                                              EnrichPolicyExecutor executor) {
+        super(ExecuteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
+            ExecuteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
+        this.executor = executor;
+    }
+
+    @Override
+    protected String executor() {
+        return ThreadPool.Names.SAME;
+    }
+
+    @Override
+    protected AcknowledgedResponse newResponse() {
+        throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+    }
+
+    @Override
+    protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state,
+                                   ActionListener<AcknowledgedResponse> listener) {
+        executor.runPolicy(request.getName(), new ActionListener<PolicyExecutionResult>() {
+            @Override
+            public void onResponse(PolicyExecutionResult policyExecutionResult) {
+                listener.onResponse(new AcknowledgedResponse(policyExecutionResult.isCompleted()));
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                listener.onFailure(e);
+            }
+        });
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(ExecuteEnrichPolicyAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
+    }
+}
diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java
new file mode 100644
index 00000000000..5129c583f0b
--- /dev/null
+++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.enrich.rest;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
+
+import java.io.IOException;
+
+public class RestExecuteEnrichPolicyAction extends BaseRestHandler {
+
+    public RestExecuteEnrichPolicyAction(final Settings settings, final RestController controller) {
+        super(settings);
+        controller.registerHandler(RestRequest.Method.PUT, "/_enrich/policy/{name}/_execute", this);
+        controller.registerHandler(RestRequest.Method.POST, "/_enrich/policy/{name}/_execute", this);
+    }
+
+    @Override
+    public String getName() {
+        return "execute_enrich_policy";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
+        final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name"));
+        return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
+    }
+}
diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java
new file mode 100644
index 00000000000..5e7fb69a129
--- /dev/null
+++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.enrich.action;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
+
+public class ExecuteEnrichPolicyActionRequestTests extends AbstractWireSerializingTestCase<ExecuteEnrichPolicyAction.Request> {
+
+    @Override
+    protected ExecuteEnrichPolicyAction.Request createTestInstance() {
+        return new ExecuteEnrichPolicyAction.Request(randomAlphaOfLength(3));
+    }
+
+    @Override
+    protected Writeable.Reader<ExecuteEnrichPolicyAction.Request> instanceReader() {
+        return ExecuteEnrichPolicyAction.Request::new;
+    }
+}
diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json
new file mode 100644
index 00000000000..06b69be213a
--- /dev/null
+++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json
@@ -0,0 +1,19 @@
+{
+  "enrich.execute_policy": {
+    "documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-execute-policy.html",
+    "methods": [ "PUT" ],
+    "url": {
+      "path": "/_enrich/policy/{name}/_execute",
+      "paths": ["/_enrich/policy/{name}/_execute"],
+      "parts": {
+        "name": {
+          "type" : "string",
+          "description" : "The name of the enrich policy"
+        }
+      },
+      "params": {
+      }
+    },
+    "body": null
+  }
+}