diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java index 0d7fa55ebb6..e5c6ab5eb67 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java @@ -30,26 +30,39 @@ public class ExecuteEnrichPolicyAction extends ActionType { private final String name; + private boolean waitForCompletion; public Request(String name) { this.name = Objects.requireNonNull(name, "name cannot be null"); + this.waitForCompletion = true; } public Request(StreamInput in) throws IOException { super(in); name = in.readString(); + waitForCompletion = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(name); + out.writeBoolean(waitForCompletion); } public String getName() { return name; } + public boolean isWaitForCompletion() { + return waitForCompletion; + } + + public Request setWaitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + return this; + } + @Override public ActionRequestValidationException validate() { return null; @@ -66,12 +79,13 @@ public class ExecuteEnrichPolicyAction extends ActionType listener) { - runPolicy(request, getPolicy(request), listener); + public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener listener) { + return runPolicy(request, getPolicy(request), listener); } - public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { - runPolicy(request, getPolicy(request), listener); + public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { + return runPolicy(request, getPolicy(request), listener); } public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index 6941d4caf4a..79810aca9b2 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -17,6 +17,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.LoggingTaskListener; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; @@ -59,17 +62,23 @@ public class TransportExecuteEnrichPolicyAction @Override protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state, ActionListener listener) { - executor.runPolicy(request, new ActionListener() { - @Override - public void onResponse(ExecuteEnrichPolicyStatus executionStatus) { - listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus)); - } + if (request.isWaitForCompletion()) { + executor.runPolicy(request, new ActionListener() { + @Override + public void onResponse(ExecuteEnrichPolicyStatus executionStatus) { + listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus)); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } else { + Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance()); + TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId()); + listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId)); + } } @Override diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java index 05955efdc8b..087117a6c1a 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java @@ -29,6 +29,7 @@ public class RestExecuteEnrichPolicyAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name")); + request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true)); return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index 42878f5b99d..871f37288e4 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.xpack.enrich; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -23,6 +26,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import java.util.ArrayList; @@ -41,7 +45,9 @@ import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class BasicEnrichTests extends ESSingleNodeTestCase { @@ -214,6 +220,60 @@ public class BasicEnrichTests extends ESSingleNodeTestCase { } } + public void testAsyncTaskExecute() throws Exception { + String policyName = "async-policy"; + String sourceIndexName = "async-policy-source"; + + { + IndexRequest indexRequest = new IndexRequest(sourceIndexName); + indexRequest.source("key", "key", "value", "val1"); + client().index(indexRequest).actionGet(); + client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet(); + } + + EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexName), "key", + Collections.singletonList("value")); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + ExecuteEnrichPolicyAction.Response executeResponse = client() + .execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false)) + .actionGet(); + + assertThat(executeResponse.getStatus(), is(nullValue())); + assertThat(executeResponse.getTaskId(), is(not(nullValue()))); + GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true); + assertBusy(() -> { + GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet(); + assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(), + is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE)); + }); + + String pipelineName = "test-pipeline"; + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName + + "\", \"field\": \"key\", \"target_field\": \"target\"}}]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); + + BulkRequest bulkRequest = new BulkRequest("my-index"); + int numTestDocs = randomIntBetween(3, 10); + for (int i = 0; i < numTestDocs; i++) { + IndexRequest indexRequest = new IndexRequest("my-index"); + indexRequest.id(Integer.toString(i)); + indexRequest.setPipeline(pipelineName); + indexRequest.source(Collections.singletonMap("key", "key")); + bulkRequest.add(indexRequest); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false)); + + for (int i = 0; i < numTestDocs; i++) { + GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet(); + Map source = getResponse.getSourceAsMap(); + assertThat(source.size(), equalTo(2)); + assertThat(source.get("target"), equalTo(mapOf("key", "key", "value", "val1"))); + } + } + private List createSourceMatchIndex(int numKeys, int numDocsPerKey) { Set keys = new HashSet<>(); for (int id = 0; id < numKeys; id++) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json index b6115e40ec6..b49486a0620 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json @@ -15,6 +15,13 @@ } } ] + }, + "params":{ + "wait_for_completion":{ + "type":"boolean", + "default":true, + "description":"Should the request should block until the execution is complete." + } } } }