[ML] Also revert quantiles and model_size_stats (elastic/x-pack-elasticsearch#1369)
When a model snapshot is reverted, we should also revert quantiles and model_size_stats to the ones of the reverted snapshot. relates elastic/x-pack-elasticsearch#1342 Original commit: elastic/x-pack-elasticsearch@ddabe40470
This commit is contained in:
parent
cd99024599
commit
a5a44a2e2e
|
@ -97,7 +97,7 @@ extends Action<GetModelSnapshotsAction.Request, GetModelSnapshotsAction.Response
|
|||
private String sort;
|
||||
private String start;
|
||||
private String end;
|
||||
private boolean desc;
|
||||
private boolean desc = true;
|
||||
private PageParams pageParams = new PageParams();
|
||||
|
||||
Request() {
|
||||
|
|
|
@ -310,9 +310,7 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
|
|||
throw new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(),
|
||||
request.getJobId()));
|
||||
}
|
||||
// The quantiles can be large, and totally dominate the output -
|
||||
// it's clearer to remove them as they are not necessary for the revert op
|
||||
handler.accept(new ModelSnapshot.Builder(modelSnapshot).setQuantiles(null).build());
|
||||
handler.accept(modelSnapshot);
|
||||
}, errorHandler);
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,8 @@ package org.elasticsearch.xpack.ml.job;
|
|||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
|
@ -33,13 +35,16 @@ import org.elasticsearch.xpack.ml.job.config.JobUpdate;
|
|||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
@ -49,6 +54,8 @@ import java.util.Objects;
|
|||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
||||
/**
|
||||
* Allows interactions with jobs. The managed interactions include:
|
||||
* <ul>
|
||||
|
@ -348,17 +355,46 @@ public class JobManager extends AbstractComponent {
|
|||
public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener<RevertModelSnapshotAction.Response> actionListener,
|
||||
ModelSnapshot modelSnapshot) {
|
||||
|
||||
final JobResultsPersister persister = new JobResultsPersister(settings, client);
|
||||
|
||||
// Step 2. After the model size stats is persisted, also persist the snapshot's quantiles and respond
|
||||
// -------
|
||||
CheckedConsumer<IndexResponse, Exception> modelSizeStatsResponseHandler = response -> {
|
||||
persister.persistQuantiles(modelSnapshot.getQuantiles(), WriteRequest.RefreshPolicy.IMMEDIATE,
|
||||
ActionListener.wrap(quantilesResponse -> {
|
||||
// The quantiles can be large, and totally dominate the output -
|
||||
// it's clearer to remove them as they are not necessary for the revert op
|
||||
ModelSnapshot snapshotWithoutQuantiles = new ModelSnapshot.Builder(modelSnapshot).setQuantiles(null).build();
|
||||
actionListener.onResponse(new RevertModelSnapshotAction.Response(snapshotWithoutQuantiles));
|
||||
}, actionListener::onFailure));
|
||||
};
|
||||
|
||||
// Step 1. When the model_snapshot_id is updated on the job, persist the snapshot's model size stats with a touched log time
|
||||
// so that a search for the latest model size stats returns the reverted one.
|
||||
// -------
|
||||
CheckedConsumer<Boolean, Exception> updateHandler = response -> {
|
||||
if (response) {
|
||||
ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSnapshot.getModelSizeStats())
|
||||
.setLogTime(new Date()).build();
|
||||
persister.persistModelSizeStats(revertedModelSizeStats, WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap(
|
||||
modelSizeStatsResponseHandler::accept, actionListener::onFailure));
|
||||
}
|
||||
};
|
||||
|
||||
// Step 0. Kick off the chain of callbacks with the cluster state update
|
||||
// -------
|
||||
clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(),
|
||||
new AckedClusterStateUpdateTask<RevertModelSnapshotAction.Response>(request, actionListener) {
|
||||
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) {
|
||||
|
||||
@Override
|
||||
protected RevertModelSnapshotAction.Response newResponse(boolean acknowledged) {
|
||||
protected Boolean newResponse(boolean acknowledged) {
|
||||
if (acknowledged) {
|
||||
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));
|
||||
return new RevertModelSnapshotAction.Response(modelSnapshot);
|
||||
return true;
|
||||
}
|
||||
throw new IllegalStateException("Could not revert modelSnapshot on job ["
|
||||
+ request.getJobId() + "], not acknowledged by master.");
|
||||
actionListener.onFailure(new IllegalStateException("Could not revert modelSnapshot on job ["
|
||||
+ request.getJobId() + "], not acknowledged by master."));
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -6,11 +6,16 @@
|
|||
package org.elasticsearch.xpack.ml.job.persistence;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -32,7 +37,6 @@ import java.io.IOException;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
||||
|
@ -221,18 +225,28 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
public void persistCategoryDefinition(CategoryDefinition category) {
|
||||
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(),
|
||||
CategoryDefinition.documentId(category.getJobId(), Long.toString(category.getCategoryId())));
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(category.getJobId()));
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(category.getJobId())).actionGet();
|
||||
// Don't commit as we expect masses of these updates and they're not
|
||||
// read again by this process
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist the quantiles
|
||||
* Persist the quantiles (blocking)
|
||||
*/
|
||||
public void persistQuantiles(Quantiles quantiles) {
|
||||
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(),
|
||||
Quantiles.documentId(quantiles.getJobId()));
|
||||
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist the quantiles (async)
|
||||
*/
|
||||
public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener<IndexResponse> listener) {
|
||||
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(),
|
||||
Quantiles.documentId(quantiles.getJobId()));
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -241,18 +255,31 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
|
||||
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(),
|
||||
ModelSnapshot.documentId(modelSnapshot));
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())).actionGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist the memory usage data (blocking)
|
||||
*/
|
||||
public void persistModelSizeStats(ModelSizeStats modelSizeStats) {
|
||||
String jobId = modelSizeStats.getJobId();
|
||||
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
|
||||
Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.documentId());
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).actionGet();
|
||||
// Don't commit as we expect masses of these updates and they're only
|
||||
// for information at the API level
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist the memory usage data
|
||||
*/
|
||||
public void persistModelSizeStats(ModelSizeStats modelSizeStats) {
|
||||
public void persistModelSizeStats(ModelSizeStats modelSizeStats, WriteRequest.RefreshPolicy refreshPolicy,
|
||||
ActionListener<IndexResponse> listener) {
|
||||
String jobId = modelSizeStats.getJobId();
|
||||
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
|
||||
Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(),
|
||||
modelSizeStats.documentId());
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
|
||||
Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.documentId());
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), listener);
|
||||
// Don't commit as we expect masses of these updates and they're only
|
||||
// for information at the API level
|
||||
}
|
||||
|
@ -262,7 +289,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
*/
|
||||
public void persistModelPlot(ModelPlot modelPlot) {
|
||||
Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, Result.TYPE.getPreferredName(), null);
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelPlot.getJobId()));
|
||||
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelPlot.getJobId())).actionGet();
|
||||
// Don't commit as we expect masses of these updates and they're not
|
||||
// read again by this process
|
||||
}
|
||||
|
@ -320,29 +347,37 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
private final ToXContent object;
|
||||
private final String type;
|
||||
private final String id;
|
||||
private WriteRequest.RefreshPolicy refreshPolicy;
|
||||
|
||||
Persistable(String jobId, ToXContent object, String type, String id) {
|
||||
this.jobId = jobId;
|
||||
this.object = object;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
|
||||
}
|
||||
|
||||
boolean persist(String indexName) {
|
||||
if (object == null) {
|
||||
logger.warn("[{}] No {} to persist for job ", jobId, type);
|
||||
return false;
|
||||
}
|
||||
void setRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
|
||||
this.refreshPolicy = refreshPolicy;
|
||||
}
|
||||
|
||||
ActionFuture<IndexResponse> persist(String indexName) {
|
||||
PlainActionFuture<IndexResponse> actionFuture = PlainActionFuture.newFuture();
|
||||
persist(indexName, actionFuture);
|
||||
return actionFuture;
|
||||
}
|
||||
|
||||
void persist(String indexName, ActionListener<IndexResponse> listener) {
|
||||
logCall(indexName);
|
||||
|
||||
try (XContentBuilder content = toXContentBuilder(object)) {
|
||||
IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content);
|
||||
client.index(indexRequest).actionGet();
|
||||
return true;
|
||||
IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content).setRefreshPolicy(refreshPolicy);
|
||||
client.index(indexRequest, listener);
|
||||
} catch (IOException e) {
|
||||
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e);
|
||||
return false;
|
||||
IndexResponse.Builder notCreatedResponse = new IndexResponse.Builder();
|
||||
notCreatedResponse.setCreated(false);
|
||||
listener.onResponse(notCreatedResponse.build());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -116,7 +116,7 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
|
|||
private final Date timestamp;
|
||||
private final Date logTime;
|
||||
|
||||
private ModelSizeStats(String jobId, String id, long modelBytes, long totalByFieldCount, long totalOverFieldCount,
|
||||
private ModelSizeStats(String jobId, long modelBytes, long totalByFieldCount, long totalOverFieldCount,
|
||||
long totalPartitionFieldCount, long bucketAllocationFailuresCount, MemoryStatus memoryStatus,
|
||||
Date timestamp, Date logTime) {
|
||||
this.jobId = jobId;
|
||||
|
@ -267,7 +267,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
|
|||
public static class Builder {
|
||||
|
||||
private final String jobId;
|
||||
private String id;
|
||||
private long modelBytes;
|
||||
private long totalByFieldCount;
|
||||
private long totalOverFieldCount;
|
||||
|
@ -279,50 +278,65 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
|
|||
|
||||
public Builder(String jobId) {
|
||||
this.jobId = jobId;
|
||||
id = RESULT_TYPE_FIELD.getPreferredName();
|
||||
memoryStatus = MemoryStatus.OK;
|
||||
logTime = new Date();
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = Objects.requireNonNull(id);
|
||||
public Builder(ModelSizeStats modelSizeStats) {
|
||||
this.jobId = modelSizeStats.jobId;
|
||||
this.modelBytes = modelSizeStats.modelBytes;
|
||||
this.totalByFieldCount = modelSizeStats.totalByFieldCount;
|
||||
this.totalOverFieldCount = modelSizeStats.totalOverFieldCount;
|
||||
this.totalPartitionFieldCount = modelSizeStats.totalPartitionFieldCount;
|
||||
this.bucketAllocationFailuresCount = modelSizeStats.bucketAllocationFailuresCount;
|
||||
this.memoryStatus = modelSizeStats.memoryStatus;
|
||||
this.timestamp = modelSizeStats.timestamp;
|
||||
this.logTime = modelSizeStats.logTime;
|
||||
}
|
||||
|
||||
public void setModelBytes(long modelBytes) {
|
||||
public Builder setModelBytes(long modelBytes) {
|
||||
this.modelBytes = modelBytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setTotalByFieldCount(long totalByFieldCount) {
|
||||
public Builder setTotalByFieldCount(long totalByFieldCount) {
|
||||
this.totalByFieldCount = totalByFieldCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setTotalPartitionFieldCount(long totalPartitionFieldCount) {
|
||||
public Builder setTotalPartitionFieldCount(long totalPartitionFieldCount) {
|
||||
this.totalPartitionFieldCount = totalPartitionFieldCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setTotalOverFieldCount(long totalOverFieldCount) {
|
||||
public Builder setTotalOverFieldCount(long totalOverFieldCount) {
|
||||
this.totalOverFieldCount = totalOverFieldCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setBucketAllocationFailuresCount(long bucketAllocationFailuresCount) {
|
||||
public Builder setBucketAllocationFailuresCount(long bucketAllocationFailuresCount) {
|
||||
this.bucketAllocationFailuresCount = bucketAllocationFailuresCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setMemoryStatus(MemoryStatus memoryStatus) {
|
||||
public Builder setMemoryStatus(MemoryStatus memoryStatus) {
|
||||
Objects.requireNonNull(memoryStatus, "[" + MEMORY_STATUS_FIELD.getPreferredName() + "] must not be null");
|
||||
this.memoryStatus = memoryStatus;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setTimestamp(Date timestamp) {
|
||||
public Builder setTimestamp(Date timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setLogTime(Date logTime) {
|
||||
public Builder setLogTime(Date logTime) {
|
||||
this.logTime = logTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ModelSizeStats build() {
|
||||
return new ModelSizeStats(jobId, id, modelBytes, totalByFieldCount, totalOverFieldCount, totalPartitionFieldCount,
|
||||
return new ModelSizeStats(jobId, modelBytes, totalByFieldCount, totalOverFieldCount, totalPartitionFieldCount,
|
||||
bucketAllocationFailuresCount, memoryStatus, timestamp, logTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -313,7 +313,6 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
|||
|
||||
private ModelSizeStats createModelSizeStats() {
|
||||
ModelSizeStats.Builder builder = new ModelSizeStats.Builder(JOB_ID);
|
||||
builder.setId(randomAlphaOfLength(20));
|
||||
builder.setTimestamp(new Date(randomNonNegativeLong()));
|
||||
builder.setLogTime(new Date(randomNonNegativeLong()));
|
||||
builder.setBucketAllocationFailuresCount(randomNonNegativeLong());
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction;
|
|||
import org.elasticsearch.xpack.ml.action.FlushJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetJobsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
|
||||
|
@ -35,6 +36,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction;
|
|||
import org.elasticsearch.xpack.ml.action.PostDataAction;
|
||||
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
|
||||
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
|
||||
|
@ -203,6 +205,11 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
|
|||
assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)), 30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
protected List<Job> getJob(String jobId) {
|
||||
GetJobsAction.Request request = new GetJobsAction.Request(jobId);
|
||||
return client().execute(GetJobsAction.INSTANCE, request).actionGet().getResponse().results();
|
||||
}
|
||||
|
||||
protected List<GetJobsStatsAction.Response.JobStats> getJobStats(String jobId) {
|
||||
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
|
||||
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet();
|
||||
|
@ -235,6 +242,11 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
|
|||
return response.getPage().results();
|
||||
}
|
||||
|
||||
protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId) {
|
||||
RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId, snapshotId);
|
||||
return client().execute(RevertModelSnapshotAction.INSTANCE, request).actionGet();
|
||||
}
|
||||
|
||||
protected List<CategoryDefinition> getCategories(String jobId) {
|
||||
GetCategoriesAction.Request getCategoriesRequest =
|
||||
new GetCategoriesAction.Request(jobId);
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
|
||||
import org.junit.After;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
/**
|
||||
* This test pushes data through a job in 2 runs creating
|
||||
* 2 model snapshots. It then reverts to the earlier snapshot
|
||||
* and asserts the reversion worked as expected.
|
||||
*/
|
||||
public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
|
||||
|
||||
@After
|
||||
public void tearDownData() throws Exception {
|
||||
cleanUp();
|
||||
}
|
||||
|
||||
public void test() throws Exception {
|
||||
TimeValue bucketSpan = TimeValue.timeValueHours(1);
|
||||
long startTime = 1491004800000L;
|
||||
|
||||
Job.Builder job = buildAndRegisterJob("revert-model-snapshot-split-it-job", bucketSpan);
|
||||
openJob(job.getId());
|
||||
postData(job.getId(), generateData(startTime, bucketSpan, 10, Arrays.asList("foo"),
|
||||
(bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0).stream().collect(Collectors.joining()));
|
||||
closeJob(job.getId());
|
||||
|
||||
ModelSizeStats modelSizeStats1 = getJobStats(job.getId()).get(0).getModelSizeStats();
|
||||
String quantiles1 = getQuantiles(job.getId());
|
||||
|
||||
// We need to wait a second to ensure the second time around model snapshot will have a different ID (it depends on epoch seconds)
|
||||
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
|
||||
|
||||
openJob(job.getId());
|
||||
postData(job.getId(), generateData(startTime + 10 * bucketSpan.getMillis(), bucketSpan, 10, Arrays.asList("foo", "bar"),
|
||||
(bucketIndex, series) -> 10.0).stream().collect(Collectors.joining()));
|
||||
closeJob(job.getId());
|
||||
|
||||
ModelSizeStats modelSizeStats2 = getJobStats(job.getId()).get(0).getModelSizeStats();
|
||||
String quantiles2 = getQuantiles(job.getId());
|
||||
|
||||
// Check model has grown since a new series was introduced
|
||||
assertThat(modelSizeStats2.getModelBytes(), greaterThan(modelSizeStats1.getModelBytes()));
|
||||
|
||||
// Check quantiles have changed
|
||||
assertThat(quantiles2, not(equalTo(quantiles1)));
|
||||
|
||||
List<ModelSnapshot> modelSnapshots = getModelSnapshots(job.getId());
|
||||
assertThat(modelSnapshots.size(), equalTo(2));
|
||||
|
||||
// Snapshots are sorted in descending timestamp order so we revert to the last of the list/earliest.
|
||||
assertThat(modelSnapshots.get(0).getTimestamp().getTime(), greaterThan(modelSnapshots.get(1).getTimestamp().getTime()));
|
||||
assertThat(getJob(job.getId()).get(0).getModelSnapshotId(), equalTo(modelSnapshots.get(0).getSnapshotId()));
|
||||
ModelSnapshot revertSnapshot = modelSnapshots.get(1);
|
||||
|
||||
assertThat(revertModelSnapshot(job.getId(), revertSnapshot.getSnapshotId()).status(), equalTo(RestStatus.OK));
|
||||
|
||||
// Check model_size_stats has been reverted
|
||||
assertThat(getJobStats(job.getId()).get(0).getModelSizeStats().getModelBytes(), equalTo(modelSizeStats1.getModelBytes()));
|
||||
|
||||
// Check quantiles have been reverted
|
||||
assertThat(getQuantiles(job.getId()), equalTo(quantiles1));
|
||||
}
|
||||
|
||||
private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception {
|
||||
Detector.Builder detector = new Detector.Builder("mean", "value");
|
||||
detector.setPartitionFieldName("series");
|
||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
|
||||
analysisConfig.setBucketSpan(bucketSpan);
|
||||
Job.Builder job = new Job.Builder(jobId);
|
||||
job.setAnalysisConfig(analysisConfig);
|
||||
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||
job.setDataDescription(dataDescription);
|
||||
registerJob(job);
|
||||
putJob(job);
|
||||
return job;
|
||||
}
|
||||
|
||||
private static List<String> generateData(long timestamp, TimeValue bucketSpan, int bucketCount, List<String> series,
|
||||
BiFunction<Integer, String, Double> timeAndSeriesToValueFunction) throws IOException {
|
||||
List<String> data = new ArrayList<>();
|
||||
long now = timestamp;
|
||||
for (int i = 0; i < bucketCount; i++) {
|
||||
for (String field : series) {
|
||||
Map<String, Object> record = new HashMap<>();
|
||||
record.put("time", now);
|
||||
record.put("value", timeAndSeriesToValueFunction.apply(i, field));
|
||||
record.put("series", field);
|
||||
data.add(createJsonRecord(record));
|
||||
}
|
||||
now += bucketSpan.getMillis();
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
private String getQuantiles(String jobId) {
|
||||
SearchResponse response = client().prepareSearch(".ml-state")
|
||||
.setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(jobId)))
|
||||
.setSize(1)
|
||||
.get();
|
||||
SearchHits hits = response.getHits();
|
||||
assertThat(hits.getTotalHits(), equalTo(1L));
|
||||
return hits.getAt(0).getSourceAsString();
|
||||
}
|
||||
}
|
|
@ -82,9 +82,6 @@ public class ModelSizeStatsTests extends AbstractSerializingTestCase<ModelSizeSt
|
|||
if (randomBoolean()) {
|
||||
stats.setMemoryStatus(randomFrom(MemoryStatus.values()));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
stats.setId(randomAlphaOfLengthBetween(1, 20));
|
||||
}
|
||||
return stats.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
|||
}
|
||||
if (randomBoolean()) {
|
||||
modelSizeStats = new ModelSizeStats.Builder(jobId);
|
||||
modelSizeStats.setId(randomAlphaOfLengthBetween(1, 20));
|
||||
modelSizeStats.setModelBytes(randomNonNegativeLong());
|
||||
} else {
|
||||
modelSizeStats = null;
|
||||
}
|
||||
|
|
|
@ -37,7 +37,17 @@ setup:
|
|||
"snapshot_id": "first",
|
||||
"description": "first snapshot",
|
||||
"latest_record_time_stamp": "2016-06-02T00:00:00Z",
|
||||
"latest_result_time_stamp": "2016-06-02T00:00:00Z"
|
||||
"latest_result_time_stamp": "2016-06-02T00:00:00Z",
|
||||
"model_size_stats": {
|
||||
"job_id": "revert-model-snapshot",
|
||||
"model_bytes": 10,
|
||||
"log_time": "2016-06-02T00:00:00Z"
|
||||
},
|
||||
"quantiles": {
|
||||
"job_id": "revert-model-snapshot",
|
||||
"timestamp": 1464825600000,
|
||||
"quantile_state": "quantiles-1"
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
|
@ -52,7 +62,17 @@ setup:
|
|||
"snapshot_id": "second",
|
||||
"description": "second snapshot",
|
||||
"latest_record_time_stamp": "2016-06-01T00:00:00Z",
|
||||
"latest_result_time_stamp": "2016-06-01T00:00:00Z"
|
||||
"latest_result_time_stamp": "2016-06-01T00:00:00Z",
|
||||
"model_size_stats": {
|
||||
"job_id": "revert-model-snapshot",
|
||||
"model_bytes": 20,
|
||||
"log_time": "2016-06-01T00:00:00Z"
|
||||
},
|
||||
"quantiles": {
|
||||
"job_id": "revert-model-snapshot",
|
||||
"timestamp": 1464739200000,
|
||||
"quantile_state": "quantiles-2"
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
|
|
Loading…
Reference in New Issue