diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 2aca79346d5..d52f11bbd3f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -315,7 +315,7 @@ public class StartDatafeedAction @Override public Assignment getAssignment(Request request, ClusterState clusterState) { - DiscoveryNode discoveryNode = selectNode(logger, request, clusterState); + DiscoveryNode discoveryNode = selectNode(logger, request.getDatafeedId(), clusterState); // TODO: Add proper explanation if (discoveryNode == null) { return NO_NODE_FOUND; @@ -378,27 +378,34 @@ public class StartDatafeedAction } } if (datafeedState == DatafeedState.STARTED) { - throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]", - RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED); + throw new ElasticsearchStatusException("datafeed [{}] already started, expected datafeed state [{}], but got [{}]", + RestStatus.CONFLICT, datafeedId, DatafeedState.STOPPED, DatafeedState.STARTED); } } - static DiscoveryNode selectNode(Logger logger, Request request, ClusterState clusterState) { + public static DiscoveryNode selectNode(Logger logger, String datafeedId, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); + DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); DiscoveryNodes nodes = clusterState.getNodes(); - JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); - if (jobState == JobState.OPENED) { - PersistentTaskInProgress task = MlMetadata.getJobTask(datafeed.getJobId(), tasks); - return nodes.get(task.getExecutorNode()); - } else { - // lets try again later when the job has been opened: - logger.debug("cannot start datafeeder, because job's [{}] state is [{}] while state [{}] is required", - datafeed.getJobId(), jobState, JobState.OPENED); + PersistentTaskInProgress jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks); + if (jobTask == null) { + logger.debug("cannot start datafeed [{}], job task doesn't yet exist", datafeed.getId()); return null; } + if (jobTask.needsReassignment(nodes)) { + logger.debug("cannot start datafeed [{}], job [{}] is unassigned or unassigned to a non existing node", + datafeed.getId(), datafeed.getJobId()); + return null; + } + if (jobTask.getStatus() != JobState.OPENED) { + // lets try again later when the job has been opened: + logger.debug("cannot start datafeed [{}], because job's [{}] state is [{}] while state [{}] is required", + datafeed.getId(), datafeed.getJobId(), jobTask.getStatus(), JobState.OPENED); + return null; + } + return nodes.get(jobTask.getExecutorNode()); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index 2ee2b32dcad..d372c1d0ae7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -76,11 +76,13 @@ public abstract class TransportJobTaskAction listener) { - PersistentTasksInProgress tasks = clusterService.state().metaData().custom(PersistentTasksInProgress.TYPE); + ClusterState state = clusterService.state(); + PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks); if (jobState == JobState.OPENED) { innerTaskOperation(request, task, listener); } else { + logger.warn("Unexpected job state based on cluster state version [{}]", state.getVersion()); listener.onFailure(new ElasticsearchStatusException("job [" + request.getJobId() + "] state is [" + jobState + "], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 4ef2dfc222a..2167862f99e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -5,9 +5,11 @@ */ package org.elasticsearch.xpack.ml.datafeed; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; @@ -15,6 +17,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; @@ -66,8 +69,18 @@ public class DatafeedJobRunner extends AbstractComponent { } public void run(StartDatafeedAction.DatafeedTask task, Consumer handler) { - MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); - DatafeedConfig datafeed = mlMetadata.getDatafeed(task.getDatafeedId()); + String datafeedId = task.getDatafeedId(); + ClusterState state = clusterService.state(); + // CS on master node can be ahead on the node where job and datafeed tasks run, + // so check again and fail if in case of unexpected cs. Persist tasks will retry later then. + if (StartDatafeedAction.selectNode(logger, datafeedId, state) == null) { + handler.accept(new ElasticsearchStatusException("Local cs [{}] isn't ready to start datafeed [{}] yet", + RestStatus.CONFLICT, state.getVersion(), datafeedId)); + return; + } + logger.info("Attempt to start datafeed based on cluster state version [{}]", state.getVersion()); + MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); + DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); gatherInformation(job.getId(), (buckets, dataCounts) -> { long latestFinalBucketEndMs = -1L; @@ -80,11 +93,13 @@ public class DatafeedJobRunner extends AbstractComponent { latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); } Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task); - UpdatePersistentTaskStatusAction.Request updateDatafeedStatus = - new UpdatePersistentTaskStatusAction.Request(task.getPersistentTaskId(), DatafeedState.STARTED); - client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateDatafeedStatus, ActionListener.wrap(r -> { - innerRun(holder, task.getStartTime(), task.getEndTime()); - }, handler)); + updateDatafeedState(task.getPersistentTaskId(), DatafeedState.STARTED, e -> { + if (e != null) { + handler.accept(e); + } else { + innerRun(holder, task.getStartTime(), task.getEndTime()); + } + }); }, handler); } @@ -211,6 +226,13 @@ public class DatafeedJobRunner extends AbstractComponent { }); } + private void updateDatafeedState(long persistentTaskId, DatafeedState datafeedState, Consumer handler) { + UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(persistentTaskId, datafeedState); + client.execute(UpdatePersistentTaskStatusAction.INSTANCE, request, ActionListener.wrap(r -> { + handler.accept(null); + }, handler)); + } + private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) { Long frequency = datafeed.getFrequency(); Long bucketSpan = job.getAnalysisConfig().getBucketSpan(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 5220a1121a0..7d3ddce0cc0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -183,13 +183,13 @@ public class MlMetadataTests extends AbstractSerializingTestCase { builder.putDatafeed(datafeedConfig1); MlMetadata result = builder.build(); - assertThat(result.getJobs().get("foo"), sameInstance(job1)); + assertThat(result.getJobs().get("job_id"), sameInstance(job1)); assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); builder = new MlMetadata.Builder(result); builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyMap())); result = builder.build(); - assertThat(result.getJobs().get("foo"), sameInstance(job1)); + assertThat(result.getJobs().get("job_id"), sameInstance(job1)); assertThat(result.getDatafeeds().get("datafeed1"), nullValue()); } @@ -255,7 +255,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { } public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() { - DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("foo"); + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("job_id"); update.setScrollSize(5000); expectThrows(ResourceNotFoundException.class, () -> new MlMetadata.Builder().updateDatafeed(update.build(), null).build()); } @@ -316,7 +316,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null)); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [foo_2]")); + assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [job_id_2]")); } public void testRemoveDatafeed_failBecauseDatafeedStarted() { @@ -327,7 +327,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { builder.putDatafeed(datafeedConfig1); MlMetadata result = builder.build(); - assertThat(result.getJobs().get("foo"), sameInstance(job1)); + assertThat(result.getJobs().get("job_id"), sameInstance(job1)); assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 855f6b3f49b..cf47a4611dd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -58,8 +58,7 @@ public class StartDatafeedActionTests extends ESTestCase { .putCustom(PersistentTasksInProgress.TYPE, tasks)) .nodes(nodes); - StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed_id", 0L); - DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build()); + DiscoveryNode node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); assertNull(node); task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); @@ -68,10 +67,45 @@ public class StartDatafeedActionTests extends ESTestCase { .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(PersistentTasksInProgress.TYPE, tasks)) .nodes(nodes); - node = StartDatafeedAction.selectNode(logger, request, cs.build()); + node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); + assertNotNull(node); assertEquals("node_id", node.getId()); } + public void testSelectNode_jobTaskStale() { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(); + mlMetadata.putJob(job, false); + mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); + + String nodeId = randomBoolean() ? "node_id2" : null; + PersistentTaskInProgress task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksInProgress.TYPE, tasks)) + .nodes(nodes); + + DiscoveryNode node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); + assertNull(node); + + task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED); + tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + cs = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksInProgress.TYPE, tasks)) + .nodes(nodes); + node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); + assertNotNull(node); + assertEquals("node_id1", node.getId()); + } + public void testValidate() { Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(); MlMetadata mlMetadata1 = new MlMetadata.Builder() @@ -88,9 +122,10 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), false, true, INITIAL_ASSIGNMENT); + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, + INITIAL_ASSIGNMENT); PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)); - DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build(); + DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) .putDatafeed(datafeedConfig1) .build(); @@ -123,7 +158,8 @@ public class StartDatafeedActionTests extends ESTestCase { Exception e = expectThrows(ElasticsearchStatusException.class, () -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes)); - assertThat(e.getMessage(), equalTo("datafeed already started, expected datafeed state [stopped], but got [started]")); + assertThat(e.getMessage(), equalTo("datafeed [datafeed_id] already started, expected datafeed state [stopped], " + + "but got [started]")); } public void testValidate_staleTask() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index fb4bacc4f6e..c47fbeb7496 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -6,10 +6,10 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; @@ -36,7 +36,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase StopDatafeedAction.validate("foo", mlMetadata1)); assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists")); - DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "foo").build(); + DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false) .putDatafeed(datafeedConfig) .build(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index 7f426b2b2d8..30a1b1f6df8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -6,13 +6,17 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -22,6 +26,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.FlushJobAction; +import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; @@ -33,23 +38,29 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Response; import org.junit.Before; import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.net.InetAddress; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; @@ -78,10 +89,27 @@ public class DatafeedJobRunnerTests extends ESTestCase { @Before @SuppressWarnings("unchecked") public void setUpTests() { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + Job job = createDatafeedJob().build(); + mlMetadata.putJob(job, false); + mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build()); + PersistentTaskInProgress task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); + ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksInProgress.TYPE, tasks)) + .nodes(nodes); + + clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(cs.build()); + client = mock(Client.class); jobDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); - clusterService = mock(ClusterService.class); JobProvider jobProvider = mock(JobProvider.class); Mockito.doAnswer(invocationOnMock -> { @@ -128,63 +156,41 @@ public class DatafeedJobRunnerTests extends ESTestCase { } public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception { - Job.Builder jobBuilder = createDatafeedJob(); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build(); - DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - Job job = jobBuilder.build(); - MlMetadata mlMetadata = new MlMetadata.Builder() - .putJob(job, false) - .putDatafeed(datafeedConfig) - .build(); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)) - .build()); - DataExtractor dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); - StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, 60000L); + StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedJobRunner.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); - verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job))); + verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id"))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); } - private static PostDataAction.Request createExpectedPostDataRequest(Job job) { + private static PostDataAction.Request createExpectedPostDataRequest(String jobId) { DataDescription.Builder expectedDataDescription = new DataDescription.Builder(); expectedDataDescription.setTimeFormat("epoch_ms"); expectedDataDescription.setFormat(DataDescription.DataFormat.JSON); - PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(job.getId()); + PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(jobId); expectedPostDataRequest.setDataDescription(expectedDataDescription.build()); return expectedPostDataRequest; } public void testStart_extractionProblem() throws Exception { - Job.Builder jobBuilder = createDatafeedJob(); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build(); - DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - Job job = jobBuilder.build(); - MlMetadata mlMetadata = new MlMetadata.Builder() - .putJob(job, false) - .putDatafeed(datafeedConfig) - .build(); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)) - .build()); - DataExtractor dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); - StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, 60000L); + StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedJobRunner.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); @@ -196,7 +202,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { public void testStart_emptyDataCountException() throws Exception { currentTime = 6000000; Job.Builder jobBuilder = createDatafeedJob(); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build(); + DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "job_id").build(); Job job = jobBuilder.build(); MlMetadata mlMetadata = new MlMetadata.Builder() .putJob(job, false) @@ -219,7 +225,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); when(dataExtractor.hasNext()).thenReturn(false); Consumer handler = mockConsumer(); - StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, null); + StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task); datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder); @@ -230,27 +236,16 @@ public class DatafeedJobRunnerTests extends ESTestCase { } public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { - Job.Builder jobBuilder = createDatafeedJob(); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build(); - DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - Job job = jobBuilder.build(); - MlMetadata mlMetadata = new MlMetadata.Builder() - .putJob(job, false) - .putDatafeed(datafeedConfig) - .build(); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)) - .build()); - DataExtractor dataExtractor = mock(DataExtractor.class); when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); - StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed1", 0L); + StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L); StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, startDatafeedRequest); datafeedJobRunner.run(task, handler); @@ -259,7 +254,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { task.stop(); verify(handler).accept(null); } else { - verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job))); + verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id"))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any()); } @@ -347,7 +342,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { acBuilder.setBucketSpan(3600L); acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); - Job.Builder builder = new Job.Builder("foo"); + Job.Builder builder = new Job.Builder("job_id"); builder.setAnalysisConfig(acBuilder); builder.setCreateTime(new Date()); return builder; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlFullClusterRestartIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlFullClusterRestartIT.java index 3b5b87c7425..e726c0e1349 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlFullClusterRestartIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlFullClusterRestartIT.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; @@ -26,10 +25,9 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.TimeUnit; -@LuceneTestCase.AwaitsFix(bugUrl = "Too noisy, needs to be stabalized first") public class MlFullClusterRestartIT extends BaseMlIntegTestCase { - @TestLogging("org.elasticsearch.xpack.ml.datafeed:TRACE") + @TestLogging("org.elasticsearch.xpack.ml.datafeed:TRACE,org.elasticsearch.xpack.ml.action:TRACE") public void testFullClusterRestart() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); ensureStableCluster(3); @@ -81,17 +79,19 @@ public class MlFullClusterRestartIT extends BaseMlIntegTestCase { Collection> taskCollection = tasks.findTasks(OpenJobAction.NAME, p -> true); assertEquals(1, taskCollection.size()); PersistentTaskInProgress task = taskCollection.iterator().next(); + assertFalse(task.needsReassignment(clusterState.nodes())); assertEquals(JobState.OPENED, task.getStatus()); taskCollection = tasks.findTasks(StartDatafeedAction.NAME, p -> true); assertEquals(1, taskCollection.size()); task = taskCollection.iterator().next(); assertEquals(DatafeedState.STARTED, task.getStatus()); + assertFalse(task.needsReassignment(clusterState.nodes())); }); long numDocs2 = randomIntBetween(2, 64); - long yesterday = now - 86400000; - indexDocs("data", numDocs2, yesterday, now); + long now2 = System.currentTimeMillis(); + indexDocs("data", numDocs2, now2 + 5000, now2 + 6000); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); assertEquals(numDocs1 + numDocs2, dataCounts.getProcessedRecordCount()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartJobDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartJobDatafeedActionTests.java index 07406d7d107..ed2e02ebb2f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartJobDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartJobDatafeedActionTests.java @@ -13,9 +13,6 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; -import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; -import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import java.util.HashMap; import java.util.Map; @@ -25,8 +22,6 @@ import static org.mockito.Mockito.mock; public class RestStartJobDatafeedActionTests extends ESTestCase { public void testPrepareRequest() throws Exception { - Job.Builder job = DatafeedJobRunnerTests.createDatafeedJob(); - DatafeedConfig datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build(); RestStartDatafeedAction action = new RestStartDatafeedAction(Settings.EMPTY, mock(RestController.class)); Map params = new HashMap<>(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index fce95ab713b..a82f877812a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -146,13 +146,6 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase { public void cleanupWorkaround() throws Exception { deleteAllDatafeeds(client()); deleteAllJobs(client()); - int numNodes = internalCluster().size(); - for (int i = 0; i < numNodes; i++) { - internalCluster().stopRandomNode(settings -> true); - } - internalCluster().startNode(Settings.builder().put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false)); - ensureStableCluster(1); - cluster().wipe(Collections.emptySet()); } protected void indexDocs(String index, long numDocs, long start, long end) { diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml index 03d86a30e26..fb9f723f4b0 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml @@ -92,7 +92,7 @@ setup: "start": 0 - do: - catch: /datafeed already started, expected datafeed state \[stopped\], but got \[started\]/ + catch: /datafeed \[datafeed\-1\] already started, expected datafeed state \[stopped\], but got \[started\]/ xpack.ml.start_datafeed: "datafeed_id": "datafeed-1" "start": 0