diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupJobAction.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupJobAction.java index 8168141b60c..283f3a37423 100644 --- a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupJobAction.java +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupJobAction.java @@ -6,18 +6,25 @@ package org.elasticsearch.xpack.rollup.action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.rollup.job.RollupJobTask; @@ -41,15 +48,70 @@ public class TransportGetRollupJobAction extends TransportTasksAction listener) { + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + + if (nodes.isLocalNodeElectedMaster()) { + if (stateHasRollupJobs(request, state)) { + super.doExecute(task, request, listener); + } else { + // If we couldn't find the job in the persistent task CS, it means it was deleted prior to this GET + // and we can just send an empty response, no need to go looking for the allocated task + listener.onResponse(new GetRollupJobsAction.Response(Collections.emptyList())); + } + + } else { + // Delegates GetJobs to elected master node, so it becomes the coordinating node. + // Non-master nodes may have a stale cluster state that shows jobs which are cancelled + // on the master, which makes testing difficult. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, GetRollupJobsAction.Response::new)); + } + } + } + + /** + * Check to see if the PersistentTask's cluster state contains the rollup job(s) we are interested in + */ + static boolean stateHasRollupJobs(GetRollupJobsAction.Request request, ClusterState state) { + boolean hasRollupJobs = false; + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + + if (pTasksMeta != null) { + // If the request was for _all rollup jobs, we need to look through the list of + // persistent tasks and see if at least once has a RollupJob param + if (request.getId().equals(MetaData.ALL)) { + hasRollupJobs = pTasksMeta.tasks() + .stream() + .anyMatch(persistentTask -> persistentTask.getTaskName().equals(RollupField.TASK_NAME)); + + } else if (pTasksMeta.getTask(request.getId()) != null) { + // If we're looking for a single job, we can just check directly + hasRollupJobs = true; + } + } + return hasRollupJobs; + } + @Override protected void taskOperation(GetRollupJobsAction.Request request, RollupJobTask jobTask, ActionListener listener) { List jobs = Collections.emptyList(); - if (jobTask.getConfig().getId().equals(request.getId()) || request.getId().equals(MetaData.ALL)) { + + assert jobTask.getConfig().getId().equals(request.getId()) || request.getId().equals(MetaData.ALL); + + // Little extra insurance, make sure we only return jobs that aren't cancelled + if (jobTask.isCancelled() == false) { GetRollupJobsAction.JobWrapper wrapper = new GetRollupJobsAction.JobWrapper(jobTask.getConfig(), jobTask.getStats(), - (RollupJobStatus)jobTask.getStatus()); + (RollupJobStatus) jobTask.getStatus()); jobs = Collections.singletonList(wrapper); } + listener.onResponse(new GetRollupJobsAction.Response(jobs)); } diff --git a/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/GetJobsActionRequestTests.java b/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/GetJobsActionRequestTests.java index 307a2d5f6da..419feb6f19c 100644 --- a/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/GetJobsActionRequestTests.java +++ b/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/GetJobsActionRequestTests.java @@ -6,9 +6,19 @@ package org.elasticsearch.xpack.rollup.action; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; +import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction; import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction.Request; +import org.elasticsearch.xpack.core.rollup.job.RollupJob; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; public class GetJobsActionRequestTests extends AbstractStreamableTestCase { @@ -25,6 +35,79 @@ public class GetJobsActionRequestTests extends AbstractStreamableTestCase> tasks + = Collections.singletonMap("bar", new PersistentTasksCustomMetaData.PersistentTask<>("bar", "bar", null, 1, null)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(0L, tasks))) + .build(); + boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state); + assertFalse(hasRollupJobs); + } + + public void testStateCheckMatchingPersistentTasks() { + GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("foo"); + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + Map> tasks + = Collections.singletonMap("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(0L, tasks))) + .build(); + boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state); + assertTrue(hasRollupJobs); + } + + public void testStateCheckAllMatchingPersistentTasks() { + GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("_all"); + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + Map> tasks + = Collections.singletonMap("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(0L, tasks))) + .build(); + boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state); + assertTrue(hasRollupJobs); + } + + public void testStateCheckAllWithSeveralMatchingPersistentTasks() { + GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("_all"); + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + RollupJob job2 = new RollupJob(ConfigTestHelpers.getRollupJob("bar").build(), Collections.emptyMap()); + Map> tasks = new HashMap<>(2); + tasks.put("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null)); + tasks.put("bar", new PersistentTasksCustomMetaData.PersistentTask<>("bar", RollupJob.NAME, job2, 1, null)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(0L, tasks))) + .build(); + boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state); + assertTrue(hasRollupJobs); + } } diff --git a/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml b/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml index 2a353b40445..efbaa763aff 100644 --- a/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml +++ b/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml @@ -1,7 +1,4 @@ setup: - - skip: - version: "all" - reason: "Timing issues with delete + get APIs at the moment, see: #4041" - do: headers: Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser