[Rollup] Delegate GetJobs to master (elastic/x-pack-elasticsearch#4247)
If a job is deleted and then GetJobs API is immediately called, it is possible for a job to be returned in the response. This is likely due to the GetJobs API being executed on a node with a slightly stale cluster state which shows the job as still existing. So we delegate to the master node so the list of jobs/tasks is current. After routing to the master, we need to check if the rollup job is in the PersistentTask's CS. A job can be acknowledged canceled, removed from the CS, but the allocated task is still alive. So we first check the CS to make sure it's really there before going to the allocated task to get the status. As extra precaution, when running local to the task, we also make sure the task isn't canceled before including it in the response. relates elastic/x-pack-elasticsearch#4041 Original commit: elastic/x-pack-elasticsearch@3b6fb65e12
This commit is contained in:
parent
54539a1eb0
commit
e8a6c9f5d1
|
@ -6,18 +6,25 @@
|
||||||
package org.elasticsearch.xpack.rollup.action;
|
package org.elasticsearch.xpack.rollup.action;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.ActionListenerResponseHandler;
|
||||||
import org.elasticsearch.action.FailedNodeException;
|
import org.elasticsearch.action.FailedNodeException;
|
||||||
import org.elasticsearch.action.TaskOperationFailure;
|
import org.elasticsearch.action.TaskOperationFailure;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||||
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||||
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||||
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
|
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
|
||||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
|
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
|
||||||
import org.elasticsearch.xpack.rollup.job.RollupJobTask;
|
import org.elasticsearch.xpack.rollup.job.RollupJobTask;
|
||||||
|
@ -41,15 +48,70 @@ public class TransportGetRollupJobAction extends TransportTasksAction<RollupJobT
|
||||||
GetRollupJobsAction.Response::new, ThreadPool.Names.SAME);
|
GetRollupJobsAction.Response::new, ThreadPool.Names.SAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doExecute(Task task, GetRollupJobsAction.Request request, ActionListener<GetRollupJobsAction.Response> listener) {
|
||||||
|
final ClusterState state = clusterService.state();
|
||||||
|
final DiscoveryNodes nodes = state.nodes();
|
||||||
|
|
||||||
|
if (nodes.isLocalNodeElectedMaster()) {
|
||||||
|
if (stateHasRollupJobs(request, state)) {
|
||||||
|
super.doExecute(task, request, listener);
|
||||||
|
} else {
|
||||||
|
// If we couldn't find the job in the persistent task CS, it means it was deleted prior to this GET
|
||||||
|
// and we can just send an empty response, no need to go looking for the allocated task
|
||||||
|
listener.onResponse(new GetRollupJobsAction.Response(Collections.emptyList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// Delegates GetJobs to elected master node, so it becomes the coordinating node.
|
||||||
|
// Non-master nodes may have a stale cluster state that shows jobs which are cancelled
|
||||||
|
// on the master, which makes testing difficult.
|
||||||
|
if (nodes.getMasterNode() == null) {
|
||||||
|
listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
|
||||||
|
} else {
|
||||||
|
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
|
||||||
|
new ActionListenerResponseHandler<>(listener, GetRollupJobsAction.Response::new));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check to see if the PersistentTask's cluster state contains the rollup job(s) we are interested in
|
||||||
|
*/
|
||||||
|
static boolean stateHasRollupJobs(GetRollupJobsAction.Request request, ClusterState state) {
|
||||||
|
boolean hasRollupJobs = false;
|
||||||
|
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||||
|
|
||||||
|
if (pTasksMeta != null) {
|
||||||
|
// If the request was for _all rollup jobs, we need to look through the list of
|
||||||
|
// persistent tasks and see if at least once has a RollupJob param
|
||||||
|
if (request.getId().equals(MetaData.ALL)) {
|
||||||
|
hasRollupJobs = pTasksMeta.tasks()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(persistentTask -> persistentTask.getTaskName().equals(RollupField.TASK_NAME));
|
||||||
|
|
||||||
|
} else if (pTasksMeta.getTask(request.getId()) != null) {
|
||||||
|
// If we're looking for a single job, we can just check directly
|
||||||
|
hasRollupJobs = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return hasRollupJobs;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void taskOperation(GetRollupJobsAction.Request request, RollupJobTask jobTask,
|
protected void taskOperation(GetRollupJobsAction.Request request, RollupJobTask jobTask,
|
||||||
ActionListener<GetRollupJobsAction.Response> listener) {
|
ActionListener<GetRollupJobsAction.Response> listener) {
|
||||||
List<GetRollupJobsAction.JobWrapper> jobs = Collections.emptyList();
|
List<GetRollupJobsAction.JobWrapper> jobs = Collections.emptyList();
|
||||||
if (jobTask.getConfig().getId().equals(request.getId()) || request.getId().equals(MetaData.ALL)) {
|
|
||||||
|
assert jobTask.getConfig().getId().equals(request.getId()) || request.getId().equals(MetaData.ALL);
|
||||||
|
|
||||||
|
// Little extra insurance, make sure we only return jobs that aren't cancelled
|
||||||
|
if (jobTask.isCancelled() == false) {
|
||||||
GetRollupJobsAction.JobWrapper wrapper = new GetRollupJobsAction.JobWrapper(jobTask.getConfig(), jobTask.getStats(),
|
GetRollupJobsAction.JobWrapper wrapper = new GetRollupJobsAction.JobWrapper(jobTask.getConfig(), jobTask.getStats(),
|
||||||
(RollupJobStatus)jobTask.getStatus());
|
(RollupJobStatus) jobTask.getStatus());
|
||||||
jobs = Collections.singletonList(wrapper);
|
jobs = Collections.singletonList(wrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
listener.onResponse(new GetRollupJobsAction.Response(jobs));
|
listener.onResponse(new GetRollupJobsAction.Response(jobs));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,9 +6,19 @@
|
||||||
package org.elasticsearch.xpack.rollup.action;
|
package org.elasticsearch.xpack.rollup.action;
|
||||||
|
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||||
|
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
|
||||||
|
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
|
||||||
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction.Request;
|
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction.Request;
|
||||||
|
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
public class GetJobsActionRequestTests extends AbstractStreamableTestCase<Request> {
|
public class GetJobsActionRequestTests extends AbstractStreamableTestCase<Request> {
|
||||||
|
@ -25,6 +35,79 @@ public class GetJobsActionRequestTests extends AbstractStreamableTestCase<Reques
|
||||||
protected Request createBlankInstance() {
|
protected Request createBlankInstance() {
|
||||||
return new Request();
|
return new Request();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testStateCheckNoPersistentTasks() {
|
||||||
|
GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("foo");
|
||||||
|
ClusterState state = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||||
|
new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
|
||||||
|
.build();
|
||||||
|
boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state);
|
||||||
|
assertFalse(hasRollupJobs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testStateCheckAllNoPersistentTasks() {
|
||||||
|
GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("_all");
|
||||||
|
ClusterState state = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||||
|
new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
|
||||||
|
.build();
|
||||||
|
boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state);
|
||||||
|
assertFalse(hasRollupJobs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testStateCheckNoMatchingPersistentTasks() {
|
||||||
|
GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("foo");
|
||||||
|
Map<String, PersistentTasksCustomMetaData.PersistentTask<?>> tasks
|
||||||
|
= Collections.singletonMap("bar", new PersistentTasksCustomMetaData.PersistentTask<>("bar", "bar", null, 1, null));
|
||||||
|
ClusterState state = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||||
|
new PersistentTasksCustomMetaData(0L, tasks)))
|
||||||
|
.build();
|
||||||
|
boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state);
|
||||||
|
assertFalse(hasRollupJobs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testStateCheckMatchingPersistentTasks() {
|
||||||
|
GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("foo");
|
||||||
|
RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap());
|
||||||
|
Map<String, PersistentTasksCustomMetaData.PersistentTask<?>> tasks
|
||||||
|
= Collections.singletonMap("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null));
|
||||||
|
ClusterState state = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||||
|
new PersistentTasksCustomMetaData(0L, tasks)))
|
||||||
|
.build();
|
||||||
|
boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state);
|
||||||
|
assertTrue(hasRollupJobs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testStateCheckAllMatchingPersistentTasks() {
|
||||||
|
GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("_all");
|
||||||
|
RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap());
|
||||||
|
Map<String, PersistentTasksCustomMetaData.PersistentTask<?>> tasks
|
||||||
|
= Collections.singletonMap("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null));
|
||||||
|
ClusterState state = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||||
|
new PersistentTasksCustomMetaData(0L, tasks)))
|
||||||
|
.build();
|
||||||
|
boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state);
|
||||||
|
assertTrue(hasRollupJobs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testStateCheckAllWithSeveralMatchingPersistentTasks() {
|
||||||
|
GetRollupJobsAction.Request request = new GetRollupJobsAction.Request("_all");
|
||||||
|
RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap());
|
||||||
|
RollupJob job2 = new RollupJob(ConfigTestHelpers.getRollupJob("bar").build(), Collections.emptyMap());
|
||||||
|
Map<String, PersistentTasksCustomMetaData.PersistentTask<?>> tasks = new HashMap<>(2);
|
||||||
|
tasks.put("foo", new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupJob.NAME, job, 1, null));
|
||||||
|
tasks.put("bar", new PersistentTasksCustomMetaData.PersistentTask<>("bar", RollupJob.NAME, job2, 1, null));
|
||||||
|
ClusterState state = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||||
|
new PersistentTasksCustomMetaData(0L, tasks)))
|
||||||
|
.build();
|
||||||
|
boolean hasRollupJobs = TransportGetRollupJobAction.stateHasRollupJobs(request, state);
|
||||||
|
assertTrue(hasRollupJobs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,4 @@
|
||||||
setup:
|
setup:
|
||||||
- skip:
|
|
||||||
version: "all"
|
|
||||||
reason: "Timing issues with delete + get APIs at the moment, see: #4041"
|
|
||||||
- do:
|
- do:
|
||||||
headers:
|
headers:
|
||||||
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
|
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
|
||||||
|
|
Loading…
Reference in New Issue