[ml] For the data, flush and update apis check the job state on the node running the job task instead of the coordinating node.

[test] Also re-enabled multi node qa tests.

Original commit: elastic/x-pack-elasticsearch@efb24131f0
This commit is contained in:
Martijn van Groningen 2017-02-18 18:59:01 +01:00
parent a74d1329ba
commit 8a6cea0350
9 changed files with 71 additions and 84 deletions

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
@ -28,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
@ -94,10 +92,6 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
public String getJobId() {
return jobId;
}
public boolean getCalcInterim() {
return calcInterim;
}
@ -248,10 +242,9 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager, JobManager jobManager, TransportListTasksAction listTasksAction) {
AutodetectProcessManager processManager) {
super(settings, FlushJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
FlushJobAction.Request::new, FlushJobAction.Response::new, MachineLearning.THREAD_POOL_NAME, jobManager, processManager,
Request::getJobId, listTasksAction);
FlushJobAction.Request::new, FlushJobAction.Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
}
@Override
@ -262,10 +255,8 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
}
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task,
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task,
ActionListener<FlushJobAction.Response> listener) {
jobManager.getJobOrThrowIfUnknown(request.getJobId());
InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder();
paramsBuilder.calcInterim(request.getCalcInterim());
if (request.getAdvanceTime() != null) {

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
@ -27,7 +26,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -223,11 +221,10 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager,
AutodetectProcessManager processManager, TransportListTasksAction listTasksAction) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager) {
super(settings, PostDataAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, jobManager, processManager, Request::getJobId,
listTasksAction);
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
}
@Override
@ -238,7 +235,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
}
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription()));
try {

View File

@ -7,16 +7,14 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
@ -25,18 +23,18 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
/**
@ -47,55 +45,49 @@ import java.util.function.Supplier;
public abstract class TransportJobTaskAction<OperationTask extends Task, Request extends TransportJobTaskAction.JobTaskRequest<Request>,
Response extends BaseTasksResponse & Writeable> extends TransportTasksAction<OperationTask, Request, Response, Response> {
protected final JobManager jobManager;
protected final AutodetectProcessManager processManager;
private final Function<Request, String> jobIdFromRequest;
private final TransportListTasksAction listTasksAction;
TransportJobTaskAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> requestSupplier,
Supplier<Response> responseSupplier, String nodeExecutor, JobManager jobManager,
AutodetectProcessManager processManager, Function<Request, String> jobIdFromRequest,
TransportListTasksAction listTasksAction) {
Supplier<Response> responseSupplier, String nodeExecutor, AutodetectProcessManager processManager) {
super(settings, actionName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
requestSupplier, responseSupplier, nodeExecutor);
this.jobManager = jobManager;
this.processManager = processManager;
this.jobIdFromRequest = jobIdFromRequest;
this.listTasksAction = listTasksAction;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
// the same validation that exists in AutodetectProcessManager#processData(...) and flush(...) methods
// is required here too because if the job hasn't been opened yet then no task exist for it yet and then
// #taskOperation(...) method will not be invoked, returning an empty result to the client.
// This ensures that we return an understandable error:
String jobId = jobIdFromRequest.apply(request);
jobManager.getJobOrThrowIfUnknown(jobId);
JobState jobState = jobManager.getJobState(jobId);
if (jobState != JobState.OPENED) {
listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + jobState +
String jobId = request.getJobId();
// We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the
// node running the job task.
ClusterState state = clusterService.state();
JobManager.getJobOrThrowIfUnknown(state, jobId);
PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null || jobTask.getExecutorNode() == null) {
listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
} else {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(OpenJobAction.NAME + "[c]");
listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> {
String expectedDescription = "job-" + jobId;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedDescription.equals(taskInfo.getDescription())) {
request.setTaskId(taskInfo.getTaskId());
super.doExecute(task, request, listener);
return;
}
}
listener.onFailure(new ResourceNotFoundException("task not found for job [" + jobId + "] " + listTasksResponse));
}, listener::onFailure));
request.setNodes(jobTask.getExecutorNode());
super.doExecute(task, request, listener);
}
}
@Override
protected final void taskOperation(Request request, OperationTask task, ActionListener<Response> listener) {
PersistentTasksInProgress tasks = clusterService.state().metaData().custom(PersistentTasksInProgress.TYPE);
JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks);
if (jobState == JobState.OPENED) {
innerTaskOperation(request, task, listener);
} else {
listener.onFailure(new ElasticsearchStatusException("job [" + request.getJobId() + "] state is [" + jobState +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
}
}
protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener<Response> listener);
@Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
@ -25,7 +24,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -181,10 +179,9 @@ public class UpdateProcessAction extends
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, AutodetectProcessManager processManager, TransportListTasksAction listTasksAction) {
AutodetectProcessManager processManager) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, jobManager, processManager, Request::getJobId,
listTasksAction);
Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager);
}
@Override
@ -195,7 +192,7 @@ public class UpdateProcessAction extends
}
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> {
try {
processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(),

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@ -150,7 +149,7 @@ public class JobManager extends AbstractComponent {
* @throws org.elasticsearch.ResourceNotFoundException
* if there is no job with matching the given {@code jobId}
*/
Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) {
public static Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get(jobId);
if (job == null) {

View File

@ -116,15 +116,9 @@ public class AutodetectProcessManager extends AbstractComponent {
* @return Count of records, fields, bytes, etc written
*/
public DataCounts processData(String jobId, InputStream input, DataLoadParams params) {
JobState jobState = jobManager.getJobState(jobId);
if (jobState != JobState.OPENED) {
throw new IllegalArgumentException("job [" + jobId + "] state is [" + jobState + "], but must be ["
+ JobState.OPENED + "] for processing data");
}
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
throw new IllegalStateException("job [" + jobId + "] with state [" + jobState + "] hasn't been started");
throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job");
}
try {
return communicator.writeToJob(input, params);

View File

@ -66,19 +66,17 @@ public class JobManagerTests extends ESTestCase {
}
public void testGetJobOrThrowIfUnknown_GivenUnknownJob() {
JobManager jobManager = createJobManager();
ClusterState cs = createClusterState();
ESTestCase.expectThrows(ResourceNotFoundException.class, () -> jobManager.getJobOrThrowIfUnknown(cs, "foo"));
ESTestCase.expectThrows(ResourceNotFoundException.class, () -> JobManager.getJobOrThrowIfUnknown(cs, "foo"));
}
public void testGetJobOrThrowIfUnknown_GivenKnownJob() {
JobManager jobManager = createJobManager();
Job job = buildJobBuilder("foo").build();
MlMetadata mlMetadata = new MlMetadata.Builder().putJob(job, false).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)).build();
assertEquals(job, jobManager.getJobOrThrowIfUnknown(cs, "foo"));
assertEquals(job, JobManager.getJobOrThrowIfUnknown(cs, "foo"));
}
public void testGetJob_GivenJobIdIsAll() {

View File

@ -94,7 +94,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class));
jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class);
givenAllocationWithState(JobState.OPENED);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
doAnswer(invocationOnMock -> {
@ -298,7 +297,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo"));
AutodetectProcessManager manager = createManager(communicator);
givenAllocationWithState(JobState.OPENED);
InputStream inputStream = createInputStream("");
manager.openJob("foo", 1L, false, e -> {});
@ -333,10 +331,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verify(autodetectProcess, times(1)).close();
}
private void givenAllocationWithState(JobState state) {
when(jobManager.getJobState("foo")).thenReturn(state);
}
private AutodetectProcessManager createManager(AutodetectCommunicator communicator) {
Client client = mock(Client.class);
return createManager(communicator, client);

View File

@ -5,9 +5,9 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -16,8 +16,10 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentType.JSON;
@ -35,7 +37,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertTrue((Boolean) ml.get("enabled"));
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/592")
public void testMiniFarequote() throws Exception {
String jobId = "foo1";
createFarequoteJob(jobId);
@ -43,6 +44,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
assertBusy(this::assertSameClusterStateOnAllNodes);
String postData =
"{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" +
@ -89,7 +91,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(200, response.getStatusLine().getStatusCode());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/592")
public void testMiniFarequoteWithDatafeeder() throws Exception {
String mappings = "{"
+ " \"mappings\": {"
@ -201,4 +202,28 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
return XContentHelper.convertToMap(JSON.xContent(), response.getEntity().getContent(), false);
}
// When open job api returns the cluster state on nodes other than master node or node that acted as coordinating node,
// may not have had the latest update with job state set to opened. This may fail subsequent post data, flush, or
// close calls until that node that is running the job task has applied the cluster state where job state has been set to opened.
// this method waits until all nodes in the cluster have the same cluster state version, so that such failures can be
// avoided in tests. Note that the job has been started on the node running the job task (autodetect process is running),
// this is just a workaround for inconsistency in cluster states that may happen for a small amount of time.
private void assertSameClusterStateOnAllNodes(){
assert getClusterHosts().size() > 1;
Set<Integer> versions = new HashSet<>();
for (HttpHost host : getClusterHosts()) {
try {
// Client round robins between cluster hosts:
Response response = client().performRequest("get", "/_cluster/state/version", Collections.singletonMap("local", "true"));
assertEquals(200, response.getStatusLine().getStatusCode());
int version = (Integer) responseEntityToMap(response).get("version");
logger.info("Sampled version [{}]", version);
versions.add(version);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
assertEquals(1, versions.size());
}
}