From fbcac2710ab74639cb6bf7bb0c3e841004387c08 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 13 Feb 2017 12:55:42 +0100 Subject: [PATCH] [ml] changed start datafeed validation Original commit: elastic/x-pack-elasticsearch@266d360cae4356286f9d43a4feca3cf62b1b661a --- .../xpack/ml/action/StartDatafeedAction.java | 36 ++++++------ .../ml/action/StartDatafeedActionTests.java | 58 +++++++++++++++---- 2 files changed, 65 insertions(+), 29 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 167efe6fdbe..b2ce65d6c82 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -243,7 +243,6 @@ public class StartDatafeedAction public static class TransportAction extends TransportPersistentAction { private final DatafeedStateObserver observer; - private final ClusterService clusterService; private final DatafeedJobRunner datafeedJobRunner; @Inject @@ -253,16 +252,12 @@ public class StartDatafeedAction ClusterService clusterService, DatafeedJobRunner datafeedJobRunner) { super(settings, NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry, actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT); - this.clusterService = clusterService; this.datafeedJobRunner = datafeedJobRunner; this.observer = new DatafeedStateObserver(threadPool, clusterService); } @Override protected void doExecute(Request request, ActionListener listener) { - ClusterState state = clusterService.state(); - StartDatafeedAction.validate(request.datafeedId, state.metaData().custom(MlMetadata.TYPE), - state.custom(PersistentTasksInProgress.TYPE), true); ActionListener finalListener = ActionListener.wrap(response -> waitForDatafeedStarted(request, response, listener), listener::onFailure); super.doExecute(request, finalListener); @@ -289,7 +284,8 @@ public class StartDatafeedAction public void validate(Request request, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE); - StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, false); + DiscoveryNodes nodes = clusterState.getNodes(); + StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, nodes); } @Override @@ -308,8 +304,7 @@ public class StartDatafeedAction } - static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks, - boolean checkJobState) { + static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks, DiscoveryNodes nodes) { DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(datafeedId); @@ -318,28 +313,31 @@ public class StartDatafeedAction if (job == null) { throw ExceptionsHelper.missingJobException(datafeed.getJobId()); } + DatafeedJobValidator.validate(datafeed, job); if (tasks == null) { return; } - // we only check job state when the user submits the start datafeed request, - // but when we persistent task framework validates the request we don't, - // because we don't know if the job task will be started before datafeed task, - // so we just wait until the job task is opened, which will happen at some point in time. - if (checkJobState) { - JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); - if (jobState != JobState.OPENED) { - throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]", - RestStatus.CONFLICT, JobState.OPENED, jobState); - } + JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); + if (jobState != JobState.OPENED) { + throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]", + RestStatus.CONFLICT, JobState.OPENED, jobState); } + PersistentTaskInProgress datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); + if (datafeedTask != null && datafeedTask.getExecutorNode() != null && datafeedState == DatafeedState.STARTED) { + if (nodes.nodeExists(datafeedTask.getExecutorNode()) == false) { + // The state is started and the node were running on no longer exists. + // We can skip the datafeed state check below, because when the node + // disappeared we didn't have time to set the state to failed. + return; + } + } if (datafeedState == DatafeedState.STARTED) { throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]", RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED); } - DatafeedJobValidator.validate(datafeed, job); } static DiscoveryNode selectNode(Logger logger, Request request, ClusterState clusterState) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 23b76975c47..6562e770d99 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -26,6 +26,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import java.net.InetAddress; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; @@ -41,7 +43,7 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTaskInProgress task = new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), "node_id"); - task = new PersistentTaskInProgress(task, randomFrom(JobState.FAILED, JobState.CLOSED, + task = new PersistentTaskInProgress<>(task, randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING)); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); @@ -59,7 +61,7 @@ public class StartDatafeedActionTests extends ESTestCase { DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build()); assertNull(node); - task = new PersistentTaskInProgress(task, JobState.OPENED); + task = new PersistentTaskInProgress<>(task, JobState.OPENED); tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())) @@ -75,7 +77,7 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .build(); Exception e = expectThrows(ResourceNotFoundException.class, - () -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null, false)); + () -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null, null)); assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); } @@ -92,12 +94,10 @@ public class StartDatafeedActionTests extends ESTestCase { .putDatafeed(datafeedConfig1) .build(); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks, true)); + () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks, null)); assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [opened], but got [closed]")); } - - public void testValidate_dataFeedAlreadyStarted() { Job job1 = createScheduledJob("job_id").build(); DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*")); @@ -105,16 +105,54 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .putDatafeed(datafeedConfig) .build(); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); - PersistentTaskInProgress task = + PersistentTaskInProgress jobTask = + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), + "node_id"); + jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED); + PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), "node_id"); - task = new PersistentTaskInProgress<>(task, DatafeedState.STARTED); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); + Map> taskMap = new HashMap<>(); + taskMap.put(0L, jobTask); + taskMap.put(1L, datafeedTask); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, false)); + () -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes)); assertThat(e.getMessage(), equalTo("datafeed already started, expected datafeed state [stopped], but got [started]")); } + public void testValidate_staleTask() { + Job job1 = createScheduledJob("job_id").build(); + DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*")); + MlMetadata mlMetadata1 = new MlMetadata.Builder() + .putJob(job1, false) + .putDatafeed(datafeedConfig) + .build(); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTaskInProgress jobTask = + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), + "node_id2"); + jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED); + PersistentTaskInProgress datafeedTask = + new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), + "node_id1"); + datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); + Map> taskMap = new HashMap<>(); + taskMap.put(0L, jobTask); + taskMap.put(1L, datafeedTask); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap); + StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes); + } + }