Fix executing enrich policies stats (#48132)

The enrich stats api picked the wrong task to be displayed
in the executing stats section.

In case `wait_for_completion` was set to `false` then no task
was being displayed and if that param was set to `true` then
the wrong task was being displayed (transport action task instead
of enrich policy executor task).

Testing executing policies in enrich stats api is tricky.
I have verified locally that this commit fixes the bug.
This commit is contained in:
Martijn van Groningen 2019-10-22 07:41:11 +02:00
parent c09b62d5bf
commit 0ec0ab64c9
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
3 changed files with 5 additions and 9 deletions

View File

@ -68,12 +68,6 @@ public class ExecuteEnrichPolicyAction extends ActionType<ExecuteEnrichPolicyAct
return null; return null;
} }
// This will be displayed in tasks api and allows stats api to figure out which policies are being executed.
@Override
public String getDescription() {
return name;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;

View File

@ -29,6 +29,8 @@ import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
public class EnrichPolicyExecutor { public class EnrichPolicyExecutor {
public static final String TASK_ACTION = "policy_execution";
private final ClusterService clusterService; private final ClusterService clusterService;
private final Client client; private final Client client;
private final TaskManager taskManager; private final TaskManager taskManager;
@ -165,7 +167,7 @@ public class EnrichPolicyExecutor {
private Task runPolicyTask(final ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, private Task runPolicyTask(final ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) { BiConsumer<Task, ExecuteEnrichPolicyStatus> onResponse, BiConsumer<Task, Exception> onFailure) {
Task asyncTask = taskManager.register("enrich", "policy_execution", new TaskAwareRequest() { Task asyncTask = taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() {
@Override @Override
public void setParentTask(TaskId taskId) { public void setParentTask(TaskId taskId) {
request.setParentTask(taskId); request.setParentTask(taskId);

View File

@ -22,7 +22,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
@ -78,7 +78,7 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction<Enrich
.sorted(Comparator.comparing(CoordinatorStats::getNodeId)) .sorted(Comparator.comparing(CoordinatorStats::getNodeId))
.collect(Collectors.toList()); .collect(Collectors.toList());
List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks().values().stream() List<ExecutingPolicy> policyExecutionTasks = taskManager.getTasks().values().stream()
.filter(t -> t.getAction().equals(ExecuteEnrichPolicyAction.NAME)) .filter(t -> t.getAction().equals(EnrichPolicyExecutor.TASK_ACTION))
.map(t -> t.taskInfo(clusterService.localNode().getId(), true)) .map(t -> t.taskInfo(clusterService.localNode().getId(), true))
.map(t -> new ExecutingPolicy(t.getDescription(), t)) .map(t -> new ExecutingPolicy(t.getDescription(), t))
.sorted(Comparator.comparing(ExecutingPolicy::getName)) .sorted(Comparator.comparing(ExecutingPolicy::getName))