From 6de846d4c66541c0a15f859a8a92d5adaff4be96 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 21 Dec 2016 12:03:17 +0000 Subject: [PATCH] Put model state in the .mlstate-anomalydetectors index (elastic/elasticsearch#589) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Put model state in the .mlstate index * Revert results index rename * Put ModelSnapshots in the results index * Change state index in C++ * Fix logging * Rename state index ‘.ml-state’ Original commit: elastic/x-pack-elasticsearch@dbe5f6b525367a53d68edd3f460cf48d8f22722f --- .../PutModelSnapshotDescriptionAction.java | 3 +- .../xpack/prelert/job/manager/JobManager.java | 7 +- .../PrelertInitializationService.java | 19 +- .../persistence/AnomalyDetectorsIndex.java | 19 +- ...ticsearchBatchedModelSnapshotIterator.java | 2 +- .../ElasticsearchBatchedResultsIterator.java | 2 +- .../persistence/JobDataCountsPersister.java | 2 +- .../job/persistence/JobDataDeleter.java | 13 +- .../prelert/job/persistence/JobProvider.java | 114 +++++++---- .../job/persistence/JobResultsPersister.java | 44 +++-- .../job/persistence/UsagePersister.java | 2 +- .../output/AutoDetectResultProcessor.java | 4 +- .../xpack/prelert/action/ScheduledJobsIT.java | 2 +- .../AutodetectResultProcessorIT.java | 8 +- .../prelert/job/manager/JobManagerTests.java | 4 +- .../PrelertInitializationServiceTests.java | 9 +- .../ElasticsearchMappingsTests.java | 4 +- .../job/persistence/JobDataDeleterTests.java | 25 ++- .../job/persistence/JobProviderTests.java | 187 +++++++++--------- .../AutoDetectResultProcessorTests.java | 4 +- .../test/delete_model_snapshot.yaml | 16 +- .../test/get_model_snapshots.yaml | 1 + .../rest-api-spec/test/jobs_crud.yaml | 5 +- .../test/put_model_snapshot_description.yaml | 1 + .../test/revert_model_snapshot.yaml | 3 + 25 files changed, 319 insertions(+), 181 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PutModelSnapshotDescriptionAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PutModelSnapshotDescriptionAction.java index 732ea07f894..a35ba241acd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PutModelSnapshotDescriptionAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PutModelSnapshotDescriptionAction.java @@ -278,8 +278,7 @@ PutModelSnapshotDescriptionAction.RequestBuilder> { modelSnapshot.setDescription(request.getDescriptionString()); // The quantiles can be large, and totally dominate the output - - // it's - // clearer to remove them + // it's clearer to remove them modelSnapshot.setQuantiles(null); listener.onResponse(new Response(modelSnapshot)); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java index 321cfcb8828..21cf860e592 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java @@ -166,7 +166,7 @@ public class JobManager extends AbstractComponent { Job job = request.getJob(); ActionListener delegateListener = ActionListener.wrap(jobSaved -> - jobProvider.createJobRelatedIndices(job, new ActionListener() { + jobProvider.createJobResultIndex(job, new ActionListener() { @Override public void onResponse(Boolean indicesCreated) { audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED)); @@ -195,12 +195,11 @@ public class JobManager extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) throws Exception { ClusterState cs = updateClusterState(job, request.isOverwrite(), currentState); - if (currentState.metaData().index(AnomalyDetectorsIndex.getJobIndexName(job.getIndexName())) != null) { + if (currentState.metaData().index(AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName())) != null) { throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS, job.getIndexName())); } return cs; } - }); } @@ -390,7 +389,7 @@ public class JobManager extends AbstractComponent { } // Commit so that when the REST API call that triggered the update // returns the updated document is searchable - jobResultsPersister.commitWrites(jobId); + jobResultsPersister.commitStateWrites(jobId); } private static PrelertMetadata.Builder createPrelertMetadataBuilder(ClusterState currentState) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java index dd46c34c4f4..863b38279ec 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; public class PrelertInitializationService extends AbstractComponent implements ClusterStateListener { @@ -62,7 +63,7 @@ public class PrelertInitializationService extends AbstractComponent implements C logger.info("successfully created prelert-usage index"); } else { if (error instanceof ResourceAlreadyExistsException) { - logger.debug("not able to create prelert-usage index", error); + logger.debug("not able to create prelert-usage index as it already exists"); } else { logger.error("not able to create prelert-usage index", error); } @@ -70,6 +71,22 @@ public class PrelertInitializationService extends AbstractComponent implements C }); }); } + String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName(); + if (metaData.hasIndex(stateIndexName) == false) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + jobProvider.createJobStateIndex((result, error) -> { + if (result) { + logger.info("successfully created {} index", stateIndexName); + } else { + if (error instanceof ResourceAlreadyExistsException) { + logger.debug("not able to create {} index as it already exists", stateIndexName); + } else { + logger.error("not able to create " + stateIndexName + " index", error); + } + } + }); + }); + } } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/AnomalyDetectorsIndex.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/AnomalyDetectorsIndex.java index 0d079d5d13d..6ab6e56e5ba 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/AnomalyDetectorsIndex.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/AnomalyDetectorsIndex.java @@ -9,13 +9,26 @@ package org.elasticsearch.xpack.prelert.job.persistence; * Methods for handling index naming related functions */ public final class AnomalyDetectorsIndex { - private static final String INDEX_PREFIX = "prelertresults-"; + private static final String RESULTS_INDEX_PREFIX = "prelertresults-"; + private static final String STATE_INDEX_NAME = ".ml-state"; private AnomalyDetectorsIndex() { } - public static String getJobIndexName(String jobId) { - return INDEX_PREFIX + jobId; + /** + * The name of the default index where the job's results are stored + * @param jobId Job Id + * @return The index name + */ + public static String jobResultsIndexName(String jobId) { + return RESULTS_INDEX_PREFIX + jobId; } + /** + * The name of the default index where a job's state is stored + * @return The index name + */ + public static String jobStateIndexName() { + return STATE_INDEX_NAME; + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedModelSnapshotIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedModelSnapshotIterator.java index 89dd219bc52..16002dab506 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedModelSnapshotIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedModelSnapshotIterator.java @@ -19,7 +19,7 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot; class ElasticsearchBatchedModelSnapshotIterator extends ElasticsearchBatchedDocumentsIterator { public ElasticsearchBatchedModelSnapshotIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher) { - super(client, AnomalyDetectorsIndex.getJobIndexName(jobId), parserFieldMatcher); + super(client, AnomalyDetectorsIndex.jobStateIndexName(), parserFieldMatcher); } @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedResultsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedResultsIterator.java index 5d0a5da6a37..ce78bd5fb2a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedResultsIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedResultsIterator.java @@ -13,7 +13,7 @@ import org.elasticsearch.xpack.prelert.job.results.Result; abstract class ElasticsearchBatchedResultsIterator extends ElasticsearchBatchedDocumentsIterator { public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType, ParseFieldMatcher parseFieldMatcher) { - super(client, AnomalyDetectorsIndex.getJobIndexName(jobId), parseFieldMatcher, + super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), parseFieldMatcher, new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType)); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataCountsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataCountsPersister.java index 456d8547346..9fd0a4b6672 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataCountsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataCountsPersister.java @@ -48,7 +48,7 @@ public class JobDataCountsPersister extends AbstractComponent { public void persistDataCounts(String jobId, DataCounts counts, ActionListener listener) { try { XContentBuilder content = serialiseCounts(counts); - client.prepareIndex(AnomalyDetectorsIndex.getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(), + client.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(jobId), DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX) .setSource(content).execute(new ActionListener() { @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java index b54463ca832..1e70623dc34 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java @@ -67,7 +67,7 @@ public class JobDataDeleter { * @param listener Response listener */ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener listener) { - String index = AnomalyDetectorsIndex.getJobIndexName(jobId); + String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId); RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Bucket.TIMESTAMP.getPreferredName()); timeRange.gte(cutoffEpochMs); @@ -108,17 +108,18 @@ public class JobDataDeleter { public void deleteModelSnapshot(ModelSnapshot modelSnapshot) { String snapshotId = modelSnapshot.getSnapshotId(); int docCount = modelSnapshot.getSnapshotDocCount(); - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName(); // Deduce the document IDs of the state documents from the information // in the snapshot document - we cannot query the state itself as it's // too big and has no mappings for (int i = 0; i < docCount; ++i) { String stateId = snapshotId + '_' + i; - bulkRequestBuilder.add(client.prepareDelete(indexName, ModelState.TYPE.getPreferredName(), stateId)); + bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateId)); ++deletedModelStateCount; } - bulkRequestBuilder.add(client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId)); + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()), + ModelSnapshot.TYPE.getPreferredName(), snapshotId)); ++deletedModelSnapshotCount; } @@ -126,7 +127,7 @@ public class JobDataDeleter { * Delete all results marked as interim */ public void deleteInterimResults() { - String index = AnomalyDetectorsIndex.getJobIndexName(jobId); + String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId); QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); @@ -192,7 +193,7 @@ public class JobDataDeleter { } /** - * Repeats a scroll search adding the hits a bulk delete request + * Repeats a scroll search adding the hits to the bulk delete request */ private class RepeatingSearchScrollListener implements ActionListener { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java index ffa84e40b89..d3f8b6d9e62 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java @@ -130,7 +130,7 @@ public class JobProvider { XContentBuilder usageMapping = ElasticsearchMappings.usageMapping(); LOGGER.trace("ES API CALL: create index {}", PRELERT_USAGE_INDEX); client.admin().indices().prepareCreate(PRELERT_USAGE_INDEX) - .setSettings(prelertIndexSettings()) + .setSettings(mlResultsIndexSettings()) .addMapping(Usage.TYPE, usageMapping) .execute(new ActionListener() { @Override @@ -145,12 +145,12 @@ public class JobProvider { }); } catch (IOException e) { - LOGGER.warn("Error checking the usage metering index", e); + LOGGER.warn("Error creating the usage metering index", e); } } /** - * Build the Elasticsearch index settings that we want to apply to Prelert + * Build the Elasticsearch index settings that we want to apply to results * indexes. It's better to do this in code rather than in elasticsearch.yml * because then the settings can be applied regardless of whether we're * using our own Elasticsearch to store results or a customer's pre-existing @@ -159,7 +159,7 @@ public class JobProvider { * @return An Elasticsearch builder initialised with the desired settings * for Prelert indexes. */ - Settings.Builder prelertIndexSettings() { + Settings.Builder mlResultsIndexSettings() { return Settings.builder() // Our indexes are small and one shard puts the // least possible burden on Elasticsearch @@ -176,39 +176,54 @@ public class JobProvider { .put(IndexSettings.DEFAULT_FIELD_SETTING.getKey(), ElasticsearchMappings.ALL_FIELD_VALUES); } + /** + * Build the Elasticsearch index settings that we want to apply to the state + * index. It's better to do this in code rather than in elasticsearch.yml + * because then the settings can be applied regardless of whether we're + * using our own Elasticsearch to store results or a customer's pre-existing + * Elasticsearch. + * + * @return An Elasticsearch builder initialised with the desired settings + * for Prelert indexes. + */ + Settings.Builder mlStateIndexSettings() { + // TODO review these settings + return Settings.builder() + // Our indexes are small and one shard puts the + // least possible burden on Elasticsearch + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + // Sacrifice durability for performance: in the event of power + // failure we can lose the last 5 seconds of changes, but it's + // much faster + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC); + } + /** * Create the Elasticsearch index and the mappings */ - // TODO: rename and move? - public void createJobRelatedIndices(Job job, ActionListener listener) { + public void createJobResultIndex(Job job, ActionListener listener) { Collection termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList(); try { XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(termFields); - XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping(); XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping(); - XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping(); - XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping(); - XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping(); XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping(); XContentBuilder usageMapping = ElasticsearchMappings.usageMapping(); XContentBuilder auditMessageMapping = ElasticsearchMappings.auditMessageMapping(); XContentBuilder auditActivityMapping = ElasticsearchMappings.auditActivityMapping(); + XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping(); String jobId = job.getId(); boolean createIndexAlias = !job.getIndexName().equals(job.getId()); - String indexName = AnomalyDetectorsIndex.getJobIndexName(job.getIndexName()); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName()); LOGGER.trace("ES API CALL: create index {}", indexName); CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); - Settings.Builder settingsBuilder = prelertIndexSettings(); - createIndexRequest.settings(settingsBuilder); + createIndexRequest.settings(mlResultsIndexSettings()); createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping); - createIndexRequest.mapping(CategorizerState.TYPE, categorizerStateMapping); createIndexRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping); - createIndexRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping); - createIndexRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping); - createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping); createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping); + createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping); // NORELASE These mappings shouldn't go in the results index once the index // strategy has been reworked createIndexRequest.mapping(Usage.TYPE, usageMapping); @@ -220,7 +235,7 @@ public class JobProvider { final ActionListener responseListener = listener; listener = ActionListener.wrap(aBoolean -> { client.admin().indices().prepareAliases() - .addAlias(indexName, AnomalyDetectorsIndex.getJobIndexName(jobId)) + .addAlias(indexName, AnomalyDetectorsIndex.jobResultsIndexName(jobId)) .execute(new ActionListener() { @Override public void onResponse(IndicesAliasesResponse indicesAliasesResponse) { @@ -254,12 +269,42 @@ public class JobProvider { } } + public void createJobStateIndex(BiConsumer listener) { + try { + XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping(); + XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping(); + XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping(); + + LOGGER.trace("ES API CALL: create state index {}", AnomalyDetectorsIndex.jobStateIndexName()); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.jobStateIndexName()); + createIndexRequest.settings(mlStateIndexSettings()); + createIndexRequest.mapping(CategorizerState.TYPE, categorizerStateMapping); + createIndexRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping); + createIndexRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping); + + client.admin().indices().create(createIndexRequest, new ActionListener() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + listener.accept(true, null); + } + + @Override + public void onFailure(Exception e) { + listener.accept(false, e); + } + }); + } catch (Exception e) { + LOGGER.warn("Error creating the usage metering index", e); + } + } + + /** * Delete all the job related documents from the database. */ // TODO: should live together with createJobRelatedIndices (in case it moves)? public void deleteJobRelatedIndices(String jobId, ActionListener listener) { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); LOGGER.trace("ES API CALL: delete index {}", indexName); try { @@ -286,7 +331,7 @@ public class JobProvider { * @return The dataCounts or default constructed object if not found */ public DataCounts dataCounts(String jobId) { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); try { GetResponse response = client.prepareGet(indexName, DataCounts.TYPE.getPreferredName(), @@ -378,7 +423,7 @@ public class JobProvider { SearchResponse searchResponse; try { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); LOGGER.trace("ES API CALL: search all of result type {} from index {} with filter from {} size {}", Bucket.RESULT_TYPE_VALUE, indexName, from, size); @@ -421,7 +466,7 @@ public class JobProvider { * @throws ResourceNotFoundException If the job id is not recognised */ public QueryPage bucket(String jobId, BucketQueryBuilder.BucketQuery query) throws ResourceNotFoundException { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); SearchHits hits; try { LOGGER.trace("ES API CALL: get Bucket with timestamp {} from index {}", query.getTimestamp(), indexName); @@ -504,7 +549,7 @@ public class JobProvider { .filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue)); FieldSortBuilder sb = new FieldSortBuilder(Bucket.TIMESTAMP.getPreferredName()).order(SortOrder.ASC); - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); SearchRequestBuilder searchBuilder = client .prepareSearch(indexName) .setQuery(boolQuery) @@ -630,7 +675,7 @@ public class JobProvider { * @return QueryPage of CategoryDefinition */ public QueryPage categoryDefinitions(String jobId, int from, int size) { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}", CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size); @@ -670,7 +715,7 @@ public class JobProvider { * @return QueryPage CategoryDefinition */ public QueryPage categoryDefinition(String jobId, String categoryId) { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); GetResponse response; try { @@ -737,7 +782,7 @@ public class JobProvider { private QueryPage records(String jobId, int from, int size, QueryBuilder recordFilter, FieldSortBuilder sb, List secondarySort, boolean descending) throws ResourceNotFoundException { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); recordFilter = new BoolQueryBuilder() .filter(recordFilter) @@ -804,7 +849,7 @@ public class JobProvider { private QueryPage influencers(String jobId, int from, int size, QueryBuilder filterBuilder, String sortField, boolean sortDescending) throws ResourceNotFoundException { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}", () -> Influencer.RESULT_TYPE_VALUE, () -> indexName, () -> (sortField != null) ? @@ -884,13 +929,12 @@ public class JobProvider { * Get the persisted quantiles state for the job */ public Optional getQuantiles(String jobId) { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobStateIndexName(); try { String quantilesId = Quantiles.quantilesId(jobId); LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName); - GetResponse response = client.prepareGet( - indexName, Quantiles.TYPE.getPreferredName(), quantilesId).get(); + GetResponse response = client.prepareGet(indexName, Quantiles.TYPE.getPreferredName(), quantilesId).get(); if (!response.isExists()) { LOGGER.info("There are currently no quantiles for job " + jobId); return Optional.empty(); @@ -965,7 +1009,7 @@ public class JobProvider { SearchResponse searchResponse; try { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}", ModelSnapshot.TYPE, indexName, sortField, from, size); @@ -1006,7 +1050,7 @@ public class JobProvider { * @param restoreStream the stream to write the state to */ public void restoreStateToStream(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) throws IOException { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobStateIndexName(); // First try to restore categorizer state. There are no snapshots for this, so the IDs simply // count up until a document is not found. It's NOT an error to have no categorizer state. @@ -1081,7 +1125,7 @@ public class JobProvider { SearchResponse searchResponse; try { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}", ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size); @@ -1115,7 +1159,7 @@ public class JobProvider { * Get the job's model size stats. */ public Optional modelSizeStats(String jobId) { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); try { LOGGER.trace("ES API CALL: get result type {} ID {} from index {}", ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName); @@ -1173,6 +1217,6 @@ public class JobProvider { * @return the {@code Auditor} */ public Auditor audit(String jobId) { - return new Auditor(client, AnomalyDetectorsIndex.getJobIndexName(jobId), jobId); + return new Auditor(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), jobId); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java index ce129a38b72..74eb3c809ff 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java @@ -71,7 +71,7 @@ public class JobResultsPersister extends AbstractComponent { private Builder(String jobId) { this.jobId = Objects.requireNonNull(jobId); - indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); bulkRequest = client.prepareBulk(); } @@ -208,7 +208,7 @@ public class JobResultsPersister extends AbstractComponent { public void persistCategoryDefinition(CategoryDefinition category) { Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(), String.valueOf(category.getCategoryId())); - persistable.persist(); + persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(category.getJobId())); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -219,13 +219,13 @@ public class JobResultsPersister extends AbstractComponent { public void persistQuantiles(Quantiles quantiles) { Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(quantiles.getJobId())); - if (persistable.persist()) { + if (persistable.persist(AnomalyDetectorsIndex.jobStateIndexName())) { // Refresh the index when persisting quantiles so that previously // persisted results will be available for searching. Do this using the // indices API rather than the index API (used to write the quantiles // above), because this will refresh all shards rather than just the // shard that the quantiles document itself was written to. - commitWrites(quantiles.getJobId()); + commitStateWrites(quantiles.getJobId()); } } @@ -235,7 +235,7 @@ public class JobResultsPersister extends AbstractComponent { public void persistModelSnapshot(ModelSnapshot modelSnapshot) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(), modelSnapshot.getSnapshotId()); - persistable.persist(); + persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId())); } /** @@ -246,9 +246,9 @@ public class JobResultsPersister extends AbstractComponent { logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()); - persistable.persist(); + persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), null); - persistable.persist(); + persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); // Don't commit as we expect masses of these updates and they're only // for information at the API level } @@ -258,7 +258,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) { Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, Result.TYPE.getPreferredName(), null); - persistable.persist(); + persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelDebugOutput.getJobId())); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -293,10 +293,26 @@ public class JobResultsPersister extends AbstractComponent { * Once all the job data has been written this function will be * called to commit the writes to the datastore. * + * @param jobId The job Id * @return True if successful */ - public boolean commitWrites(String jobId) { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + public boolean commitResultWrites(String jobId) { + String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + // Refresh should wait for Lucene to make the data searchable + logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); + client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet(); + return true; + } + + /** + * Once the job state has been written calling this function makes it + * immediately searchable. + * + * @param jobId The job Id + * @return True if successful + * */ + public boolean commitStateWrites(String jobId) { + String indexName = AnomalyDetectorsIndex.jobStateIndexName(); // Refresh should wait for Lucene to make the data searchable logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet(); @@ -329,16 +345,15 @@ public class JobResultsPersister extends AbstractComponent { this.id = id; } - boolean persist() { + boolean persist(String indexName) { if (object == null) { logger.warn("[{}] No {} to persist for job ", jobId, type); return false; } - logCall(); + logCall(indexName); try { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); client.prepareIndex(indexName, type, id) .setSource(toXContentBuilder(object)) .execute().actionGet(); @@ -349,8 +364,7 @@ public class JobResultsPersister extends AbstractComponent { } } - private void logCall() { - String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); + private void logCall(String indexName) { if (id != null) { logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, type, indexName, id); } else { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/UsagePersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/UsagePersister.java index d5d742d92dd..4dff35940b1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/UsagePersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/UsagePersister.java @@ -48,7 +48,7 @@ public class UsagePersister extends AbstractComponent { // update global count updateDocument(jobId, PRELERT_USAGE_INDEX, docId, bytesRead, fieldsRead, recordsRead); - updateDocument(jobId, AnomalyDetectorsIndex.getJobIndexName(jobId), docId, bytesRead, + updateDocument(jobId, AnomalyDetectorsIndex.jobResultsIndexName(jobId), docId, bytesRead, fieldsRead, recordsRead); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java index 4c12744a16b..9771c49dd1b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -84,6 +84,8 @@ public class AutoDetectResultProcessor { LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); } } + + context.bulkResultsPersister.executeRequest(); LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); LOGGER.info("[{}] Parse results Complete", jobId); } catch (Exception e) { @@ -157,7 +159,7 @@ public class AutoDetectResultProcessor { // Commit previous writes here, effectively continuing // the flush from the C++ autodetect process right // through to the data store - persister.commitWrites(context.jobId); + persister.commitResultWrites(context.jobId); flushListener.acknowledgeFlush(flushAcknowledgement.getId()); // Interim results may have been produced by the flush, // which need to be diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java index 9ab84ee63d1..ef980f71299 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java @@ -191,7 +191,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { } private DataCounts getDataCounts(String jobId) { - GetResponse getResponse = client().prepareGet(AnomalyDetectorsIndex.getJobIndexName(jobId), + GetResponse getResponse = client().prepareGet(AnomalyDetectorsIndex.jobResultsIndexName(jobId), DataCounts.TYPE.getPreferredName(), jobId + "-data-counts").get(); if (getResponse.isExists() == false) { return new DataCounts(jobId); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java index 01ef8bb4973..f2702c3f190 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java @@ -111,7 +111,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { }).start(); resultProcessor.process(JOB_ID, inputStream, false); - jobResultsPersister.commitWrites(JOB_ID); + jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); @@ -178,7 +178,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { }).start(); resultProcessor.process(JOB_ID, inputStream, false); - jobResultsPersister.commitWrites(JOB_ID); + jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); @@ -227,7 +227,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { }).start(); resultProcessor.process(JOB_ID, inputStream, false); - jobResultsPersister.commitWrites(JOB_ID); + jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); @@ -249,7 +249,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { Job.Builder jobBuilder = new Job.Builder(JOB_ID); jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector))); - jobProvider.createJobRelatedIndices(jobBuilder.build(), new ActionListener() { + jobProvider.createJobResultIndex(jobBuilder.build(), new ActionListener() { @Override public void onResponse(Boolean aBoolean) { } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java index fa031b733b8..6545023d25e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java @@ -155,14 +155,14 @@ public class JobManagerTests extends ESTestCase { PutJobAction.Request request = new PutJobAction.Request(jobBuilder.build()); Index index = mock(Index.class); - when(index.getName()).thenReturn(AnomalyDetectorsIndex.getJobIndexName("my-special-place")); + when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("my-special-place")); IndexMetaData indexMetaData = mock(IndexMetaData.class); when(indexMetaData.getIndex()).thenReturn(index); ImmutableOpenMap aliases = ImmutableOpenMap.of(); when(indexMetaData.getAliases()).thenReturn(aliases); ImmutableOpenMap indexMap = ImmutableOpenMap.builder() - .fPut(AnomalyDetectorsIndex.getJobIndexName("my-special-place"), indexMetaData).build(); + .fPut(AnomalyDetectorsIndex.jobResultsIndexName("my-special-place"), indexMetaData).build(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, PrelertMetadata.PROTO).indices(indexMap)).build(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java index e52cee647bd..df2dbf4cd67 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import java.util.concurrent.ExecutorService; @@ -57,6 +58,7 @@ public class PrelertInitializationServiceTests extends ESTestCase { verify(clusterService, times(1)).submitStateUpdateTask(eq("install-prelert-metadata"), any()); verify(jobProvider, times(1)).createUsageMeteringIndex(any()); + verify(jobProvider, times(1)).createJobStateIndex(any()); } public void testInitialize_noMasterNode() { @@ -109,12 +111,17 @@ public class PrelertInitializationServiceTests extends ESTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) )) + .put(IndexMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + )) .putCustom(PrelertMetadata.TYPE, new PrelertMetadata.Builder().build())) .build(); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); verify(clusterService, times(0)).submitStateUpdateTask(eq("install-prelert-metadata"), any()); verify(jobProvider, times(0)).createUsageMeteringIndex(any()); + verify(jobProvider, times(0)).createJobStateIndex(any()); } - } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappingsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappingsTests.java index de98662057d..f5750c27a19 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappingsTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappingsTests.java @@ -92,8 +92,8 @@ public class ElasticsearchMappingsTests extends ESTestCase { overridden.add(Quantiles.TYPE.getPreferredName()); overridden.add(Usage.TYPE); - // These are not reserved because they're in the prelert-int index, not - // prelertresults-* + // These are not reserved because they're in the prelert-int index + // not the job indices overridden.add(ListDocument.ID.getPreferredName()); overridden.add(ListDocument.ITEMS.getPreferredName()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterTests.java index 98866ba2891..76327571865 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterTests.java @@ -13,15 +13,20 @@ import org.elasticsearch.client.Client; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.prelert.job.ModelSnapshot; +import org.elasticsearch.xpack.prelert.job.ModelState; import org.mockito.Mockito; import java.util.ArrayList; import java.util.Date; import java.util.List; +import static org.elasticsearch.mock.orig.Mockito.mock; import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.verify; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; public class JobDataDeleterTests extends ESTestCase { @@ -35,7 +40,7 @@ public class JobDataDeleterTests extends ESTestCase { BulkResponse bulkResponse = Mockito.mock(BulkResponse.class); Client client = new MockClientBuilder("myCluster") - .prepareSearchExecuteListener(AnomalyDetectorsIndex.getJobIndexName("foo"), response) + .prepareSearchExecuteListener(AnomalyDetectorsIndex.jobResultsIndexName("foo"), response) .prepareSearchScrollExecuteListener(response) .prepareBulk(bulkResponse).build(); @@ -73,6 +78,24 @@ public class JobDataDeleterTests extends ESTestCase { verify(client.prepareBulk(), times(1)).execute(bulkListener); } + public void testDeleteModelSnapShot() { + String jobId = "foo"; + ModelSnapshot snapshot = new ModelSnapshot(jobId); + snapshot.setSnapshotDocCount(5); + snapshot.setSnapshotId("snap-1"); + + BulkResponse bulkResponse = Mockito.mock(BulkResponse.class); + Client client = new MockClientBuilder("myCluster").prepareBulk(bulkResponse).build(); + + JobDataDeleter bulkDeleter = new JobDataDeleter(client, jobId); + bulkDeleter.deleteModelSnapshot(snapshot); + verify(client, times(5)) + .prepareDelete(eq(AnomalyDetectorsIndex.jobStateIndexName()), eq(ModelState.TYPE.getPreferredName()), anyString()); + verify(client, times(1)) + .prepareDelete(eq(AnomalyDetectorsIndex.jobResultsIndexName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()), + eq("snap-1")); + } + private SearchResponse createSearchResponseWithHits(long totalHitCount, int hitsPerSearchResult) { SearchHits hits = mockSearchHits(totalHitCount, hitsPerSearchResult); SearchResponse searchResponse = Mockito.mock(SearchResponse.class); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java index 0d7746b7813..a2a4d88bcac 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; @@ -67,28 +66,16 @@ import static org.mockito.Mockito.when; public class JobProviderTests extends ESTestCase { private static final String CLUSTER_NAME = "myCluster"; private static final String JOB_ID = "foo"; - private static final String INDEX_NAME = "prelertresults-foo"; + private static final String STATE_INDEX_NAME = ".ml-state"; @Captor private ArgumentCaptor> mapCaptor; - public void testGetQuantiles_GivenNoIndexForJob() throws InterruptedException, ExecutionException { - - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .throwMissingIndexOnPrepareGet(INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID)); - - JobProvider provider = createProvider(clientBuilder.build()); - - ESTestCase.expectThrows(IndexNotFoundException.class, () -> provider.getQuantiles(JOB_ID)); - } - public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception { GetResponse getResponse = createGetResponse(false, null); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareGet(INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse); + .prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse); JobProvider provider = createProvider(clientBuilder.build()); @@ -105,8 +92,7 @@ public class JobProviderTests extends ESTestCase { GetResponse getResponse = createGetResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareGet(INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse); + .prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse); JobProvider provider = createProvider(clientBuilder.build()); @@ -124,8 +110,7 @@ public class JobProviderTests extends ESTestCase { GetResponse getResponse = createGetResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareGet(INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId("foo"), getResponse); + .prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId("foo"), getResponse); JobProvider provider = createProvider(clientBuilder.build()); @@ -146,10 +131,10 @@ public class JobProviderTests extends ESTestCase { clientBuilder.verifyIndexCreated(JobProvider.PRELERT_USAGE_INDEX); } - public void testIndexSettings() { + public void testMlResultsIndexSettings() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); JobProvider provider = createProvider(clientBuilder.build()); - Settings settings = provider.prelertIndexSettings().build(); + Settings settings = provider.mlResultsIndexSettings().build(); assertEquals("1", settings.get("index.number_of_shards")); assertEquals("0", settings.get("index.number_of_replicas")); @@ -158,31 +143,28 @@ public class JobProviderTests extends ESTestCase { assertEquals("all_field_values", settings.get("index.query.default_field")); } - public void testCreateJobRelatedIndicies() { + public void testCreateJobResultsIndex() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); - clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor); + clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor); Job.Builder job = buildJobBuilder("foo"); JobProvider provider = createProvider(clientBuilder.build()); - provider.createJobRelatedIndices(job.build(), new ActionListener() { + provider.createJobResultIndex(job.build(), new ActionListener() { @Override public void onResponse(Boolean aBoolean) { CreateIndexRequest request = captor.getValue(); assertNotNull(request); - assertEquals(provider.prelertIndexSettings().build(), request.settings()); + assertEquals(provider.mlResultsIndexSettings().build(), request.settings()); assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(CategorizerState.TYPE)); assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName())); assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName())); assertTrue(request.mappings().containsKey(Usage.TYPE)); assertTrue(request.mappings().containsKey(AuditMessage.TYPE.getPreferredName())); assertTrue(request.mappings().containsKey(AuditActivity.TYPE.getPreferredName())); - assertEquals(10, request.mappings().size()); + assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName())); + assertEquals(7, request.mappings().size()); } @Override @@ -195,15 +177,15 @@ public class JobProviderTests extends ESTestCase { public void testCreateJobRelatedIndicies_createsAliasIfIndexNameIsSet() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); - clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor); - clientBuilder.prepareAlias(AnomalyDetectorsIndex.getJobIndexName("bar"), AnomalyDetectorsIndex.getJobIndexName("foo")); + clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor); + clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("bar"), AnomalyDetectorsIndex.jobResultsIndexName("foo")); Job.Builder job = buildJobBuilder("foo"); job.setIndexName("bar"); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); - provider.createJobRelatedIndices(job.build(), new ActionListener() { + provider.createJobResultIndex(job.build(), new ActionListener() { @Override public void onResponse(Boolean aBoolean) { verify(client.admin().indices(), times(1)).prepareAliases(); @@ -220,14 +202,14 @@ public class JobProviderTests extends ESTestCase { public void testCreateJobRelatedIndicies_doesntCreateAliasIfIndexNameIsSameAsJobId() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); - clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor); + clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor); Job.Builder job = buildJobBuilder("foo"); job.setIndexName("foo"); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); - provider.createJobRelatedIndices(job.build(), new ActionListener() { + provider.createJobResultIndex(job.build(), new ActionListener() { @Override public void onResponse(Boolean aBoolean) { verify(client.admin().indices(), never()).prepareAliases(); @@ -240,6 +222,36 @@ public class JobProviderTests extends ESTestCase { }); } + public void testMlStateIndexSettings() { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + JobProvider provider = createProvider(clientBuilder.build()); + Settings settings = provider.mlResultsIndexSettings().build(); + + assertEquals("1", settings.get("index.number_of_shards")); + assertEquals("0", settings.get("index.number_of_replicas")); + assertEquals("async", settings.get("index.translog.durability")); + } + + public void testCreateJobStateIndex() { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); + clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), captor); + + Job.Builder job = buildJobBuilder("foo"); + JobProvider provider = createProvider(clientBuilder.build()); + + provider.createJobStateIndex((result, error) -> { + assertTrue(result); + CreateIndexRequest request = captor.getValue(); + assertNotNull(request); + assertEquals(provider.mlStateIndexSettings().build(), request.settings()); + assertTrue(request.mappings().containsKey(CategorizerState.TYPE)); + assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName())); + assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName())); + assertEquals(3, request.mappings().size()); + }); + } + public void testCreateJob() throws InterruptedException, ExecutionException { Job.Builder job = buildJobBuilder("marscapone"); job.setDescription("This is a very cheesy job"); @@ -247,12 +259,13 @@ public class JobProviderTests extends ESTestCase { job.setAnalysisLimits(limits); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).createIndexRequest("prelertresults-" + job.getId(), captor); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME) + .createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName(job.getId()), captor); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); AtomicReference resultHolder = new AtomicReference<>(); - provider.createJobRelatedIndices(job.build(), new ActionListener() { + provider.createJobResultIndex(job.build(), new ActionListener() { @Override public void onResponse(Boolean aBoolean) { resultHolder.set(aBoolean); @@ -271,12 +284,12 @@ public class JobProviderTests extends ESTestCase { @SuppressWarnings("unchecked") ActionListener actionListener = mock(ActionListener.class); String jobId = "ThisIsMyJob"; - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse(); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); clientBuilder.resetIndices(); - clientBuilder.addIndicesExistsResponse("prelertresults-" + jobId, true).addIndicesDeleteResponse("prelertresults-" + jobId, true, + clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true) + .addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true, false, actionListener); clientBuilder.build(); @@ -291,12 +304,12 @@ public class JobProviderTests extends ESTestCase { @SuppressWarnings("unchecked") ActionListener actionListener = mock(ActionListener.class); String jobId = "ThisIsMyJob"; - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse(); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); clientBuilder.resetIndices(); - clientBuilder.addIndicesExistsResponse("prelertresults-" + jobId, true).addIndicesDeleteResponse("prelertresults-" + jobId, true, + clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true) + .addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true, true, actionListener); clientBuilder.build(); @@ -324,8 +337,8 @@ public class JobProviderTests extends ESTestCase { int from = 0; int size = 10; MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -359,8 +372,8 @@ public class JobProviderTests extends ESTestCase { int from = 99; int size = 17; MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -394,8 +407,8 @@ public class JobProviderTests extends ESTestCase { int from = 99; int size = 17; MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -423,15 +436,11 @@ public class JobProviderTests extends ESTestCase { Date now = new Date(); List> source = new ArrayList<>(); - Map map = new HashMap<>(); - map.put("timestamp", now.getTime()); - // source.add(map); - ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(false, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -457,8 +466,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -487,8 +496,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -529,8 +538,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -581,8 +590,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -640,8 +649,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -679,8 +688,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearchAnySize("prelertresults-" + jobId, Result.TYPE.getPreferredName(), response, queryBuilder); + .prepareSearchAnySize(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -711,8 +720,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearchAnySize("prelertresults-" + jobId, Result.TYPE.getPreferredName(), response, queryBuilder); + .prepareSearchAnySize(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + Result.TYPE.getPreferredName(), response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -742,8 +751,8 @@ public class JobProviderTests extends ESTestCase { int from = 0; int size = 10; MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, CategoryDefinition.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + CategoryDefinition.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -766,8 +775,8 @@ public class JobProviderTests extends ESTestCase { GetResponse getResponse = createGetResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareGet("prelertresults-" + jobId, CategoryDefinition.TYPE.getPreferredName(), categoryId, getResponse); + .prepareGet(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + CategoryDefinition.TYPE.getPreferredName(), categoryId, getResponse); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -810,8 +819,7 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), Result.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); @@ -875,8 +883,7 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), Result.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); @@ -910,8 +917,7 @@ public class JobProviderTests extends ESTestCase { String jobId = "TestJobIdentificationForInfluencers"; String influencerId = "ThisIsAnInfluencerId"; - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse(); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -952,8 +958,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -1008,8 +1014,8 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); SearchResponse response = createSearchResponse(true, source); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareSearch("prelertresults-" + jobId, ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder); + .prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), + ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder); Client client = clientBuilder.build(); JobProvider provider = createProvider(client); @@ -1038,8 +1044,7 @@ public class JobProviderTests extends ESTestCase { } public void testMergePartitionScoresIntoBucket() throws InterruptedException, ExecutionException { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME) - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true).addClusterStatusYellowResponse(); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); JobProvider provider = createProvider(clientBuilder.build()); @@ -1093,8 +1098,7 @@ public class JobProviderTests extends ESTestCase { } public void testMergePartitionScoresIntoBucket_WithEmptyScoresList() throws InterruptedException, ExecutionException { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME) - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true).addClusterStatusYellowResponse(); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); JobProvider provider = createProvider(clientBuilder.build()); @@ -1125,11 +1129,10 @@ public class JobProviderTests extends ESTestCase { GetResponse modelStateGetResponse2 = createGetResponse(true, modelState); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true) - .prepareGet(INDEX_NAME, CategorizerState.TYPE, JOB_ID + "_1", categorizerStateGetResponse1) - .prepareGet(INDEX_NAME, CategorizerState.TYPE, JOB_ID + "_2", categorizerStateGetResponse2) - .prepareGet(INDEX_NAME, ModelState.TYPE.getPreferredName(), "123_1", modelStateGetResponse1) - .prepareGet(INDEX_NAME, ModelState.TYPE.getPreferredName(), "123_2", modelStateGetResponse2); + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "_1", categorizerStateGetResponse1) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "_2", categorizerStateGetResponse2) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), "123_1", modelStateGetResponse1) + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), "123_2", modelStateGetResponse2); JobProvider provider = createProvider(clientBuilder.build()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java index e8f3c41d17f..1737cab98d3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -204,7 +204,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { processor.processResult(context, result); verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); - verify(persister, times(1)).commitWrites(JOB_ID); + verify(persister, times(1)).commitResultWrites(JOB_ID); verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); assertTrue(context.deleteInterimRequired); @@ -230,7 +230,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { processor.processResult(context, result); inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition); - inOrder.verify(persister, times(1)).commitWrites(JOB_ID); + inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID); inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/delete_model_snapshot.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/delete_model_snapshot.yaml index f44d1b5ffe2..c38b47128d8 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/delete_model_snapshot.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/delete_model_snapshot.yaml @@ -43,7 +43,7 @@ setup: - do: index: - index: prelertresults-foo + index: .ml-state type: model_state id: "foo1_0" body: > @@ -53,7 +53,7 @@ setup: - do: index: - index: prelertresults-foo + index: .ml-state type: model_state id: "foo1_1" body: > @@ -77,6 +77,10 @@ setup: "latest_result_time_stamp": "2016-06-01T00:00:00Z" } + - do: + indices.refresh: + index: .ml-state + - do: indices.refresh: index: prelertresults-foo @@ -104,7 +108,7 @@ setup: - do: count: - index: prelertresults-foo + index: .ml-state type: model_state - match: { count: 2 } @@ -119,6 +123,10 @@ setup: indices.refresh: index: prelertresults-foo + - do: + indices.refresh: + index: .ml-state + - do: xpack.prelert.get_model_snapshots: job_id: "foo" @@ -127,7 +135,7 @@ setup: - do: count: - index: prelertresults-foo + index: .ml-state type: model_state - match: { count: 0 } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/get_model_snapshots.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/get_model_snapshots.yaml index fdcc7751e2b..67ef7d11d96 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/get_model_snapshots.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/get_model_snapshots.yaml @@ -10,6 +10,7 @@ setup: type: date "restore_priority": type: integer + - do: index: index: prelertresults-foo diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml index bd98db548d3..272e67f814b 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_crud.yaml @@ -21,7 +21,10 @@ - do: indices.get: index: "prelertresults-farequote" - - is_true: "prelertresults-farequote" + + - do: + indices.get: + index: ".ml-state" - do: xpack.prelert.get_jobs: diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/put_model_snapshot_description.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/put_model_snapshot_description.yaml index 55fc08d4978..9a40f4c00c8 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/put_model_snapshot_description.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/put_model_snapshot_description.yaml @@ -10,6 +10,7 @@ setup: type: date "restore_priority": type: integer + - do: index: index: prelertresults-foo diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml index de84614e860..f2bae60b11b 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml @@ -128,6 +128,9 @@ setup: indices.refresh: index: prelertresults-foo + - do: + indices.refresh: + index: .ml-state --- "Test revert model with only job_id": - do: