Add basic task support for executing enrich policies (#47523)

Changes the execution logic to create a new task using the execute request,
and attaches the new task to the policy runner to be updated. Also, a new
response is now returned from the execute api, which contains either the task
id of the execution, or the completed status of the run. The fields are mutually
exclusive to make it easier to discern what type of response it is.
This commit is contained in:
James Baiera 2019-10-09 14:37:53 -04:00
parent 102016d571
commit 73263c654a
13 changed files with 454 additions and 110 deletions

View File

@ -5,23 +5,26 @@
*/
package org.elasticsearch.xpack.core.enrich.action;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Objects;
public class ExecuteEnrichPolicyAction extends ActionType<AcknowledgedResponse> {
public class ExecuteEnrichPolicyAction extends ActionType<ExecuteEnrichPolicyAction.Response> {
public static final ExecuteEnrichPolicyAction INSTANCE = new ExecuteEnrichPolicyAction();
public static final String NAME = "cluster:admin/xpack/enrich/execute";
private ExecuteEnrichPolicyAction() {
super(NAME, AcknowledgedResponse::new);
super(NAME, ExecuteEnrichPolicyAction.Response::new);
}
public static class Request extends MasterNodeRequest<Request> {
@ -71,4 +74,64 @@ public class ExecuteEnrichPolicyAction extends ActionType<AcknowledgedResponse>
return Objects.hash(name);
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private final TaskId taskId;
private final ExecuteEnrichPolicyStatus status;
public Response(ExecuteEnrichPolicyStatus status) {
this.taskId = null;
this.status = status;
}
public Response(TaskId taskId) {
this.taskId = taskId;
this.status = null;
}
public TaskId getTaskId() {
return taskId;
}
public ExecuteEnrichPolicyStatus getStatus() {
return status;
}
public Response(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
this.status = new ExecuteEnrichPolicyStatus(in);
this.taskId = null;
} else {
this.taskId = TaskId.readFromStream(in);
this.status = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
boolean waitedForCompletion = status != null;
out.writeBoolean(waitedForCompletion);
if (waitedForCompletion) {
status.writeTo(out);
} else {
taskId.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
if (taskId != null) {
builder.field("task", taskId.getNodeId() + ":" + taskId.getId());
} else {
builder.field("status", status);
}
}
builder.endObject();
return builder;
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.enrich.action;
import java.io.IOException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
public class ExecuteEnrichPolicyStatus implements Task.Status {
public static final class PolicyPhases {
private PolicyPhases() {}
public static final String SCHEDULED = "SCHEDULED";
public static final String RUNNING = "RUNNING";
public static final String COMPLETE = "COMPLETE";
public static final String FAILED = "FAILED";
}
public static final String NAME = "enrich-policy-execution";
private static final String PHASE_FIELD = "phase";
private final String phase;
public ExecuteEnrichPolicyStatus(String phase) {
this.phase = phase;
}
public ExecuteEnrichPolicyStatus(StreamInput in) throws IOException {
this.phase = in.readString();
}
public String getPhase() {
return phase;
}
public boolean isCompleted() {
return PolicyPhases.COMPLETE.equals(phase);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(phase);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(PHASE_FIELD, phase);
}
builder.endObject();
return builder;
}
}

View File

@ -39,12 +39,12 @@ public class EnrichClient {
public void executeEnrichPolicy(
final ExecuteEnrichPolicyAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
final ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, listener);
}
public ActionFuture<AcknowledgedResponse> executeEnrichPolicy(final ExecuteEnrichPolicyAction.Request request) {
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
public ActionFuture<ExecuteEnrichPolicyAction.Response> executeEnrichPolicy(final ExecuteEnrichPolicyAction.Request request) {
final PlainActionFuture<ExecuteEnrichPolicyAction.Response> listener = PlainActionFuture.newFuture();
client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, listener);
return listener;
}

View File

@ -14,7 +14,7 @@
- do:
enrich.execute_policy:
name: policy-crud
- is_true: acknowledged
- match: { status.phase: "COMPLETE" }
- do:
enrich.get_policy:

View File

@ -163,14 +163,11 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
}
EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool,
new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis);
EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService(settings, client,
clusterService, threadPool, enrichPolicyLocks);
enrichPolicyMaintenanceService.initialize();
return Arrays.asList(
enrichPolicyLocks,
enrichPolicyExecutor,
new EnrichCoordinatorProxyAction.Coordinator(client, settings),
enrichPolicyMaintenanceService
);

View File

@ -6,7 +6,9 @@
package org.elasticsearch.xpack.enrich;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import org.elasticsearch.action.ActionListener;
@ -15,13 +17,21 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
public class EnrichPolicyExecutor {
private final ClusterService clusterService;
private final Client client;
private final TaskManager taskManager;
private final ThreadPool threadPool;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;
@ -31,15 +41,17 @@ public class EnrichPolicyExecutor {
private final int maxForceMergeAttempts;
private final Semaphore policyExecutionPermits;
EnrichPolicyExecutor(Settings settings,
public EnrichPolicyExecutor(Settings settings,
ClusterService clusterService,
Client client,
TaskManager taskManager,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks policyLocks,
LongSupplier nowSupplier) {
this.clusterService = clusterService;
this.client = client;
this.taskManager = taskManager;
this.threadPool = threadPool;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
@ -68,51 +80,123 @@ public class EnrichPolicyExecutor {
}
}
private class PolicyUnlockingListener implements ActionListener<PolicyExecutionResult> {
private class PolicyCompletionListener implements ActionListener<ExecuteEnrichPolicyStatus> {
private final String policyName;
private final ActionListener<PolicyExecutionResult> listener;
private final ExecuteEnrichPolicyTask task;
private final BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse;
private final BiConsumer<Task, Exception> onFailure;
PolicyUnlockingListener(String policyName, ActionListener<PolicyExecutionResult> listener) {
PolicyCompletionListener(String policyName, ExecuteEnrichPolicyTask task,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
this.policyName = policyName;
this.listener = listener;
this.task = task;
this.onResponse = onResponse;
this.onFailure = onFailure;
}
@Override
public void onResponse(PolicyExecutionResult policyExecutionResult) {
public void onResponse(ExecuteEnrichPolicyStatus status) {
assert ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE.equals(status.getPhase()) : "incomplete task returned";
releasePolicy(policyName);
listener.onResponse(policyExecutionResult);
try {
taskManager.unregister(task);
} finally {
onResponse.accept(task, status);
}
}
@Override
public void onFailure(Exception e) {
// Set task status to failed to avoid having to catch and rethrow exceptions everywhere
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
releasePolicy(policyName);
listener.onFailure(e);
try {
taskManager.unregister(task);
} finally {
onFailure.accept(task, e);
}
}
}
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier,
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
return new EnrichPolicyRunner(policyName, policy, task, listener, clusterService, client, indexNameExpressionResolver, nowSupplier,
fetchSize, maxForceMergeAttempts);
}
public void runPolicy(String policyId, ActionListener<PolicyExecutionResult> listener) {
private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
// Look up policy in policy store and execute it
EnrichPolicy policy = EnrichStore.getPolicy(policyId, clusterService.state());
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), clusterService.state());
if (policy == null) {
throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + policyId + "]");
} else {
runPolicy(policyId, policy, listener);
throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + request.getName() + "]");
}
return policy;
}
public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
runPolicy(request, getPolicy(request), listener);
}
public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
runPolicy(request, getPolicy(request), listener);
}
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, policy, (t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e));
}
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
TaskListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, policy, listener::onResponse, listener::onFailure);
}
private Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
tryLockingPolicy(request.getName());
try {
return runPolicyTask(request, policy, onResponse, onFailure);
} catch (Exception e) {
// Be sure to unlock if submission failed.
releasePolicy(request.getName());
throw e;
}
}
public void runPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
tryLockingPolicy(policyName);
private Task runPolicyTask(final ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
Task asyncTask = taskManager.register("enrich", "policy_execution", new TaskAwareRequest() {
@Override
public void setParentTask(TaskId taskId) {
request.setParentTask(taskId);
}
@Override
public TaskId getParentTask() {
return request.getParentTask();
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers);
}
@Override
public String getDescription() {
return request.getName();
}
});
ExecuteEnrichPolicyTask task = (ExecuteEnrichPolicyTask) asyncTask;
try {
Runnable runnable = createPolicyRunner(policyName, policy, new PolicyUnlockingListener(policyName, listener));
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
PolicyCompletionListener completionListener = new PolicyCompletionListener(request.getName(), task, onResponse, onFailure);
Runnable runnable = createPolicyRunner(request.getName(), policy, task, completionListener);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
return asyncTask;
} catch (Exception e) {
// Be sure to unlock if submission failed.
releasePolicy(policyName);
// Unregister task in case of exception
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
taskManager.unregister(asyncTask);
throw e;
}
}

View File

@ -48,6 +48,7 @@ import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -70,7 +71,8 @@ public class EnrichPolicyRunner implements Runnable {
private final String policyName;
private final EnrichPolicy policy;
private final ActionListener<PolicyExecutionResult> listener;
private final ExecuteEnrichPolicyTask task;
private final ActionListener<ExecuteEnrichPolicyStatus> listener;
private final ClusterService clusterService;
private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@ -78,11 +80,13 @@ public class EnrichPolicyRunner implements Runnable {
private final int fetchSize;
private final int maxForceMergeAttempts;
EnrichPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener,
ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier, int fetchSize, int maxForceMergeAttempts) {
EnrichPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener, ClusterService clusterService, Client client,
IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier, int fetchSize,
int maxForceMergeAttempts) {
this.policyName = policyName;
this.policy = policy;
this.task = task;
this.listener = listener;
this.clusterService = clusterService;
this.client = client;
@ -94,8 +98,9 @@ public class EnrichPolicyRunner implements Runnable {
@Override
public void run() {
// Collect the source index information
logger.info("Policy [{}]: Running enrich policy", policyName);
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING));
// Collect the source index information
final String[] sourceIndices = policy.getIndices().toArray(new String[0]);
logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices);
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices);
@ -451,7 +456,9 @@ public class EnrichPolicyRunner implements Runnable {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
logger.info("Policy [{}]: Policy execution complete", policyName);
listener.onResponse(new PolicyExecutionResult(true));
ExecuteEnrichPolicyStatus completeStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE);
task.setStatus(completeStatus);
listener.onResponse(completeStatus);
}
@Override

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import java.util.Map;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
class ExecuteEnrichPolicyTask extends Task {
private volatile ExecuteEnrichPolicyStatus status;
ExecuteEnrichPolicyTask(long id, String type, String action, String description, TaskId parentTask,
Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
}
@Override
public Status getStatus() {
return status;
}
void setStatus(ExecuteEnrichPolicyStatus status) {
this.status = status;
}
}

View File

@ -1,18 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
public class PolicyExecutionResult {
private final boolean completed;
public PolicyExecutionResult(boolean completed) {
this.completed = completed;
}
public boolean isCompleted() {
return completed;
}
}

View File

@ -7,8 +7,8 @@ package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -16,29 +16,34 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor;
import org.elasticsearch.xpack.enrich.PolicyExecutionResult;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import java.io.IOException;
public class TransportExecuteEnrichPolicyAction
extends TransportMasterNodeAction<ExecuteEnrichPolicyAction.Request, AcknowledgedResponse> {
extends TransportMasterNodeAction<ExecuteEnrichPolicyAction.Request, ExecuteEnrichPolicyAction.Response> {
private final EnrichPolicyExecutor executor;
@Inject
public TransportExecuteEnrichPolicyAction(TransportService transportService,
public TransportExecuteEnrichPolicyAction(Settings settings,
Client client,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyExecutor executor) {
EnrichPolicyLocks enrichPolicyLocks) {
super(ExecuteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
ExecuteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
this.executor = executor;
this.executor = new EnrichPolicyExecutor(settings, clusterService, client, transportService.getTaskManager(), threadPool,
new IndexNameExpressionResolver(), enrichPolicyLocks, System::currentTimeMillis);
}
@Override
@ -46,22 +51,18 @@ public class TransportExecuteEnrichPolicyAction
return ThreadPool.Names.SAME;
}
protected AcknowledgedResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
protected ExecuteEnrichPolicyAction.Response read(StreamInput in) throws IOException {
return new ExecuteEnrichPolicyAction.Response(in);
}
@Override
protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
executor.runPolicy(request.getName(), new ActionListener<PolicyExecutionResult>() {
ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
executor.runPolicy(request, new ActionListener<ExecuteEnrichPolicyStatus>() {
@Override
public void onResponse(PolicyExecutionResult policyExecutionResult) {
listener.onResponse(new AcknowledgedResponse(policyExecutionResult.isCompleted()));
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
}
@Override

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.enrich;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -19,10 +18,13 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -31,9 +33,10 @@ import static org.hamcrest.CoreMatchers.containsString;
public class EnrichPolicyExecutorTests extends ESTestCase {
private static ThreadPool testThreadPool;
private static final ActionListener<PolicyExecutionResult> noOpListener = new ActionListener<PolicyExecutionResult>() {
private static TaskManager testTaskManager;
private static final ActionListener<ExecuteEnrichPolicyStatus> noOpListener = new ActionListener<ExecuteEnrichPolicyStatus>() {
@Override
public void onResponse(PolicyExecutionResult policyExecutionResult) { }
public void onResponse(ExecuteEnrichPolicyStatus ignored) { }
@Override
public void onFailure(Exception e) { }
@ -42,6 +45,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
@BeforeClass
public static void beforeCLass() {
testThreadPool = new TestThreadPool("EnrichPolicyExecutorTests");
testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet());
}
@AfterClass
@ -54,18 +58,24 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
*/
private static class BlockingTestPolicyRunner implements Runnable {
private final CountDownLatch latch;
private final ActionListener<PolicyExecutionResult> listener;
private final ExecuteEnrichPolicyTask task;
private final ActionListener<ExecuteEnrichPolicyStatus> listener;
BlockingTestPolicyRunner(CountDownLatch latch, ActionListener<PolicyExecutionResult> listener) {
BlockingTestPolicyRunner(CountDownLatch latch, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
this.latch = latch;
this.task = task;
this.listener = listener;
}
@Override
public void run() {
try {
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING));
latch.await();
listener.onResponse(new PolicyExecutionResult(true));
ExecuteEnrichPolicyStatus newStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE);
task.setStatus(newStatus);
listener.onResponse(newStatus);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for test framework to continue the test", e);
}
@ -78,33 +88,37 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
*/
private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor {
EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) {
super(settings, clusterService, client, threadPool, indexNameExpressionResolver, new EnrichPolicyLocks(), nowSupplier);
EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, TaskManager taskManager,
ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier) {
super(settings, clusterService, client, taskManager, threadPool, indexNameExpressionResolver, new EnrichPolicyLocks(),
nowSupplier);
}
private CountDownLatch currentLatch;
CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener<ExecuteEnrichPolicyStatus> listener) {
currentLatch = new CountDownLatch(1);
runPolicy(policyName, policy, listener);
ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(policyName);
runPolicy(request, policy, listener);
return currentLatch;
}
@Override
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener) {
if (currentLatch == null) {
throw new IllegalStateException("Use the testRunPolicy method on this test instance");
}
return new BlockingTestPolicyRunner(currentLatch, listener);
return new BlockingTestPolicyRunner(currentLatch, task, listener);
}
}
public void testNonConcurrentPolicyExecution() throws InterruptedException {
String testPolicyName = "test_policy";
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Arrays.asList("some_index"), "keyfield",
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield",
Collections.singletonList("valuefield"));
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testThreadPool,
new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testTaskManager,
testThreadPool, new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
// Launch a fake policy run that will block until firstTaskBlock is counted down.
final CountDownLatch firstTaskComplete = new CountDownLatch(1);
@ -143,8 +157,8 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build();
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield",
Collections.singletonList("valuefield"));
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testThreadPool,
new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testTaskManager,
testThreadPool, new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
// Launch a two fake policy runs that will block until counted down to use up the maximum concurrent
final CountDownLatch firstTaskComplete = new CountDownLatch(1);

View File

@ -12,6 +12,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -49,8 +50,17 @@ import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
@ -65,6 +75,20 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class);
}
private static ThreadPool testThreadPool;
private static TaskManager testTaskManager;
@BeforeClass
public static void beforeCLass() {
testThreadPool = new TestThreadPool("EnrichPolicyRunnerTests");
testTaskManager = new TaskManager(Settings.EMPTY, testThreadPool, Collections.emptySet());
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS);
}
public void testRunner() throws Exception {
final String sourceIndex = "source-index";
IndexResponse indexRequest = client().index(new IndexRequest()
@ -106,7 +130,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -191,7 +215,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -294,7 +318,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -402,7 +426,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -510,7 +534,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -574,7 +598,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -604,7 +628,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -652,7 +676,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -704,7 +728,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -787,7 +811,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -913,7 +937,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -1046,7 +1070,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -1171,7 +1195,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
@ -1268,14 +1292,50 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
String createdEnrichIndex = ".enrich-test1-" + createTime;
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class);
Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() {
@Override
public void setParentTask(TaskId taskId) {}
@Override
public TaskId getParentTask() {
return TaskId.EMPTY_TASK_ID;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers);
}
@Override
public String getDescription() {
return policyName;
}
});
ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask);
// The executor would wrap the listener in order to clean up the task in the
// task manager, but we're just testing the runner, so we make sure to clean
// up after ourselves.
ActionListener<ExecuteEnrichPolicyStatus> wrappedListener = new ActionListener<ExecuteEnrichPolicyStatus>() {
@Override
public void onResponse(ExecuteEnrichPolicyStatus policyExecutionResult) {
testTaskManager.unregister(task);
listener.onResponse(policyExecutionResult);
}
@Override
public void onFailure(Exception e) {
testTaskManager.unregister(task);
listener.onFailure(e);
}
};
AtomicInteger forceMergeAttempts = new AtomicInteger(0);
final XContentBuilder unmergedDocument = SmileXContent.contentBuilder()
.startObject().field("field1", "value1.1").field("field2", 2).field("field5", "value5").endObject();
EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver,
() -> createTime, randomIntBetween(1, 10000), randomIntBetween(3, 10)) {
EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, task, wrappedListener, clusterService, client(),
resolver, () -> createTime, randomIntBetween(1, 10000), randomIntBetween(3, 10)) {
@Override
protected void ensureSingleSegment(String destinationIndexName, int attempt) {
forceMergeAttempts.incrementAndGet();
@ -1351,15 +1411,51 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
ensureEnrichIndexIsReadOnly(createdEnrichIndex);
}
private EnrichPolicyRunner createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener,
Long createTime) {
private EnrichPolicyRunner createPolicyRunner(String policyName, EnrichPolicy policy,
ActionListener<ExecuteEnrichPolicyStatus> listener, Long createTime) {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class);
return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime,
Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() {
@Override
public void setParentTask(TaskId taskId) {}
@Override
public TaskId getParentTask() {
return TaskId.EMPTY_TASK_ID;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers);
}
@Override
public String getDescription() {
return policyName;
}
});
ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask);
// The executor would wrap the listener in order to clean up the task in the
// task manager, but we're just testing the runner, so we make sure to clean
// up after ourselves.
ActionListener<ExecuteEnrichPolicyStatus> wrappedListener = new ActionListener<ExecuteEnrichPolicyStatus>() {
@Override
public void onResponse(ExecuteEnrichPolicyStatus policyExecutionResult) {
testTaskManager.unregister(task);
listener.onResponse(policyExecutionResult);
}
@Override
public void onFailure(Exception e) {
testTaskManager.unregister(task);
listener.onFailure(e);
}
};
return new EnrichPolicyRunner(policyName, policy, task, wrappedListener, clusterService, client(), resolver, () -> createTime,
randomIntBetween(1, 10000), randomIntBetween(1, 10));
}
private ActionListener<PolicyExecutionResult> createTestListener(final CountDownLatch latch,
private ActionListener<ExecuteEnrichPolicyStatus> createTestListener(final CountDownLatch latch,
final Consumer<Exception> exceptionConsumer) {
return new LatchedActionListener<>(ActionListener.wrap((r) -> logger.info("Run complete"), exceptionConsumer), latch);
}

View File

@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
@ -42,7 +43,8 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
"key1", Collections.singletonList("field1"));
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet());
assertThat("Execute failed", client().execute(ExecuteEnrichPolicyAction.INSTANCE,
new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet().getStatus().isCompleted(), equalTo(true));
String pipelineConfig =
"{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";