[ml] changed start datafeed validation

Original commit: elastic/x-pack-elasticsearch@266d360cae
This commit is contained in:
Martijn van Groningen 2017-02-13 12:55:42 +01:00
parent 3e43b591df
commit fbcac2710a
2 changed files with 65 additions and 29 deletions

View File

@ -243,7 +243,6 @@ public class StartDatafeedAction
public static class TransportAction extends TransportPersistentAction<Request> {
private final DatafeedStateObserver observer;
private final ClusterService clusterService;
private final DatafeedJobRunner datafeedJobRunner;
@Inject
@ -253,16 +252,12 @@ public class StartDatafeedAction
ClusterService clusterService, DatafeedJobRunner datafeedJobRunner) {
super(settings, NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry,
actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.datafeedJobRunner = datafeedJobRunner;
this.observer = new DatafeedStateObserver(threadPool, clusterService);
}
@Override
protected void doExecute(Request request, ActionListener<PersistentActionResponse> listener) {
ClusterState state = clusterService.state();
StartDatafeedAction.validate(request.datafeedId, state.metaData().custom(MlMetadata.TYPE),
state.custom(PersistentTasksInProgress.TYPE), true);
ActionListener<PersistentActionResponse> finalListener =
ActionListener.wrap(response -> waitForDatafeedStarted(request, response, listener), listener::onFailure);
super.doExecute(request, finalListener);
@ -289,7 +284,8 @@ public class StartDatafeedAction
public void validate(Request request, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE);
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, false);
DiscoveryNodes nodes = clusterState.getNodes();
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, nodes);
}
@Override
@ -308,8 +304,7 @@ public class StartDatafeedAction
}
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks,
boolean checkJobState) {
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks, DiscoveryNodes nodes) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
@ -318,28 +313,31 @@ public class StartDatafeedAction
if (job == null) {
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
DatafeedJobValidator.validate(datafeed, job);
if (tasks == null) {
return;
}
// we only check job state when the user submits the start datafeed request,
// but when we persistent task framework validates the request we don't,
// because we don't know if the job task will be started before datafeed task,
// so we just wait until the job task is opened, which will happen at some point in time.
if (checkJobState) {
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
if (jobState != JobState.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, jobState);
}
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
if (jobState != JobState.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, jobState);
}
PersistentTaskInProgress<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
if (datafeedTask != null && datafeedTask.getExecutorNode() != null && datafeedState == DatafeedState.STARTED) {
if (nodes.nodeExists(datafeedTask.getExecutorNode()) == false) {
// The state is started and the node were running on no longer exists.
// We can skip the datafeed state check below, because when the node
// disappeared we didn't have time to set the state to failed.
return;
}
}
if (datafeedState == DatafeedState.STARTED) {
throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED);
}
DatafeedJobValidator.validate(datafeed, job);
}
static DiscoveryNode selectNode(Logger logger, Request request, ClusterState clusterState) {

View File

@ -26,6 +26,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
@ -41,7 +43,7 @@ public class StartDatafeedActionTests extends ESTestCase {
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), "node_id");
task = new PersistentTaskInProgress<OpenJobAction.Request>(task, randomFrom(JobState.FAILED, JobState.CLOSED,
task = new PersistentTaskInProgress<>(task, randomFrom(JobState.FAILED, JobState.CLOSED,
JobState.CLOSING, JobState.OPENING));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
@ -59,7 +61,7 @@ public class StartDatafeedActionTests extends ESTestCase {
DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build());
assertNull(node);
task = new PersistentTaskInProgress<OpenJobAction.Request>(task, JobState.OPENED);
task = new PersistentTaskInProgress<>(task, JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()))
@ -75,7 +77,7 @@ public class StartDatafeedActionTests extends ESTestCase {
.putJob(job1, false)
.build();
Exception e = expectThrows(ResourceNotFoundException.class,
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null, false));
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null, null));
assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists"));
}
@ -92,12 +94,10 @@ public class StartDatafeedActionTests extends ESTestCase {
.putDatafeed(datafeedConfig1)
.build();
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks, true));
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks, null));
assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [opened], but got [closed]"));
}
public void testValidate_dataFeedAlreadyStarted() {
Job job1 = createScheduledJob("job_id").build();
DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"));
@ -105,16 +105,54 @@ public class StartDatafeedActionTests extends ESTestCase {
.putJob(job1, false)
.putDatafeed(datafeedConfig)
.build();
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<StartDatafeedAction.Request> task =
PersistentTaskInProgress<OpenJobAction.Request> jobTask =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"),
"node_id");
jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
"node_id");
task = new PersistentTaskInProgress<>(task, DatafeedState.STARTED);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask);
taskMap.put(1L, datafeedTask);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap);
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, false));
() -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes));
assertThat(e.getMessage(), equalTo("datafeed already started, expected datafeed state [stopped], but got [started]"));
}
public void testValidate_staleTask() {
Job job1 = createScheduledJob("job_id").build();
DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"));
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.putDatafeed(datafeedConfig)
.build();
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> jobTask =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"),
"node_id2");
jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
"node_id1");
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask);
taskMap.put(1L, datafeedTask);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap);
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);
}
}