[ML] Upon job close validate if datafeed has been stopped

and fail otherwise.

Original commit: elastic/x-pack-elasticsearch@d13bfe1041
This commit is contained in:
Martijn van Groningen 2017-03-16 18:01:11 +01:00
parent 06ce981fed
commit 06277c3677
5 changed files with 91 additions and 34 deletions

View File

@ -81,6 +81,10 @@ public class MlMetadata implements MetaData.Custom {
return datafeeds.get(datafeedId); return datafeeds.get(datafeedId);
} }
public Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
}
@Override @Override
public Version getMinimalSupportedVersion() { public Version getMinimalSupportedVersion() {
return Version.V_5_4_0_UNRELEASED; return Version.V_5_4_0_UNRELEASED;

View File

@ -38,6 +38,8 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -49,7 +51,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.function.Predicate; import java.util.Optional;
public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobAction.Response, CloseJobAction.RequestBuilder> { public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobAction.Response, CloseJobAction.RequestBuilder> {
@ -285,25 +287,26 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
} }
PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE); PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE);
if (tasks != null) { Optional<DatafeedConfig> datafeed = mlMetadata.getDatafeedByJobId(jobId);
Predicate<PersistentTask<?>> p = t -> { if (datafeed.isPresent()) {
OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest(); DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks);
return storedRequest.getJobId().equals(jobId); if (datafeedState != DatafeedState.STOPPED) {
}; throw new ElasticsearchStatusException("cannot close job [{}], datafeed hasn't been stopped",
for (PersistentTask<?> task : tasks.findTasks(OpenJobAction.NAME, p)) { RestStatus.CONFLICT, jobId);
OpenJobAction.Request storedRequest = (OpenJobAction.Request) task.getRequest(); }
if (storedRequest.getJobId().equals(jobId)) { }
JobState jobState = (JobState) task.getStatus();
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask != null) {
JobState jobState = (JobState) jobTask.getStatus();
if (jobState.isAnyOf(JobState.OPENED, JobState.FAILED) == false) { if (jobState.isAnyOf(JobState.OPENED, JobState.FAILED) == false) {
throw new ElasticsearchStatusException("cannot close job, expected job state [{}], but got [{}]", throw new ElasticsearchStatusException("cannot close job [{}], expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, jobState); RestStatus.CONFLICT, jobId, JobState.OPENED, jobState);
} }
return task; return jobTask;
} }
} throw new ElasticsearchStatusException("cannot close job [{}], expected job state [{}], but got [{}]",
} RestStatus.CONFLICT, jobId, JobState.OPENED, JobState.CLOSED);
throw new ElasticsearchStatusException("cannot close job, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, JobState.CLOSED);
} }
static ClusterState moveJobToClosingState(String jobId, ClusterState currentState) { static ClusterState moveJobToClosingState(String jobId, ClusterState currentState) {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
@ -338,6 +339,9 @@ public class DatafeedJobRunner extends AbstractComponent {
} }
private void closeJob() { private void closeJob() {
DatafeedStateObserver observer = new DatafeedStateObserver(threadPool, clusterService);
observer.waitForState(datafeed.getId(), TimeValue.timeValueSeconds(20), DatafeedState.STOPPED, e1 -> {
if (e1 == null) {
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId()); CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId());
client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<CloseJobAction.Response>() { client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<CloseJobAction.Response>() {
@ -353,6 +357,10 @@ public class DatafeedJobRunner extends AbstractComponent {
logger.error("[" + datafeed.getJobId() + "] failed to auto-close job", e); logger.error("[" + datafeed.getJobId() + "] failed to auto-close job", e);
} }
}); });
} else {
logger.error("Cannot auto close job [" + datafeed.getJobId() + "]", e1);
}
});
} }
} }
} }

View File

@ -10,13 +10,18 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasks; import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
@ -57,13 +62,50 @@ public class CloseJobActionTests extends ESTestCase {
.putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task)))); .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task))));
ElasticsearchStatusException result = ElasticsearchStatusException result =
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build())); expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build()));
assertEquals("cannot close job, expected job state [opened], but got [opening]", result.getMessage()); assertEquals("cannot close job [job_id], expected job state [opened], but got [opening]", result.getMessage());
ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name")) ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap()))); .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap())));
result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build())); result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build()));
assertEquals("cannot close job, expected job state [opened], but got [closed]", result.getMessage()); assertEquals("cannot close job [job_id], expected job state [opened], but got [closed]", result.getMessage());
}
public void testCloseJob_datafeedNotStopped() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(), false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", Collections.singletonList("*")));
Map<Long, PersistentTask<?>> tasks = new HashMap<>();
PersistentTask<?> jobTask = createJobTask(1L, "job_id", null, JobState.OPENED);
tasks.put(1L, jobTask);
tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED));
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, tasks))).build();
ElasticsearchStatusException e =
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.validateAndFindTask("job_id", cs1));
assertEquals(RestStatus.CONFLICT, e.status());
assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage());
tasks = new HashMap<>();
tasks.put(1L, jobTask);
if (randomBoolean()) {
tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STOPPED));
}
ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, tasks))).build();
assertEquals(jobTask, CloseJobAction.validateAndFindTask("job_id", cs2));
}
public static PersistentTask<StartDatafeedAction.Request> createDatafeedTask(long id, String datafeedId, long startTime,
String nodeId, DatafeedState datafeedState) {
PersistentTask<StartDatafeedAction.Request> task =
new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime), false, true,
new PersistentTasks.Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, datafeedState);
return task;
} }
} }

View File

@ -84,8 +84,8 @@ public class MlRestTestStateCleaner {
logger.error("Got status code " + statusCode + " when closing job " + jobId); logger.error("Got status code " + statusCode + " when closing job " + jobId);
} }
} catch (Exception e) { } catch (Exception e) {
if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]") if (e.getMessage().contains("expected job state [opened], but got [closed]")
|| e.getMessage().contains("cannot close job, expected job state [opened], but got [closing]")) { || e.getMessage().contains("expected job state [opened], but got [closing]")) {
logger.debug("job [" + jobId + "] has already been closed", e); logger.debug("job [" + jobId + "] has already been closed", e);
} else { } else {
logger.warn("failed to close job [" + jobId + "]", e); logger.warn("failed to close job [" + jobId + "]", e);