Add wait for completion for Enrich policy execution (#47886)

This PR adds the ability to run the enrich policy execution task in the background,
returning a task id instead of waiting for the completed operation.
This commit is contained in:
James Baiera 2019-10-14 15:38:07 -04:00
parent 7fc9198d46
commit 18d7e32b7d
6 changed files with 107 additions and 16 deletions

View File

@ -30,26 +30,39 @@ public class ExecuteEnrichPolicyAction extends ActionType<ExecuteEnrichPolicyAct
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
private final String name; private final String name;
private boolean waitForCompletion;
public Request(String name) { public Request(String name) {
this.name = Objects.requireNonNull(name, "name cannot be null"); this.name = Objects.requireNonNull(name, "name cannot be null");
this.waitForCompletion = true;
} }
public Request(StreamInput in) throws IOException { public Request(StreamInput in) throws IOException {
super(in); super(in);
name = in.readString(); name = in.readString();
waitForCompletion = in.readBoolean();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(name); out.writeString(name);
out.writeBoolean(waitForCompletion);
} }
public String getName() { public String getName() {
return name; return name;
} }
public boolean isWaitForCompletion() {
return waitForCompletion;
}
public Request setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
return null; return null;
@ -66,12 +79,13 @@ public class ExecuteEnrichPolicyAction extends ActionType<ExecuteEnrichPolicyAct
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o; Request request = (Request) o;
return name.equals(request.name); return waitForCompletion == request.waitForCompletion &&
Objects.equals(name, request.name);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(name); return Objects.hash(name, waitForCompletion);
} }
} }

View File

@ -133,12 +133,12 @@ public class EnrichPolicyExecutor {
return policy; return policy;
} }
public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) { public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
runPolicy(request, getPolicy(request), listener); return runPolicy(request, getPolicy(request), listener);
} }
public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) { public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
runPolicy(request, getPolicy(request), listener); return runPolicy(request, getPolicy(request), listener);
} }
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,

View File

@ -17,6 +17,9 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
@ -59,17 +62,23 @@ public class TransportExecuteEnrichPolicyAction
@Override @Override
protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state, protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<ExecuteEnrichPolicyAction.Response> listener) { ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
executor.runPolicy(request, new ActionListener<ExecuteEnrichPolicyStatus>() { if (request.isWaitForCompletion()) {
@Override executor.runPolicy(request, new ActionListener<ExecuteEnrichPolicyStatus>() {
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) { @Override
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus)); public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
} listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
}
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
}); });
} else {
Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance());
TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId());
listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId));
}
} }
@Override @Override

View File

@ -29,6 +29,7 @@ public class RestExecuteEnrichPolicyAction extends BaseRestHandler {
@Override @Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name")); final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name"));
request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true));
return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel)); return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
} }
} }

View File

@ -5,6 +5,9 @@
*/ */
package org.elasticsearch.xpack.enrich; package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
@ -23,6 +26,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import java.util.ArrayList; import java.util.ArrayList;
@ -41,7 +45,9 @@ import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class BasicEnrichTests extends ESSingleNodeTestCase { public class BasicEnrichTests extends ESSingleNodeTestCase {
@ -214,6 +220,60 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
} }
} }
public void testAsyncTaskExecute() throws Exception {
String policyName = "async-policy";
String sourceIndexName = "async-policy-source";
{
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
indexRequest.source("key", "key", "value", "val1");
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet();
}
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexName), "key",
Collections.singletonList("value"));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
ExecuteEnrichPolicyAction.Response executeResponse = client()
.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false))
.actionGet();
assertThat(executeResponse.getStatus(), is(nullValue()));
assertThat(executeResponse.getTaskId(), is(not(nullValue())));
GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true);
assertBusy(() -> {
GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet();
assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(),
is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE));
});
String pipelineName = "test-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
BulkRequest bulkRequest = new BulkRequest("my-index");
int numTestDocs = randomIntBetween(3, 10);
for (int i = 0; i < numTestDocs; i++) {
IndexRequest indexRequest = new IndexRequest("my-index");
indexRequest.id(Integer.toString(i));
indexRequest.setPipeline(pipelineName);
indexRequest.source(Collections.singletonMap("key", "key"));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));
for (int i = 0; i < numTestDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(2));
assertThat(source.get("target"), equalTo(mapOf("key", "key", "value", "val1")));
}
}
private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) { private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
Set<String> keys = new HashSet<>(); Set<String> keys = new HashSet<>();
for (int id = 0; id < numKeys; id++) { for (int id = 0; id < numKeys; id++) {

View File

@ -15,6 +15,13 @@
} }
} }
] ]
},
"params":{
"wait_for_completion":{
"type":"boolean",
"default":true,
"description":"Should the request should block until the execution is complete."
}
} }
} }
} }