diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java index 286fa40c55b..9eaad98141a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java @@ -230,16 +230,11 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen } void putJobStateIndexTemplate(BiConsumer listener) { - try (XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping(); - XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping(); - XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping()) { - + try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) { PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName()); templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName())); templateRequest.settings(mlStateIndexSettings()); - templateRequest.mapping(CategorizerState.TYPE, categorizerStateMapping); - templateRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping); - templateRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping); + templateRequest.mapping(ElasticsearchMappings.DOC_TYPE, stateMapping); templateRequest.version(Version.CURRENT.id); client.admin().indices().putTemplate(templateRequest, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 0394da6c53c..8cb8b433118 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -7,12 +7,9 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AnomalyCause; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; @@ -494,39 +491,6 @@ public class ElasticsearchMappings { .endObject(); } - /** - * {@link CategorizerState} mapping. - * The type is disabled so {@link CategorizerState} is not searchable and - * the '_all' field is disabled - * - * @return The builder - * @throws IOException On builder write error - */ - public static XContentBuilder categorizerStateMapping() throws IOException { - return jsonBuilder() - .startObject() - .startObject(CategorizerState.TYPE) - .field(ENABLED, false) - .endObject() - .endObject(); - } - - /** - * Create the Elasticsearch mapping for {@linkplain Quantiles}. - * The type is disabled as is the '_all' field as the document isn't meant to be searched. - *

- * The quantile state string is not searchable (enabled = false) as it could be - * very large. - */ - public static XContentBuilder quantilesMapping() throws IOException { - return jsonBuilder() - .startObject() - .startObject(Quantiles.TYPE.getPreferredName()) - .field(ENABLED, false) - .endObject() - .endObject(); - } - /** * Create the Elasticsearch mapping for {@linkplain CategoryDefinition}. * The '_all' field is disabled as the document isn't meant to be searched. @@ -552,16 +516,15 @@ public class ElasticsearchMappings { } /** - * Create the Elasticsearch mapping for {@linkplain ModelState}. - * The model state could potentially be huge (over a gigabyte in size) - * so all analysis by Elasticsearch is disabled. The only way to - * retrieve the model state is by knowing the ID of a particular - * document or by searching for all documents of this type. + * Create the Elasticsearch mapping for state. State could potentially be + * huge (target document size is 16MB and there can be many documents) so all + * analysis by Elasticsearch is disabled. The only way to retrieve state is + * by knowing the ID of a particular document. */ - public static XContentBuilder modelStateMapping() throws IOException { + public static XContentBuilder stateMapping() throws IOException { return jsonBuilder() .startObject() - .startObject(ModelState.TYPE.getPreferredName()) + .startObject(DOC_TYPE) .field(ENABLED, false) .endObject() .endObject(); @@ -603,7 +566,7 @@ public class ElasticsearchMappings { // end model size stats mapping builder.endObject(); - builder.startObject(Quantiles.TYPE.getPreferredName()) + builder.startObject(ModelSnapshot.QUANTILES.getPreferredName()) .field(ENABLED, false) .endObject() .startObject(ModelSnapshot.LATEST_RECORD_TIME.getPreferredName()) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 05618f74873..a3c9b153aa6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -56,11 +56,18 @@ public class JobDataDeleter { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); for (ModelSnapshot modelSnapshot : modelSnapshots) { for (String stateDocId : modelSnapshot.stateDocumentIds()) { - bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateDocId)); + bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ElasticsearchMappings.DOC_TYPE, stateDocId)); + } + // TODO: remove in 7.0 + for (String stateDocId : modelSnapshot.legacyStateDocumentIds()) { + bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE, stateDocId)); } bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()), ElasticsearchMappings.DOC_TYPE, ModelSnapshot.documentId(modelSnapshot))); + // TODO: remove in 7.0 + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()), + ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.legacyDocumentId(modelSnapshot))); } bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 94694d2598f..8735177cc92 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -65,7 +65,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; @@ -253,6 +252,8 @@ public class JobProvider { MultiSearchRequestBuilder msearch = client.prepareMultiSearch() .add(createLatestDataCountsSearch(resultsIndex, jobId)) .add(createLatestModelSizeStatsSearch(resultsIndex)) + // These next two document IDs never need to be the legacy ones due to the rule + // that you cannot open a 5.4 job in a subsequent version of the product .add(createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, job.getModelSnapshotId()))) .add(createDocIdSearch(stateIndex, Quantiles.documentId(jobId))); @@ -523,8 +524,8 @@ public class JobProvider { } String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}", - CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size); + LOGGER.trace("ES API CALL: search all of category definitions from index {} sort ascending {} from {} size {}", + indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size); SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions())); @@ -605,8 +606,8 @@ public class JobProvider { searchRequest.source().sort(sortField, descending ? SortOrder.DESC : SortOrder.ASC); } - LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}", - AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "", + LOGGER.trace("ES API CALL: search all of records from index {}{}{} with filter after sort from {} size {}", + indexName, (sb != null) ? " with sort" : "", secondarySort.isEmpty() ? "" : " with secondary sort", from, size); client.search(searchRequest, ActionListener.wrap(searchResponse -> { List results = new ArrayList<>(); @@ -639,8 +640,7 @@ public class JobProvider { .build(); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}", - () -> Influencer.RESULT_TYPE_VALUE, () -> indexName, + LOGGER.trace("ES API CALL: search all of influencers from index {}{} with filter from {} size {}", () -> indexName, () -> (query.getSortField() != null) ? " with sort " + (query.isSortDescending() ? "descending" : "ascending") + " on field " + query.getSortField() : "", query::getFrom, query::getSize); @@ -760,8 +760,8 @@ public class JobProvider { .order(sortDescending ? SortOrder.DESC : SortOrder.ASC); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - LOGGER.trace("ES API CALL: search all {}s from index {} sort ascending {} with filter after sort from {} size {}", - ModelSnapshot.TYPE, indexName, sortField, from, size); + LOGGER.trace("ES API CALL: search all model snapshots from index {} sort ascending {} with filter after sort from {} size {}", + indexName, sortField, from, size); SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions())); @@ -788,6 +788,9 @@ public class JobProvider { * stream. If there are multiple state documents they are separated using '\0' * when written to the stream. * + * Because we have a rule that we will not open a legacy job in the current product version + * we don't have to worry about legacy document IDs here. + * * @param jobId the job id * @param modelSnapshot the model snapshot to be restored * @param restoreStream the stream to write the state to @@ -797,9 +800,9 @@ public class JobProvider { // First try to restore model state. for (String stateDocId : modelSnapshot.stateDocumentIds()) { - LOGGER.trace("ES API CALL: get ID {} type {} from index {}", stateDocId, ModelState.TYPE, indexName); + LOGGER.trace("ES API CALL: get ID {} from index {}", stateDocId, indexName); - GetResponse stateResponse = client.prepareGet(indexName, ModelState.TYPE.getPreferredName(), stateDocId).get(); + GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, stateDocId).get(); if (!stateResponse.isExists()) { LOGGER.error("Expected {} documents for model state for {} snapshot {} but failed to find {}", modelSnapshot.getSnapshotDocCount(), jobId, modelSnapshot.getSnapshotId(), stateDocId); @@ -809,16 +812,15 @@ public class JobProvider { } // Secondly try to restore categorizer state. This must come after model state because that's - // the order the C++ process expects. - // There are no snapshots for this, so the IDs simply + // the order the C++ process expects. There are no snapshots for this, so the IDs simply // count up until a document is not found. It's NOT an error to have no categorizer state. int docNum = 0; while (true) { - String docId = CategorizerState.categorizerStateDocId(jobId, ++docNum); + String docId = CategorizerState.documentId(jobId, ++docNum); - LOGGER.trace("ES API CALL: get ID {} type {} from index {}", docId, CategorizerState.TYPE, indexName); + LOGGER.trace("ES API CALL: get ID {} from index {}", docId, indexName); - GetResponse stateResponse = client.prepareGet(indexName, CategorizerState.TYPE, docId).get(); + GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, docId).get(); if (!stateResponse.isExists()) { break; } @@ -850,8 +852,7 @@ public class JobProvider { public QueryPage modelPlot(String jobId, int from, int size) { SearchResponse searchResponse; String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}", - ModelPlot.RESULT_TYPE_VALUE, indexName, from, size); + LOGGER.trace("ES API CALL: search model plots from index {} from {} size {}", indexName, from, size); searchResponse = client.prepareSearch(indexName) .setIndicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 8b305ed6a2f..c107038e7c3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.Result; import java.io.IOException; import java.util.Collections; @@ -96,28 +95,26 @@ public class JobResultsPersister extends AbstractComponent { bucketWithoutRecords.setRecords(Collections.emptyList()); } try (XContentBuilder content = toXContentBuilder(bucketWithoutRecords)) { - logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", - jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch()); + String id = bucketWithoutRecords.getId(); + logger.trace("[{}] ES API CALL: index bucket to index [{}] with ID [{}]", jobId, indexName, id); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, bucketWithoutRecords.getId()).source(content)); + bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers()); } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}), e); + logger.error(new ParameterizedMessage("[{}] Error serialising bucket", jobId), e); } return this; } - private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) - throws IOException { + private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) throws IOException { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) { try (XContentBuilder content = toXContentBuilder(bucketInfluencer)) { // Need consistent IDs to ensure overwriting on renormalization String id = bucketInfluencer.getId(); - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", - jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id); + logger.trace("[{}] ES BULK ACTION: index bucket influencer to index [{}] with ID [{}]", jobId, indexName, id); bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); } } @@ -135,9 +132,9 @@ public class JobResultsPersister extends AbstractComponent { try { for (AnomalyRecord record : records) { try (XContentBuilder content = toXContentBuilder(record)) { - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", - jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId()); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, record.getId()).source(content)); + String id = record.getId(); + logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, id); + bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); } } } catch (IOException e) { @@ -158,9 +155,9 @@ public class JobResultsPersister extends AbstractComponent { try { for (Influencer influencer : influencers) { try (XContentBuilder content = toXContentBuilder(influencer)) { - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", - jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId()); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, influencer.getId()).source(content)); + String id = influencer.getId(); + logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, id); + bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); } } } catch (IOException e) { @@ -199,8 +196,7 @@ public class JobResultsPersister extends AbstractComponent { * @param category The category to be persisted */ public void persistCategoryDefinition(CategoryDefinition category) { - Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(), - category.getId()); + Persistable persistable = new Persistable(category.getJobId(), category, category.getId()); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId())).actionGet(); // Don't commit as we expect masses of these updates and they're not @@ -211,8 +207,7 @@ public class JobResultsPersister extends AbstractComponent { * Persist the quantiles (blocking) */ public void persistQuantiles(Quantiles quantiles) { - Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), - Quantiles.documentId(quantiles.getJobId())); + Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet(); } @@ -220,8 +215,7 @@ public class JobResultsPersister extends AbstractComponent { * Persist the quantiles (async) */ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener listener) { - Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), - Quantiles.documentId(quantiles.getJobId())); + Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); persistable.setRefreshPolicy(refreshPolicy); persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener); } @@ -230,8 +224,7 @@ public class JobResultsPersister extends AbstractComponent { * Persist a model snapshot description */ public void persistModelSnapshot(ModelSnapshot modelSnapshot) { - Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(), - ModelSnapshot.documentId(modelSnapshot)); + Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot)); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); } @@ -241,7 +234,7 @@ public class JobResultsPersister extends AbstractComponent { public void persistModelSizeStats(ModelSizeStats modelSizeStats) { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); - Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.getId()); + Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId()); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet(); // Don't commit as we expect masses of these updates and they're only // for information at the API level @@ -254,7 +247,7 @@ public class JobResultsPersister extends AbstractComponent { ActionListener listener) { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); - Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.getId()); + Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId()); persistable.setRefreshPolicy(refreshPolicy); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), listener); // Don't commit as we expect masses of these updates and they're only @@ -265,7 +258,7 @@ public class JobResultsPersister extends AbstractComponent { * Persist model plot output */ public void persistModelPlot(ModelPlot modelPlot) { - Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, Result.TYPE.getPreferredName(), modelPlot.getId()); + Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, modelPlot.getId()); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelPlot.getJobId())).actionGet(); // Don't commit as we expect masses of these updates and they're not // read again by this process @@ -283,9 +276,8 @@ public class JobResultsPersister extends AbstractComponent { * called to commit the writes to the datastore. * * @param jobId The job Id - * @return True if successful */ - public boolean commitResultWrites(String jobId) { + public void commitResultWrites(String jobId) { // We refresh using the read alias in order to ensure all indices will // be refreshed even if a rollover occurs in between. String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); @@ -295,7 +287,6 @@ public class JobResultsPersister extends AbstractComponent { RefreshRequest refreshRequest = new RefreshRequest(indexName); refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); client.admin().indices().refresh(refreshRequest).actionGet(); - return true; } /** @@ -303,19 +294,17 @@ public class JobResultsPersister extends AbstractComponent { * immediately searchable. * * @param jobId The job Id - * @return True if successful * */ - public boolean commitStateWrites(String jobId) { + public void commitStateWrites(String jobId) { String indexName = AnomalyDetectorsIndex.jobStateIndexName(); // Refresh should wait for Lucene to make the data searchable logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); RefreshRequest refreshRequest = new RefreshRequest(indexName); refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); client.admin().indices().refresh(refreshRequest).actionGet(); - return true; } - XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { + private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { XContentBuilder builder = jsonBuilder(); obj.toXContent(builder, ToXContent.EMPTY_PARAMS); return builder; @@ -325,14 +314,12 @@ public class JobResultsPersister extends AbstractComponent { private final String jobId; private final ToXContent object; - private final String description; private final String id; private WriteRequest.RefreshPolicy refreshPolicy; - Persistable(String jobId, ToXContent object, String description, String id) { + Persistable(String jobId, ToXContent object, String id) { this.jobId = jobId; this.object = object; - this.description = description; this.id = id; this.refreshPolicy = WriteRequest.RefreshPolicy.NONE; } @@ -349,15 +336,12 @@ public class JobResultsPersister extends AbstractComponent { void persist(String indexName, ActionListener listener) { logCall(indexName); - // TODO no_release: this is a temporary hack until we also switch state index to have doc type in which case - // we can remove this line and use DOC_TYPE directly in the index request - String type = AnomalyDetectorsIndex.jobStateIndexName().equals(indexName) ? description : DOC_TYPE; try (XContentBuilder content = toXContentBuilder(object)) { - IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content).setRefreshPolicy(refreshPolicy); + IndexRequest indexRequest = new IndexRequest(indexName, DOC_TYPE, id).source(content).setRefreshPolicy(refreshPolicy); client.index(indexRequest, listener); } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, description}), e); + logger.error(new ParameterizedMessage("[{}] Error writing [{}]", jobId, (id == null) ? "auto-generated ID" : id), e); IndexResponse.Builder notCreatedResponse = new IndexResponse.Builder(); notCreatedResponse.setCreated(false); listener.onResponse(notCreatedResponse.build()); @@ -366,9 +350,9 @@ public class JobResultsPersister extends AbstractComponent { private void logCall(String indexName) { if (id != null) { - logger.trace("[{}] ES API CALL: index {} to index {} with ID {}", jobId, description, indexName, id); + logger.trace("[{}] ES API CALL: to index {} with ID [{}]", jobId, indexName, id); } else { - logger.trace("[{}] ES API CALL: index {} to index {} with auto-generated ID", jobId, description, indexName); + logger.trace("[{}] ES API CALL: to index {} with auto-generated ID", jobId, indexName); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java index a130017c3d9..9a1f14e589f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; @@ -23,11 +22,9 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; -import org.elasticsearch.index.query.WildcardQueryBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; @@ -96,7 +93,7 @@ public class JobStorageDeletionTask extends Task { // Step 3. Delete quantiles done, delete the categorizer state ActionListener deleteQuantilesHandler = ActionListener.wrap( - response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler), + response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler), failureHandler); // Step 2. Delete state done, delete the quantiles @@ -109,12 +106,12 @@ public class JobStorageDeletionTask extends Task { } private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { - // The quantiles doc Id changed in v5.5 so delete both the old and new format + // The quantiles type and doc Id changed in v5.5 so delete both the old and new format BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(), + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, Quantiles.documentId(jobId))); bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(), - jobId + "-" + Quantiles.TYPE.getPreferredName())); + Quantiles.legacyDocumentId(jobId))); bulkRequestBuilder.execute(ActionListener.wrap( response -> finishedHandler.onResponse(true), e -> { @@ -138,26 +135,35 @@ public class JobStorageDeletionTask extends Task { listener::onFailure); } - private void deleteCategorizerState(String jobId, Client client, ActionListener finishedHandler) { - SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName()); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); - request.setSlices(5); - - searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); - WildcardQueryBuilder query = new WildcardQueryBuilder(UidFieldMapper.NAME, Uid.createUid(CategorizerState.TYPE, jobId + "#*")); - searchRequest.source(new SearchSourceBuilder().query(query)); - client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(BulkByScrollResponse bulkByScrollResponse) { - finishedHandler.onResponse(true); - } - - @Override - public void onFailure(Exception e) { - logger.error("[" + jobId + "] Failed to delete categorizer state for job.", e); - finishedHandler.onFailure(e); - } - }); + private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener finishedHandler) { + // The categorizer state type and doc Id changed in v5.5 so delete both the old and new format + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + CategorizerState.documentId(jobId, docNum))); + // TODO: remove in 7.0 + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, + CategorizerState.legacyDocumentId(jobId, docNum))); + bulkRequestBuilder.execute(ActionListener.wrap( + response -> { + // If we successfully deleted either document try the next one; if not we're done + for (BulkItemResponse item : response.getItems()) { + if (item.status() == RestStatus.OK) { + // There's an assumption here that there won't be very many categorizer + // state documents, so the recursion won't go more than, say, 5 levels deep + deleteCategorizerState(jobId, client, docNum + 1, finishedHandler); + return; + } + } + finishedHandler.onResponse(true); + }, + e -> { + // It's not a problem for us if the index wasn't found - it's equivalent to document not found + if (e instanceof IndexNotFoundException) { + finishedHandler.onResponse(true); + } else { + finishedHandler.onFailure(e); + } + })); } private void deleteAlias(String jobId, String aliasName, String indexName, Client client, ActionListener finishedHandler ) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java index 52231664d98..fe6e076d18b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java @@ -14,6 +14,8 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings; import java.io.IOException; import java.io.InputStream; @@ -85,7 +87,7 @@ public class StateProcessor extends AbstractComponent { void persist(String jobId, BytesReference bytes) throws IOException { logger.trace("[{}] ES API CALL: bulk index", jobId); BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(bytes, null, null, XContentType.JSON); + bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON); if (bulkRequest.numberOfActions() > 0) { client.bulk(bulkRequest).actionGet(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/CategorizerState.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/CategorizerState.java index bab37a2a646..697a87b3e69 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/CategorizerState.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/CategorizerState.java @@ -7,18 +7,25 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.state; /** - * The categorizer state does not need to be loaded on the Java side. - * However, the Java process DOES set up a mapping on the Elasticsearch - * index to tell Elasticsearch not to analyse the categorizer state documents - * in any way. + * The categorizer state does not need to be understood on the Java side. + * The Java code only needs to know how to form the document IDs so that + * it can retrieve and delete the correct documents. */ public class CategorizerState { + /** - * The type of this class used when persisting the data + * Legacy type, now used only as a discriminant in the document ID */ public static final String TYPE = "categorizer_state"; - public static final String categorizerStateDocId(String jobId, int docNum) { + public static final String documentId(String jobId, int docNum) { + return jobId + "_" + TYPE + "#" + docNum; + } + + /** + * This is how the IDs were formed in v5.4 + */ + public static final String legacyDocumentId(String jobId, int docNum) { return jobId + "#" + docNum; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java index 6c27cc2ec9c..2da848cca75 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java @@ -42,13 +42,14 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { public static final ParseField SNAPSHOT_DOC_COUNT = new ParseField("snapshot_doc_count"); public static final ParseField LATEST_RECORD_TIME = new ParseField("latest_record_time_stamp"); public static final ParseField LATEST_RESULT_TIME = new ParseField("latest_result_time_stamp"); + public static final ParseField QUANTILES = new ParseField("quantiles"); public static final ParseField RETAIN = new ParseField("retain"); // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("model_snapshots"); /** - * Elasticsearch type + * Legacy type, now used only as a discriminant in the document ID */ public static final ParseField TYPE = new ParseField("model_snapshot"); @@ -86,7 +87,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { throw new IllegalArgumentException( "unexpected token [" + p.currentToken() + "] for [" + LATEST_RESULT_TIME.getPreferredName() + "]"); }, LATEST_RESULT_TIME, ValueType.VALUE); - PARSER.declareObject(Builder::setQuantiles, Quantiles.PARSER, Quantiles.TYPE); + PARSER.declareObject(Builder::setQuantiles, Quantiles.PARSER, QUANTILES); PARSER.declareBoolean(Builder::setRetain, RETAIN); } @@ -184,7 +185,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { latestResultTimeStamp.getTime()); } if (quantiles != null) { - builder.field(Quantiles.TYPE.getPreferredName(), quantiles); + builder.field(QUANTILES.getPreferredName(), quantiles); } builder.field(RETAIN.getPreferredName(), retain); builder.endObject(); @@ -260,32 +261,53 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { && this.retain == that.retain; } - private String stateDocumentPrefix() { - return jobId + "-" + snapshotId; - } - public List stateDocumentIds() { - String prefix = stateDocumentPrefix(); List stateDocumentIds = new ArrayList<>(snapshotDocCount); // The state documents count suffices are 1-based for (int i = 1; i <= snapshotDocCount; i++) { - stateDocumentIds.add(prefix + '#' + i); + stateDocumentIds.add(ModelState.documentId(jobId, snapshotId, i)); + } + return stateDocumentIds; + } + + /** + * This is how the IDs were formed in v5.4 + */ + public List legacyStateDocumentIds() { + List stateDocumentIds = new ArrayList<>(snapshotDocCount); + // The state documents count suffices are 1-based + for (int i = 1; i <= snapshotDocCount; i++) { + stateDocumentIds.add(ModelState.legacyDocumentId(jobId, snapshotId, i)); } return stateDocumentIds; } public static String documentIdPrefix(String jobId) { - return jobId + "_model_snapshot_"; + return jobId + "_" + TYPE + "_"; } public static String documentId(ModelSnapshot snapshot) { return documentId(snapshot.getJobId(), snapshot.getSnapshotId()); } + /** + * This is how the IDs were formed in v5.4 + */ + public static String legacyDocumentId(ModelSnapshot snapshot) { + return legacyDocumentId(snapshot.getJobId(), snapshot.getSnapshotId()); + } + public static String documentId(String jobId, String snapshotId) { return documentIdPrefix(jobId) + snapshotId; } + /** + * This is how the IDs were formed in v5.4 + */ + public static String legacyDocumentId(String jobId, String snapshotId) { + return jobId + "-" + snapshotId; + } + public static ModelSnapshot fromJson(BytesReference bytesReference) { try (XContentParser parser = XContentFactory.xContent(bytesReference).createParser(NamedXContentRegistry.EMPTY, bytesReference)) { return PARSER.apply(parser, null).build(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelState.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelState.java index 576693cf79a..3dc319ac82b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelState.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelState.java @@ -6,22 +6,28 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.state; -import org.elasticsearch.common.ParseField; - /** - * The serialised models can get very large and only the C++ code - * understands how to decode them, hence there is no reason to load - * them into the Java process. - * However, the Java process DOES set up a mapping on the Elasticsearch - * index to tell Elasticsearch not to analyse the model state documents - * in any way. (Otherwise Elasticsearch would go into a spin trying to - * make sense of such large JSON documents.) + * The model state does not need to be understood on the Java side. + * The Java code only needs to know how to form the document IDs so that + * it can retrieve and delete the correct documents. */ public class ModelState { + /** - * The type of this class used when persisting the data + * Legacy type, now used only as a discriminant in the document ID */ - public static final ParseField TYPE = new ParseField("model_state"); + public static final String TYPE = "model_state"; + + public static final String documentId(String jobId, String snapshotId, int docNum) { + return jobId + "_" + TYPE + "_" + snapshotId + "#" + docNum; + } + + /** + * This is how the IDs were formed in v5.4 + */ + public static final String legacyDocumentId(String jobId, String snapshotId, int docNum) { + return jobId + "-" + snapshotId + "#" + docNum; + } private ModelState() { } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/Quantiles.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/Quantiles.java index 552fd6031cd..66aa198ab61 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/Quantiles.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/Quantiles.java @@ -31,7 +31,7 @@ public class Quantiles extends ToXContentToBytes implements Writeable { public static final ParseField QUANTILE_STATE = new ParseField("quantile_state"); /** - * Elasticsearch type + * Legacy type, now used only as a discriminant in the document ID */ public static final ParseField TYPE = new ParseField("quantiles"); @@ -45,7 +45,14 @@ public class Quantiles extends ToXContentToBytes implements Writeable { } public static String documentId(String jobId) { - return jobId + "_" + TYPE.getPreferredName(); + return jobId + "_" + TYPE; + } + + /** + * This is how the IDs were formed in v5.4 + */ + public static String legacyDocumentId(String jobId) { + return jobId + "-" + TYPE; } private final String jobId; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java index 571e33bf77a..23a47633c05 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java @@ -24,7 +24,11 @@ import java.util.TreeSet; public class CategoryDefinition extends ToXContentToBytes implements Writeable { + /** + * Legacy type, now used only as a discriminant in the document ID + */ public static final ParseField TYPE = new ParseField("category_definition"); + public static final ParseField CATEGORY_ID = new ParseField("category_id"); public static final ParseField TERMS = new ParseField("terms"); public static final ParseField REGEX = new ParseField("regex"); @@ -82,7 +86,7 @@ public class CategoryDefinition extends ToXContentToBytes implements Writeable { } public String getId() { - return jobId + "_category_definition_" + categoryId; + return jobId + "_" + TYPE + "_" + categoryId; } public long getCategoryId() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java index f0928852c3a..b6ffef7b7b8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java @@ -25,10 +25,8 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.notifications.AuditMessage; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; @@ -236,10 +234,8 @@ public class MachineLearningTemplateRegistryTests extends ESTestCase { PutIndexTemplateRequest request = captor.getValue(); assertNotNull(request); assertEquals(templateRegistry.mlStateIndexSettings().build(), request.settings()); - assertTrue(request.mappings().containsKey(CategorizerState.TYPE)); - assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName())); - assertEquals(3, request.mappings().size()); + assertTrue(request.mappings().containsKey(ElasticsearchMappings.DOC_TYPE)); + assertEquals(1, request.mappings().size()); assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()), request.patterns()); assertEquals(new Integer(Version.CURRENT.id), request.version()); }); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index d22e12420cb..35b371be034 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -801,6 +801,7 @@ public class JobProviderTests extends ESTestCase { } public void testRestoreStateToStream() throws Exception { + String snapshotId = "123"; Map categorizerState = new HashMap<>(); categorizerState.put("catName", "catVal"); GetResponse categorizerStateGetResponse1 = createGetResponse(true, categorizerState); @@ -812,16 +813,18 @@ public class JobProviderTests extends ESTestCase { GetResponse modelStateGetResponse2 = createGetResponse(true, modelState); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "#1", categorizerStateGetResponse1) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "#2", categorizerStateGetResponse2) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), JOB_ID + "-123#1", - modelStateGetResponse1) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), JOB_ID + "-123#2", - modelStateGetResponse2); + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + CategorizerState.documentId(JOB_ID, 1), categorizerStateGetResponse1) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + CategorizerState.documentId(JOB_ID, 2), categorizerStateGetResponse2) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + ModelState.documentId(JOB_ID, snapshotId, 1), modelStateGetResponse1) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + ModelState.documentId(JOB_ID, snapshotId, 2), modelStateGetResponse2); JobProvider provider = createProvider(clientBuilder.build()); - ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID).setSnapshotId("123").setSnapshotDocCount(2).build(); + ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID).setSnapshotId(snapshotId).setSnapshotDocCount(2).build(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java index 6cb73142068..3e3402f3e37 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java @@ -183,11 +183,12 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase + { + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"mlcategory"}], + "categorization_field_name": "message" + }, + "data_description" : { + "format":"xcontent" + } + } + - match: { job_id: "index-layout-state-job" } + + - do: + index: + index: .ml-anomalies-shared + type: doc + id: "index-layout-state-job_model_snapshot_123" + body: > + { + "job_id" : "index-layout-state-job", + "timestamp": "2017-05-02T00:00:00Z", + "snapshot_id": "123", + "snapshot_doc_count": 2, + "retain": false + } + + - do: + index: + index: .ml-anomalies-shared + type: model_snapshot + id: "index-layout-state-job-456" + body: > + { + "job_id" : "index-layout-state-job", + "timestamp": "2017-05-01T00:00:00Z", + "snapshot_id": "456", + "snapshot_doc_count": 2, + "retain": false + } + + - do: + index: + index: .ml-state + type: doc + id: index-layout-state-job_model_state_123#1 + body: + state: new-model-state + + - do: + index: + index: .ml-state + type: doc + id: index-layout-state-job_model_state_123#2 + body: + state: more-new-model-state + + - do: + index: + index: .ml-state + type: model_state + id: index-layout-state-job-456#1 + body: + state: old-model-state + + - do: + index: + index: .ml-state + type: model_state + id: index-layout-state-job-456#2 + body: + state: more-old-model-state + + - do: + index: + index: .ml-state + type: doc + id: index-layout-state-job_categorizer_state#1 + body: + state: new-categorizer-state + + - do: + index: + index: .ml-state + type: doc + id: index-layout-state-job_categorizer_state#2 + body: + state: more-new-categorizer-state + + - do: + index: + index: .ml-state + type: categorizer_state + id: index-layout-state-job#1 + body: + state: old-categorizer-state + + - do: + index: + index: .ml-state + type: categorizer_state + id: index-layout-state-job#2 + body: + state: more-old-categorizer-state + + - do: + indices.refresh: {} + + - do: + xpack.ml.delete_job: + job_id: "index-layout-state-job" + - match: { acknowledged: true } + + - do: + indices.refresh: {} + + - do: + count: + index: .ml-anomalies-shared + - match: {count: 0} + + - do: + count: + index: .ml-state + - match: {count: 0}