From eed0e41a298fd146a433566fb3501e448251dcb3 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 1 Mar 2017 13:28:12 -0500 Subject: [PATCH] [ML] Share job result indices by default (elastic/x-pack-elasticsearch#638) This moves the index structure to using a single, shared index (.ml-anomalies-shared). Custom indices can still be used by manually setting `results_index`. An alias is always created which points from `.ml-anomalies-` to `.ml-anomalies-shared`. User defined indices are prepended with "custom-" Index helper functions have been renamed to make this clear. Furthermore, accessing an index should always be done either by fetching the currently configured index/alias from the state, or using the preconfigured alias. Because the user can specify a custom physical index, it is impossible to determine the physical index "by convention" now. The helpers have been configured to reflect that. Original commit: elastic/x-pack-elasticsearch@a5368eb230f90770c6d7cc1f54f2eaeec60b5a07 --- .../elasticsearch/xpack/ml/MlMetadata.java | 2 +- .../xpack/ml/action/OpenJobAction.java | 6 +- .../xpack/ml/job/JobManager.java | 4 +- .../xpack/ml/job/config/Job.java | 27 +- .../persistence/AnomalyDetectorsIndex.java | 11 +- .../persistence/BatchedResultsIterator.java | 2 +- .../persistence/ElasticsearchMappings.java | 1 - .../persistence/JobDataCountsPersister.java | 2 +- .../ml/job/persistence/JobDataDeleter.java | 6 +- .../xpack/ml/job/persistence/JobProvider.java | 99 +++-- .../job/persistence/JobResultsPersister.java | 16 +- .../persistence/JobStorageDeletionTask.java | 88 ++--- .../ExpiredModelSnapshotsRemover.java | 2 +- .../job/retention/ExpiredResultsRemover.java | 2 +- .../xpack/ml/action/OpenJobActionTests.java | 10 +- .../integration/BasicDistributedJobsIT.java | 2 +- .../xpack/ml/integration/MlJobIT.java | 184 +++++++--- .../integration/MlRestTestStateCleaner.java | 6 + .../xpack/ml/job/config/JobTests.java | 7 +- .../job/persistence/JobDataDeleterTests.java | 4 +- .../ml/job/persistence/JobProviderTests.java | 155 +++----- .../ExpiredModelSnapshotsRemoverTests.java | 12 +- .../retention/ExpiredResultsRemoverTests.java | 8 +- .../rest-api-spec/test/ml/index_layout.yaml | 345 ++++++++++++++++++ 24 files changed, 708 insertions(+), 293 deletions(-) create mode 100644 plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index 67ab188aad7..d36e4c9ce86 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -245,7 +245,7 @@ public class MlMetadata implements MetaData.Custom { JobState jobState = MlMetadata.getJobState(jobId, tasks); if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" + - JobState.CLOSED + "]"); + JobState.CLOSED + " or " + JobState.FAILED + "]"); } Job job = jobs.remove(jobId); if (job == null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 7989082a0c1..6650b3f141d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -481,15 +481,15 @@ public class OpenJobAction extends Action verifyIndicesPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); Job job = mlMetadata.getJobs().get(jobId); - String[] indices = indicesOfInterest(job); + String[] indices = indicesOfInterest(clusterState, jobId); List unavailableIndices = new ArrayList<>(indices.length); for (String index : indices) { // Indices are created on demand from templates. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 8af8d85dbd2..dcae4aced09 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction; @@ -26,8 +27,6 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.messages.Messages; -import org.elasticsearch.xpack.ml.MlMetadata; -import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; @@ -253,7 +252,6 @@ public class JobManager extends AbstractComponent { ActionListener actionListener) { String jobId = request.getJobId(); - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); logger.debug("Deleting job '" + jobId + "'"); // Step 3. When the job has been removed from the cluster state, return a response diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java index 55ace759e38..1a29aa08ac6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.utils.MlStrings; import org.elasticsearch.xpack.ml.utils.time.TimeUtils; @@ -195,6 +196,15 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent * @return The job's index name */ public String getResultsIndexName() { + return AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + resultsIndexName; + } + + /** + * Private version of getResultsIndexName so that a job can be built from another + * job and pass index name validation + * @return The job's index name, minus prefix + */ + private String getResultsIndexNameNoPrefix() { return resultsIndexName; } @@ -525,10 +535,9 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent this.backgroundPersistInterval = job.getBackgroundPersistInterval(); this.modelSnapshotRetentionDays = job.getModelSnapshotRetentionDays(); this.resultsRetentionDays = job.getResultsRetentionDays(); - this.modelSnapshotRetentionDays = job.getModelSnapshotRetentionDays(); this.customSettings = job.getCustomSettings(); this.modelSnapshotId = job.getModelSnapshotId(); - this.resultsIndexName = job.getResultsIndexName(); + this.resultsIndexName = job.getResultsIndexNameNoPrefix(); this.deleted = job.isDeleted(); } @@ -667,15 +676,23 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent } if (Strings.isNullOrEmpty(resultsIndexName)) { - resultsIndexName = id; + resultsIndexName = AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT; } else if (!MlStrings.isValidId(resultsIndexName)) { - throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, RESULTS_INDEX_NAME.getPreferredName())); + throw new IllegalArgumentException( + Messages.getMessage(Messages.INVALID_ID, RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName)); + } else if (!resultsIndexName.equals(AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT)) { + // User-defined names are prepended with "custom" + // Conditional guards against multiple prepending due to updates instead of first creation + resultsIndexName = resultsIndexName.startsWith("custom-") + ? resultsIndexName + : "custom-" + resultsIndexName; } return new Job( id, description, createTime, finishedTime, lastDataTime, analysisConfig, analysisLimits, dataDescription, modelDebugConfig, renormalizationWindowDays, backgroundPersistInterval, - modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted); + modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, + resultsIndexName, deleted); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java index aead89933b3..4eb1d0aef97 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java @@ -18,8 +18,9 @@ public final class AnomalyDetectorsIndex { */ public static final String ML_META_INDEX = ".ml-meta"; - private static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-"; + public static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-"; private static final String STATE_INDEX_NAME = ".ml-state"; + public static final String RESULTS_INDEX_DEFAULT = "shared"; private AnomalyDetectorsIndex() { } @@ -33,18 +34,18 @@ public final class AnomalyDetectorsIndex { * @param jobId Job Id * @return The index name */ - public static String jobResultsIndexName(String jobId) { + public static String jobResultsAliasedName(String jobId) { return RESULTS_INDEX_PREFIX + jobId; } /** - * The default index pattern for rollover index results + * Retrieves the currently defined physical index from the job state * @param jobId Job Id * @return The index name */ - public static String getCurrentResultsIndex(ClusterState state, String jobId) { + public static String getPhysicalIndexFromState(ClusterState state, String jobId) { MlMetadata meta = state.getMetaData().custom(MlMetadata.TYPE); - return RESULTS_INDEX_PREFIX + meta.getJobs().get(jobId).getResultsIndexName(); + return meta.getJobs().get(jobId).getResultsIndexName(); } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java index eac6a2e1a79..afb7ebb011e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java @@ -13,7 +13,7 @@ public abstract class BatchedResultsIterator extends BatchedDocumentsIterator> { public BatchedResultsIterator(Client client, String jobId, String resultType) { - super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), + super(client, AnomalyDetectorsIndex.jobResultsAliasedName(jobId), new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 195505b3683..d29adf66d5a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -426,7 +426,6 @@ public class ElasticsearchMappings { return jsonBuilder() .startObject() .startObject(DataCounts.TYPE.getPreferredName()) - .field(ENABLED, false) .startObject(PROPERTIES) .startObject(Job.ID.getPreferredName()) .field(TYPE, KEYWORD) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java index 7f772afadb0..da04693350a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java @@ -48,7 +48,7 @@ public class JobDataCountsPersister extends AbstractComponent { public void persistDataCounts(String jobId, DataCounts counts, ActionListener listener) { try { XContentBuilder content = serialiseCounts(counts); - client.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(jobId), DataCounts.TYPE.getPreferredName(), + client.prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId)) .setSource(content).execute(new ActionListener() { @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 1fe00118ddd..f1cae309f6b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -67,7 +67,7 @@ public class JobDataDeleter { * @param listener Response listener */ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener listener) { - String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String index = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()); timeRange.gte(cutoffEpochMs); @@ -119,7 +119,7 @@ public class JobDataDeleter { ++deletedModelStateCount; } - bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()), + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()), ModelSnapshot.TYPE.getPreferredName(), snapshotDocId)); ++deletedModelSnapshotCount; } @@ -128,7 +128,7 @@ public class JobDataDeleter { * Delete all results marked as interim */ public void deleteInterimResults() { - String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String index = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index da416bc48c2..671c6ef9969 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -28,14 +28,12 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; @@ -54,7 +52,6 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.Job; @@ -124,51 +121,50 @@ public class JobProvider { */ public void createJobResultIndex(Job job, ClusterState state, ActionListener listener) { Collection termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList(); - String jobId = job.getId(); - boolean createIndexAlias = !job.getResultsIndexName().equals(job.getId()); - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName()); - if (createIndexAlias) { + + String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()); + String indexName = job.getResultsIndexName(); + + final ActionListener responseListener = listener; listener = ActionListener.wrap(aBoolean -> { client.admin().indices().prepareAliases() - .addAlias(indexName, AnomalyDetectorsIndex.jobResultsIndexName(jobId)) + .addAlias(indexName, aliasName) .execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure)); }, listener::onFailure); - } - // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if - // already in the CS - if (!state.getMetaData().hasIndex(indexName)) { - LOGGER.trace("ES API CALL: create index {}", indexName); - CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if + // already in the CS + if (!state.getMetaData().hasIndex(indexName)) { + LOGGER.trace("ES API CALL: create index {}", indexName); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); - final ActionListener createdListener = listener; - client.admin().indices().create(createIndexRequest, - ActionListener.wrap( - r -> { - updateIndexMappingWithTermFields(indexName, termFields, createdListener); - }, - e -> { - // Possible that the index was created while the request was executing, - // so we need to handle that possibility - if (e instanceof ResourceAlreadyExistsException) { - LOGGER.info("Index already exists"); - // Create the alias - createdListener.onResponse(true); - } else { - createdListener.onFailure(e); + final ActionListener createdListener = listener; + client.admin().indices().create(createIndexRequest, + ActionListener.wrap( + r -> updateIndexMappingWithTermFields(indexName, termFields, createdListener), + e -> { + // Possible that the index was created while the request was executing, + // so we need to handle that possibility + if (e instanceof ResourceAlreadyExistsException) { + LOGGER.info("Index already exists"); + // Create the alias + createdListener.onResponse(true); + } else { + createdListener.onFailure(e); + } } - } - )); - } else { - // Add the job's term fields to the index mapping - final ActionListener updateMappingListener = listener; - checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap( - r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener), - updateMappingListener::onFailure)); - } + )); + } else { + // Add the job's term fields to the index mapping + final ActionListener updateMappingListener = listener; + checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap( + r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener), + updateMappingListener::onFailure)); + } + } private void updateIndexMappingWithTermFields(String indexName, Collection termFields, ActionListener listener) { @@ -230,7 +226,7 @@ public class JobProvider { */ // TODO: should live together with createJobRelatedIndices (in case it moves)? public void deleteJobRelatedIndices(String jobId, ActionListener listener) { - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); LOGGER.trace("ES API CALL: delete index {}", indexName); try { @@ -248,7 +244,7 @@ public class JobProvider { * @param jobId The job id */ public void dataCounts(String jobId, Consumer handler, Consumer errorHandler) { - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); get(jobId, indexName, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId), handler, errorHandler, DataCounts.PARSER, () -> new DataCounts(jobId)); } @@ -256,10 +252,9 @@ public class JobProvider { private void get(String jobId, String indexName, String type, String id, Consumer handler, Consumer errorHandler, BiFunction objectParser, Supplier notFoundSupplier) { GetRequest getRequest = new GetRequest(indexName, type, id); - client.get(getRequest, ActionListener.wrap( - response -> { - if (response.isExists() == false) { + response -> { + if (response.isExists() == false) { handler.accept(notFoundSupplier.get()); } else { BytesReference source = response.getSourceAsBytesRef(); @@ -343,7 +338,7 @@ public class JobProvider { QueryBuilder boolQuery = new BoolQueryBuilder() .filter(rfb.build()) .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE)); - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.types(Result.TYPE.getPreferredName()); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); @@ -465,7 +460,7 @@ public class JobProvider { .filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue)); FieldSortBuilder sb = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC); - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.sort(sb); sourceBuilder.query(boolQuery); @@ -597,7 +592,7 @@ public class JobProvider { throw new IllegalStateException("Both categoryId and pageParams are specified"); } - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}", CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size); @@ -663,7 +658,7 @@ public class JobProvider { QueryBuilder recordFilter, FieldSortBuilder sb, List secondarySort, boolean descending, Consumer> handler, Consumer errorHandler) { - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); recordFilter = new BoolQueryBuilder() .filter(recordFilter) @@ -718,7 +713,7 @@ public class JobProvider { .interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim()) .build(); - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}", () -> Influencer.RESULT_TYPE_VALUE, () -> indexName, () -> (query.getSortField() != null) ? @@ -785,7 +780,7 @@ public class JobProvider { handler.accept(null); return; } - get(jobId, AnomalyDetectorsIndex.jobResultsIndexName(jobId), ModelSnapshot.TYPE.getPreferredName(), + get(jobId, AnomalyDetectorsIndex.jobResultsAliasedName(jobId), ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(jobId, modelSnapshotId), handler, errorHandler, ModelSnapshot.PARSER, () -> null); } @@ -861,7 +856,7 @@ public class JobProvider { FieldSortBuilder sb = new FieldSortBuilder(sortField) .order(sortDescending ? SortOrder.DESC : SortOrder.ASC); - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}", ModelSnapshot.TYPE, indexName, sortField, from, size); @@ -953,7 +948,7 @@ public class JobProvider { public QueryPage modelDebugOutput(String jobId, int from, int size) { SearchResponse searchResponse; - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}", ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size); @@ -986,7 +981,7 @@ public class JobProvider { LOGGER.trace("ES API CALL: get result type {} ID {} for job {}", ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, jobId); - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); get(jobId, indexName, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId), handler, errorHandler, (parser, context) -> ModelSizeStats.PARSER.apply(parser, null).build(), () -> { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 3f6c1362303..f63dc32dc66 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -73,7 +73,7 @@ public class JobResultsPersister extends AbstractComponent { private Builder(String jobId) { this.jobId = Objects.requireNonNull(jobId); - indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); bulkRequest = new BulkRequest(); } @@ -212,7 +212,7 @@ public class JobResultsPersister extends AbstractComponent { public void persistCategoryDefinition(CategoryDefinition category) { Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(), CategoryDefinition.documentId(category.getJobId(), Long.toString(category.getCategoryId()))); - persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(category.getJobId())); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(category.getJobId())); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -239,11 +239,11 @@ public class JobResultsPersister extends AbstractComponent { public void persistModelSnapshot(ModelSnapshot modelSnapshot) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)); - persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId())); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())); } public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer handler, Consumer errorHandler) { - String index = AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()); + String index = AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()); IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)); try { indexRequest.source(toXContentBuilder(modelSnapshot)); @@ -261,9 +261,9 @@ public class JobResultsPersister extends AbstractComponent { logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId)); - persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), null); - persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); // Don't commit as we expect masses of these updates and they're only // for information at the API level } @@ -273,7 +273,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) { Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, Result.TYPE.getPreferredName(), null); - persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelDebugOutput.getJobId())); + persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelDebugOutput.getJobId())); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -295,7 +295,7 @@ public class JobResultsPersister extends AbstractComponent { * @return True if successful */ public boolean commitResultWrites(String jobId) { - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); // Refresh should wait for Lucene to make the data searchable logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java index d095c460aec..0585b8eb03f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -7,13 +7,16 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; @@ -38,63 +41,60 @@ public class JobStorageDeletionTask extends Task { CheckedConsumer finishedHandler, Consumer failureHandler) { - String indexName = AnomalyDetectorsIndex.getCurrentResultsIndex(state, jobId); - String indexPattern = indexName + "-*"; + final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(state, jobId); + final String indexPattern = indexName + "-*"; + final String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - // Step 2. Regardless of if the DBQ succeeds, we delete the physical index + CheckedConsumer deleteAliasHandler = indicesAliasesResponse -> { + if (!indicesAliasesResponse.isAcknowledged()) { + logger.warn("Delete Alias request not acknowledged for alias [" + aliasName + "]."); + } else { + logger.info("Done deleting alias [" + aliasName + "]"); + } + + finishedHandler.accept(true); + }; + + // Step 2. DBQ done, delete the alias // ------- - // TODO this will be removed once shared indices are used + // TODO norelease more robust handling of failures? CheckedConsumer dbqHandler = bulkByScrollResponse -> { if (bulkByScrollResponse.isTimedOut()) { - logger.warn("DeleteByQuery for index [" + indexPattern + "] timed out. Continuing to delete index."); + logger.warn("DeleteByQuery for indices [" + indexName + ", " + indexPattern + "] timed out."); } if (!bulkByScrollResponse.getBulkFailures().isEmpty()) { logger.warn("[" + bulkByScrollResponse.getBulkFailures().size() - + "] failures encountered while running DeleteByQuery on index [" + indexPattern + "]. " - + "Continuing to delete index"); + + "] failures encountered while running DeleteByQuery on indices [" + indexName + ", " + + indexPattern + "]. "); } - - DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); - client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> { - logger.info("Deleting index [" + indexName + "] successful"); - - if (deleteIndexResponse.isAcknowledged()) { - logger.info("Index deletion acknowledged"); - } else { - logger.warn("Index deletion not acknowledged"); - } - finishedHandler.accept(deleteIndexResponse.isAcknowledged()); - }, missingIndexHandler(indexName, finishedHandler, failureHandler))); + IndicesAliasesRequest request = new IndicesAliasesRequest() + .addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName)); + client.admin().indices().aliases(request, ActionListener.wrap(deleteAliasHandler, + e -> { + if (e instanceof IndexNotFoundException) { + logger.warn("Alias [" + aliasName + "] not found. Continuing to delete job."); + try { + finishedHandler.accept(false); + } catch (Exception e1) { + failureHandler.accept(e1); + } + } else { + // all other exceptions should die + failureHandler.accept(e); + } + })); }; // Step 1. DeleteByQuery on the index, matching all docs with the right job_id // ------- - SearchRequest searchRequest = new SearchRequest(indexPattern); - searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]"); + SearchRequest searchRequest = new SearchRequest(indexName, indexPattern); DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); - searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder(Job.ID.getPreferredName(), jobId))); + ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); + searchRequest.source(new SearchSourceBuilder().query(query)); + searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); request.setSlices(5); - client.execute(MlDeleteByQueryAction.INSTANCE, request, - ActionListener.wrap(dbqHandler, missingIndexHandler(indexName, finishedHandler, failureHandler))); - } - - // If the index doesn't exist, we need to catch the exception and carry onwards so that the cluster - // state is properly updated - private Consumer missingIndexHandler(String indexName, CheckedConsumer finishedHandler, - Consumer failureHandler) { - return e -> { - if (e instanceof IndexNotFoundException) { - logger.warn("Physical index [" + indexName + "] not found. Continuing to delete job."); - try { - finishedHandler.accept(false); - } catch (Exception e1) { - failureHandler.accept(e1); - } - } else { - // all other exceptions should die - failureHandler.accept(e); - } - }; + client.execute(MlDeleteByQueryAction.INSTANCE, request, ActionListener.wrap(dbqHandler, failureHandler)); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 721df66d671..248f928b988 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -66,7 +66,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover LOGGER.info("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); QueryBuilder excludeFilter = QueryBuilders.termQuery(ModelSnapshot.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId()); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(AnomalyDetectorsIndex.jobResultsIndexName(job.getId())); + searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); searchRequest.types(ModelSnapshot.TYPE.getPreferredName()); QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter); searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 48c53792577..2f15cd0bd9b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -87,7 +87,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); request.setSlices(5); - searchRequest.indices(AnomalyDetectorsIndex.jobResultsIndexName(job.getId())); + searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); searchRequest.types(type); QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter); searchRequest.source(new SearchSourceBuilder().query(query)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index cb7a32c1f27..d34e8e80a06 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -283,8 +283,7 @@ public class OpenJobActionTests extends ESTestCase { routingTable = new RoutingTable.Builder(cs.routingTable()); MlMetadata mlMetadata = cs.metaData().custom(MlMetadata.TYPE); - Job job = mlMetadata.getJobs().get("job_id"); - String indexToRemove = randomFrom(OpenJobAction.indicesOfInterest(job)); + String indexToRemove = randomFrom(OpenJobAction.indicesOfInterest(cs, "job_id")); if (randomBoolean()) { routingTable.remove(indexToRemove); } else { @@ -317,9 +316,10 @@ public class OpenJobActionTests extends ESTestCase { indices.add(AnomalyDetectorsIndex.jobStateIndexName()); indices.add(AnomalyDetectorsIndex.ML_META_INDEX); indices.add(Auditor.NOTIFICATIONS_INDEX); - for (String jobId : jobIds) { - indices.add(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); - } + + // norelease: randomizing this throws an NPE in the test due to verifyIndicesExistAndPrimaryShardsAreActive() + // returning false. Needs fixing, deferring to a followup PR + indices.add(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT); for (String indexName : indices) { IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); indexMetaData.settings(Settings.builder() diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 9081f4beca9..576df2abdf3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -341,7 +341,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { Exception e = expectThrows(ElasticsearchStatusException.class, () -> client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet()); assertTrue(e.getMessage().startsWith("cannot open job [job_id], no suitable nodes found, allocation explanation")); - assertTrue(e.getMessage().endsWith("because not all primary shards are active for the following indices [.ml-anomalies-job_id]]")); + assertTrue(e.getMessage().endsWith("because not all primary shards are active for the following indices [.ml-anomalies-shared]]")); logger.info("Start data node"); internalCluster().startNode(Settings.builder() diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 444a8ec199d..0d659d07d98 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -220,16 +220,15 @@ public class MlJobIT extends ESRestTestCase { String jobConfig = String.format(Locale.ROOT, jobTemplate, "index-1"); - Response response = client().performRequest("put", MachineLearning.BASE_PATH - + "anomaly_detectors/repeated-id" , Collections.emptyMap(), + Response response = client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/repeated-id" , + Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON)); assertEquals(200, response.getStatusLine().getStatusCode()); final String jobConfig2 = String.format(Locale.ROOT, jobTemplate, "index-2"); ResponseException e = expectThrows(ResponseException.class, - () ->client().performRequest("put", MachineLearning.BASE_PATH - + "anomaly_detectors/repeated-id" , Collections.emptyMap(), - new StringEntity(jobConfig2, ContentType.APPLICATION_JSON))); + () ->client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/repeated-id" , + Collections.emptyMap(), new StringEntity(jobConfig2, ContentType.APPLICATION_JSON))); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); assertThat(e.getMessage(), containsString("The job cannot be created with the Id 'repeated-id'. The Id is already used.")); @@ -259,27 +258,43 @@ public class MlJobIT extends ESRestTestCase { assertEquals(200, response.getStatusLine().getStatusCode()); String responseAsString = responseEntityToString(response); - assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsIndexName(indexName) - + "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsIndexName(jobId1) + "\":{},\"" + - AnomalyDetectorsIndex.jobResultsIndexName(jobId2))); + assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName("custom-" + indexName) + + "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "\":{},\"" + + AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))); response = client().performRequest("get", "_cat/indices"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); - assertThat(responseAsString, containsString(indexName)); - assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId1)))); - assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId2)))); + assertThat(responseAsString, containsString(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-" + indexName)); + assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId1)))); + assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2)))); - addBucketResult(indexName, "1234", 1); - addBucketResult(indexName, "1236", 1); - response = client().performRequest("get", MachineLearning.BASE_PATH - + "anomaly_detectors/" + jobId1 + "/results/buckets"); + String bucketResult = String.format(Locale.ROOT, + "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}", + jobId1, "1234", 1); + String id = String.format(Locale.ROOT, + "%s_%s_%s", jobId1, "1234", 1); + response = client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/result/" + id, + Collections.emptyMap(), new StringEntity(bucketResult, ContentType.APPLICATION_JSON)); + assertEquals(201, response.getStatusLine().getStatusCode()); + + bucketResult = String.format(Locale.ROOT, + "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}", + jobId1, "1236", 1); + id = String.format(Locale.ROOT, + "%s_%s_%s", jobId1, "1236", 1); + response = client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/result/" + id, + Collections.emptyMap(), new StringEntity(bucketResult, ContentType.APPLICATION_JSON)); + assertEquals(201, response.getStatusLine().getStatusCode()); + + client().performRequest("post", "_refresh"); + + response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1 + "/results/buckets"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"count\":2")); - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(indexName) - + "/result/_search"); + response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/result/_search"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"total\":2")); @@ -287,20 +302,66 @@ public class MlJobIT extends ESRestTestCase { response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - // check index and alias were deleted + // check that indices still exist, but are empty and aliases are gone response = client().performRequest("get", "_aliases"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); - assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId1)))); - assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId2)))); + assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId1)))); + assertThat(responseAsString, containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))); //job2 still exists response = client().performRequest("get", "_cat/indices"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); - assertThat(responseAsString, not(containsString(indexName))); + assertThat(responseAsString, containsString(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-" + indexName)); + + client().performRequest("post", "_refresh"); + + response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-" + indexName + "/_count"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"count\":0")); } public void testCreateJobInSharedIndexUpdatesMapping() throws Exception { + String jobTemplate = "{\n" + + " \"analysis_config\" : {\n" + + " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"metric\", \"by_field_name\":\"%s\"}]\n" + + " }\n" + + "}"; + + String jobId1 = "job-1"; + String byFieldName1 = "responsetime"; + String jobId2 = "job-2"; + String byFieldName2 = "cpu-usage"; + String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1); + + Response response = client().performRequest("put", MachineLearning.BASE_PATH + + "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON)); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Check the index mapping contains the first by_field_name + response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT + "/_mapping?pretty"); + assertEquals(200, response.getStatusLine().getStatusCode()); + String responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString(byFieldName1)); + assertThat(responseAsString, not(containsString(byFieldName2))); + + jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2); + response = client().performRequest("put", MachineLearning.BASE_PATH + + "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON)); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Check the index mapping now contains both fields + response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT + "/_mapping?pretty"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString(byFieldName1)); + assertThat(responseAsString, containsString(byFieldName2)); + } + + public void testCreateJobInCustomSharedIndexUpdatesMapping() throws Exception { String jobTemplate = "{\n" + " \"analysis_config\" : {\n" + " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"metric\", \"by_field_name\":\"%s\"}]\n" + @@ -318,7 +379,7 @@ public class MlJobIT extends ESRestTestCase { assertEquals(200, response.getStatusLine().getStatusCode()); // Check the index mapping contains the first by_field_name - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName("shared-index") + "/_mapping?pretty"); + response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-shared-index" + "/_mapping?pretty"); assertEquals(200, response.getStatusLine().getStatusCode()); String responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString(byFieldName1)); @@ -330,7 +391,7 @@ public class MlJobIT extends ESRestTestCase { assertEquals(200, response.getStatusLine().getStatusCode()); // Check the index mapping now contains both fields - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName("shared-index") + "/_mapping?pretty"); + response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-shared-index" + "/_mapping?pretty"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString(byFieldName1)); @@ -339,7 +400,7 @@ public class MlJobIT extends ESRestTestCase { public void testDeleteJob() throws Exception { String jobId = "foo"; - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT; createFarequoteJob(jobId); Response response = client().performRequest("get", "_cat/indices"); @@ -350,11 +411,23 @@ public class MlJobIT extends ESRestTestCase { response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - // check index was deleted + // check that the index still exists (it's shared by default) response = client().performRequest("get", "_cat/indices"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); - assertThat(responseAsString, not(containsString(indexName))); + assertThat(responseAsString, containsString(indexName)); + + assertBusy(() -> { + try { + Response r = client().performRequest("get", indexName + "/_count"); + assertEquals(200, r.getStatusLine().getStatusCode()); + String responseString = responseEntityToString(r); + assertThat(responseString, containsString("\"count\":0")); + } catch (Exception e) { + fail(e.getMessage()); + } + + }); // check that the job itself is gone expectThrows(ResponseException.class, () -> @@ -363,7 +436,8 @@ public class MlJobIT extends ESRestTestCase { public void testDeleteJobAfterMissingIndex() throws Exception { String jobId = "foo"; - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT; createFarequoteJob(jobId); Response response = client().performRequest("get", "_cat/indices"); @@ -383,6 +457,7 @@ public class MlJobIT extends ESRestTestCase { response = client().performRequest("get", "_cat/indices"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); + assertThat(responseAsString, not(containsString(aliasName))); assertThat(responseAsString, not(containsString(indexName))); expectThrows(ResponseException.class, () -> @@ -391,7 +466,7 @@ public class MlJobIT extends ESRestTestCase { public void testMultiIndexDelete() throws Exception { String jobId = "foo"; - String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); + String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT; createFarequoteJob(jobId); Response response = client().performRequest("put", indexName + "-001"); @@ -412,28 +487,32 @@ public class MlJobIT extends ESRestTestCase { String.format(Locale.ROOT, "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}", jobId, 123, 1, 1); - client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + 123, + client().performRequest("put", indexName + "/result/" + 123, Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); - client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/result/" + 123, + client().performRequest("put", indexName + "-001/result/" + 123, Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); - client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/result/" + 123, + client().performRequest("put", indexName + "-002/result/" + 123, + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); + + // Also index a few through the alias for the first job + client().performRequest("put", indexName + "/result/" + 456, Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); client().performRequest("post", "_refresh"); // check for the documents - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/_count"); + response = client().performRequest("get", indexName+ "/_count"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"count\":2")); + + response = client().performRequest("get", indexName + "-001/_count"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"count\":1")); - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/_count"); - assertEquals(200, response.getStatusLine().getStatusCode()); - responseAsString = responseEntityToString(response); - assertThat(responseAsString, containsString("\"count\":1")); - - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/_count"); + response = client().performRequest("get", indexName + "-002/_count"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"count\":1")); @@ -444,30 +523,38 @@ public class MlJobIT extends ESRestTestCase { client().performRequest("post", "_refresh"); - // check index was deleted + // check that the indices still exist but are empty response = client().performRequest("get", "_cat/indices"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); - assertThat(responseAsString, not(containsString("\t" + indexName + "\t"))); + assertThat(responseAsString, containsString(indexName)); + assertThat(responseAsString, containsString(indexName + "-001")); + assertThat(responseAsString, containsString(indexName + "-002")); - // The other two indices won't be deleted, but the DBQ should have cleared them out - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/_count"); + response = client().performRequest("get", indexName + "/_count"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"count\":0")); - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/_count"); + response = client().performRequest("get", indexName + "-001/_count"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"count\":0")); + response = client().performRequest("get", indexName + "-002/_count"); + assertEquals(200, response.getStatusLine().getStatusCode()); + responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"count\":0")); + + expectThrows(ResponseException.class, () -> client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); } private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception { try { - client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId), + client().performRequest("put", + AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT, Collections.emptyMap(), new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON)); } catch (ResponseException e) { // it is ok: the index already exists @@ -480,14 +567,15 @@ public class MlJobIT extends ESRestTestCase { jobId, timestamp, bucketSpan); String id = String.format(Locale.ROOT, "%s_%s_%s", jobId, timestamp, bucketSpan); - return client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + id, + return client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId) + "/result/" + id, Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult, ContentType.APPLICATION_JSON)); } private Response addRecordResult(String jobId, String timestamp, long bucketSpan, int sequenceNum) throws Exception { try { - client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId), Collections.emptyMap(), - new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON)); + client().performRequest("put", + AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT, + Collections.emptyMap(), new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON)); } catch (ResponseException e) { // it is ok: the index already exists assertThat(e.getMessage(), containsString("resource_already_exists_exception")); @@ -498,7 +586,7 @@ public class MlJobIT extends ESRestTestCase { String.format(Locale.ROOT, "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}", jobId, timestamp, bucketSpan, sequenceNum); - return client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + timestamp, + return client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId) + "/result/" + timestamp, Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index acbc1397377..6ba30441c78 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import java.io.IOException; import java.util.Collections; @@ -30,6 +31,7 @@ public class MlRestTestStateCleaner { public void clearMlMetadata() throws IOException { deleteAllDatafeeds(); deleteAllJobs(); + deleteDotML(); } @SuppressWarnings("unchecked") @@ -82,4 +84,8 @@ public class MlRestTestStateCleaner { client.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId); } } + + private void deleteDotML() throws IOException { + client.performRequest("DELETE", ".ml-*?ignore_unavailable=true"); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java index 7d47428dd84..c8b541a6ec8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; import java.util.Arrays; @@ -323,21 +324,21 @@ public class JobTests extends AbstractSerializingTestCase { public void testBuilder_setsDefaultIndexName() { Job.Builder builder = buildJobBuilder("foo"); Job job = builder.build(); - assertEquals("foo", job.getResultsIndexName()); + assertEquals(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT, job.getResultsIndexName()); } public void testBuilder_setsIndexName() { Job.Builder builder = buildJobBuilder("foo"); builder.setResultsIndexName("carol"); Job job = builder.build(); - assertEquals("carol", job.getResultsIndexName()); + assertEquals(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-carol", job.getResultsIndexName()); } public void testBuilder_withInvalidIndexNameThrows () { Job.Builder builder = buildJobBuilder("foo"); builder.setResultsIndexName("_bad^name"); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> builder.build()); - assertEquals(Messages.getMessage(Messages.INVALID_ID, Job.RESULTS_INDEX_NAME.getPreferredName()), e.getMessage()); + assertEquals(Messages.getMessage(Messages.INVALID_ID, Job.RESULTS_INDEX_NAME.getPreferredName(), "_bad^name"), e.getMessage()); } public static Job.Builder buildJobBuilder(String id, Date date) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java index 86a95710432..eaf24f349d0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java @@ -40,7 +40,7 @@ public class JobDataDeleterTests extends ESTestCase { BulkResponse bulkResponse = Mockito.mock(BulkResponse.class); Client client = new MockClientBuilder("myCluster") - .prepareSearchExecuteListener(AnomalyDetectorsIndex.jobResultsIndexName("foo"), response) + .prepareSearchExecuteListener(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), response) .prepareSearchScrollExecuteListener(response) .prepareBulk(bulkResponse).build(); @@ -92,7 +92,7 @@ public class JobDataDeleterTests extends ESTestCase { verify(client, times(5)) .prepareDelete(eq(AnomalyDetectorsIndex.jobStateIndexName()), eq(ModelState.TYPE.getPreferredName()), anyString()); verify(client, times(1)) - .prepareDelete(eq(AnomalyDetectorsIndex.jobResultsIndexName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()), + .prepareDelete(eq(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()), eq("foo-snap-1")); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index e38b287a7c5..240d01c6fda 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -24,15 +24,12 @@ import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.Index; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; @@ -133,9 +130,12 @@ public class JobProviderTests extends ESTestCase { @SuppressWarnings("unchecked") public void testCreateJobResultsIndex() { + String resultsIndexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT; + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); - clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor); + clientBuilder.createIndexRequest(resultsIndexName, captor); + clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo")); clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName()); Job.Builder job = buildJobBuilder("foo"); @@ -145,13 +145,21 @@ public class JobProviderTests extends ESTestCase { ClusterState cs = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())).build(); + ClusterService clusterService = mock(ClusterService.class); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs); + return null; + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); + provider.createJobResultIndex(job.build(), cs, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { CreateIndexRequest request = captor.getValue(); assertNotNull(request); - assertEquals(AnomalyDetectorsIndex.jobResultsIndexName("foo"), request.index()); - clientBuilder.verifyIndexCreated(AnomalyDetectorsIndex.jobResultsIndexName("foo")); + assertEquals(resultsIndexName, request.index()); + clientBuilder.verifyIndexCreated(resultsIndexName); resultHolder.set(aBoolean); } @@ -168,7 +176,8 @@ public class JobProviderTests extends ESTestCase { @SuppressWarnings("unchecked") public void testCreateJobWithExistingIndex() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("foo"), AnomalyDetectorsIndex.jobResultsIndexName("foo123")); + clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), + AnomalyDetectorsIndex.jobResultsAliasedName("foo123")); clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName()); GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class); @@ -176,7 +185,7 @@ public class JobProviderTests extends ESTestCase { ImmutableOpenMap> mappings = ImmutableOpenMap.>builder() - .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), typeMappings).build(); + .fPut(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), typeMappings).build(); when(getMappingsResponse.mappings()).thenReturn(mappings); clientBuilder.prepareGetMapping(getMappingsResponse); @@ -185,7 +194,7 @@ public class JobProviderTests extends ESTestCase { JobProvider provider = createProvider(clientBuilder.build()); Index index = mock(Index.class); - when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo")); + when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsAliasedName("foo")); IndexMetaData indexMetaData = mock(IndexMetaData.class); when(indexMetaData.getIndex()).thenReturn(index); @@ -193,26 +202,26 @@ public class JobProviderTests extends ESTestCase { when(indexMetaData.getAliases()).thenReturn(aliases); ImmutableOpenMap indexMap = ImmutableOpenMap.builder() - .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build(); + .fPut(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), indexMetaData).build(); - ClusterState cs = ClusterState.builder(new ClusterName("_name")) + ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); ClusterService clusterService = mock(ClusterService.class); doAnswer(invocationOnMock -> { AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; - task.execute(cs); + task.execute(cs2); return null; }).when(clusterService).submitStateUpdateTask(eq("put-job-foo123"), any(AckedClusterStateUpdateTask.class)); doAnswer(invocationOnMock -> { AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; - task.execute(cs); + task.execute(cs2); return null; }).when(clusterService).submitStateUpdateTask(eq("index-aliases"), any(AckedClusterStateUpdateTask.class)); - provider.createJobResultIndex(job.build(), cs, new ActionListener() { + provider.createJobResultIndex(job.build(), cs2, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { assertTrue(aBoolean); @@ -228,10 +237,13 @@ public class JobProviderTests extends ESTestCase { @SuppressWarnings("unchecked") public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() { + String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-bar"; + String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName("foo"); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); - clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor); - clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("bar"), AnomalyDetectorsIndex.jobResultsIndexName("foo")); + clientBuilder.createIndexRequest(indexName, captor); + clientBuilder.prepareAlias(indexName, aliasName); clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName()); Job.Builder job = buildJobBuilder("foo"); @@ -239,19 +251,19 @@ public class JobProviderTests extends ESTestCase { Client client = clientBuilder.build(); JobProvider provider = createProvider(client); - Index index = mock(Index.class); - when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo")); - IndexMetaData indexMetaData = mock(IndexMetaData.class); - when(indexMetaData.getIndex()).thenReturn(index); - ImmutableOpenMap aliases = ImmutableOpenMap.of(); - when(indexMetaData.getAliases()).thenReturn(aliases); - - ImmutableOpenMap indexMap = ImmutableOpenMap.builder() - .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build(); + ImmutableOpenMap indexMap = ImmutableOpenMap.builder().build(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + ClusterService clusterService = mock(ClusterService.class); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs); + return null; + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); + provider.createJobResultIndex(job.build(), cs, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { @@ -265,54 +277,7 @@ public class JobProviderTests extends ESTestCase { }); } - @SuppressWarnings("unchecked") - public void testCreateJobRelatedIndicies_doesntCreateAliasIfIndexNameIsSameAsJobId() { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); - clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor); - clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName()); - - GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class); - ImmutableOpenMap typeMappings = ImmutableOpenMap.of(); - ImmutableOpenMap> mappings = - ImmutableOpenMap.>builder() - .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), typeMappings).build(); - when(getMappingsResponse.mappings()).thenReturn(mappings); - clientBuilder.prepareGetMapping(getMappingsResponse); - - Job.Builder job = buildJobBuilder("foo"); - job.setResultsIndexName("foo"); - Client client = clientBuilder.build(); - JobProvider provider = createProvider(client); - - Index index = mock(Index.class); - when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo")); - IndexMetaData indexMetaData = mock(IndexMetaData.class); - when(indexMetaData.getIndex()).thenReturn(index); - ImmutableOpenMap aliases = ImmutableOpenMap.of(); - when(indexMetaData.getAliases()).thenReturn(aliases); - - ImmutableOpenMap indexMap = ImmutableOpenMap.builder() - .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build(); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); - - provider.createJobResultIndex(job.build(), cs, new ActionListener() { - @Override - public void onResponse(Boolean aBoolean) { - verify(client.admin().indices(), never()).prepareAliases(); - verify(clientBuilder.build().admin().indices(), times(1)).preparePutMapping(any()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }); - } - - public void testDeleteJobRelatedIndices() throws InterruptedException, ExecutionException, IOException { + public void testDeleteJobRelatedIndices() throws InterruptedException, ExecutionException, IOException { @SuppressWarnings("unchecked") ActionListener actionListener = mock(ActionListener.class); String jobId = "ThisIsMyJob"; @@ -320,8 +285,8 @@ public class JobProviderTests extends ESTestCase { Client client = clientBuilder.build(); JobProvider provider = createProvider(client); clientBuilder.resetIndices(); - clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true) - .addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true, + clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), true) + .addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), true, false, actionListener); clientBuilder.build(); @@ -340,8 +305,8 @@ public class JobProviderTests extends ESTestCase { Client client = clientBuilder.build(); JobProvider provider = createProvider(client); clientBuilder.resetIndices(); - clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true) - .addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true, + clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), true) + .addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), true, true, actionListener); clientBuilder.build(); @@ -365,7 +330,7 @@ public class JobProviderTests extends ESTestCase { source.add(map); QueryBuilder[] queryBuilderHolder = new QueryBuilder[1]; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); int from = 0; int size = 10; Client client = getMockedClient(queryBuilder -> {queryBuilderHolder[0] = queryBuilder;}, response); @@ -399,7 +364,7 @@ public class JobProviderTests extends ESTestCase { source.add(map); QueryBuilder[] queryBuilderHolder = new QueryBuilder[1]; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); int from = 99; int size = 17; @@ -434,7 +399,7 @@ public class JobProviderTests extends ESTestCase { source.add(map); QueryBuilder[] queryBuilderHolder = new QueryBuilder[1]; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); int from = 99; int size = 17; @@ -466,7 +431,7 @@ public class JobProviderTests extends ESTestCase { Long timestamp = 98765432123456789L; List> source = new ArrayList<>(); - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(false, source); Client client = getMockedClient(queryBuilder -> {}, response); JobProvider provider = createProvider(client); @@ -490,7 +455,7 @@ public class JobProviderTests extends ESTestCase { map.put("bucket_span", 22); source.add(map); - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(queryBuilder -> {}, response); JobProvider provider = createProvider(client); @@ -518,7 +483,7 @@ public class JobProviderTests extends ESTestCase { map.put("is_interim", true); source.add(map); - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(queryBuilder -> {}, response); JobProvider provider = createProvider(client); @@ -557,7 +522,7 @@ public class JobProviderTests extends ESTestCase { int from = 14; int size = 2; String sortfield = "minefield"; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(qb -> {}, response); JobProvider provider = createProvider(client); @@ -607,7 +572,7 @@ public class JobProviderTests extends ESTestCase { int from = 14; int size = 2; String sortfield = "minefield"; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(qb -> {}, response); JobProvider provider = createProvider(client); @@ -665,7 +630,7 @@ public class JobProviderTests extends ESTestCase { int from = 14; int size = 2; String sortfield = "minefield"; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(qb -> {}, response); JobProvider provider = createProvider(client); @@ -702,7 +667,7 @@ public class JobProviderTests extends ESTestCase { source.add(recordMap); } - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(qb -> {}, response); JobProvider provider = createProvider(client); @@ -731,7 +696,7 @@ public class JobProviderTests extends ESTestCase { source.add(recordMap); } - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(qb -> {}, response); JobProvider provider = createProvider(client); @@ -757,7 +722,7 @@ public class JobProviderTests extends ESTestCase { source.add(map); - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); int from = 0; int size = 10; Client client = getMockedClient(q -> {}, response); @@ -783,7 +748,7 @@ public class JobProviderTests extends ESTestCase { source.put("category_id", categoryId); source.put("terms", terms); - SearchResponse response = createSearchResponse(Collections.singletonList(source)); + SearchResponse response = createSearchResponse(true, Collections.singletonList(source)); Client client = getMockedClient(q -> {}, response); JobProvider provider = createProvider(client); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -826,7 +791,7 @@ public class JobProviderTests extends ESTestCase { int from = 4; int size = 3; QueryBuilder[] qbHolder = new QueryBuilder[1]; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(q -> qbHolder[0] = q, response); JobProvider provider = createProvider(client); @@ -888,7 +853,7 @@ public class JobProviderTests extends ESTestCase { int from = 4; int size = 3; QueryBuilder[] qbHolder = new QueryBuilder[1]; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(q -> qbHolder[0] = q, response); JobProvider provider = createProvider(client); @@ -943,7 +908,7 @@ public class JobProviderTests extends ESTestCase { int from = 4; int size = 3; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(qb -> {}, response); JobProvider provider = createProvider(client); @@ -994,7 +959,7 @@ public class JobProviderTests extends ESTestCase { int from = 4; int size = 3; QueryBuilder[] qbHolder = new QueryBuilder[1]; - SearchResponse response = createSearchResponse(source); + SearchResponse response = createSearchResponse(true, source); Client client = getMockedClient(qb -> qbHolder[0] = qb, response); JobProvider provider = createProvider(client); @@ -1149,7 +1114,7 @@ public class JobProviderTests extends ESTestCase { return getResponse; } - private static SearchResponse createSearchResponse(List> source) throws IOException { + private static SearchResponse createSearchResponse(boolean exists, List> source) throws IOException { SearchResponse response = mock(SearchResponse.class); List list = new ArrayList<>(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 9e3d3f8c78c..d59e1677e6f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -103,9 +103,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { assertThat(capturedSearchRequests.size(), equalTo(2)); SearchRequest searchRequest = capturedSearchRequests.get(0); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")})); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")})); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); @@ -137,9 +137,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { assertThat(capturedSearchRequests.size(), equalTo(2)); SearchRequest searchRequest = capturedSearchRequests.get(0); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")})); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")})); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); } @@ -162,9 +162,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { assertThat(capturedSearchRequests.size(), equalTo(2)); SearchRequest searchRequest = capturedSearchRequests.get(0); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")})); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")})); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index cd5eff0e53b..0376ef6dd2e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -98,9 +98,9 @@ public class ExpiredResultsRemoverTests extends ESTestCase { assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); - assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-1")})); + assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")})); dbqRequest = capturedDeleteByQueryRequests.get(1); - assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-2")})); + assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")})); } public void testOnTrigger_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException { @@ -115,9 +115,9 @@ public class ExpiredResultsRemoverTests extends ESTestCase { assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); - assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-1")})); + assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")})); dbqRequest = capturedDeleteByQueryRequests.get(1); - assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-2")})); + assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")})); } private void givenClientRequestsSucceed() { diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml new file mode 100644 index 00000000000..9ea537a9f9f --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml @@ -0,0 +1,345 @@ +setup: + - skip: + features: ["headers"] + +--- +"Test CRUD on two jobs in shared index": + + - do: + xpack.ml.put_job: + job_id: farequote + body: > + { + "job_id":"farequote", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span":3600, + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"JSON", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: "farequote" } + + + - do: + xpack.ml.put_job: + job_id: farequote2 + body: > + { + "job_id":"farequote2", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span":3600, + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"JSON", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: "farequote2" } + + - do: + xpack.ml.open_job: + job_id: farequote + + - do: + xpack.ml.open_job: + job_id: farequote2 + + - do: + #set the header so we won't randomize it + headers: + Content-Type: application/json + xpack.ml.post_data: + job_id: farequote + body: > + {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} + + - do: + #set the header so we won't randomize it + headers: + Content-Type: application/json + xpack.ml.post_data: + job_id: farequote2 + body: > + {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + + + - do: + xpack.ml.flush_job: + job_id: farequote + - match: { flushed: true } + + - do: + xpack.ml.flush_job: + job_id: farequote2 + - match: { flushed: true } + + + - do: + xpack.ml.close_job: + job_id: farequote + - match: { closed: true } + + - do: + xpack.ml.close_job: + job_id: farequote2 + - match: { closed: true } + + + - do: + indices.refresh: {} + + - do: + indices.exists: + index: ".ml-anomalies-farequote" + - is_true: '' + + - do: + indices.exists: + index: ".ml-anomalies-farequote" + - is_true: '' + + - do: + count: + index: .ml-anomalies-shared + - match: {count: 8} + + - do: + count: + index: .ml-anomalies-farequote + body: + query: + constant_score: + filter: + term: + job_id: farequote + + - match: {count: 4} + + - do: + count: + index: .ml-anomalies-shared + body: + query: + constant_score: + filter: + term: + job_id: farequote + + - match: {count: 4} + + - do: + count: + index: .ml-anomalies-farequote2 + body: + query: + constant_score: + filter: + term: + job_id: farequote2 + + - match: {count: 4} + + - do: + count: + index: .ml-anomalies-shared + body: + query: + constant_score: + filter: + term: + job_id: farequote2 + - match: {count: 4} + + - do: + xpack.ml.delete_job: + job_id: "farequote" + - match: { acknowledged: true } + + - do: + indices.refresh: {} + + - do: + indices.exists: + index: ".ml-anomalies-shared" + - is_true: '' + + - do: + count: + index: .ml-anomalies-shared + body: + query: + constant_score: + filter: + term: + job_id: farequote + - match: {count: 0} + + - do: + count: + index: .ml-anomalies-shared + - match: {count: 4} + + + - do: + count: + index: .ml-anomalies-farequote2 + body: + query: + constant_score: + filter: + term: + job_id: farequote2 + + - match: {count: 4} + + - do: + count: + index: .ml-anomalies-shared + body: + query: + constant_score: + filter: + term: + job_id: farequote2 + + - match: {count: 4} + + - do: + xpack.ml.delete_job: + job_id: "farequote2" + - match: { acknowledged: true } + + - do: + indices.refresh: {} + + - do: + indices.exists: + index: ".ml-anomalies-shared" + - is_true: '' + + - do: + indices.exists: + index: ".ml-anomalies-farequote" + - is_false: '' + + - do: + indices.exists: + index: ".ml-anomalies-farequote2" + - is_false: '' + + - do: + count: + index: .ml-anomalies-shared + - match: {count: 0} + + +--- +"Test unrelated index": + + - do: + xpack.ml.put_job: + job_id: farequote + body: > + { + "job_id":"farequote", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span":3600, + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"JSON", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: "farequote" } + + - do: + xpack.ml.open_job: + job_id: farequote + + - do: + #set the header so we won't randomize it + headers: + Content-Type: application/json + xpack.ml.post_data: + job_id: farequote + body: > + {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} + + - do: + indices.create: + index: foo + + - do: + indices.create: + index: .ml-anomalies-foo + + - do: + index: + index: foo + type: foo + body: + key: value + + - do: + index: + index: .ml-anomalies-foo + type: foo + body: + key: value + + - do: + index: + index: .ml-anomalies-foo + type: foo + body: + key: value + job_id: foo + + - do: + xpack.ml.flush_job: + job_id: farequote + - match: { flushed: true } + + - do: + xpack.ml.close_job: + job_id: farequote + - match: { closed: true } + + - do: + xpack.ml.delete_job: + job_id: "farequote" + - match: { acknowledged: true } + + - do: + indices.refresh: {} + + - do: + count: + index: .ml-anomalies-shared + - match: {count: 0} + + - do: + count: + index: foo + - match: {count: 1} + + - do: + count: + index: .ml-anomalies-foo + - match: {count: 2} + + +