From 19a7e0f4eb4e094ddc7f6e1195c685ef22537bb8 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 11 Jan 2019 08:03:41 -0600 Subject: [PATCH] ML: update .ml-state actions to support > 1 index (#37307) * ML: Updating .ml-state calls to be able to support > 1 index * Matching bulk delete behavior with dbq * Adjusting state name * refreshing indices before search * fixing line length * adjusting index expansion options --- .../persistence/AnomalyDetectorsIndex.java | 10 ++- .../AnomalyDetectorsIndexFields.java | 2 +- .../ml/integration/DeleteExpiredDataIT.java | 3 +- .../xpack/ml/MachineLearning.java | 2 +- .../ml/action/TransportDeleteJobAction.java | 9 +- .../TransportDeleteModelSnapshotAction.java | 6 +- .../ml/action/TransportOpenJobAction.java | 8 +- .../ml/job/persistence/JobDataDeleter.java | 45 ++++++---- .../job/persistence/JobResultsPersister.java | 2 +- .../job/persistence/JobResultsProvider.java | 10 +-- .../ml/job/persistence/StateStreamer.java | 23 +++-- .../ml/job/retention/UnusedStateRemover.java | 53 ++++++------ .../ml/integration/JobResultsProviderIT.java | 2 +- .../ml/integration/MlConfigMigratorIT.java | 17 ++-- .../ml/integration/NetworkDisruptionIT.java | 6 +- .../ml/job/persistence/MockClientBuilder.java | 5 ++ .../job/persistence/StateStreamerTests.java | 85 +++++++++++++------ 17 files changed, 183 insertions(+), 105 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 673e796ef7e..b9f887d2d49 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -44,7 +44,15 @@ public final class AnomalyDetectorsIndex { * @return The index name */ public static String jobStateIndexName() { - return AnomalyDetectorsIndexFields.STATE_INDEX_NAME; + return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX; + } + + /** + * The name pattern to capture all .ml-state prefixed indices + * @return The .ml-state index pattern + */ + public static String jobStateIndexPattern() { + return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "*"; } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexFields.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexFields.java index 527ba5dc145..96f21876223 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexFields.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexFields.java @@ -9,7 +9,7 @@ public final class AnomalyDetectorsIndexFields { public static final String CONFIG_INDEX = ".ml-config"; public static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-"; - public static final String STATE_INDEX_NAME = ".ml-state"; + public static final String STATE_INDEX_PREFIX = ".ml-state"; public static final String RESULTS_INDEX_DEFAULT = "shared"; private AnomalyDetectorsIndexFields() {} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 2c5e7326ad5..f2ca43bf53c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -239,8 +239,9 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { } // Verify .ml-state doesn't contain unused state documents - SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setFetchSource(false) + .setTrackTotalHits(true) .setSize(10000) .get(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index cc259f51c1e..11d302470c7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -702,7 +702,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) { IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()) - .patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName())) + .patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern())) // TODO review these settings .settings(Settings.builder() .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index b186ea21848..876f2cd1aac 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -387,7 +386,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction deleteStateHandler = ActionListener.wrap( + ActionListener deleteStateHandler = ActionListener.wrap( bulkResponse -> deleteQuantiles(parentTaskClient, jobId, deleteQuantilesHandler), failureHandler); @@ -397,7 +396,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler) { // The quantiles type and doc ID changed in v5.5 so delete both the old and new format - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId)); request.setQuery(query); @@ -417,7 +416,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { + private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener listener) { GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null); request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE)); executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap( @@ -432,7 +431,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler) { // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum)); request.setQuery(query); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java index 61592e4ddfa..9904ecad099 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java @@ -7,12 +7,12 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; @@ -79,9 +79,9 @@ public class TransportDeleteModelSnapshotAction extends HandledTransportAction() { + new ActionListener() { @Override - public void onResponse(BulkResponse bulkResponse) { + public void onResponse(BulkByScrollResponse bulkResponse) { String msg = Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED, deleteCandidate.getSnapshotId(), deleteCandidate.getDescription()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index c81a539fb0e..b7b4fb3aad4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; @@ -368,13 +369,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction verifyIndicesPrimaryShardsAreActive(String resultsIndex, ClusterState clusterState) { - String[] indices = indicesOfInterest(resultsIndex); + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(); + String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indicesOfInterest(resultsIndex)); List unavailableIndices = new ArrayList<>(indices.length); for (String index : indices) { // Indices are created on demand from templates. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index b65feb68da0..c96388213c8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -8,26 +8,28 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkAction; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.results.Result; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -50,27 +52,34 @@ public class JobDataDeleter { * * @param modelSnapshots the model snapshots to delete */ - public void deleteModelSnapshots(List modelSnapshots, ActionListener listener) { + public void deleteModelSnapshots(List modelSnapshots, ActionListener listener) { if (modelSnapshots.isEmpty()) { - listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L)); + listener.onResponse(new BulkByScrollResponse(TimeValue.ZERO, + new BulkByScrollTask.Status(Collections.emptyList(), null), + Collections.emptyList(), + Collections.emptyList(), + false)); return; } - String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName(); + String stateIndexName = AnomalyDetectorsIndex.jobStateIndexPattern(); - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + List idsToDelete = new ArrayList<>(); + Set indices = new HashSet<>(); + indices.add(stateIndexName); for (ModelSnapshot modelSnapshot : modelSnapshots) { - for (String stateDocId : modelSnapshot.stateDocumentIds()) { - bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ElasticsearchMappings.DOC_TYPE, stateDocId)); - } - - bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()), - ElasticsearchMappings.DOC_TYPE, ModelSnapshot.documentId(modelSnapshot))); + idsToDelete.addAll(modelSnapshot.stateDocumentIds()); + idsToDelete.add(ModelSnapshot.documentId(modelSnapshot)); + indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())); } - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0])) + .setRefresh(true) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0]))); + try { - executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(), listener); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener); } catch (Exception e) { listener.onFailure(e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 32a52410f2d..e57d85aefa7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -305,7 +305,7 @@ public class JobResultsPersister { * @param jobId The job Id * */ public void commitStateWrites(String jobId) { - String indexName = AnomalyDetectorsIndex.jobStateIndexName(); + String indexName = AnomalyDetectorsIndex.jobStateIndexPattern(); // Refresh should wait for Lucene to make the data searchable logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); RefreshRequest refreshRequest = new RefreshRequest(indexName); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index 17d173bf22f..b942c49c14e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -157,14 +157,14 @@ public class JobResultsProvider { */ public void checkForLeftOverDocuments(Job job, ActionListener listener) { - SearchRequestBuilder stateDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + SearchRequestBuilder stateDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setQuery(QueryBuilders.idsQuery().addIds(CategorizerState.documentId(job.getId(), 1), CategorizerState.v54DocumentId(job.getId(), 1))) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()); + .setIndicesOptions(IndicesOptions.strictExpand()); - SearchRequestBuilder quantilesDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + SearchRequestBuilder quantilesDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId()))) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()); + .setIndicesOptions(IndicesOptions.strictExpand()); String resultsIndexName = job.getResultsIndexName(); SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName) @@ -396,7 +396,7 @@ public class JobResultsProvider { AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId()); String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - String stateIndex = AnomalyDetectorsIndex.jobStateIndexName(); + String stateIndex = AnomalyDetectorsIndex.jobStateIndexPattern(); MultiSearchRequestBuilder msearch = client.prepareMultiSearch() .add(createLatestDataCountsSearch(resultsIndex, jobId)) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamer.java index 9a2c6a4938b..3ed91412042 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamer.java @@ -9,10 +9,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; -import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; @@ -62,7 +63,7 @@ public class StateStreamer { * @param restoreStream the stream to write the state to */ public void restoreStateToStream(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) throws IOException { - String indexName = AnomalyDetectorsIndex.jobStateIndexName(); + String indexName = AnomalyDetectorsIndex.jobStateIndexPattern(); // First try to restore model state. for (String stateDocId : modelSnapshot.stateDocumentIds()) { @@ -73,13 +74,16 @@ public class StateStreamer { LOGGER.trace("ES API CALL: get ID {} from index {}", stateDocId, indexName); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { - GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, stateDocId).get(); - if (!stateResponse.isExists()) { + SearchResponse stateResponse = client.prepareSearch(indexName) + .setTypes(ElasticsearchMappings.DOC_TYPE) + .setSize(1) + .setQuery(QueryBuilders.idsQuery().addIds(stateDocId)).get(); + if (stateResponse.getHits().getHits().length == 0) { LOGGER.error("Expected {} documents for model state for {} snapshot {} but failed to find {}", modelSnapshot.getSnapshotDocCount(), jobId, modelSnapshot.getSnapshotId(), stateDocId); break; } - writeStateToStream(stateResponse.getSourceAsBytesRef(), restoreStream); + writeStateToStream(stateResponse.getHits().getAt(0).getSourceRef(), restoreStream); } } @@ -97,11 +101,14 @@ public class StateStreamer { LOGGER.trace("ES API CALL: get ID {} from index {}", docId, indexName); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { - GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, docId).get(); - if (!stateResponse.isExists()) { + SearchResponse stateResponse = client.prepareSearch(indexName) + .setTypes(ElasticsearchMappings.DOC_TYPE) + .setSize(1) + .setQuery(QueryBuilders.idsQuery().addIds(docId)).get(); + if (stateResponse.getHits().getHits().length == 0) { break; } - writeStateToStream(stateResponse.getSourceAsBytesRef(), restoreStream); + writeStateToStream(stateResponse.getHits().getAt(0).getSourceRef(), restoreStream); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index 66030c56823..249d3761b58 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -8,11 +8,13 @@ package org.elasticsearch.xpack.ml.job.retention; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -23,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator; +import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.HashSet; @@ -51,9 +54,9 @@ public class UnusedStateRemover implements MlDataRemover { @Override public void remove(ActionListener listener) { try { - BulkRequestBuilder deleteUnusedStateRequestBuilder = findUnusedStateDocs(); - if (deleteUnusedStateRequestBuilder.numberOfActions() > 0) { - executeDeleteUnusedStateDocs(deleteUnusedStateRequestBuilder, listener); + List unusedStateDocIds = findUnusedStateDocIds(); + if (unusedStateDocIds.size() > 0) { + executeDeleteUnusedStateDocs(unusedStateDocIds, listener); } else { listener.onResponse(true); } @@ -62,10 +65,11 @@ public class UnusedStateRemover implements MlDataRemover { } } - private BulkRequestBuilder findUnusedStateDocs() { + private List findUnusedStateDocIds() { Set jobIds = getJobIds(); - BulkRequestBuilder deleteUnusedStateRequestBuilder = client.prepareBulk(); - BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client, AnomalyDetectorsIndex.jobStateIndexName()); + List stateDocIdsToDelete = new ArrayList<>(); + BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client, + AnomalyDetectorsIndex.jobStateIndexPattern()); while (stateDocIdsIterator.hasNext()) { Deque stateDocIds = stateDocIdsIterator.next(); for (String stateDocId : stateDocIds) { @@ -75,12 +79,11 @@ public class UnusedStateRemover implements MlDataRemover { continue; } if (jobIds.contains(jobId) == false) { - deleteUnusedStateRequestBuilder.add(new DeleteRequest( - AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, stateDocId)); + stateDocIdsToDelete.add(stateDocId); } } } - return deleteUnusedStateRequestBuilder; + return stateDocIdsToDelete; } private Set getJobIds() { @@ -98,27 +101,29 @@ public class UnusedStateRemover implements MlDataRemover { return jobIds; } - private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener listener) { + private void executeDeleteUnusedStateDocs(List unusedDocIds, ActionListener listener) { LOGGER.info("Found [{}] unused state documents; attempting to delete", - deleteUnusedStateRequestBuilder.numberOfActions()); - deleteUnusedStateRequestBuilder.execute(new ActionListener() { - @Override - public void onResponse(BulkResponse bulkItemResponses) { - if (bulkItemResponses.hasFailures()) { + unusedDocIds.size()); + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()) + .types(ElasticsearchMappings.DOC_TYPE) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0]))); + client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( + response -> { + if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) { LOGGER.error("Some unused state documents could not be deleted due to failures: {}", - bulkItemResponses.buildFailureMessage()); + Strings.collectionToCommaDelimitedString(response.getBulkFailures()) + + "," + Strings.collectionToCommaDelimitedString(response.getSearchFailures())); } else { LOGGER.info("Successfully deleted all unused state documents"); } listener.onResponse(true); - } - - @Override - public void onFailure(Exception e) { + }, + e -> { LOGGER.error("Error deleting unused model state documents: ", e); listener.onFailure(e); } - }); + )); } private static class JobIdExtractor { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 3843181a0bc..02cc738477c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -379,7 +379,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase { Quantiles quantiles = new Quantiles(jobId, new Date(), "quantile-state"); indexQuantiles(quantiles); - client().admin().indices().prepareRefresh(MlMetaIndex.INDEX_NAME, AnomalyDetectorsIndex.jobStateIndexName(), + client().admin().indices().prepareRefresh(MlMetaIndex.INDEX_NAME, AnomalyDetectorsIndex.jobStateIndexPattern(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).get(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 87c0e4ac824..33b24847666 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -6,8 +6,8 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.Version; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -52,6 +53,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.eq; @@ -308,12 +310,17 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase { } public void assertSnapshot(MlMetadata expectedMlMetadata) throws IOException { - GetResponse getResponse = client() - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, "ml-config").get(); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobStateIndexPattern()).execute(); + SearchResponse searchResponse = client() + .prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) + .setTypes(ElasticsearchMappings.DOC_TYPE) + .setSize(1) + .setQuery(QueryBuilders.idsQuery().addIds("ml-config")) + .get(); - assertTrue(getResponse.isExists()); + assertThat(searchResponse.getHits().getHits().length, greaterThan(0)); - try (InputStream stream = getResponse.getSourceAsBytesRef().streamInput(); + try (InputStream stream = searchResponse.getHits().getAt(0).getSourceRef().streamInput(); XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { MlMetadata recoveredMeta = MlMetadata.LENIENT_PARSER.apply(parser, null).build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java index 4ab369120e0..3304963ae35 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java @@ -93,8 +93,9 @@ public class NetworkDisruptionIT extends BaseMlIntegTestCase { assertEquals(newJobNode, finalJobNode); // The job running on the original node should have been killed, and hence should not have persisted quantiles - SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()))) + .setTrackTotalHits(true) .setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet(); assertEquals(0L, searchResponse.getHits().getTotalHits().value); @@ -103,8 +104,9 @@ public class NetworkDisruptionIT extends BaseMlIntegTestCase { assertTrue(closeJobResponse.isClosed()); // The relocated job was closed rather than killed, and hence should have persisted quantiles - searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()))) + .setTrackTotalHits(true) .setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet(); assertEquals(1L, searchResponse.getHits().getTotalHits().value); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java index 627465f1d4f..4a4284e2d14 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java @@ -271,6 +271,11 @@ public class MockClientBuilder { return this; } + public MockClientBuilder prepareSearches(String index, SearchRequestBuilder first, SearchRequestBuilder... searches) { + when(client.prepareSearch(eq(index))).thenReturn(first, searches); + return this; + } + /** * Creates a {@link SearchResponse} with a {@link SearchHit} for each element of {@code docs} * @param indexName Index being searched diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamerTests.java index beae3959308..1629a8bcdba 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamerTests.java @@ -5,14 +5,20 @@ */ package org.elasticsearch.xpack.ml.job.persistence; -import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.mock.orig.Mockito; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; @@ -21,9 +27,14 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -36,24 +47,24 @@ public class StateStreamerTests extends ESTestCase { String snapshotId = "123"; Map categorizerState = new HashMap<>(); categorizerState.put("catName", "catVal"); - GetResponse categorizerStateGetResponse1 = createGetResponse(true, categorizerState); - GetResponse categorizerStateGetResponse2 = createGetResponse(false, null); - Map modelState = new HashMap<>(); - modelState.put("modName", "modVal1"); - GetResponse modelStateGetResponse1 = createGetResponse(true, modelState); - modelState.put("modName", "modVal2"); - GetResponse modelStateGetResponse2 = createGetResponse(true, modelState); + Map modelState1 = new HashMap<>(); + modelState1.put("modName1", "modVal1"); + Map modelState2 = new HashMap<>(); + modelState2.put("modName2", "modVal2"); - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, - CategorizerState.documentId(JOB_ID, 1), categorizerStateGetResponse1) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, - CategorizerState.documentId(JOB_ID, 2), categorizerStateGetResponse2) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, - ModelState.documentId(JOB_ID, snapshotId, 1), modelStateGetResponse1) - .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, - ModelState.documentId(JOB_ID, snapshotId, 2), modelStateGetResponse2); + SearchRequestBuilder builder1 = prepareSearchBuilder(createSearchResponse(Collections.singletonList(modelState1)), + QueryBuilders.idsQuery().addIds(ModelState.documentId(JOB_ID, snapshotId, 1))); + SearchRequestBuilder builder2 = prepareSearchBuilder(createSearchResponse(Collections.singletonList(modelState2)), + QueryBuilders.idsQuery().addIds(ModelState.documentId(JOB_ID, snapshotId, 2))); + SearchRequestBuilder builder3 = prepareSearchBuilder(createSearchResponse(Collections.singletonList(categorizerState)), + QueryBuilders.idsQuery().addIds(CategorizerState.documentId(JOB_ID, 1))); + SearchRequestBuilder builder4 = prepareSearchBuilder(createSearchResponse(Collections.emptyList()), + QueryBuilders.idsQuery().addIds(CategorizerState.documentId(JOB_ID, 2))); + + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME) + .addClusterStatusYellowResponse() + .prepareSearches(AnomalyDetectorsIndex.jobStateIndexPattern(), builder1, builder2, builder3, builder4); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID).setSnapshotId(snapshotId).setSnapshotDocCount(2).build(); @@ -64,8 +75,8 @@ public class StateStreamerTests extends ESTestCase { String[] restoreData = stream.toString(StandardCharsets.UTF_8.name()).split("\0"); assertEquals(3, restoreData.length); - assertEquals("{\"modName\":\"modVal1\"}", restoreData[0]); - assertEquals("{\"modName\":\"modVal2\"}", restoreData[1]); + assertEquals("{\"modName1\":\"modVal1\"}", restoreData[0]); + assertEquals("{\"modName2\":\"modVal2\"}", restoreData[1]); assertEquals("{\"catName\":\"catVal\"}", restoreData[2]); } @@ -80,10 +91,32 @@ public class StateStreamerTests extends ESTestCase { Mockito.verifyNoMoreInteractions(outputStream); } - private static GetResponse createGetResponse(boolean exists, Map source) throws IOException { - GetResponse getResponse = mock(GetResponse.class); - when(getResponse.isExists()).thenReturn(exists); - when(getResponse.getSourceAsBytesRef()).thenReturn(BytesReference.bytes(XContentFactory.jsonBuilder().map(source))); - return getResponse; + private static SearchResponse createSearchResponse(List> source) throws IOException { + SearchResponse searchResponse = mock(SearchResponse.class); + SearchHit[] hits = new SearchHit[source.size()]; + int i = 0; + for (Map s : source) { + SearchHit hit = new SearchHit(1).sourceRef(BytesReference.bytes(XContentFactory.jsonBuilder().map(s))); + hits[i++] = hit; + } + SearchHits searchHits = new SearchHits(hits, null, (float)0.0); + when(searchResponse.getHits()).thenReturn(searchHits); + return searchResponse; } -} \ No newline at end of file + + private static SearchRequestBuilder prepareSearchBuilder(SearchResponse response, QueryBuilder queryBuilder) { + SearchRequestBuilder builder = mock(SearchRequestBuilder.class); + when(builder.setTypes(any())).thenReturn(builder); + when(builder.addSort(any(SortBuilder.class))).thenReturn(builder); + when(builder.setQuery(queryBuilder)).thenReturn(builder); + when(builder.setPostFilter(any())).thenReturn(builder); + when(builder.setFrom(anyInt())).thenReturn(builder); + when(builder.setSize(anyInt())).thenReturn(builder); + when(builder.setFetchSource(eq(true))).thenReturn(builder); + when(builder.addDocValueField(any(String.class))).thenReturn(builder); + when(builder.addDocValueField(any(String.class), any(String.class))).thenReturn(builder); + when(builder.addSort(any(String.class), any(SortOrder.class))).thenReturn(builder); + when(builder.get()).thenReturn(response); + return builder; + } +}