From 73263c654ace5a77310d09bf64c70d22a53dd622 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 9 Oct 2019 14:37:53 -0400 Subject: [PATCH] Add basic task support for executing enrich policies (#47523) Changes the execution logic to create a new task using the execute request, and attaches the new task to the policy runner to be updated. Also, a new response is now returned from the execute api, which contains either the task id of the execution, or the completed status of the run. The fields are mutually exclusive to make it easier to discern what type of response it is. --- .../action/ExecuteEnrichPolicyAction.java | 71 ++++++++- .../action/ExecuteEnrichPolicyStatus.java | 67 +++++++++ .../core/enrich/client/EnrichClient.java | 6 +- .../rest-api-spec/test/enrich/10_basic.yml | 2 +- .../xpack/enrich/EnrichPlugin.java | 3 - .../xpack/enrich/EnrichPolicyExecutor.java | 124 +++++++++++++--- .../xpack/enrich/EnrichPolicyRunner.java | 19 ++- .../xpack/enrich/ExecuteEnrichPolicyTask.java | 31 ++++ .../xpack/enrich/PolicyExecutionResult.java | 18 --- .../TransportExecuteEnrichPolicyAction.java | 33 ++--- .../enrich/EnrichPolicyExecutorTests.java | 50 ++++--- .../xpack/enrich/EnrichPolicyRunnerTests.java | 136 +++++++++++++++--- .../xpack/enrich/EnrichPolicyUpdateTests.java | 4 +- 13 files changed, 454 insertions(+), 110 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyStatus.java create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java delete mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java index 6f5804e54ae..0d7fa55ebb6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java @@ -5,23 +5,26 @@ */ package org.elasticsearch.xpack.core.enrich.action; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Objects; -public class ExecuteEnrichPolicyAction extends ActionType { +public class ExecuteEnrichPolicyAction extends ActionType { public static final ExecuteEnrichPolicyAction INSTANCE = new ExecuteEnrichPolicyAction(); public static final String NAME = "cluster:admin/xpack/enrich/execute"; private ExecuteEnrichPolicyAction() { - super(NAME, AcknowledgedResponse::new); + super(NAME, ExecuteEnrichPolicyAction.Response::new); } public static class Request extends MasterNodeRequest { @@ -71,4 +74,64 @@ public class ExecuteEnrichPolicyAction extends ActionType return Objects.hash(name); } } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final TaskId taskId; + private final ExecuteEnrichPolicyStatus status; + + public Response(ExecuteEnrichPolicyStatus status) { + this.taskId = null; + this.status = status; + } + + public Response(TaskId taskId) { + this.taskId = taskId; + this.status = null; + } + + public TaskId getTaskId() { + return taskId; + } + + public ExecuteEnrichPolicyStatus getStatus() { + return status; + } + + public Response(StreamInput in) throws IOException { + super(in); + if (in.readBoolean()) { + this.status = new ExecuteEnrichPolicyStatus(in); + this.taskId = null; + } else { + this.taskId = TaskId.readFromStream(in); + this.status = null; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + boolean waitedForCompletion = status != null; + out.writeBoolean(waitedForCompletion); + if (waitedForCompletion) { + status.writeTo(out); + } else { + taskId.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + if (taskId != null) { + builder.field("task", taskId.getNodeId() + ":" + taskId.getId()); + } else { + builder.field("status", status); + } + } + builder.endObject(); + return builder; + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyStatus.java new file mode 100644 index 00000000000..96435b6f95f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyStatus.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.enrich.action; + +import java.io.IOException; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; + +public class ExecuteEnrichPolicyStatus implements Task.Status { + + public static final class PolicyPhases { + private PolicyPhases() {} + + public static final String SCHEDULED = "SCHEDULED"; + public static final String RUNNING = "RUNNING"; + public static final String COMPLETE = "COMPLETE"; + public static final String FAILED = "FAILED"; + } + + public static final String NAME = "enrich-policy-execution"; + + private static final String PHASE_FIELD = "phase"; + + private final String phase; + + public ExecuteEnrichPolicyStatus(String phase) { + this.phase = phase; + } + + public ExecuteEnrichPolicyStatus(StreamInput in) throws IOException { + this.phase = in.readString(); + } + + public String getPhase() { + return phase; + } + + public boolean isCompleted() { + return PolicyPhases.COMPLETE.equals(phase); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(phase); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(PHASE_FIELD, phase); + } + builder.endObject(); + return builder; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/client/EnrichClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/client/EnrichClient.java index 5bfff235a47..c12df5b49d2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/client/EnrichClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/client/EnrichClient.java @@ -39,12 +39,12 @@ public class EnrichClient { public void executeEnrichPolicy( final ExecuteEnrichPolicyAction.Request request, - final ActionListener listener) { + final ActionListener listener) { client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, listener); } - public ActionFuture executeEnrichPolicy(final ExecuteEnrichPolicyAction.Request request) { - final PlainActionFuture listener = PlainActionFuture.newFuture(); + public ActionFuture executeEnrichPolicy(final ExecuteEnrichPolicyAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, listener); return listener; } diff --git a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml index c53026b49ce..2a837d9c3b6 100644 --- a/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml +++ b/x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml @@ -14,7 +14,7 @@ - do: enrich.execute_policy: name: policy-crud - - is_true: acknowledged + - match: { status.phase: "COMPLETE" } - do: enrich.get_policy: diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 4219d38903c..be7e40be8c4 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -163,14 +163,11 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { } EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks(); - EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool, - new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis); EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(settings, client, clusterService, threadPool, enrichPolicyLocks); enrichPolicyMaintenanceService.initialize(); return Arrays.asList( enrichPolicyLocks, - enrichPolicyExecutor, new EnrichCoordinatorProxyAction.Coordinator(client, settings), enrichPolicyMaintenanceService ); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 79f8e37cbf6..238d8e23758 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -6,7 +6,9 @@ package org.elasticsearch.xpack.enrich; +import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import org.elasticsearch.action.ActionListener; @@ -15,13 +17,21 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskListener; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; public class EnrichPolicyExecutor { private final ClusterService clusterService; private final Client client; + private final TaskManager taskManager; private final ThreadPool threadPool; private final IndexNameExpressionResolver indexNameExpressionResolver; private final LongSupplier nowSupplier; @@ -31,15 +41,17 @@ public class EnrichPolicyExecutor { private final int maxForceMergeAttempts; private final Semaphore policyExecutionPermits; - EnrichPolicyExecutor(Settings settings, + public EnrichPolicyExecutor(Settings settings, ClusterService clusterService, Client client, + TaskManager taskManager, ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, EnrichPolicyLocks policyLocks, LongSupplier nowSupplier) { this.clusterService = clusterService; this.client = client; + this.taskManager = taskManager; this.threadPool = threadPool; this.indexNameExpressionResolver = indexNameExpressionResolver; this.nowSupplier = nowSupplier; @@ -68,51 +80,123 @@ public class EnrichPolicyExecutor { } } - private class PolicyUnlockingListener implements ActionListener { + private class PolicyCompletionListener implements ActionListener { private final String policyName; - private final ActionListener listener; + private final ExecuteEnrichPolicyTask task; + private final BiConsumer onResponse; + private final BiConsumer onFailure; - PolicyUnlockingListener(String policyName, ActionListener listener) { + PolicyCompletionListener(String policyName, ExecuteEnrichPolicyTask task, + BiConsumer onResponse, BiConsumer onFailure) { this.policyName = policyName; - this.listener = listener; + this.task = task; + this.onResponse = onResponse; + this.onFailure = onFailure; } @Override - public void onResponse(PolicyExecutionResult policyExecutionResult) { + public void onResponse(ExecuteEnrichPolicyStatus status) { + assert ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE.equals(status.getPhase()) : "incomplete task returned"; releasePolicy(policyName); - listener.onResponse(policyExecutionResult); + try { + taskManager.unregister(task); + } finally { + onResponse.accept(task, status); + } } @Override public void onFailure(Exception e) { + // Set task status to failed to avoid having to catch and rethrow exceptions everywhere + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); releasePolicy(policyName); - listener.onFailure(e); + try { + taskManager.unregister(task); + } finally { + onFailure.accept(task, e); + } } } - protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener) { - return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier, + protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, + ActionListener listener) { + return new EnrichPolicyRunner(policyName, policy, task, listener, clusterService, client, indexNameExpressionResolver, nowSupplier, fetchSize, maxForceMergeAttempts); } - public void runPolicy(String policyId, ActionListener listener) { + private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) { // Look up policy in policy store and execute it - EnrichPolicy policy = EnrichStore.getPolicy(policyId, clusterService.state()); + EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), clusterService.state()); if (policy == null) { - throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + policyId + "]"); - } else { - runPolicy(policyId, policy, listener); + throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + request.getName() + "]"); + } + return policy; + } + + public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener listener) { + runPolicy(request, getPolicy(request), listener); + } + + public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { + runPolicy(request, getPolicy(request), listener); + } + + public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, + ActionListener listener) { + return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e)); + } + + public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, + TaskListener listener) { + return runPolicy(request, policy, listener::onResponse, listener::onFailure); + } + + private Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, + BiConsumer onResponse, BiConsumer onFailure) { + tryLockingPolicy(request.getName()); + try { + return runPolicyTask(request, policy, onResponse, onFailure); + } catch (Exception e) { + // Be sure to unlock if submission failed. + releasePolicy(request.getName()); + throw e; } } - public void runPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { - tryLockingPolicy(policyName); + private Task runPolicyTask(final ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, + BiConsumer onResponse, BiConsumer onFailure) { + Task asyncTask = taskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return request.getName(); + } + }); + ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) asyncTask; try { - Runnable runnable = createPolicyRunner(policyName, policy, new PolicyUnlockingListener(policyName, listener)); + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED)); + PolicyCompletionListener completionListener = new PolicyCompletionListener(request.getName(), task, onResponse, onFailure); + Runnable runnable = createPolicyRunner(request.getName(), policy, task, completionListener); threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); + return asyncTask; } catch (Exception e) { - // Be sure to unlock if submission failed. - releasePolicy(policyName); + // Unregister task in case of exception + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED)); + taskManager.unregister(asyncTask); throw e; } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 7f4d836545b..412ffeef386 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import java.io.IOException; import java.io.UncheckedIOException; @@ -70,7 +71,8 @@ public class EnrichPolicyRunner implements Runnable { private final String policyName; private final EnrichPolicy policy; - private final ActionListener listener; + private final ExecuteEnrichPolicyTask task; + private final ActionListener listener; private final ClusterService clusterService; private final Client client; private final IndexNameExpressionResolver indexNameExpressionResolver; @@ -78,11 +80,13 @@ public class EnrichPolicyRunner implements Runnable { private final int fetchSize; private final int maxForceMergeAttempts; - EnrichPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener, - ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver, - LongSupplier nowSupplier, int fetchSize, int maxForceMergeAttempts) { + EnrichPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, + ActionListener listener, ClusterService clusterService, Client client, + IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier, int fetchSize, + int maxForceMergeAttempts) { this.policyName = policyName; this.policy = policy; + this.task = task; this.listener = listener; this.clusterService = clusterService; this.client = client; @@ -94,8 +98,9 @@ public class EnrichPolicyRunner implements Runnable { @Override public void run() { - // Collect the source index information logger.info("Policy [{}]: Running enrich policy", policyName); + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING)); + // Collect the source index information final String[] sourceIndices = policy.getIndices().toArray(new String[0]); logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices); GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices); @@ -451,7 +456,9 @@ public class EnrichPolicyRunner implements Runnable { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { logger.info("Policy [{}]: Policy execution complete", policyName); - listener.onResponse(new PolicyExecutionResult(true)); + ExecuteEnrichPolicyStatus completeStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); + task.setStatus(completeStatus); + listener.onResponse(completeStatus); } @Override diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java new file mode 100644 index 00000000000..dedd45920fa --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExecuteEnrichPolicyTask.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import java.util.Map; + +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; + +class ExecuteEnrichPolicyTask extends Task { + + private volatile ExecuteEnrichPolicyStatus status; + + ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask, + Map headers) { + super(id, type, action, description, parentTask, headers); + } + + @Override + public Status getStatus() { + return status; + } + + void setStatus(ExecuteEnrichPolicyStatus status) { + this.status = status; + } +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java deleted file mode 100644 index faa48c87521..00000000000 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/PolicyExecutionResult.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.enrich; - -public class PolicyExecutionResult { - private final boolean completed; - - public PolicyExecutionResult(boolean completed) { - this.completed = completed; - } - - public boolean isCompleted() { - return completed; - } -} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index e422e9d14b3..6941d4caf4a 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.enrich.action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -16,29 +16,34 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; -import org.elasticsearch.xpack.enrich.PolicyExecutionResult; +import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; import java.io.IOException; public class TransportExecuteEnrichPolicyAction - extends TransportMasterNodeAction { + extends TransportMasterNodeAction { private final EnrichPolicyExecutor executor; @Inject - public TransportExecuteEnrichPolicyAction(TransportService transportService, + public TransportExecuteEnrichPolicyAction(Settings settings, + Client client, + TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - EnrichPolicyExecutor executor) { + EnrichPolicyLocks enrichPolicyLocks) { super(ExecuteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters, ExecuteEnrichPolicyAction.Request::new, indexNameExpressionResolver); - this.executor = executor; + this.executor = new EnrichPolicyExecutor(settings, clusterService, client, transportService.getTaskManager(), threadPool, + new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis); } @Override @@ -46,22 +51,18 @@ public class TransportExecuteEnrichPolicyAction return ThreadPool.Names.SAME; } - protected AcknowledgedResponse newResponse() { - throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); - } - @Override - protected AcknowledgedResponse read(StreamInput in) throws IOException { - return new AcknowledgedResponse(in); + protected ExecuteEnrichPolicyAction.Response read(StreamInput in) throws IOException { + return new ExecuteEnrichPolicyAction.Response(in); } @Override protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state, - ActionListener listener) { - executor.runPolicy(request.getName(), new ActionListener() { + ActionListener listener) { + executor.runPolicy(request, new ActionListener() { @Override - public void onResponse(PolicyExecutionResult policyExecutionResult) { - listener.onResponse(new AcknowledgedResponse(policyExecutionResult.isCompleted())); + public void onResponse(ExecuteEnrichPolicyStatus executionStatus) { + listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus)); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java index 70ebd20c819..7d11a69e06a 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.enrich; -import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -19,10 +18,13 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -31,9 +33,10 @@ import static org.hamcrest.CoreMatchers.containsString; public class EnrichPolicyExecutorTests extends ESTestCase { private static ThreadPool testThreadPool; - private static final ActionListener noOpListener = new ActionListener() { + private static TaskManager testTaskManager; + private static final ActionListener noOpListener = new ActionListener() { @Override - public void onResponse(PolicyExecutionResult policyExecutionResult) { } + public void onResponse(ExecuteEnrichPolicyStatus ignored) { } @Override public void onFailure(Exception e) { } @@ -42,6 +45,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase { @BeforeClass public static void beforeCLass() { testThreadPool = new TestThreadPool("EnrichPolicyExecutorTests"); + testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet()); } @AfterClass @@ -54,18 +58,24 @@ public class EnrichPolicyExecutorTests extends ESTestCase { */ private static class BlockingTestPolicyRunner implements Runnable { private final CountDownLatch latch; - private final ActionListener listener; + private final ExecuteEnrichPolicyTask task; + private final ActionListener listener; - BlockingTestPolicyRunner(CountDownLatch latch, ActionListener listener) { + BlockingTestPolicyRunner(CountDownLatch latch, ExecuteEnrichPolicyTask task, + ActionListener listener) { this.latch = latch; + this.task = task; this.listener = listener; } @Override public void run() { try { + task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING)); latch.await(); - listener.onResponse(new PolicyExecutionResult(true)); + ExecuteEnrichPolicyStatus newStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); + task.setStatus(newStatus); + listener.onResponse(newStatus); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for test framework to continue the test", e); } @@ -78,33 +88,37 @@ public class EnrichPolicyExecutorTests extends ESTestCase { */ private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor { - EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, ThreadPool threadPool, - IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) { - super(settings, clusterService, client, threadPool, indexNameExpressionResolver, new EnrichPolicyLocks(), nowSupplier); + EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, TaskManager taskManager, + ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, + LongSupplier nowSupplier) { + super(settings, clusterService, client, taskManager, threadPool, indexNameExpressionResolver, new EnrichPolicyLocks(), + nowSupplier); } private CountDownLatch currentLatch; - CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { + CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { currentLatch = new CountDownLatch(1); - runPolicy(policyName, policy, listener); + ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(policyName); + runPolicy(request, policy, listener); return currentLatch; } @Override - protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener) { + protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task, + ActionListener listener) { if (currentLatch == null) { throw new IllegalStateException("Use the testRunPolicy method on this test instance"); } - return new BlockingTestPolicyRunner(currentLatch, listener); + return new BlockingTestPolicyRunner(currentLatch, task, listener); } } public void testNonConcurrentPolicyExecution() throws InterruptedException { String testPolicyName = "test_policy"; - EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Arrays.asList("some_index"), "keyfield", + EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield", Collections.singletonList("valuefield")); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testThreadPool, - new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); + final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testTaskManager, + testThreadPool, new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); // Launch a fake policy run that will block until firstTaskBlock is counted down. final CountDownLatch firstTaskComplete = new CountDownLatch(1); @@ -143,8 +157,8 @@ public class EnrichPolicyExecutorTests extends ESTestCase { Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build(); EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield", Collections.singletonList("valuefield")); - final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testThreadPool, - new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); + final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testTaskManager, + testThreadPool, new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); // Launch a two fake policy runs that will block until counted down to use up the maximum concurrent final CountDownLatch firstTaskComplete = new CountDownLatch(1); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index de2a3d40b31..b2c28da720a 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -12,6 +12,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -49,8 +50,17 @@ import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; +import org.junit.AfterClass; +import org.junit.BeforeClass; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; @@ -65,6 +75,20 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class); } + private static ThreadPool testThreadPool; + private static TaskManager testTaskManager; + + @BeforeClass + public static void beforeCLass() { + testThreadPool = new TestThreadPool("EnrichPolicyRunnerTests"); + testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet()); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); + } + public void testRunner() throws Exception { final String sourceIndex = "source-index"; IndexResponse indexRequest = client().index(new IndexRequest() @@ -106,7 +130,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -191,7 +215,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -294,7 +318,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -402,7 +426,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -510,7 +534,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -574,7 +598,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -604,7 +628,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -652,7 +676,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -704,7 +728,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -787,7 +811,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -913,7 +937,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -1046,7 +1070,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -1171,7 +1195,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); logger.info("Starting policy run"); @@ -1268,14 +1292,50 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { String createdEnrichIndex = ".enrich-test1-" + createTime; final AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = createTestListener(latch, exception::set); + ActionListener listener = createTestListener(latch, exception::set); ClusterService clusterService = getInstanceFromNode(ClusterService.class); IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); + Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return policyName; + } + }); + ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask); + // The executor would wrap the listener in order to clean up the task in the + // task manager, but we're just testing the runner, so we make sure to clean + // up after ourselves. + ActionListener wrappedListener = new ActionListener() { + @Override + public void onResponse(ExecuteEnrichPolicyStatus policyExecutionResult) { + testTaskManager.unregister(task); + listener.onResponse(policyExecutionResult); + } + + @Override + public void onFailure(Exception e) { + testTaskManager.unregister(task); + listener.onFailure(e); + } + }; AtomicInteger forceMergeAttempts = new AtomicInteger(0); final XContentBuilder unmergedDocument = SmileXContent.contentBuilder() .startObject().field("field1", "value1.1").field("field2", 2).field("field5", "value5").endObject(); - EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, - () -> createTime, randomIntBetween(1, 10000), randomIntBetween(3, 10)) { + EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, task, wrappedListener, clusterService, client(), + resolver, () -> createTime, randomIntBetween(1, 10000), randomIntBetween(3, 10)) { @Override protected void ensureSingleSegment(String destinationIndexName, int attempt) { forceMergeAttempts.incrementAndGet(); @@ -1351,15 +1411,51 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } - private EnrichPolicyRunner createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener, - Long createTime) { + private EnrichPolicyRunner createPolicyRunner(String policyName, EnrichPolicy policy, + ActionListener listener, Long createTime) { ClusterService clusterService = getInstanceFromNode(ClusterService.class); IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); - return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime, + Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return policyName; + } + }); + ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask); + // The executor would wrap the listener in order to clean up the task in the + // task manager, but we're just testing the runner, so we make sure to clean + // up after ourselves. + ActionListener wrappedListener = new ActionListener() { + @Override + public void onResponse(ExecuteEnrichPolicyStatus policyExecutionResult) { + testTaskManager.unregister(task); + listener.onResponse(policyExecutionResult); + } + + @Override + public void onFailure(Exception e) { + testTaskManager.unregister(task); + listener.onFailure(e); + } + }; + return new EnrichPolicyRunner(policyName, policy, task, wrappedListener, clusterService, client(), resolver, () -> createTime, randomIntBetween(1, 10000), randomIntBetween(1, 10)); } - private ActionListener createTestListener(final CountDownLatch latch, + private ActionListener createTestListener(final CountDownLatch latch, final Consumer exceptionConsumer) { return new LatchedActionListener<>(ActionListener.wrap((r) -> logger.info("Run complete"), exceptionConsumer), latch); } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index 0b67802d6e9..5663b98aee4 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.instanceOf; public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase { @@ -42,7 +43,8 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase { "key1", Collections.singletonList("field1")); PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1); assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); - assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet()); + assertThat("Execute failed", client().execute(ExecuteEnrichPolicyAction.INSTANCE, + new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet().getStatus().isCompleted(), equalTo(true)); String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";