[ML] Tweaked starting datafeed logic, so that it will not start when job hasn't started.

[TEST] Unmuted ml full cluster restart test.

Original commit: elastic/x-pack-elasticsearch@c8cd9870c9
This commit is contained in:
Martijn van Groningen 2017-02-24 12:26:28 +01:00
parent f67d8c3746
commit cd4e543479
11 changed files with 149 additions and 99 deletions

View File

@ -315,7 +315,7 @@ public class StartDatafeedAction
@Override
public Assignment getAssignment(Request request, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectNode(logger, request, clusterState);
DiscoveryNode discoveryNode = selectNode(logger, request.getDatafeedId(), clusterState);
// TODO: Add proper explanation
if (discoveryNode == null) {
return NO_NODE_FOUND;
@ -378,27 +378,34 @@ public class StartDatafeedAction
}
}
if (datafeedState == DatafeedState.STARTED) {
throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED);
throw new ElasticsearchStatusException("datafeed [{}] already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, datafeedId, DatafeedState.STOPPED, DatafeedState.STARTED);
}
}
static DiscoveryNode selectNode(Logger logger, Request request, ClusterState clusterState) {
public static DiscoveryNode selectNode(Logger logger, String datafeedId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
DiscoveryNodes nodes = clusterState.getNodes();
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
if (jobState == JobState.OPENED) {
PersistentTaskInProgress task = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
return nodes.get(task.getExecutorNode());
} else {
// lets try again later when the job has been opened:
logger.debug("cannot start datafeeder, because job's [{}] state is [{}] while state [{}] is required",
datafeed.getJobId(), jobState, JobState.OPENED);
PersistentTaskInProgress<?> jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
if (jobTask == null) {
logger.debug("cannot start datafeed [{}], job task doesn't yet exist", datafeed.getId());
return null;
}
if (jobTask.needsReassignment(nodes)) {
logger.debug("cannot start datafeed [{}], job [{}] is unassigned or unassigned to a non existing node",
datafeed.getId(), datafeed.getJobId());
return null;
}
if (jobTask.getStatus() != JobState.OPENED) {
// lets try again later when the job has been opened:
logger.debug("cannot start datafeed [{}], because job's [{}] state is [{}] while state [{}] is required",
datafeed.getId(), datafeed.getJobId(), jobTask.getStatus(), JobState.OPENED);
return null;
}
return nodes.get(jobTask.getExecutorNode());
}
}

View File

@ -76,11 +76,13 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
@Override
protected final void taskOperation(Request request, OperationTask task, ActionListener<Response> listener) {
PersistentTasksInProgress tasks = clusterService.state().metaData().custom(PersistentTasksInProgress.TYPE);
ClusterState state = clusterService.state();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks);
if (jobState == JobState.OPENED) {
innerTaskOperation(request, task, listener);
} else {
logger.warn("Unexpected job state based on cluster state version [{}]", state.getVersion());
listener.onFailure(new ElasticsearchStatusException("job [" + request.getJobId() + "] state is [" + jobState +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
}

View File

@ -5,9 +5,11 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
@ -15,6 +17,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -66,8 +69,18 @@ public class DatafeedJobRunner extends AbstractComponent {
}
public void run(StartDatafeedAction.DatafeedTask task, Consumer<Exception> handler) {
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(task.getDatafeedId());
String datafeedId = task.getDatafeedId();
ClusterState state = clusterService.state();
// CS on master node can be ahead on the node where job and datafeed tasks run,
// so check again and fail if in case of unexpected cs. Persist tasks will retry later then.
if (StartDatafeedAction.selectNode(logger, datafeedId, state) == null) {
handler.accept(new ElasticsearchStatusException("Local cs [{}] isn't ready to start datafeed [{}] yet",
RestStatus.CONFLICT, state.getVersion(), datafeedId));
return;
}
logger.info("Attempt to start datafeed based on cluster state version [{}]", state.getVersion());
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
gatherInformation(job.getId(), (buckets, dataCounts) -> {
long latestFinalBucketEndMs = -1L;
@ -80,11 +93,13 @@ public class DatafeedJobRunner extends AbstractComponent {
latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task);
UpdatePersistentTaskStatusAction.Request updateDatafeedStatus =
new UpdatePersistentTaskStatusAction.Request(task.getPersistentTaskId(), DatafeedState.STARTED);
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateDatafeedStatus, ActionListener.wrap(r -> {
innerRun(holder, task.getStartTime(), task.getEndTime());
}, handler));
updateDatafeedState(task.getPersistentTaskId(), DatafeedState.STARTED, e -> {
if (e != null) {
handler.accept(e);
} else {
innerRun(holder, task.getStartTime(), task.getEndTime());
}
});
}, handler);
}
@ -211,6 +226,13 @@ public class DatafeedJobRunner extends AbstractComponent {
});
}
private void updateDatafeedState(long persistentTaskId, DatafeedState datafeedState, Consumer<Exception> handler) {
UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(persistentTaskId, datafeedState);
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, request, ActionListener.wrap(r -> {
handler.accept(null);
}, handler));
}
private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) {
Long frequency = datafeed.getFrequency();
Long bucketSpan = job.getAnalysisConfig().getBucketSpan();

View File

@ -183,13 +183,13 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
builder.putDatafeed(datafeedConfig1);
MlMetadata result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getJobs().get("job_id"), sameInstance(job1));
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
builder = new MlMetadata.Builder(result);
builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyMap()));
result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getJobs().get("job_id"), sameInstance(job1));
assertThat(result.getDatafeeds().get("datafeed1"), nullValue());
}
@ -255,7 +255,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() {
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("foo");
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("job_id");
update.setScrollSize(5000);
expectThrows(ResourceNotFoundException.class, () -> new MlMetadata.Builder().updateDatafeed(update.build(), null).build());
}
@ -316,7 +316,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [foo_2]"));
assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [job_id_2]"));
}
public void testRemoveDatafeed_failBecauseDatafeedStarted() {
@ -327,7 +327,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
builder.putDatafeed(datafeedConfig1);
MlMetadata result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
assertThat(result.getJobs().get("job_id"), sameInstance(job1));
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);

View File

@ -58,8 +58,7 @@ public class StartDatafeedActionTests extends ESTestCase {
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed_id", 0L);
DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build());
DiscoveryNode node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNull(node);
task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
@ -68,10 +67,45 @@ public class StartDatafeedActionTests extends ESTestCase {
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
node = StartDatafeedAction.selectNode(logger, request, cs.build());
node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNotNull(node);
assertEquals("node_id", node.getId());
}
public void testSelectNode_jobTaskStale() {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job job = createScheduledJob("job_id").build();
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
String nodeId = randomBoolean() ? "node_id2" : null;
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
DiscoveryNode node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNull(node);
task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
node = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertNotNull(node);
assertEquals("node_id1", node.getId());
}
public void testValidate() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder()
@ -88,9 +122,10 @@ public class StartDatafeedActionTests extends ESTestCase {
.putJob(job1, false)
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), false, true, INITIAL_ASSIGNMENT);
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true,
INITIAL_ASSIGNMENT);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task));
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build();
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1)
.build();
@ -123,7 +158,8 @@ public class StartDatafeedActionTests extends ESTestCase {
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes));
assertThat(e.getMessage(), equalTo("datafeed already started, expected datafeed state [stopped], but got [started]"));
assertThat(e.getMessage(), equalTo("datafeed [datafeed_id] already started, expected datafeed state [stopped], " +
"but got [started]"));
}
public void testValidate_staleTask() {

View File

@ -6,10 +6,10 @@
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
@ -36,7 +36,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableTestCase<S
() -> StopDatafeedAction.validate("foo", mlMetadata1));
assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists"));
DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "foo").build();
DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false)
.putDatafeed(datafeedConfig)
.build();

View File

@ -6,13 +6,17 @@
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.aggregations.AggregationBuilders;
@ -22,6 +26,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
@ -33,23 +38,29 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Response;
import org.junit.Before;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
@ -78,10 +89,27 @@ public class DatafeedJobRunnerTests extends ESTestCase {
@Before
@SuppressWarnings("unchecked")
public void setUpTests() {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job job = createDatafeedJob().build();
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build());
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksInProgress.TYPE, tasks))
.nodes(nodes);
clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(cs.build());
client = mock(Client.class);
jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
clusterService = mock(ClusterService.class);
JobProvider jobProvider = mock(JobProvider.class);
Mockito.doAnswer(invocationOnMock -> {
@ -128,63 +156,41 @@ public class DatafeedJobRunnerTests extends ESTestCase {
}
public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception {
Job.Builder jobBuilder = createDatafeedJob();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
Job job = jobBuilder.build();
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))
.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, 60000L);
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job)));
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id")));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
}
private static PostDataAction.Request createExpectedPostDataRequest(Job job) {
private static PostDataAction.Request createExpectedPostDataRequest(String jobId) {
DataDescription.Builder expectedDataDescription = new DataDescription.Builder();
expectedDataDescription.setTimeFormat("epoch_ms");
expectedDataDescription.setFormat(DataDescription.DataFormat.JSON);
PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(job.getId());
PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(jobId);
expectedPostDataRequest.setDataDescription(expectedDataDescription.build());
return expectedPostDataRequest;
}
public void testStart_extractionProblem() throws Exception {
Job.Builder jobBuilder = createDatafeedJob();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
Job job = jobBuilder.build();
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))
.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, 60000L);
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
@ -196,7 +202,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
public void testStart_emptyDataCountException() throws Exception {
currentTime = 6000000;
Job.Builder jobBuilder = createDatafeedJob();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "job_id").build();
Job job = jobBuilder.build();
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
@ -219,7 +225,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(false);
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, null);
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null);
DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task);
datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder);
@ -230,27 +236,16 @@ public class DatafeedJobRunnerTests extends ESTestCase {
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
Job.Builder jobBuilder = createDatafeedJob();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
Job job = jobBuilder.build();
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))
.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed1", 0L);
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L);
StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, startDatafeedRequest);
datafeedJobRunner.run(task, handler);
@ -259,7 +254,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
task.stop();
verify(handler).accept(null);
} else {
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest(job)));
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id")));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any());
}
@ -347,7 +342,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
acBuilder.setBucketSpan(3600L);
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
Job.Builder builder = new Job.Builder("foo");
Job.Builder builder = new Job.Builder("job_id");
builder.setAnalysisConfig(acBuilder);
builder.setCreateTime(new Date());
return builder;

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
@ -26,10 +25,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@LuceneTestCase.AwaitsFix(bugUrl = "Too noisy, needs to be stabalized first")
public class MlFullClusterRestartIT extends BaseMlIntegTestCase {
@TestLogging("org.elasticsearch.xpack.ml.datafeed:TRACE")
@TestLogging("org.elasticsearch.xpack.ml.datafeed:TRACE,org.elasticsearch.xpack.ml.action:TRACE")
public void testFullClusterRestart() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
ensureStableCluster(3);
@ -81,17 +79,19 @@ public class MlFullClusterRestartIT extends BaseMlIntegTestCase {
Collection<PersistentTaskInProgress<?>> taskCollection = tasks.findTasks(OpenJobAction.NAME, p -> true);
assertEquals(1, taskCollection.size());
PersistentTaskInProgress<?> task = taskCollection.iterator().next();
assertFalse(task.needsReassignment(clusterState.nodes()));
assertEquals(JobState.OPENED, task.getStatus());
taskCollection = tasks.findTasks(StartDatafeedAction.NAME, p -> true);
assertEquals(1, taskCollection.size());
task = taskCollection.iterator().next();
assertEquals(DatafeedState.STARTED, task.getStatus());
assertFalse(task.needsReassignment(clusterState.nodes()));
});
long numDocs2 = randomIntBetween(2, 64);
long yesterday = now - 86400000;
indexDocs("data", numDocs2, yesterday, now);
long now2 = System.currentTimeMillis();
indexDocs("data", numDocs2, now2 + 5000, now2 + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertEquals(numDocs1 + numDocs2, dataCounts.getProcessedRecordCount());

View File

@ -13,9 +13,6 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import java.util.HashMap;
import java.util.Map;
@ -25,8 +22,6 @@ import static org.mockito.Mockito.mock;
public class RestStartJobDatafeedActionTests extends ESTestCase {
public void testPrepareRequest() throws Exception {
Job.Builder job = DatafeedJobRunnerTests.createDatafeedJob();
DatafeedConfig datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build();
RestStartDatafeedAction action = new RestStartDatafeedAction(Settings.EMPTY, mock(RestController.class));
Map<String, String> params = new HashMap<>();

View File

@ -146,13 +146,6 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
public void cleanupWorkaround() throws Exception {
deleteAllDatafeeds(client());
deleteAllJobs(client());
int numNodes = internalCluster().size();
for (int i = 0; i < numNodes; i++) {
internalCluster().stopRandomNode(settings -> true);
}
internalCluster().startNode(Settings.builder().put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false));
ensureStableCluster(1);
cluster().wipe(Collections.emptySet());
}
protected void indexDocs(String index, long numDocs, long start, long end) {

View File

@ -92,7 +92,7 @@ setup:
"start": 0
- do:
catch: /datafeed already started, expected datafeed state \[stopped\], but got \[started\]/
catch: /datafeed \[datafeed\-1\] already started, expected datafeed state \[stopped\], but got \[started\]/
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0