diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java index 6f5804e54ae..0d7fa55ebb6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java @@ -5,23 +5,26 @@ */ package org.elasticsearch.xpack.core.enrich.action; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Objects; -public class ExecuteEnrichPolicyAction extends ActionType { +public class ExecuteEnrichPolicyAction extends ActionType { public static final ExecuteEnrichPolicyAction INSTANCE = new ExecuteEnrichPolicyAction(); public static final String NAME = "cluster:admin/xpack/enrich/execute"; private ExecuteEnrichPolicyAction() { - super(NAME, AcknowledgedResponse::new); + super(NAME, ExecuteEnrichPolicyAction.Response::new); } public static class Request extends MasterNodeRequest { @@ -71,4 +74,64 @@ public class ExecuteEnrichPolicyAction extends ActionType return Objects.hash(name); } } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final TaskId taskId; + private final ExecuteEnrichPolicyStatus status; + + public Response(ExecuteEnrichPolicyStatus status) { + this.taskId = null; + this.status = status; + } + + public Response(TaskId taskId) { + this.taskId = taskId; + this.status = null; + } + + public TaskId getTaskId() { + return taskId; + } + + public ExecuteEnrichPolicyStatus getStatus() { + return status; + } + + public Response(StreamInput in) throws IOException { + super(in); + if (in.readBoolean()) { + this.status = new ExecuteEnrichPolicyStatus(in); + this.taskId = null; + } else { + this.taskId = TaskId.readFromStream(in); + this.status = null; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + boolean waitedForCompletion = status != null; + out.writeBoolean(waitedForCompletion); + if (waitedForCompletion) { + status.writeTo(out); + } else { + taskId.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + if (taskId != null) { + builder.field("task", taskId.getNodeId() + ":" + taskId.getId()); + } else { + builder.field("status", status); + } + } + builder.endObject(); + return builder; + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyStatus.java new file mode 100644 index 00000000000..96435b6f95f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyStatus.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.enrich.action; + +import java.io.IOException; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; + +public class ExecuteEnrichPolicyStatus implements Task.Status { + + public static final class PolicyPhases { + private PolicyPhases() {} + + public static final String SCHEDULED = "SCHEDULED"; + public static final String RUNNING = "RUNNING"; + public static final String COMPLETE = "COMPLETE"; + public static final String FAILED = "FAILED"; + } + + public static final String NAME = "enrich-policy-execution"; + + private static final String PHASE_FIELD = "phase"; + + private final String phase; + + public ExecuteEnrichPolicyStatus(String phase) { + this.phase = phase; + } + + public ExecuteEnrichPolicyStatus(StreamInput in) throws IOException { + this.phase = in.readString(); + } + + public String getPhase() { + return phase; + } + + public boolean isCompleted() { + return PolicyPhases.COMPLETE.equals(phase); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(phase); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(PHASE_FIELD, phase); + } + builder.endObject(); + return builder; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/client/EnrichClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/client/EnrichClient.java index 5bfff235a47..c12df5b49d2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/client/EnrichClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/client/EnrichClient.java @@ -39,12 +39,12 @@ public class EnrichClient { public void executeEnrichPolicy( final ExecuteEnrichPolicyAction.Request request, - final ActionListener listener) { + final ActionListener listener) { client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, listener); } - public ActionFuture executeEnrichPolicy(final ExecuteEnrichPolicyAction.Request request) { - final PlainActionFuture listener = PlainActionFuture.newFuture(); + public ActionFuture executeEnrichPolicy(final ExecuteEnrichPolicyAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, listener); return listener; } diff --git a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml index c53026b49ce..2a837d9c3b6 100644 --- a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml +++ b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml @@ -14,7 +14,7 @@ - do: enrich.execute_policy: name: policy-crud - - is_true: acknowledged + - match: { status.phase: "COMPLETE" } - do: enrich.get_policy: diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 4219d38903c..be7e40be8c4 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -163,14 +163,11 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { } EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks(); - EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool, - new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis); EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(settings, client, clusterService, threadPool, enrichPolicyLocks); enrichPolicyMaintenanceService.initialize(); return Arrays.asList( enrichPolicyLocks, - enrichPolicyExecutor, new EnrichCoordinatorProxyAction.Coordinator(client, settings), enrichPolicyMaintenanceService ); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 79f8e37cbf6..238d8e23758 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -6,7 +6,9 @@ package org.elasticsearch.xpack.enrich; +import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import org.elasticsearch.action.ActionListener; @@ -15,13 +17,21 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskListener; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; public class EnrichPolicyExecutor { private final ClusterService clusterService; private final Client client; + private final TaskManager taskManager; private final ThreadPool threadPool; private final IndexNameExpressionResolver indexNameExpressionResolver; private final LongSupplier nowSupplier; @@ -31,15 +41,17 @@ public class EnrichPolicyExecutor { private final int maxForceMergeAttempts; private final Semaphore policyExecutionPermits; - EnrichPolicyExecutor(Settings settings, + public EnrichPolicyExecutor(Settings settings, ClusterService clusterService, Client client, + TaskManager taskManager, ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, EnrichPolicyLocks policyLocks, LongSupplier nowSupplier) { this.clusterService = clusterService; this.client = client; + this.taskManager = taskManager; this.threadPool = threadPool; this.indexNameExpressionResolver = indexNameExpressionResolver; this.nowSupplier = nowSupplier; @@ -68,51 +80,123 @@ public class EnrichPolicyExecutor { } } - private class PolicyUnlockingListener implements ActionListener { + private class PolicyCompletionListener implements ActionListener { private final String policyName; - private final ActionListener listener; + private final ExecuteEnrichPolicyTask task; + private final BiConsumer onResponse; + private final BiConsumer onFailure; - PolicyUnlockingListener(String policyName, ActionListener listener) { + PolicyCompletionListener(String policyName, ExecuteEnrichPolicyTask task, + BiConsumer onResponse, BiConsumer onFailure) { this.policyName = policyName; - this.listener = listener; + this.task = task; + this.onResponse = onResponse; + this.onFailure = onFailure; } @Override - public void onResponse(PolicyExecutionResult policyExecutionResult) { + public void onResponse(ExecuteEnrichPolicyStatus status) { + assert ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE.equals(status.getPhase()) : "incomplete task returned"; releasePolicy(policyName); - listener.onResponse(policyExecutionResult); + try { + taskManager.unregister(task); + } finally { + onResponse.accept(task, status); + } } @Override public void onFailure(Exception e) { + // Set task status to failed to avoid having to catch and rethrow exceptions everywhere + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); releasePolicy(policyName); - listener.onFailure(e); + try { + taskManager.unregister(task); + } finally { + onFailure.accept(task, e); + } } } - protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener) { - return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier, + protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, + ActionListener listener) { + return new EnrichPolicyRunner(policyName, policy, task, listener, clusterService, client, indexNameExpressionResolver, nowSupplier, fetchSize, maxForceMergeAttempts); } - public void runPolicy(String policyId, ActionListener listener) { + private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) { // Look up policy in policy store and execute it - EnrichPolicy policy = EnrichStore.getPolicy(policyId, clusterService.state()); + EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), clusterService.state()); if (policy == null) { - throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + policyId + "]"); - } else { - runPolicy(policyId, policy, listener); + throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + request.getName() + "]"); + } + return policy; + } + + public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener listener) { + runPolicy(request, getPolicy(request), listener); + } + + public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { + runPolicy(request, getPolicy(request), listener); + } + + public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, + ActionListener listener) { + return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e)); + } + + public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, + TaskListener listener) { + return runPolicy(request, policy, listener::onResponse, listener::onFailure); + } + + private Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, + BiConsumer onResponse, BiConsumer onFailure) { + tryLockingPolicy(request.getName()); + try { + return runPolicyTask(request, policy, onResponse, onFailure); + } catch (Exception e) { + // Be sure to unlock if submission failed. + releasePolicy(request.getName()); + throw e; } } - public void runPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { - tryLockingPolicy(policyName); + private Task runPolicyTask(final ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, + BiConsumer onResponse, BiConsumer onFailure) { + Task asyncTask = taskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return request.getName(); + } + }); + ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) asyncTask; try { - Runnable runnable = createPolicyRunner(policyName, policy, new PolicyUnlockingListener(policyName, listener)); + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED)); + PolicyCompletionListener completionListener = new PolicyCompletionListener(request.getName(), task, onResponse, onFailure); + Runnable runnable = createPolicyRunner(request.getName(), policy, task, completionListener); threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); + return asyncTask; } catch (Exception e) { - // Be sure to unlock if submission failed. - releasePolicy(policyName); + // Unregister task in case of exception + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); + taskManager.unregister(asyncTask); throw e; } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 7f4d836545b..412ffeef386 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import java.io.IOException; import java.io.UncheckedIOException; @@ -70,7 +71,8 @@ public class EnrichPolicyRunner implements Runnable { private final String policyName; private final EnrichPolicy policy; - private final ActionListener listener; + private final ExecuteEnrichPolicyTask task; + private final ActionListener listener; private final ClusterService clusterService; private final Client client; private final IndexNameExpressionResolver indexNameExpressionResolver; @@ -78,11 +80,13 @@ public class EnrichPolicyRunner implements Runnable { private final int fetchSize; private final int maxForceMergeAttempts; - EnrichPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener, - ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver, - LongSupplier nowSupplier, int fetchSize, int maxForceMergeAttempts) { + EnrichPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, + ActionListener listener, ClusterService clusterService, Client client, + IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier, int fetchSize, + int maxForceMergeAttempts) { this.policyName = policyName; this.policy = policy; + this.task = task; this.listener = listener; this.clusterService = clusterService; this.client = client; @@ -94,8 +98,9 @@ public class EnrichPolicyRunner implements Runnable { @Override public void run() { - // Collect the source index information logger.info("Policy [{}]: Running enrich policy", policyName); + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING)); + // Collect the source index information final String[] sourceIndices = policy.getIndices().toArray(new String[0]); logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices); GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices); @@ -451,7 +456,9 @@ public class EnrichPolicyRunner implements Runnable { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { logger.info("Policy [{}]: Policy execution complete", policyName); - listener.onResponse(new PolicyExecutionResult(true)); + ExecuteEnrichPolicyStatus completeStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); + task.setStatus(completeStatus); + listener.onResponse(completeStatus); } @Override diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java new file mode 100644 index 00000000000..dedd45920fa --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import java.util.Map; + +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; + +class ExecuteEnrichPolicyTask extends Task { + + private volatile ExecuteEnrichPolicyStatus status; + + ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask, + Map headers) { + super(id, type, action, description, parentTask, headers); + } + + @Override + public Status getStatus() { + return status; + } + + void setStatus(ExecuteEnrichPolicyStatus status) { + this.status = status; + } +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java deleted file mode 100644 index faa48c87521..00000000000 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.enrich; - -public class PolicyExecutionResult { - private final boolean completed; - - public PolicyExecutionResult(boolean completed) { - this.completed = completed; - } - - public boolean isCompleted() { - return completed; - } -} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index e422e9d14b3..6941d4caf4a 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.enrich.action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -16,29 +16,34 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; -import org.elasticsearch.xpack.enrich.PolicyExecutionResult; +import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; import java.io.IOException; public class TransportExecuteEnrichPolicyAction - extends TransportMasterNodeAction { + extends TransportMasterNodeAction { private final EnrichPolicyExecutor executor; @Inject - public TransportExecuteEnrichPolicyAction(TransportService transportService, + public TransportExecuteEnrichPolicyAction(Settings settings, + Client client, + TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - EnrichPolicyExecutor executor) { + EnrichPolicyLocks enrichPolicyLocks) { super(ExecuteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters, ExecuteEnrichPolicyAction.Request::new, indexNameExpressionResolver); - this.executor = executor; + this.executor = new EnrichPolicyExecutor(settings, clusterService, client, transportService.getTaskManager(), threadPool, + new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis); } @Override @@ -46,22 +51,18 @@ public class TransportExecuteEnrichPolicyAction return ThreadPool.Names.SAME; } - protected AcknowledgedResponse newResponse() { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - @Override - protected AcknowledgedResponse read(StreamInput in) throws IOException { - return new AcknowledgedResponse(in); + protected ExecuteEnrichPolicyAction.Response read(StreamInput in) throws IOException { + return new ExecuteEnrichPolicyAction.Response(in); } @Override protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state, - ActionListener listener) { - executor.runPolicy(request.getName(), new ActionListener() { + ActionListener listener) { + executor.runPolicy(request, new ActionListener() { @Override - public void onResponse(PolicyExecutionResult policyExecutionResult) { - listener.onResponse(new AcknowledgedResponse(policyExecutionResult.isCompleted())); + public void onResponse(ExecuteEnrichPolicyStatus executionStatus) { + listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus)); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java index 70ebd20c819..7d11a69e06a 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.enrich; -import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -19,10 +18,13 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -31,9 +33,10 @@ import static org.hamcrest.CoreMatchers.containsString; public class EnrichPolicyExecutorTests extends ESTestCase { private static ThreadPool testThreadPool; - private static final ActionListener noOpListener = new ActionListener() { + private static TaskManager testTaskManager; + private static final ActionListener noOpListener = new ActionListener() { @Override - public void onResponse(PolicyExecutionResult policyExecutionResult) { } + public void onResponse(ExecuteEnrichPolicyStatus ignored) { } @Override public void onFailure(Exception e) { } @@ -42,6 +45,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase { @BeforeClass public static void beforeCLass() { testThreadPool = new TestThreadPool("EnrichPolicyExecutorTests"); + testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet()); } @AfterClass @@ -54,18 +58,24 @@ public class EnrichPolicyExecutorTests extends ESTestCase { */ private static class BlockingTestPolicyRunner implements Runnable { private final CountDownLatch latch; - private final ActionListener listener; + private final ExecuteEnrichPolicyTask task; + private final ActionListener listener; - BlockingTestPolicyRunner(CountDownLatch latch, ActionListener listener) { + BlockingTestPolicyRunner(CountDownLatch latch, ExecuteEnrichPolicyTask task, + ActionListener listener) { this.latch = latch; + this.task = task; this.listener = listener; } @Override public void run() { try { + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING)); latch.await(); - listener.onResponse(new PolicyExecutionResult(true)); + ExecuteEnrichPolicyStatus newStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); + task.setStatus(newStatus); + listener.onResponse(newStatus); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for test framework to continue the test", e); } @@ -78,33 +88,37 @@ public class EnrichPolicyExecutorTests extends ESTestCase { */ private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor { - EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, ThreadPool threadPool, - IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) { - super(settings, clusterService, client, threadPool, indexNameExpressionResolver, new EnrichPolicyLocks(), nowSupplier); + EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, TaskManager taskManager, + ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, + LongSupplier nowSupplier) { + super(settings, clusterService, client, taskManager, threadPool, indexNameExpressionResolver, new EnrichPolicyLocks(), + nowSupplier); } private CountDownLatch currentLatch; - CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { + CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { currentLatch = new CountDownLatch(1); - runPolicy(policyName, policy, listener); + ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(policyName); + runPolicy(request, policy, listener); return currentLatch; } @Override - protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener) { + protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, + ActionListener listener) { if (currentLatch == null) { throw new IllegalStateException("Use the testRunPolicy method on this test instance"); } - return new BlockingTestPolicyRunner(currentLatch, listener); + return new BlockingTestPolicyRunner(currentLatch, task, listener); } } public void testNonConcurrentPolicyExecution() throws InterruptedException { String testPolicyName = "test_policy"; - EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Arrays.asList("some_index"), "keyfield", + EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield", Collections.singletonList("valuefield")); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testThreadPool, - new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); + final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testTaskManager, + testThreadPool, new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); // Launch a fake policy run that will block until firstTaskBlock is counted down. final CountDownLatch firstTaskComplete = new CountDownLatch(1); @@ -143,8 +157,8 @@ public class EnrichPolicyExecutorTests extends ESTestCase { Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build(); EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield", Collections.singletonList("valuefield")); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testThreadPool, - new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); + final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testTaskManager, + testThreadPool, new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); // Launch a two fake policy runs that will block until counted down to use up the maximum concurrent final CountDownLatch firstTaskComplete = new CountDownLatch(1); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index de2a3d40b31..b2c28da720a 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -12,6 +12,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -49,8 +50,17 @@ import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; +import org.junit.AfterClass; +import org.junit.BeforeClass; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; @@ -65,6 +75,20 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class); } + private static ThreadPool testThreadPool; + private static TaskManager testTaskManager; + + @BeforeClass + public static void beforeCLass() { + testThreadPool = new TestThreadPool("EnrichPolicyRunnerTests"); + testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet()); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); + } + public void testRunner() throws Exception { final String sourceIndex = "source-index"; IndexResponse indexRequest = client().index(new IndexRequest() @@ -106,7 +130,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -191,7 +215,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -294,7 +318,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -402,7 +426,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -510,7 +534,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -574,7 +598,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -604,7 +628,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -652,7 +676,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -704,7 +728,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -787,7 +811,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -913,7 +937,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -1046,7 +1070,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -1171,7 +1195,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -1268,14 +1292,50 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { String createdEnrichIndex = ".enrich-test1-" + createTime; final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); ClusterService clusterService = getInstanceFromNode(ClusterService.class); IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); + Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return policyName; + } + }); + ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask); + // The executor would wrap the listener in order to clean up the task in the + // task manager, but we're just testing the runner, so we make sure to clean + // up after ourselves. + ActionListener wrappedListener = new ActionListener() { + @Override + public void onResponse(ExecuteEnrichPolicyStatus policyExecutionResult) { + testTaskManager.unregister(task); + listener.onResponse(policyExecutionResult); + } + + @Override + public void onFailure(Exception e) { + testTaskManager.unregister(task); + listener.onFailure(e); + } + }; AtomicInteger forceMergeAttempts = new AtomicInteger(0); final XContentBuilder unmergedDocument = SmileXContent.contentBuilder() .startObject().field("field1", "value1.1").field("field2", 2).field("field5", "value5").endObject(); - EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, - () -> createTime, randomIntBetween(1, 10000), randomIntBetween(3, 10)) { + EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, task, wrappedListener, clusterService, client(), + resolver, () -> createTime, randomIntBetween(1, 10000), randomIntBetween(3, 10)) { @Override protected void ensureSingleSegment(String destinationIndexName, int attempt) { forceMergeAttempts.incrementAndGet(); @@ -1351,15 +1411,51 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } - private EnrichPolicyRunner createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener, - Long createTime) { + private EnrichPolicyRunner createPolicyRunner(String policyName, EnrichPolicy policy, + ActionListener listener, Long createTime) { ClusterService clusterService = getInstanceFromNode(ClusterService.class); IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); - return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime, + Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return policyName; + } + }); + ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask); + // The executor would wrap the listener in order to clean up the task in the + // task manager, but we're just testing the runner, so we make sure to clean + // up after ourselves. + ActionListener wrappedListener = new ActionListener() { + @Override + public void onResponse(ExecuteEnrichPolicyStatus policyExecutionResult) { + testTaskManager.unregister(task); + listener.onResponse(policyExecutionResult); + } + + @Override + public void onFailure(Exception e) { + testTaskManager.unregister(task); + listener.onFailure(e); + } + }; + return new EnrichPolicyRunner(policyName, policy, task, wrappedListener, clusterService, client(), resolver, () -> createTime, randomIntBetween(1, 10000), randomIntBetween(1, 10)); } - private ActionListener createTestListener(final CountDownLatch latch, + private ActionListener createTestListener(final CountDownLatch latch, final Consumer exceptionConsumer) { return new LatchedActionListener<>(ActionListener.wrap((r) -> logger.info("Run complete"), exceptionConsumer), latch); } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index 0b67802d6e9..5663b98aee4 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.instanceOf; public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase { @@ -42,7 +43,8 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase { "key1", Collections.singletonList("field1")); PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1); assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); - assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet()); + assertThat("Execute failed", client().execute(ExecuteEnrichPolicyAction.INSTANCE, + new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet().getStatus().isCompleted(), equalTo(true)); String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";