diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetModelSnapshotsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetModelSnapshotsAction.java index ca49e972409..43978b74297 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetModelSnapshotsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetModelSnapshotsAction.java @@ -97,7 +97,7 @@ extends Action @@ -348,17 +355,46 @@ public class JobManager extends AbstractComponent { public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener actionListener, ModelSnapshot modelSnapshot) { + final JobResultsPersister persister = new JobResultsPersister(settings, client); + + // Step 2. After the model size stats is persisted, also persist the snapshot's quantiles and respond + // ------- + CheckedConsumer modelSizeStatsResponseHandler = response -> { + persister.persistQuantiles(modelSnapshot.getQuantiles(), WriteRequest.RefreshPolicy.IMMEDIATE, + ActionListener.wrap(quantilesResponse -> { + // The quantiles can be large, and totally dominate the output - + // it's clearer to remove them as they are not necessary for the revert op + ModelSnapshot snapshotWithoutQuantiles = new ModelSnapshot.Builder(modelSnapshot).setQuantiles(null).build(); + actionListener.onResponse(new RevertModelSnapshotAction.Response(snapshotWithoutQuantiles)); + }, actionListener::onFailure)); + }; + + // Step 1. When the model_snapshot_id is updated on the job, persist the snapshot's model size stats with a touched log time + // so that a search for the latest model size stats returns the reverted one. + // ------- + CheckedConsumer updateHandler = response -> { + if (response) { + ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSnapshot.getModelSizeStats()) + .setLogTime(new Date()).build(); + persister.persistModelSizeStats(revertedModelSizeStats, WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap( + modelSizeStatsResponseHandler::accept, actionListener::onFailure)); + } + }; + + // Step 0. Kick off the chain of callbacks with the cluster state update + // ------- clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), - new AckedClusterStateUpdateTask(request, actionListener) { + new AckedClusterStateUpdateTask(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) { @Override - protected RevertModelSnapshotAction.Response newResponse(boolean acknowledged) { + protected Boolean newResponse(boolean acknowledged) { if (acknowledged) { auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); - return new RevertModelSnapshotAction.Response(modelSnapshot); + return true; } - throw new IllegalStateException("Could not revert modelSnapshot on job [" - + request.getJobId() + "], not acknowledged by master."); + actionListener.onFailure(new IllegalStateException("Could not revert modelSnapshot on job [" + + request.getJobId() + "], not acknowledged by master.")); + return false; } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index ab9fc6bd79a..c8141290bcd 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -6,11 +6,16 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; @@ -32,7 +37,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.function.Consumer; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -221,18 +225,28 @@ public class JobResultsPersister extends AbstractComponent { public void persistCategoryDefinition(CategoryDefinition category) { Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(), CategoryDefinition.documentId(category.getJobId(), Long.toString(category.getCategoryId()))); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(category.getJobId())); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(category.getJobId())).actionGet(); // Don't commit as we expect masses of these updates and they're not // read again by this process } /** - * Persist the quantiles + * Persist the quantiles (blocking) */ public void persistQuantiles(Quantiles quantiles) { Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), Quantiles.documentId(quantiles.getJobId())); - persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()); + persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet(); + } + + /** + * Persist the quantiles (async) + */ + public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener listener) { + Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), + Quantiles.documentId(quantiles.getJobId())); + persistable.setRefreshPolicy(refreshPolicy); + persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener); } /** @@ -241,18 +255,31 @@ public class JobResultsPersister extends AbstractComponent { public void persistModelSnapshot(ModelSnapshot modelSnapshot) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())).actionGet(); + } + + /** + * Persist the memory usage data (blocking) + */ + public void persistModelSizeStats(ModelSizeStats modelSizeStats) { + String jobId = modelSizeStats.getJobId(); + logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); + Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.documentId()); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).actionGet(); + // Don't commit as we expect masses of these updates and they're only + // for information at the API level } /** * Persist the memory usage data */ - public void persistModelSizeStats(ModelSizeStats modelSizeStats) { + public void persistModelSizeStats(ModelSizeStats modelSizeStats, WriteRequest.RefreshPolicy refreshPolicy, + ActionListener listener) { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); - Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), - modelSizeStats.documentId()); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); + Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.documentId()); + persistable.setRefreshPolicy(refreshPolicy); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), listener); // Don't commit as we expect masses of these updates and they're only // for information at the API level } @@ -262,7 +289,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistModelPlot(ModelPlot modelPlot) { Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, Result.TYPE.getPreferredName(), null); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelPlot.getJobId())); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelPlot.getJobId())).actionGet(); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -320,29 +347,37 @@ public class JobResultsPersister extends AbstractComponent { private final ToXContent object; private final String type; private final String id; + private WriteRequest.RefreshPolicy refreshPolicy; Persistable(String jobId, ToXContent object, String type, String id) { this.jobId = jobId; this.object = object; this.type = type; this.id = id; + this.refreshPolicy = WriteRequest.RefreshPolicy.NONE; } - boolean persist(String indexName) { - if (object == null) { - logger.warn("[{}] No {} to persist for job ", jobId, type); - return false; - } + void setRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; + } + ActionFuture persist(String indexName) { + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + persist(indexName, actionFuture); + return actionFuture; + } + + void persist(String indexName, ActionListener listener) { logCall(indexName); try (XContentBuilder content = toXContentBuilder(object)) { - IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content); - client.index(indexRequest).actionGet(); - return true; + IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content).setRefreshPolicy(refreshPolicy); + client.index(indexRequest, listener); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e); - return false; + IndexResponse.Builder notCreatedResponse = new IndexResponse.Builder(); + notCreatedResponse.setCreated(false); + listener.onResponse(notCreatedResponse.build()); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStats.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStats.java index 103a9282505..075f45c4d03 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStats.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStats.java @@ -116,7 +116,7 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { private final Date timestamp; private final Date logTime; - private ModelSizeStats(String jobId, String id, long modelBytes, long totalByFieldCount, long totalOverFieldCount, + private ModelSizeStats(String jobId, long modelBytes, long totalByFieldCount, long totalOverFieldCount, long totalPartitionFieldCount, long bucketAllocationFailuresCount, MemoryStatus memoryStatus, Date timestamp, Date logTime) { this.jobId = jobId; @@ -267,7 +267,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { public static class Builder { private final String jobId; - private String id; private long modelBytes; private long totalByFieldCount; private long totalOverFieldCount; @@ -279,50 +278,65 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { public Builder(String jobId) { this.jobId = jobId; - id = RESULT_TYPE_FIELD.getPreferredName(); memoryStatus = MemoryStatus.OK; logTime = new Date(); } - public void setId(String id) { - this.id = Objects.requireNonNull(id); + public Builder(ModelSizeStats modelSizeStats) { + this.jobId = modelSizeStats.jobId; + this.modelBytes = modelSizeStats.modelBytes; + this.totalByFieldCount = modelSizeStats.totalByFieldCount; + this.totalOverFieldCount = modelSizeStats.totalOverFieldCount; + this.totalPartitionFieldCount = modelSizeStats.totalPartitionFieldCount; + this.bucketAllocationFailuresCount = modelSizeStats.bucketAllocationFailuresCount; + this.memoryStatus = modelSizeStats.memoryStatus; + this.timestamp = modelSizeStats.timestamp; + this.logTime = modelSizeStats.logTime; } - public void setModelBytes(long modelBytes) { + public Builder setModelBytes(long modelBytes) { this.modelBytes = modelBytes; + return this; } - public void setTotalByFieldCount(long totalByFieldCount) { + public Builder setTotalByFieldCount(long totalByFieldCount) { this.totalByFieldCount = totalByFieldCount; + return this; } - public void setTotalPartitionFieldCount(long totalPartitionFieldCount) { + public Builder setTotalPartitionFieldCount(long totalPartitionFieldCount) { this.totalPartitionFieldCount = totalPartitionFieldCount; + return this; } - public void setTotalOverFieldCount(long totalOverFieldCount) { + public Builder setTotalOverFieldCount(long totalOverFieldCount) { this.totalOverFieldCount = totalOverFieldCount; + return this; } - public void setBucketAllocationFailuresCount(long bucketAllocationFailuresCount) { + public Builder setBucketAllocationFailuresCount(long bucketAllocationFailuresCount) { this.bucketAllocationFailuresCount = bucketAllocationFailuresCount; + return this; } - public void setMemoryStatus(MemoryStatus memoryStatus) { + public Builder setMemoryStatus(MemoryStatus memoryStatus) { Objects.requireNonNull(memoryStatus, "[" + MEMORY_STATUS_FIELD.getPreferredName() + "] must not be null"); this.memoryStatus = memoryStatus; + return this; } - public void setTimestamp(Date timestamp) { + public Builder setTimestamp(Date timestamp) { this.timestamp = timestamp; + return this; } - public void setLogTime(Date logTime) { + public Builder setLogTime(Date logTime) { this.logTime = logTime; + return this; } public ModelSizeStats build() { - return new ModelSizeStats(jobId, id, modelBytes, totalByFieldCount, totalOverFieldCount, totalPartitionFieldCount, + return new ModelSizeStats(jobId, modelBytes, totalByFieldCount, totalOverFieldCount, totalPartitionFieldCount, bucketAllocationFailuresCount, memoryStatus, timestamp, logTime); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 1922524eeae..1a3663eefdc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -313,7 +313,6 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { private ModelSizeStats createModelSizeStats() { ModelSizeStats.Builder builder = new ModelSizeStats.Builder(JOB_ID); - builder.setId(randomAlphaOfLength(20)); builder.setTimestamp(new Date(randomNonNegativeLong())); builder.setLogTime(new Date(randomNonNegativeLong())); builder.setBucketAllocationFailuresCount(randomNonNegativeLong()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index d1a1e4f1b76..8e720fb5c26 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.GetBucketsAction; import org.elasticsearch.xpack.ml.action.GetCategoriesAction; +import org.elasticsearch.xpack.ml.action.GetJobsAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.ml.action.GetRecordsAction; @@ -35,6 +36,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.ml.action.PutJobAction; +import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction; @@ -203,6 +205,11 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)), 30, TimeUnit.SECONDS); } + protected List getJob(String jobId) { + GetJobsAction.Request request = new GetJobsAction.Request(jobId); + return client().execute(GetJobsAction.INSTANCE, request).actionGet().getResponse().results(); + } + protected List getJobStats(String jobId) { GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet(); @@ -235,6 +242,11 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { return response.getPage().results(); } + protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId) { + RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId, snapshotId); + return client().execute(RevertModelSnapshotAction.INSTANCE, request).actionGet(); + } + protected List getCategories(String jobId) { GetCategoriesAction.Request getCategoriesRequest = new GetCategoriesAction.Request(jobId); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java new file mode 100644 index 00000000000..2d423e48d30 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; +import org.junit.After; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; + +/** + * This test pushes data through a job in 2 runs creating + * 2 model snapshots. It then reverts to the earlier snapshot + * and asserts the reversion worked as expected. + */ +public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { + + @After + public void tearDownData() throws Exception { + cleanUp(); + } + + public void test() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueHours(1); + long startTime = 1491004800000L; + + Job.Builder job = buildAndRegisterJob("revert-model-snapshot-split-it-job", bucketSpan); + openJob(job.getId()); + postData(job.getId(), generateData(startTime, bucketSpan, 10, Arrays.asList("foo"), + (bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0).stream().collect(Collectors.joining())); + closeJob(job.getId()); + + ModelSizeStats modelSizeStats1 = getJobStats(job.getId()).get(0).getModelSizeStats(); + String quantiles1 = getQuantiles(job.getId()); + + // We need to wait a second to ensure the second time around model snapshot will have a different ID (it depends on epoch seconds) + awaitBusy(() -> false, 1, TimeUnit.SECONDS); + + openJob(job.getId()); + postData(job.getId(), generateData(startTime + 10 * bucketSpan.getMillis(), bucketSpan, 10, Arrays.asList("foo", "bar"), + (bucketIndex, series) -> 10.0).stream().collect(Collectors.joining())); + closeJob(job.getId()); + + ModelSizeStats modelSizeStats2 = getJobStats(job.getId()).get(0).getModelSizeStats(); + String quantiles2 = getQuantiles(job.getId()); + + // Check model has grown since a new series was introduced + assertThat(modelSizeStats2.getModelBytes(), greaterThan(modelSizeStats1.getModelBytes())); + + // Check quantiles have changed + assertThat(quantiles2, not(equalTo(quantiles1))); + + List modelSnapshots = getModelSnapshots(job.getId()); + assertThat(modelSnapshots.size(), equalTo(2)); + + // Snapshots are sorted in descending timestamp order so we revert to the last of the list/earliest. + assertThat(modelSnapshots.get(0).getTimestamp().getTime(), greaterThan(modelSnapshots.get(1).getTimestamp().getTime())); + assertThat(getJob(job.getId()).get(0).getModelSnapshotId(), equalTo(modelSnapshots.get(0).getSnapshotId())); + ModelSnapshot revertSnapshot = modelSnapshots.get(1); + + assertThat(revertModelSnapshot(job.getId(), revertSnapshot.getSnapshotId()).status(), equalTo(RestStatus.OK)); + + // Check model_size_stats has been reverted + assertThat(getJobStats(job.getId()).get(0).getModelSizeStats().getModelBytes(), equalTo(modelSizeStats1.getModelBytes())); + + // Check quantiles have been reverted + assertThat(getQuantiles(job.getId()), equalTo(quantiles1)); + } + + private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { + Detector.Builder detector = new Detector.Builder("mean", "value"); + detector.setPartitionFieldName("series"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + Job.Builder job = new Job.Builder(jobId); + job.setAnalysisConfig(analysisConfig); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + job.setDataDescription(dataDescription); + registerJob(job); + putJob(job); + return job; + } + + private static List generateData(long timestamp, TimeValue bucketSpan, int bucketCount, List series, + BiFunction timeAndSeriesToValueFunction) throws IOException { + List data = new ArrayList<>(); + long now = timestamp; + for (int i = 0; i < bucketCount; i++) { + for (String field : series) { + Map record = new HashMap<>(); + record.put("time", now); + record.put("value", timeAndSeriesToValueFunction.apply(i, field)); + record.put("series", field); + data.add(createJsonRecord(record)); + } + now += bucketSpan.getMillis(); + } + return data; + } + + private String getQuantiles(String jobId) { + SearchResponse response = client().prepareSearch(".ml-state") + .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(jobId))) + .setSize(1) + .get(); + SearchHits hits = response.getHits(); + assertThat(hits.getTotalHits(), equalTo(1L)); + return hits.getAt(0).getSourceAsString(); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStatsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStatsTests.java index a5f86a0d9ac..b46eb8e829e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStatsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStatsTests.java @@ -82,9 +82,6 @@ public class ModelSizeStatsTests extends AbstractSerializingTestCase