From fffe424625c8bed407e5b6de94be32eb4c2f7fb4 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 26 May 2017 10:51:29 +0100 Subject: [PATCH] [ML] Switch state to use _type "doc" (elastic/x-pack-elasticsearch#1552) This commit means that newly created ML state indices will have a single type named "doc", and newly persisted state documents will have type "doc" too. Retrieving state is only supported for type "doc". When deleting state, documents with the old types are deleted in addition to those with type "doc". This means jobs created by the beta can be fully deleted. Relates elastic/x-pack-elasticsearch#668 Original commit: elastic/x-pack-elasticsearch@29c07d40f15baee93958bfd8a17f93b5a5c827ef --- .../ml/MachineLearningTemplateRegistry.java | 9 +- .../persistence/ElasticsearchMappings.java | 51 +------ .../ml/job/persistence/JobDataDeleter.java | 9 +- .../xpack/ml/job/persistence/JobProvider.java | 37 ++--- .../job/persistence/JobResultsPersister.java | 70 ++++----- .../persistence/JobStorageDeletionTask.java | 62 ++++---- .../autodetect/output/StateProcessor.java | 4 +- .../autodetect/state/CategorizerState.java | 19 ++- .../autodetect/state/ModelSnapshot.java | 42 ++++-- .../process/autodetect/state/ModelState.java | 28 ++-- .../process/autodetect/state/Quantiles.java | 11 +- .../ml/job/results/CategoryDefinition.java | 6 +- .../MachineLearningTemplateRegistryTests.java | 10 +- .../ml/job/persistence/JobProviderTests.java | 17 ++- .../autodetect/state/ModelSnapshotTests.java | 5 +- .../rest-api-spec/test/ml/index_layout.yml | 137 +++++++++++++++++- 16 files changed, 328 insertions(+), 189 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java index 286fa40c55b..9eaad98141a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java @@ -230,16 +230,11 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen } void putJobStateIndexTemplate(BiConsumer listener) { - try (XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping(); - XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping(); - XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping()) { - + try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) { PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName()); templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName())); templateRequest.settings(mlStateIndexSettings()); - templateRequest.mapping(CategorizerState.TYPE, categorizerStateMapping); - templateRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping); - templateRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping); + templateRequest.mapping(ElasticsearchMappings.DOC_TYPE, stateMapping); templateRequest.version(Version.CURRENT.id); client.admin().indices().putTemplate(templateRequest, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 0394da6c53c..8cb8b433118 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -7,12 +7,9 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AnomalyCause; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; @@ -494,39 +491,6 @@ public class ElasticsearchMappings { .endObject(); } - /** - * {@link CategorizerState} mapping. - * The type is disabled so {@link CategorizerState} is not searchable and - * the '_all' field is disabled - * - * @return The builder - * @throws IOException On builder write error - */ - public static XContentBuilder categorizerStateMapping() throws IOException { - return jsonBuilder() - .startObject() - .startObject(CategorizerState.TYPE) - .field(ENABLED, false) - .endObject() - .endObject(); - } - - /** - * Create the Elasticsearch mapping for {@linkplain Quantiles}. - * The type is disabled as is the '_all' field as the document isn't meant to be searched. - *

- * The quantile state string is not searchable (enabled = false) as it could be - * very large. - */ - public static XContentBuilder quantilesMapping() throws IOException { - return jsonBuilder() - .startObject() - .startObject(Quantiles.TYPE.getPreferredName()) - .field(ENABLED, false) - .endObject() - .endObject(); - } - /** * Create the Elasticsearch mapping for {@linkplain CategoryDefinition}. * The '_all' field is disabled as the document isn't meant to be searched. @@ -552,16 +516,15 @@ public class ElasticsearchMappings { } /** - * Create the Elasticsearch mapping for {@linkplain ModelState}. - * The model state could potentially be huge (over a gigabyte in size) - * so all analysis by Elasticsearch is disabled. The only way to - * retrieve the model state is by knowing the ID of a particular - * document or by searching for all documents of this type. + * Create the Elasticsearch mapping for state. State could potentially be + * huge (target document size is 16MB and there can be many documents) so all + * analysis by Elasticsearch is disabled. The only way to retrieve state is + * by knowing the ID of a particular document. */ - public static XContentBuilder modelStateMapping() throws IOException { + public static XContentBuilder stateMapping() throws IOException { return jsonBuilder() .startObject() - .startObject(ModelState.TYPE.getPreferredName()) + .startObject(DOC_TYPE) .field(ENABLED, false) .endObject() .endObject(); @@ -603,7 +566,7 @@ public class ElasticsearchMappings { // end model size stats mapping builder.endObject(); - builder.startObject(Quantiles.TYPE.getPreferredName()) + builder.startObject(ModelSnapshot.QUANTILES.getPreferredName()) .field(ENABLED, false) .endObject() .startObject(ModelSnapshot.LATEST_RECORD_TIME.getPreferredName()) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 05618f74873..a3c9b153aa6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -56,11 +56,18 @@ public class JobDataDeleter { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); for (ModelSnapshot modelSnapshot : modelSnapshots) { for (String stateDocId : modelSnapshot.stateDocumentIds()) { - bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateDocId)); + bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ElasticsearchMappings.DOC_TYPE, stateDocId)); + } + // TODO: remove in 7.0 + for (String stateDocId : modelSnapshot.legacyStateDocumentIds()) { + bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE, stateDocId)); } bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()), ElasticsearchMappings.DOC_TYPE, ModelSnapshot.documentId(modelSnapshot))); + // TODO: remove in 7.0 + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()), + ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.legacyDocumentId(modelSnapshot))); } bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 94694d2598f..8735177cc92 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -65,7 +65,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; @@ -253,6 +252,8 @@ public class JobProvider { MultiSearchRequestBuilder msearch = client.prepareMultiSearch() .add(createLatestDataCountsSearch(resultsIndex, jobId)) .add(createLatestModelSizeStatsSearch(resultsIndex)) + // These next two document IDs never need to be the legacy ones due to the rule + // that you cannot open a 5.4 job in a subsequent version of the product .add(createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, job.getModelSnapshotId()))) .add(createDocIdSearch(stateIndex, Quantiles.documentId(jobId))); @@ -523,8 +524,8 @@ public class JobProvider { } String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}", - CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size); + LOGGER.trace("ES API CALL: search all of category definitions from index {} sort ascending {} from {} size {}", + indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size); SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions())); @@ -605,8 +606,8 @@ public class JobProvider { searchRequest.source().sort(sortField, descending ? SortOrder.DESC : SortOrder.ASC); } - LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}", - AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "", + LOGGER.trace("ES API CALL: search all of records from index {}{}{} with filter after sort from {} size {}", + indexName, (sb != null) ? " with sort" : "", secondarySort.isEmpty() ? "" : " with secondary sort", from, size); client.search(searchRequest, ActionListener.wrap(searchResponse -> { List results = new ArrayList<>(); @@ -639,8 +640,7 @@ public class JobProvider { .build(); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}", - () -> Influencer.RESULT_TYPE_VALUE, () -> indexName, + LOGGER.trace("ES API CALL: search all of influencers from index {}{} with filter from {} size {}", () -> indexName, () -> (query.getSortField() != null) ? " with sort " + (query.isSortDescending() ? "descending" : "ascending") + " on field " + query.getSortField() : "", query::getFrom, query::getSize); @@ -760,8 +760,8 @@ public class JobProvider { .order(sortDescending ? SortOrder.DESC : SortOrder.ASC); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - LOGGER.trace("ES API CALL: search all {}s from index {} sort ascending {} with filter after sort from {} size {}", - ModelSnapshot.TYPE, indexName, sortField, from, size); + LOGGER.trace("ES API CALL: search all model snapshots from index {} sort ascending {} with filter after sort from {} size {}", + indexName, sortField, from, size); SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions())); @@ -788,6 +788,9 @@ public class JobProvider { * stream. If there are multiple state documents they are separated using '\0' * when written to the stream. * + * Because we have a rule that we will not open a legacy job in the current product version + * we don't have to worry about legacy document IDs here. + * * @param jobId the job id * @param modelSnapshot the model snapshot to be restored * @param restoreStream the stream to write the state to @@ -797,9 +800,9 @@ public class JobProvider { // First try to restore model state. for (String stateDocId : modelSnapshot.stateDocumentIds()) { - LOGGER.trace("ES API CALL: get ID {} type {} from index {}", stateDocId, ModelState.TYPE, indexName); + LOGGER.trace("ES API CALL: get ID {} from index {}", stateDocId, indexName); - GetResponse stateResponse = client.prepareGet(indexName, ModelState.TYPE.getPreferredName(), stateDocId).get(); + GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, stateDocId).get(); if (!stateResponse.isExists()) { LOGGER.error("Expected {} documents for model state for {} snapshot {} but failed to find {}", modelSnapshot.getSnapshotDocCount(), jobId, modelSnapshot.getSnapshotId(), stateDocId); @@ -809,16 +812,15 @@ public class JobProvider { } // Secondly try to restore categorizer state. This must come after model state because that's - // the order the C++ process expects. - // There are no snapshots for this, so the IDs simply + // the order the C++ process expects. There are no snapshots for this, so the IDs simply // count up until a document is not found. It's NOT an error to have no categorizer state. int docNum = 0; while (true) { - String docId = CategorizerState.categorizerStateDocId(jobId, ++docNum); + String docId = CategorizerState.documentId(jobId, ++docNum); - LOGGER.trace("ES API CALL: get ID {} type {} from index {}", docId, CategorizerState.TYPE, indexName); + LOGGER.trace("ES API CALL: get ID {} from index {}", docId, indexName); - GetResponse stateResponse = client.prepareGet(indexName, CategorizerState.TYPE, docId).get(); + GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, docId).get(); if (!stateResponse.isExists()) { break; } @@ -850,8 +852,7 @@ public class JobProvider { public QueryPage modelPlot(String jobId, int from, int size) { SearchResponse searchResponse; String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}", - ModelPlot.RESULT_TYPE_VALUE, indexName, from, size); + LOGGER.trace("ES API CALL: search model plots from index {} from {} size {}", indexName, from, size); searchResponse = client.prepareSearch(indexName) .setIndicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 8b305ed6a2f..c107038e7c3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.Result; import java.io.IOException; import java.util.Collections; @@ -96,28 +95,26 @@ public class JobResultsPersister extends AbstractComponent { bucketWithoutRecords.setRecords(Collections.emptyList()); } try (XContentBuilder content = toXContentBuilder(bucketWithoutRecords)) { - logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", - jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch()); + String id = bucketWithoutRecords.getId(); + logger.trace("[{}] ES API CALL: index bucket to index [{}] with ID [{}]", jobId, indexName, id); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, bucketWithoutRecords.getId()).source(content)); + bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers()); } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}), e); + logger.error(new ParameterizedMessage("[{}] Error serialising bucket", jobId), e); } return this; } - private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) - throws IOException { + private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) throws IOException { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) { try (XContentBuilder content = toXContentBuilder(bucketInfluencer)) { // Need consistent IDs to ensure overwriting on renormalization String id = bucketInfluencer.getId(); - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", - jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id); + logger.trace("[{}] ES BULK ACTION: index bucket influencer to index [{}] with ID [{}]", jobId, indexName, id); bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); } } @@ -135,9 +132,9 @@ public class JobResultsPersister extends AbstractComponent { try { for (AnomalyRecord record : records) { try (XContentBuilder content = toXContentBuilder(record)) { - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", - jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId()); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, record.getId()).source(content)); + String id = record.getId(); + logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, id); + bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); } } } catch (IOException e) { @@ -158,9 +155,9 @@ public class JobResultsPersister extends AbstractComponent { try { for (Influencer influencer : influencers) { try (XContentBuilder content = toXContentBuilder(influencer)) { - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", - jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId()); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, influencer.getId()).source(content)); + String id = influencer.getId(); + logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, id); + bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); } } } catch (IOException e) { @@ -199,8 +196,7 @@ public class JobResultsPersister extends AbstractComponent { * @param category The category to be persisted */ public void persistCategoryDefinition(CategoryDefinition category) { - Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(), - category.getId()); + Persistable persistable = new Persistable(category.getJobId(), category, category.getId()); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId())).actionGet(); // Don't commit as we expect masses of these updates and they're not @@ -211,8 +207,7 @@ public class JobResultsPersister extends AbstractComponent { * Persist the quantiles (blocking) */ public void persistQuantiles(Quantiles quantiles) { - Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), - Quantiles.documentId(quantiles.getJobId())); + Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet(); } @@ -220,8 +215,7 @@ public class JobResultsPersister extends AbstractComponent { * Persist the quantiles (async) */ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener listener) { - Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), - Quantiles.documentId(quantiles.getJobId())); + Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); persistable.setRefreshPolicy(refreshPolicy); persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener); } @@ -230,8 +224,7 @@ public class JobResultsPersister extends AbstractComponent { * Persist a model snapshot description */ public void persistModelSnapshot(ModelSnapshot modelSnapshot) { - Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(), - ModelSnapshot.documentId(modelSnapshot)); + Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot)); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); } @@ -241,7 +234,7 @@ public class JobResultsPersister extends AbstractComponent { public void persistModelSizeStats(ModelSizeStats modelSizeStats) { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); - Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.getId()); + Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId()); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet(); // Don't commit as we expect masses of these updates and they're only // for information at the API level @@ -254,7 +247,7 @@ public class JobResultsPersister extends AbstractComponent { ActionListener listener) { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); - Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.getId()); + Persistable persistable = new Persistable(jobId, modelSizeStats, modelSizeStats.getId()); persistable.setRefreshPolicy(refreshPolicy); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), listener); // Don't commit as we expect masses of these updates and they're only @@ -265,7 +258,7 @@ public class JobResultsPersister extends AbstractComponent { * Persist model plot output */ public void persistModelPlot(ModelPlot modelPlot) { - Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, Result.TYPE.getPreferredName(), modelPlot.getId()); + Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, modelPlot.getId()); persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelPlot.getJobId())).actionGet(); // Don't commit as we expect masses of these updates and they're not // read again by this process @@ -283,9 +276,8 @@ public class JobResultsPersister extends AbstractComponent { * called to commit the writes to the datastore. * * @param jobId The job Id - * @return True if successful */ - public boolean commitResultWrites(String jobId) { + public void commitResultWrites(String jobId) { // We refresh using the read alias in order to ensure all indices will // be refreshed even if a rollover occurs in between. String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); @@ -295,7 +287,6 @@ public class JobResultsPersister extends AbstractComponent { RefreshRequest refreshRequest = new RefreshRequest(indexName); refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); client.admin().indices().refresh(refreshRequest).actionGet(); - return true; } /** @@ -303,19 +294,17 @@ public class JobResultsPersister extends AbstractComponent { * immediately searchable. * * @param jobId The job Id - * @return True if successful * */ - public boolean commitStateWrites(String jobId) { + public void commitStateWrites(String jobId) { String indexName = AnomalyDetectorsIndex.jobStateIndexName(); // Refresh should wait for Lucene to make the data searchable logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); RefreshRequest refreshRequest = new RefreshRequest(indexName); refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); client.admin().indices().refresh(refreshRequest).actionGet(); - return true; } - XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { + private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { XContentBuilder builder = jsonBuilder(); obj.toXContent(builder, ToXContent.EMPTY_PARAMS); return builder; @@ -325,14 +314,12 @@ public class JobResultsPersister extends AbstractComponent { private final String jobId; private final ToXContent object; - private final String description; private final String id; private WriteRequest.RefreshPolicy refreshPolicy; - Persistable(String jobId, ToXContent object, String description, String id) { + Persistable(String jobId, ToXContent object, String id) { this.jobId = jobId; this.object = object; - this.description = description; this.id = id; this.refreshPolicy = WriteRequest.RefreshPolicy.NONE; } @@ -349,15 +336,12 @@ public class JobResultsPersister extends AbstractComponent { void persist(String indexName, ActionListener listener) { logCall(indexName); - // TODO no_release: this is a temporary hack until we also switch state index to have doc type in which case - // we can remove this line and use DOC_TYPE directly in the index request - String type = AnomalyDetectorsIndex.jobStateIndexName().equals(indexName) ? description : DOC_TYPE; try (XContentBuilder content = toXContentBuilder(object)) { - IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content).setRefreshPolicy(refreshPolicy); + IndexRequest indexRequest = new IndexRequest(indexName, DOC_TYPE, id).source(content).setRefreshPolicy(refreshPolicy); client.index(indexRequest, listener); } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, description}), e); + logger.error(new ParameterizedMessage("[{}] Error writing [{}]", jobId, (id == null) ? "auto-generated ID" : id), e); IndexResponse.Builder notCreatedResponse = new IndexResponse.Builder(); notCreatedResponse.setCreated(false); listener.onResponse(notCreatedResponse.build()); @@ -366,9 +350,9 @@ public class JobResultsPersister extends AbstractComponent { private void logCall(String indexName) { if (id != null) { - logger.trace("[{}] ES API CALL: index {} to index {} with ID {}", jobId, description, indexName, id); + logger.trace("[{}] ES API CALL: to index {} with ID [{}]", jobId, indexName, id); } else { - logger.trace("[{}] ES API CALL: index {} to index {} with auto-generated ID", jobId, description, indexName); + logger.trace("[{}] ES API CALL: to index {} with auto-generated ID", jobId, indexName); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java index a130017c3d9..9a1f14e589f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; @@ -23,11 +22,9 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; -import org.elasticsearch.index.query.WildcardQueryBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; @@ -96,7 +93,7 @@ public class JobStorageDeletionTask extends Task { // Step 3. Delete quantiles done, delete the categorizer state ActionListener deleteQuantilesHandler = ActionListener.wrap( - response -> deleteCategorizerState(jobId, client, deleteCategorizerStateHandler), + response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler), failureHandler); // Step 2. Delete state done, delete the quantiles @@ -109,12 +106,12 @@ public class JobStorageDeletionTask extends Task { } private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { - // The quantiles doc Id changed in v5.5 so delete both the old and new format + // The quantiles type and doc Id changed in v5.5 so delete both the old and new format BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(), + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, Quantiles.documentId(jobId))); bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), Quantiles.TYPE.getPreferredName(), - jobId + "-" + Quantiles.TYPE.getPreferredName())); + Quantiles.legacyDocumentId(jobId))); bulkRequestBuilder.execute(ActionListener.wrap( response -> finishedHandler.onResponse(true), e -> { @@ -138,26 +135,35 @@ public class JobStorageDeletionTask extends Task { listener::onFailure); } - private void deleteCategorizerState(String jobId, Client client, ActionListener finishedHandler) { - SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName()); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); - request.setSlices(5); - - searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); - WildcardQueryBuilder query = new WildcardQueryBuilder(UidFieldMapper.NAME, Uid.createUid(CategorizerState.TYPE, jobId + "#*")); - searchRequest.source(new SearchSourceBuilder().query(query)); - client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(BulkByScrollResponse bulkByScrollResponse) { - finishedHandler.onResponse(true); - } - - @Override - public void onFailure(Exception e) { - logger.error("[" + jobId + "] Failed to delete categorizer state for job.", e); - finishedHandler.onFailure(e); - } - }); + private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener finishedHandler) { + // The categorizer state type and doc Id changed in v5.5 so delete both the old and new format + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + CategorizerState.documentId(jobId, docNum))); + // TODO: remove in 7.0 + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, + CategorizerState.legacyDocumentId(jobId, docNum))); + bulkRequestBuilder.execute(ActionListener.wrap( + response -> { + // If we successfully deleted either document try the next one; if not we're done + for (BulkItemResponse item : response.getItems()) { + if (item.status() == RestStatus.OK) { + // There's an assumption here that there won't be very many categorizer + // state documents, so the recursion won't go more than, say, 5 levels deep + deleteCategorizerState(jobId, client, docNum + 1, finishedHandler); + return; + } + } + finishedHandler.onResponse(true); + }, + e -> { + // It's not a problem for us if the index wasn't found - it's equivalent to document not found + if (e instanceof IndexNotFoundException) { + finishedHandler.onResponse(true); + } else { + finishedHandler.onFailure(e); + } + })); } private void deleteAlias(String jobId, String aliasName, String indexName, Client client, ActionListener finishedHandler ) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java index 52231664d98..fe6e076d18b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java @@ -14,6 +14,8 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings; import java.io.IOException; import java.io.InputStream; @@ -85,7 +87,7 @@ public class StateProcessor extends AbstractComponent { void persist(String jobId, BytesReference bytes) throws IOException { logger.trace("[{}] ES API CALL: bulk index", jobId); BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(bytes, null, null, XContentType.JSON); + bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON); if (bulkRequest.numberOfActions() > 0) { client.bulk(bulkRequest).actionGet(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/CategorizerState.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/CategorizerState.java index bab37a2a646..697a87b3e69 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/CategorizerState.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/CategorizerState.java @@ -7,18 +7,25 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.state; /** - * The categorizer state does not need to be loaded on the Java side. - * However, the Java process DOES set up a mapping on the Elasticsearch - * index to tell Elasticsearch not to analyse the categorizer state documents - * in any way. + * The categorizer state does not need to be understood on the Java side. + * The Java code only needs to know how to form the document IDs so that + * it can retrieve and delete the correct documents. */ public class CategorizerState { + /** - * The type of this class used when persisting the data + * Legacy type, now used only as a discriminant in the document ID */ public static final String TYPE = "categorizer_state"; - public static final String categorizerStateDocId(String jobId, int docNum) { + public static final String documentId(String jobId, int docNum) { + return jobId + "_" + TYPE + "#" + docNum; + } + + /** + * This is how the IDs were formed in v5.4 + */ + public static final String legacyDocumentId(String jobId, int docNum) { return jobId + "#" + docNum; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java index 6c27cc2ec9c..2da848cca75 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java @@ -42,13 +42,14 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { public static final ParseField SNAPSHOT_DOC_COUNT = new ParseField("snapshot_doc_count"); public static final ParseField LATEST_RECORD_TIME = new ParseField("latest_record_time_stamp"); public static final ParseField LATEST_RESULT_TIME = new ParseField("latest_result_time_stamp"); + public static final ParseField QUANTILES = new ParseField("quantiles"); public static final ParseField RETAIN = new ParseField("retain"); // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("model_snapshots"); /** - * Elasticsearch type + * Legacy type, now used only as a discriminant in the document ID */ public static final ParseField TYPE = new ParseField("model_snapshot"); @@ -86,7 +87,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { throw new IllegalArgumentException( "unexpected token [" + p.currentToken() + "] for [" + LATEST_RESULT_TIME.getPreferredName() + "]"); }, LATEST_RESULT_TIME, ValueType.VALUE); - PARSER.declareObject(Builder::setQuantiles, Quantiles.PARSER, Quantiles.TYPE); + PARSER.declareObject(Builder::setQuantiles, Quantiles.PARSER, QUANTILES); PARSER.declareBoolean(Builder::setRetain, RETAIN); } @@ -184,7 +185,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { latestResultTimeStamp.getTime()); } if (quantiles != null) { - builder.field(Quantiles.TYPE.getPreferredName(), quantiles); + builder.field(QUANTILES.getPreferredName(), quantiles); } builder.field(RETAIN.getPreferredName(), retain); builder.endObject(); @@ -260,32 +261,53 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { && this.retain == that.retain; } - private String stateDocumentPrefix() { - return jobId + "-" + snapshotId; - } - public List stateDocumentIds() { - String prefix = stateDocumentPrefix(); List stateDocumentIds = new ArrayList<>(snapshotDocCount); // The state documents count suffices are 1-based for (int i = 1; i <= snapshotDocCount; i++) { - stateDocumentIds.add(prefix + '#' + i); + stateDocumentIds.add(ModelState.documentId(jobId, snapshotId, i)); + } + return stateDocumentIds; + } + + /** + * This is how the IDs were formed in v5.4 + */ + public List legacyStateDocumentIds() { + List stateDocumentIds = new ArrayList<>(snapshotDocCount); + // The state documents count suffices are 1-based + for (int i = 1; i <= snapshotDocCount; i++) { + stateDocumentIds.add(ModelState.legacyDocumentId(jobId, snapshotId, i)); } return stateDocumentIds; } public static String documentIdPrefix(String jobId) { - return jobId + "_model_snapshot_"; + return jobId + "_" + TYPE + "_"; } public static String documentId(ModelSnapshot snapshot) { return documentId(snapshot.getJobId(), snapshot.getSnapshotId()); } + /** + * This is how the IDs were formed in v5.4 + */ + public static String legacyDocumentId(ModelSnapshot snapshot) { + return legacyDocumentId(snapshot.getJobId(), snapshot.getSnapshotId()); + } + public static String documentId(String jobId, String snapshotId) { return documentIdPrefix(jobId) + snapshotId; } + /** + * This is how the IDs were formed in v5.4 + */ + public static String legacyDocumentId(String jobId, String snapshotId) { + return jobId + "-" + snapshotId; + } + public static ModelSnapshot fromJson(BytesReference bytesReference) { try (XContentParser parser = XContentFactory.xContent(bytesReference).createParser(NamedXContentRegistry.EMPTY, bytesReference)) { return PARSER.apply(parser, null).build(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelState.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelState.java index 576693cf79a..3dc319ac82b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelState.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelState.java @@ -6,22 +6,28 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.state; -import org.elasticsearch.common.ParseField; - /** - * The serialised models can get very large and only the C++ code - * understands how to decode them, hence there is no reason to load - * them into the Java process. - * However, the Java process DOES set up a mapping on the Elasticsearch - * index to tell Elasticsearch not to analyse the model state documents - * in any way. (Otherwise Elasticsearch would go into a spin trying to - * make sense of such large JSON documents.) + * The model state does not need to be understood on the Java side. + * The Java code only needs to know how to form the document IDs so that + * it can retrieve and delete the correct documents. */ public class ModelState { + /** - * The type of this class used when persisting the data + * Legacy type, now used only as a discriminant in the document ID */ - public static final ParseField TYPE = new ParseField("model_state"); + public static final String TYPE = "model_state"; + + public static final String documentId(String jobId, String snapshotId, int docNum) { + return jobId + "_" + TYPE + "_" + snapshotId + "#" + docNum; + } + + /** + * This is how the IDs were formed in v5.4 + */ + public static final String legacyDocumentId(String jobId, String snapshotId, int docNum) { + return jobId + "-" + snapshotId + "#" + docNum; + } private ModelState() { } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/Quantiles.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/Quantiles.java index 552fd6031cd..66aa198ab61 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/Quantiles.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/Quantiles.java @@ -31,7 +31,7 @@ public class Quantiles extends ToXContentToBytes implements Writeable { public static final ParseField QUANTILE_STATE = new ParseField("quantile_state"); /** - * Elasticsearch type + * Legacy type, now used only as a discriminant in the document ID */ public static final ParseField TYPE = new ParseField("quantiles"); @@ -45,7 +45,14 @@ public class Quantiles extends ToXContentToBytes implements Writeable { } public static String documentId(String jobId) { - return jobId + "_" + TYPE.getPreferredName(); + return jobId + "_" + TYPE; + } + + /** + * This is how the IDs were formed in v5.4 + */ + public static String legacyDocumentId(String jobId) { + return jobId + "-" + TYPE; } private final String jobId; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java index 571e33bf77a..23a47633c05 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinition.java @@ -24,7 +24,11 @@ import java.util.TreeSet; public class CategoryDefinition extends ToXContentToBytes implements Writeable { + /** + * Legacy type, now used only as a discriminant in the document ID + */ public static final ParseField TYPE = new ParseField("category_definition"); + public static final ParseField CATEGORY_ID = new ParseField("category_id"); public static final ParseField TERMS = new ParseField("terms"); public static final ParseField REGEX = new ParseField("regex"); @@ -82,7 +86,7 @@ public class CategoryDefinition extends ToXContentToBytes implements Writeable { } public String getId() { - return jobId + "_category_definition_" + categoryId; + return jobId + "_" + TYPE + "_" + categoryId; } public long getCategoryId() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java index f0928852c3a..b6ffef7b7b8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java @@ -25,10 +25,8 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.notifications.AuditMessage; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; @@ -236,10 +234,8 @@ public class MachineLearningTemplateRegistryTests extends ESTestCase { PutIndexTemplateRequest request = captor.getValue(); assertNotNull(request); assertEquals(templateRegistry.mlStateIndexSettings().build(), request.settings()); - assertTrue(request.mappings().containsKey(CategorizerState.TYPE)); - assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName())); - assertEquals(3, request.mappings().size()); + assertTrue(request.mappings().containsKey(ElasticsearchMappings.DOC_TYPE)); + assertEquals(1, request.mappings().size()); assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()), request.patterns()); assertEquals(new Integer(Version.CURRENT.id), request.version()); }); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index d22e12420cb..35b371be034 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -801,6 +801,7 @@ public class JobProviderTests extends ESTestCase { } public void testRestoreStateToStream() throws Exception { + String snapshotId = "123"; Map categorizerState = new HashMap<>(); categorizerState.put("catName", "catVal"); GetResponse categorizerStateGetResponse1 = createGetResponse(true, categorizerState); @@ -812,16 +813,18 @@ public class JobProviderTests extends ESTestCase { GetResponse modelStateGetResponse2 = createGetResponse(true, modelState); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "#1", categorizerStateGetResponse1) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "#2", categorizerStateGetResponse2) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), JOB_ID + "-123#1", - modelStateGetResponse1) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), JOB_ID + "-123#2", - modelStateGetResponse2); + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + CategorizerState.documentId(JOB_ID, 1), categorizerStateGetResponse1) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + CategorizerState.documentId(JOB_ID, 2), categorizerStateGetResponse2) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + ModelState.documentId(JOB_ID, snapshotId, 1), modelStateGetResponse1) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, + ModelState.documentId(JOB_ID, snapshotId, 2), modelStateGetResponse2); JobProvider provider = createProvider(clientBuilder.build()); - ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID).setSnapshotId("123").setSnapshotDocCount(2).build(); + ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID).setSnapshotId(snapshotId).setSnapshotDocCount(2).build(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java index 6cb73142068..3e3402f3e37 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java @@ -183,11 +183,12 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase + { + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"mlcategory"}], + "categorization_field_name": "message" + }, + "data_description" : { + "format":"xcontent" + } + } + - match: { job_id: "index-layout-state-job" } + + - do: + index: + index: .ml-anomalies-shared + type: doc + id: "index-layout-state-job_model_snapshot_123" + body: > + { + "job_id" : "index-layout-state-job", + "timestamp": "2017-05-02T00:00:00Z", + "snapshot_id": "123", + "snapshot_doc_count": 2, + "retain": false + } + + - do: + index: + index: .ml-anomalies-shared + type: model_snapshot + id: "index-layout-state-job-456" + body: > + { + "job_id" : "index-layout-state-job", + "timestamp": "2017-05-01T00:00:00Z", + "snapshot_id": "456", + "snapshot_doc_count": 2, + "retain": false + } + + - do: + index: + index: .ml-state + type: doc + id: index-layout-state-job_model_state_123#1 + body: + state: new-model-state + + - do: + index: + index: .ml-state + type: doc + id: index-layout-state-job_model_state_123#2 + body: + state: more-new-model-state + + - do: + index: + index: .ml-state + type: model_state + id: index-layout-state-job-456#1 + body: + state: old-model-state + + - do: + index: + index: .ml-state + type: model_state + id: index-layout-state-job-456#2 + body: + state: more-old-model-state + + - do: + index: + index: .ml-state + type: doc + id: index-layout-state-job_categorizer_state#1 + body: + state: new-categorizer-state + + - do: + index: + index: .ml-state + type: doc + id: index-layout-state-job_categorizer_state#2 + body: + state: more-new-categorizer-state + + - do: + index: + index: .ml-state + type: categorizer_state + id: index-layout-state-job#1 + body: + state: old-categorizer-state + + - do: + index: + index: .ml-state + type: categorizer_state + id: index-layout-state-job#2 + body: + state: more-old-categorizer-state + + - do: + indices.refresh: {} + + - do: + xpack.ml.delete_job: + job_id: "index-layout-state-job" + - match: { acknowledged: true } + + - do: + indices.refresh: {} + + - do: + count: + index: .ml-anomalies-shared + - match: {count: 0} + + - do: + count: + index: .ml-state + - match: {count: 0}