From e8a6c9f5d1eff7566002f46734d1ff0274bdc3a9 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 30 Mar 2018 06:24:29 -0700 Subject: [PATCH] [Rollup] Delegate GetJobs to master (elastic/x-pack-elasticsearch#4247) If a job is deleted and then GetJobs API is immediately called, it is possible for a job to be returned in the response. This is likely due to the GetJobs API being executed on a node with a slightly stale cluster state which shows the job as still existing. So we delegate to the master node so the list of jobs/tasks is current. After routing to the master, we need to check if the rollup job is in the PersistentTask's CS. A job can be acknowledged canceled, removed from the CS, but the allocated task is still alive. So we first check the CS to make sure it's really there before going to the allocated task to get the status. As extra precaution, when running local to the task, we also make sure the task isn't canceled before including it in the response. relates elastic/x-pack-elasticsearch#4041 Original commit: elastic/x-pack-elasticsearch@3b6fb65e128661b640bf5e66f4c6dd58c41411d0 --- .../action/TransportGetRollupJobAction.java | 66 ++++++++++++++- .../action/GetJobsActionRequestTests.java | 83 +++++++++++++++++++ .../rest-api-spec/test/rollup/delete_job.yml | 3 - 3 files changed, 147 insertions(+), 5 deletions(-) diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupJobAction.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupJobAction.java index 8168141b60c..283f3a37423 100644 --- a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupJobAction.java +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupJobAction.java @@ -6,18 +6,25 @@ package org.elasticsearch.xpack.rollup.action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.rollup.job.RollupJobTask; @@ -41,15 +48,70 @@ public class TransportGetRollupJobAction extends TransportTasksAction listener) { + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + + if (nodes.isLocalNodeElectedMaster()) { + if (stateHasRollupJobs(request, state)) { + super.doExecute(task, request, listener); + } else { + // If we couldn't find the job in the persistent task CS, it means it was deleted prior to this GET + // and we can just send an empty response, no need to go looking for the allocated task + listener.onResponse(new GetRollupJobsAction.Response(Collections.emptyList())); + } + + } else { + // Delegates GetJobs to elected master node, so it becomes the coordinating node. + // Non-master nodes may have a stale cluster state that shows jobs which are cancelled + // on the master, which makes testing difficult. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, GetRollupJobsAction.Response::new)); + } + } + } + + /** + * Check to see if the PersistentTask's cluster state contains the rollup job(s) we are interested in + */ + static boolean stateHasRollupJobs(GetRollupJobsAction.Request request, ClusterState state) { + boolean hasRollupJobs = false; + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + + if (pTasksMeta != null) { + // If the request was for _all rollup jobs, we need to look through the list of + // persistent tasks and see if at least once has a RollupJob param + if (request.getId().equals(MetaData.ALL)) { + hasRollupJobs = pTasksMeta.tasks() + .stream() + .anyMatch(persistentTask -> persistentTask.getTaskName().equals(RollupField.TASK_NAME)); + + } else if (pTasksMeta.getTask(request.getId()) != null) { + // If we're looking for a single job, we can just check directly + hasRollupJobs = true; + } + } + return hasRollupJobs; + } + @Override protected void taskOperation(GetRollupJobsAction.Request request, RollupJobTask jobTask, ActionListener listener) { List jobs = Collections.emptyList(); - if (jobTask.getConfig().getId().equals(request.getId()) || request.getId().equals(MetaData.ALL)) { + + assert jobTask.getConfig().getId().equals(request.getId()) || request.getId().equals(MetaData.ALL); + + // Little extra insurance, make sure we only return jobs that aren't cancelled + if (jobTask.isCancelled() == false) { GetRollupJobsAction.JobWrapper wrapper = new GetRollupJobsAction.JobWrapper(jobTask.getConfig(), jobTask.getStats(), - (RollupJobStatus)jobTask.getStatus()); + (RollupJobStatus) jobTask.getStatus()); jobs = Collections.singletonList(wrapper); } + listener.onResponse(new GetRollupJobsAction.Response(jobs)); } diff --git a/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/GetJobsActionRequestTests.java b/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/GetJobsActionRequestTests.java index 307a2d5f6da..419feb6f19c 100644 --- a/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/GetJobsActionRequestTests.java +++ b/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/GetJobsActionRequestTests.java @@ -6,9 +6,19 @@ package org.elasticsearch.xpack.rollup.action; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; +import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction; import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction.Request; +import org.elasticsearch.xpack.core.rollup.job.RollupJob; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; public class GetJobsActionRequestTests extends AbstractStreamableTestCase { @@ -25,6 +35,79 @@ public class GetJobsActionRequestTests extends AbstractStreamableTestCase> tasks + = Collections.singletonMap("bar", new PersistentTasksCustomMetaData.PersistentTask<>("bar", "bar", null, 1, null)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(0L, tasks))) + .build(); + boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state); + assertFalse(hasRollupJobs); + } + + public void testStateCheckMatchingPersistentTasks() { + GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("foo"); + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + Map> tasks + = Collections.singletonMap("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(0L, tasks))) + .build(); + boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state); + assertTrue(hasRollupJobs); + } + + public void testStateCheckAllMatchingPersistentTasks() { + GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("_all"); + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + Map> tasks + = Collections.singletonMap("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(0L, tasks))) + .build(); + boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state); + assertTrue(hasRollupJobs); + } + + public void testStateCheckAllWithSeveralMatchingPersistentTasks() { + GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("_all"); + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + RollupJob job2 = new RollupJob(ConfigTestHelpers.getRollupJob("bar").build(), Collections.emptyMap()); + Map> tasks = new HashMap<>(2); + tasks.put("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null)); + tasks.put("bar", new PersistentTasksCustomMetaData.PersistentTask<>("bar", RollupJob.NAME, job2, 1, null)); + ClusterState state = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, + new PersistentTasksCustomMetaData(0L, tasks))) + .build(); + boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state); + assertTrue(hasRollupJobs); + } } diff --git a/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml b/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml index 2a353b40445..efbaa763aff 100644 --- a/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml +++ b/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml @@ -1,7 +1,4 @@ setup: - - skip: - version: "all" - reason: "Timing issues with delete + get APIs at the moment, see: #4041" - do: headers: Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser