[TEST] Add test that verifies that machine learning works correctly after full cluster restart.
Added missing named xcontent entries, tweaked start datafeed validation logic and fixed status serialization issue that were uncovered by this test. Original commit: elastic/x-pack-elasticsearch@8b3fb012df
This commit is contained in:
parent
4bb57137a9
commit
9a5b410110
|
@ -223,8 +223,20 @@ public class MachineLearning extends Plugin implements ActionPlugin {
|
|||
|
||||
@Override
|
||||
public List<NamedXContentRegistry.Entry> getNamedXContent() {
|
||||
return Arrays.asList(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
|
||||
return Arrays.asList(
|
||||
// Custom metadata
|
||||
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
|
||||
parser -> MlMetadata.ML_METADATA_PARSER.parse(parser, null).build()),
|
||||
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE),
|
||||
PersistentTasksInProgress::fromXContent),
|
||||
|
||||
// Persistent action requests
|
||||
new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(StartDatafeedAction.NAME),
|
||||
StartDatafeedAction.Request::fromXContent),
|
||||
new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(OpenJobAction.NAME),
|
||||
OpenJobAction.Request::fromXContent),
|
||||
|
||||
// Task statuses
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobState.NAME), JobState::fromXContent)
|
||||
);
|
||||
|
|
|
@ -30,10 +30,10 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
|
@ -155,7 +155,7 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
|
|||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
|
||||
builder.field(STATE, datafeedState);
|
||||
builder.field(STATE, datafeedState.toString());
|
||||
builder.endObject();
|
||||
|
||||
return builder;
|
||||
|
|
|
@ -33,10 +33,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
|
@ -193,7 +193,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
|
|||
if (modelSizeStats != null) {
|
||||
builder.field(MODEL_SIZE_STATS, modelSizeStats);
|
||||
}
|
||||
builder.field(STATE, state);
|
||||
builder.field(STATE, state.toString());
|
||||
builder.endObject();
|
||||
|
||||
return builder;
|
||||
|
|
|
@ -40,9 +40,9 @@ import org.elasticsearch.transport.TransportResponse;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.XPackPlugin;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
|
||||
|
@ -96,6 +96,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
|
|||
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
|
||||
}
|
||||
|
||||
public static Request fromXContent(XContentParser parser) {
|
||||
return parseRequest(null, parser);
|
||||
}
|
||||
|
||||
public static Request parseRequest(String jobId, XContentParser parser) {
|
||||
Request request = PARSER.apply(parser, null);
|
||||
if (jobId != null) {
|
||||
|
|
|
@ -38,13 +38,13 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.XPackPlugin;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
|
||||
|
@ -95,6 +95,10 @@ public class StartDatafeedAction
|
|||
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
|
||||
}
|
||||
|
||||
public static Request fromXContent(XContentParser parser) {
|
||||
return parseRequest(null, parser);
|
||||
}
|
||||
|
||||
public static Request parseRequest(String datafeedId, XContentParser parser) {
|
||||
Request request = PARSER.apply(parser, null);
|
||||
if (datafeedId != null) {
|
||||
|
@ -354,11 +358,15 @@ public class StartDatafeedAction
|
|||
|
||||
PersistentTaskInProgress<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
|
||||
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
|
||||
if (datafeedTask != null && datafeedTask.getExecutorNode() != null && datafeedState == DatafeedState.STARTED) {
|
||||
if (nodes.nodeExists(datafeedTask.getExecutorNode()) == false) {
|
||||
if (datafeedTask != null && datafeedState == DatafeedState.STARTED) {
|
||||
if (datafeedTask.getExecutorNode() == null) {
|
||||
// We can skip the datafeed state check below, because the task got unassigned after we went into
|
||||
// started state on a node that disappeared and we didn't have the opportunity to set the status to stopped
|
||||
return;
|
||||
} else if (nodes.nodeExists(datafeedTask.getExecutorNode()) == false) {
|
||||
// The state is started and the node were running on no longer exists.
|
||||
// We can skip the datafeed state check below, because when the node
|
||||
// disappeared we didn't have time to set the state to failed.
|
||||
// disappeared we didn't have time to set the state to stopped.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,10 +25,10 @@ import org.elasticsearch.rest.RestStatus;
|
|||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
|
||||
|
@ -120,7 +120,7 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
|
|||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class JobTaskRequest<R extends JobTaskRequest<R>> extends BaseTasksRequest<R> {
|
||||
|
|
|
@ -5,21 +5,32 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.datafeed;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
||||
public enum DatafeedState implements Task.Status {
|
||||
|
||||
STARTED, STOPPED;
|
||||
|
||||
public static final String NAME = "DatafeedState";
|
||||
public static final String NAME = StartDatafeedAction.NAME;//"DatafeedState";
|
||||
|
||||
private static final ConstructingObjectParser<DatafeedState, Void> PARSER =
|
||||
new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(constructorArg(), new ParseField("state"));
|
||||
}
|
||||
|
||||
public static DatafeedState fromString(String name) {
|
||||
return valueOf(name.trim().toUpperCase(Locale.ROOT));
|
||||
|
@ -45,15 +56,19 @@ public enum DatafeedState implements Task.Status {
|
|||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.value(this.toString().toLowerCase(Locale.ROOT));
|
||||
builder.startObject();
|
||||
builder.field("state", name().toLowerCase(Locale.ROOT));
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static DatafeedState fromXContent(XContentParser parser) throws IOException {
|
||||
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
|
||||
throw new ElasticsearchParseException("Unexpected token {}", parser.currentToken());
|
||||
}
|
||||
return fromString(parser.text());
|
||||
return PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -5,17 +5,21 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.job.config;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.xpack.ml.action.OpenJobAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
||||
/**
|
||||
* Jobs whether running or complete are in one of these states.
|
||||
* When a job is created it is initialised in to the state closed
|
||||
|
@ -25,7 +29,14 @@ public enum JobState implements Task.Status {
|
|||
|
||||
CLOSING, CLOSED, OPENING, OPENED, FAILED;
|
||||
|
||||
public static final String NAME = "JobState";
|
||||
public static final String NAME = OpenJobAction.NAME;//"JobState";
|
||||
|
||||
private static final ConstructingObjectParser<JobState, Void> PARSER =
|
||||
new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(constructorArg(), new ParseField("state"));
|
||||
}
|
||||
|
||||
public static JobState fromString(String name) {
|
||||
return valueOf(name.trim().toUpperCase(Locale.ROOT));
|
||||
|
@ -51,15 +62,20 @@ public enum JobState implements Task.Status {
|
|||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.value(this.toString().toLowerCase(Locale.ROOT));
|
||||
builder.startObject();
|
||||
builder.field("state", name().toLowerCase(Locale.ROOT));
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
public static JobState fromXContent(XContentParser parser) throws IOException {
|
||||
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
|
||||
throw new ElasticsearchParseException("Unexpected token {}", parser.currentToken());
|
||||
}
|
||||
return fromString(parser.text());
|
||||
return PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -7,19 +7,10 @@ package org.elasticsearch.xpack.ml.action;
|
|||
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
|
||||
|
@ -27,12 +18,10 @@ import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class DatafeedJobsIT extends BaseMlIntegTestCase {
|
||||
|
||||
|
@ -154,35 +143,4 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
private void indexDocs(String index, long numDocs, long start, long end) {
|
||||
int maxDelta = (int) (end - start - 1);
|
||||
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
IndexRequest indexRequest = new IndexRequest(index, "type");
|
||||
long timestamp = start + randomIntBetween(0, maxDelta);
|
||||
assert timestamp >= start && timestamp < end;
|
||||
indexRequest.source("time", timestamp);
|
||||
bulkRequestBuilder.add(indexRequest);
|
||||
}
|
||||
BulkResponse bulkResponse = bulkRequestBuilder
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||
.get();
|
||||
assertThat(bulkResponse.hasFailures(), is(false));
|
||||
logger.info("Indexed [{}] documents", numDocs);
|
||||
}
|
||||
|
||||
private DataCounts getDataCounts(String jobId) {
|
||||
GetResponse getResponse = client().prepareGet(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
|
||||
DataCounts.TYPE.getPreferredName(), jobId + "-data-counts").get();
|
||||
if (getResponse.isExists() == false) {
|
||||
return new DataCounts(jobId);
|
||||
}
|
||||
|
||||
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, getResponse.getSourceAsBytesRef())) {
|
||||
return DataCounts.PARSER.apply(parser, null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -15,12 +15,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
|
||||
|
@ -146,6 +146,12 @@ public class StartDatafeedActionTests extends ESTestCase {
|
|||
taskMap.put(1L, datafeedTask);
|
||||
PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap);
|
||||
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);
|
||||
|
||||
datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
|
||||
false, true, null);
|
||||
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
|
||||
taskMap.put(1L, datafeedTask);
|
||||
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.OpenJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class MlFullClusterRestartIT extends BaseMlIntegTestCase {
|
||||
|
||||
public void testFullClusterRestart() throws Exception {
|
||||
internalCluster().ensureAtLeastNumDataNodes(3);
|
||||
ensureStableCluster(3);
|
||||
|
||||
client().admin().indices().prepareCreate("data")
|
||||
.addMapping("type", "time", "type=date")
|
||||
.get();
|
||||
long numDocs1 = randomIntBetween(32, 2048);
|
||||
long now = System.currentTimeMillis();
|
||||
long weekAgo = now - 604800000;
|
||||
long twoWeeksAgo = weekAgo - 604800000;
|
||||
indexDocs("data", numDocs1, twoWeeksAgo, weekAgo);
|
||||
|
||||
Job.Builder job = createScheduledJob("job_id");
|
||||
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
|
||||
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
||||
DatafeedConfig config = createDatafeed("data_feed_id", job.getId(), Collections.singletonList("data"));
|
||||
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
|
||||
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest)
|
||||
.actionGet();
|
||||
assertTrue(putDatadeedResponse.isAcknowledged());
|
||||
|
||||
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));
|
||||
assertBusy(() -> {
|
||||
GetJobsStatsAction.Response statsResponse =
|
||||
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
|
||||
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
|
||||
});
|
||||
|
||||
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
|
||||
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
|
||||
assertBusy(() -> {
|
||||
DataCounts dataCounts = getDataCounts(job.getId());
|
||||
assertEquals(numDocs1, dataCounts.getProcessedRecordCount());
|
||||
assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount());
|
||||
});
|
||||
|
||||
logger.info("Restarting all nodes");
|
||||
internalCluster().fullRestart();
|
||||
logger.info("Restarted all nodes");
|
||||
assertBusy(() -> {
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
PersistentTasksInProgress tasks = clusterState.metaData().custom(PersistentTasksInProgress.TYPE);
|
||||
assertNotNull(tasks);
|
||||
assertEquals(2, tasks.taskMap().size());
|
||||
|
||||
Collection<PersistentTaskInProgress<?>> taskCollection = tasks.findTasks(OpenJobAction.NAME, p -> true);
|
||||
assertEquals(1, taskCollection.size());
|
||||
PersistentTaskInProgress<?> task = taskCollection.iterator().next();
|
||||
assertEquals(JobState.OPENED, task.getStatus());
|
||||
|
||||
taskCollection = tasks.findTasks(StartDatafeedAction.NAME, p -> true);
|
||||
assertEquals(1, taskCollection.size());
|
||||
task = taskCollection.iterator().next();
|
||||
assertEquals(DatafeedState.STARTED, task.getStatus());
|
||||
});
|
||||
|
||||
long numDocs2 = randomIntBetween(2, 64);
|
||||
long yesterday = now - 86400000;
|
||||
indexDocs("data", numDocs2, yesterday, now);
|
||||
assertBusy(() -> {
|
||||
DataCounts dataCounts = getDataCounts(job.getId());
|
||||
assertEquals(numDocs1 + numDocs2, dataCounts.getProcessedRecordCount());
|
||||
assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount());
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
cleanupWorkaround(3);
|
||||
}
|
||||
|
||||
}
|
|
@ -6,6 +6,10 @@
|
|||
package org.elasticsearch.xpack.ml.support;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -15,6 +19,7 @@ import org.elasticsearch.test.ESIntegTestCase;
|
|||
import org.elasticsearch.test.SecurityIntegTestCase;
|
||||
import org.elasticsearch.xpack.XPackSettings;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.action.CloseJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
|
||||
|
@ -28,7 +33,7 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
|||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -37,6 +42,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* A base class for testing datafeed and job lifecycle specifics.
|
||||
|
@ -123,6 +129,33 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
|
|||
cluster().wipe(Collections.emptySet());
|
||||
}
|
||||
|
||||
protected void indexDocs(String index, long numDocs, long start, long end) {
|
||||
int maxDelta = (int) (end - start - 1);
|
||||
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
IndexRequest indexRequest = new IndexRequest(index, "type");
|
||||
long timestamp = start + randomIntBetween(0, maxDelta);
|
||||
assert timestamp >= start && timestamp < end;
|
||||
indexRequest.source("time", timestamp);
|
||||
bulkRequestBuilder.add(indexRequest);
|
||||
}
|
||||
BulkResponse bulkResponse = bulkRequestBuilder
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||
.get();
|
||||
assertThat(bulkResponse.hasFailures(), is(false));
|
||||
logger.info("Indexed [{}] documents", numDocs);
|
||||
}
|
||||
|
||||
protected DataCounts getDataCounts(String jobId) {
|
||||
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
|
||||
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet();
|
||||
if (response.getResponse().results().isEmpty()) {
|
||||
return new DataCounts(jobId);
|
||||
} else {
|
||||
return response.getResponse().results().get(0).getDataCounts();
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteAllDatafeeds(Client client) throws Exception {
|
||||
MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData();
|
||||
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
|
||||
|
|
Loading…
Reference in New Issue