From 3f73748d14f09d2517531fda20cab23ed06aaa2c Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 3 May 2017 14:51:41 +0100 Subject: [PATCH] [ML] Use delete-by-query in JobDataDeleter (elastic/x-pack-elasticsearch#1274) JobDataDeleter handles the deletion logic for 3 cases: 1. deleting a model snapshot and its state docs 2. deleting all results after a timestamp 3. deleting all interim results The last 2 are currently implemented by manually performing a search and scroll and then adding matching hits in a bulk delete action. This operation is exactly what delete-by-query does. This commit changes JobDataDeleter to use delete-by-query. This makes the code simpler and less error-prone. The downside is losing some logging which seems non-critical. Unit tests for JobDataDeleter are also removed as they are heavily mocked tests, adding little value and high maintenance cost. This functionality is tested by integration tests already. relates elastic/x-pack-elasticsearch#821 Original commit: elastic/x-pack-elasticsearch@7da91332bd3d853820982ef607c25031e083ea1e --- .../ml/action/DeleteModelSnapshotAction.java | 13 +- .../ml/action/RevertModelSnapshotAction.java | 12 +- .../ml/job/persistence/JobDataDeleter.java | 250 +++++------------- .../xpack/ml/job/persistence/JobProvider.java | 14 +- .../job/persistence/JobResultsPersister.java | 4 +- .../persistence/JobStorageDeletionTask.java | 8 +- .../autodetect/state/ModelSnapshot.java | 12 + .../AutodetectResultProcessorIT.java | 25 +- .../job/persistence/JobDataDeleterTests.java | 123 --------- .../autodetect/state/ModelSnapshotTests.java | 13 + 10 files changed, 133 insertions(+), 341 deletions(-) delete mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java index a3225d24d6a..ea776effa87 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java @@ -25,17 +25,18 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; -import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.util.Collections; import java.util.List; public class DeleteModelSnapshotAction extends Action() { + deleter.deleteModelSnapshots(Collections.singletonList(deleteCandidate), new ActionListener() { @Override public void onResponse(BulkResponse bulkResponse) { auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED, @@ -194,8 +194,7 @@ public class DeleteModelSnapshotAction extends Action() { @Override public void onResponse(Boolean success) { - dataDeleter.commit(ActionListener.wrap( - bulkItemResponses -> {listener.onResponse(response);}, - listener::onFailure), - true); + listener.onResponse(response); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index c3a65113e7b..2860cb593cb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -5,15 +5,14 @@ */ package org.elasticsearch.xpack.ml.job.persistence; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteAction; -import org.elasticsearch.action.delete.DeleteRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; +import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -22,44 +21,56 @@ import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.common.action.XPackDeleteByQueryAction; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; -import java.util.Date; +import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; public class JobDataDeleter { private static final Logger LOGGER = Loggers.getLogger(JobDataDeleter.class); - private static final int SCROLL_SIZE = 1000; - private static final String SCROLL_CONTEXT_DURATION = "5m"; - private final Client client; private final String jobId; - private final BulkRequestBuilder bulkRequestBuilder; - private long deletedResultCount; - private long deletedModelSnapshotCount; - private long deletedModelStateCount; - private boolean quiet; public JobDataDeleter(Client client, String jobId) { - this(client, jobId, false); - } - - public JobDataDeleter(Client client, String jobId, boolean quiet) { this.client = Objects.requireNonNull(client); this.jobId = Objects.requireNonNull(jobId); - bulkRequestBuilder = client.prepareBulk(); - deletedResultCount = 0; - deletedModelSnapshotCount = 0; - deletedModelStateCount = 0; - this.quiet = quiet; + } + + /** + * Delete a list of model snapshots and their corresponding state documents. + * + * @param modelSnapshots the model snapshots to delete + */ + public void deleteModelSnapshots(List modelSnapshots, ActionListener listener) { + if (modelSnapshots.isEmpty()) { + listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L)); + return; + } + + String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName(); + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + for (ModelSnapshot modelSnapshot : modelSnapshots) { + for (String stateDocId : modelSnapshot.stateDocumentIds()) { + bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateDocId)); + } + + bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()), + ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot))); + } + + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try { + bulkRequestBuilder.execute(listener); + } catch (Exception e) { + listener.onFailure(e); + } } /** @@ -69,181 +80,60 @@ public class JobDataDeleter { * @param listener Response listener */ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener listener) { - String index = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); + deleteByQueryHolder.dbqRequest.setRefresh(true); RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()); timeRange.gte(cutoffEpochMs); + deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + deleteByQueryHolder.searchRequest.types(Result.TYPE.getPreferredName()); + deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(timeRange)); + client.execute(XPackDeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest, new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + listener.onResponse(true); + } - RepeatingSearchScrollListener scrollSearchListener = new RepeatingSearchScrollListener(index, listener); - - client.prepareSearch(index) - .setTypes(Result.TYPE.getPreferredName()) - .setFetchSource(false) - .setQuery(timeRange) - .setScroll(SCROLL_CONTEXT_DURATION) - .setSize(SCROLL_SIZE) - .execute(scrollSearchListener); - } - - private void addDeleteRequestForSearchHits(SearchHits hits, String index) { - for (SearchHit hit : hits.getHits()) { - LOGGER.trace("Search hit for result: {}", hit.getId()); - addDeleteRequest(hit, index); - } - deletedResultCount = hits.getTotalHits(); - } - - private void addDeleteRequest(SearchHit hit, String index) { - DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client) - .setIndex(index) - .setType(hit.getType()) - .setId(hit.getId()); - bulkRequestBuilder.add(deleteRequest); - } - - /** - * Delete a {@code ModelSnapshot} - * - * @param modelSnapshot the model snapshot to delete - */ - public void deleteModelSnapshot(ModelSnapshot modelSnapshot) { - String snapshotDocId = ModelSnapshot.documentId(modelSnapshot); - int docCount = modelSnapshot.getSnapshotDocCount(); - String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName(); - // Deduce the document IDs of the state documents from the information - // in the snapshot document - we cannot query the state itself as it's - // too big and has no mappings. - // Note: state docs are 1-based - for (int i = 1; i <= docCount; ++i) { - String stateId = snapshotDocId + '#' + i; - bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateId)); - ++deletedModelStateCount; - } - - bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()), - ModelSnapshot.TYPE.getPreferredName(), snapshotDocId)); - ++deletedModelSnapshotCount; + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } /** * Delete all results marked as interim */ public void deleteInterimResults() { - String index = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); + DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + request.setRefresh(false); + request.setSlices(5); + searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + searchRequest.types(Result.TYPE.getPreferredName()); QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); + searchRequest.source(new SearchSourceBuilder().query(new ConstantScoreQueryBuilder(qb))); - SearchResponse searchResponse = client.prepareSearch(index) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setTypes(Result.TYPE.getPreferredName()) - .setQuery(new ConstantScoreQueryBuilder(qb)) - .setFetchSource(false) - .setScroll(SCROLL_CONTEXT_DURATION) - .setSize(SCROLL_SIZE) - .get(); - - long totalHits = searchResponse.getHits().getTotalHits(); - long totalDeletedCount = 0; - while (totalDeletedCount < totalHits) { - for (SearchHit hit : searchResponse.getHits()) { - LOGGER.trace("Search hit for result: {}", hit.getId()); - ++totalDeletedCount; - addDeleteRequest(hit, index); - ++deletedResultCount; - } - - searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).get(); - } - - clearScroll(searchResponse.getScrollId()); - } - - private void clearScroll(String scrollId) { try { - client.prepareClearScroll().addScrollId(scrollId).get(); + client.execute(XPackDeleteByQueryAction.INSTANCE, request).get(); } catch (Exception e) { - LOGGER.warn("[{}] Error while clearing scroll with id [{}]", jobId, scrollId); + LOGGER.error("[" + jobId + "] An error occurred while deleting interim results", e); } } - /** - * Commit the deletions without enforcing the removal of data from disk. - * @param listener Response listener - * @param refresh If true a refresh is forced with request policy - * {@link WriteRequest.RefreshPolicy#IMMEDIATE} else the default - */ - public void commit(ActionListener listener, boolean refresh) { - if (bulkRequestBuilder.numberOfActions() == 0) { - listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L)); - return; - } + // Wrapper to ensure safety + private static class DeleteByQueryHolder { - Level logLevel = quiet ? Level.DEBUG : Level.INFO; - LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents", - deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount); + private final SearchRequest searchRequest; + private final DeleteByQueryRequest dbqRequest; - if (refresh) { - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - } - try { - bulkRequestBuilder.execute(listener); - } catch (Exception e) { - listener.onFailure(e); + private DeleteByQueryHolder(String index) { + // The search request has to be constructed and passed to the DeleteByQueryRequest before more details are set to it + searchRequest = new SearchRequest(index); + dbqRequest = new DeleteByQueryRequest(searchRequest); + dbqRequest.setSlices(5); + dbqRequest.setAbortOnVersionConflict(false); } } - - /** - * Blocking version of {@linkplain #commit(ActionListener, boolean)} - */ - public void commit(boolean refresh) { - if (bulkRequestBuilder.numberOfActions() == 0) { - return; - } - - Level logLevel = quiet ? Level.DEBUG : Level.INFO; - LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents", - deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount); - if (refresh) { - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - } - BulkResponse response = bulkRequestBuilder.get(); - if (response.hasFailures()) { - LOGGER.debug("Bulk request has failures. {}", response.buildFailureMessage()); - } - } - - /** - * Repeats a scroll search adding the hits to the bulk delete request - */ - private class RepeatingSearchScrollListener implements ActionListener { - - private final AtomicLong totalDeletedCount; - private final String index; - private final ActionListener scrollFinishedListener; - - RepeatingSearchScrollListener(String index, ActionListener scrollFinishedListener) { - totalDeletedCount = new AtomicLong(0L); - this.index = index; - this.scrollFinishedListener = scrollFinishedListener; - } - - @Override - public void onResponse(SearchResponse searchResponse) { - addDeleteRequestForSearchHits(searchResponse.getHits(), index); - - totalDeletedCount.addAndGet(searchResponse.getHits().getHits().length); - if (totalDeletedCount.get() < searchResponse.getHits().getTotalHits()) { - client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).execute(this); - } - else { - clearScroll(searchResponse.getScrollId()); - scrollFinishedListener.onResponse(true); - } - } - - @Override - public void onFailure(Exception e) { - scrollFinishedListener.onFailure(e); - } - }; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 4d115328b1e..98127a78501 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -88,7 +88,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.function.BiFunction; @@ -868,24 +867,19 @@ public class JobProvider { public void restoreStateToStream(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) throws IOException { String indexName = AnomalyDetectorsIndex.jobStateIndexName(); - // First try to restore model state. - int numDocs = modelSnapshot.getSnapshotDocCount(); - for (int docNum = 1; docNum <= numDocs; ++docNum) { - String docId = String.format(Locale.ROOT, "%s#%d", ModelSnapshot.documentId(modelSnapshot), docNum); + for (String stateDocId : modelSnapshot.stateDocumentIds()) { + LOGGER.trace("ES API CALL: get ID {} type {} from index {}", stateDocId, ModelState.TYPE, indexName); - LOGGER.trace("ES API CALL: get ID {} type {} from index {}", docId, ModelState.TYPE, indexName); - - GetResponse stateResponse = client.prepareGet(indexName, ModelState.TYPE.getPreferredName(), docId).get(); + GetResponse stateResponse = client.prepareGet(indexName, ModelState.TYPE.getPreferredName(), stateDocId).get(); if (!stateResponse.isExists()) { LOGGER.error("Expected {} documents for model state for {} snapshot {} but failed to find {}", - numDocs, jobId, modelSnapshot.getSnapshotId(), docId); + modelSnapshot.getSnapshotDocCount(), jobId, modelSnapshot.getSnapshotId(), stateDocId); break; } writeStateToStream(stateResponse.getSourceAsBytesRef(), restoreStream); } - // Secondly try to restore categorizer state. This must come after model state because that's // the order the C++ process expects. // There are no snapshots for this, so the IDs simply diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 92731f7d8a6..e8ff71ebd62 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -272,9 +272,7 @@ public class JobResultsPersister extends AbstractComponent { * Delete any existing interim results synchronously */ public void deleteInterimResults(String jobId) { - JobDataDeleter deleter = new JobDataDeleter(client, jobId, true); - deleter.deleteInterimResults(); - deleter.commit(false); + new JobDataDeleter(client, jobId).deleteInterimResults(); } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java index 184b6eaac17..754a39fe1c1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobStorageDeletionTask.java @@ -117,14 +117,8 @@ public class JobStorageDeletionTask extends Task { jobProvider.modelSnapshots(jobId, 0, 10000, page -> { List deleteCandidates = page.results(); - - // Delete the snapshot and any associated state files JobDataDeleter deleter = new JobDataDeleter(client, jobId); - for (ModelSnapshot deleteCandidate : deleteCandidates) { - deleter.deleteModelSnapshot(deleteCandidate); - } - - deleter.commit(listener, true); + deleter.deleteModelSnapshots(deleteCandidates, listener); }, listener::onFailure); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java index f8c9f5ba4cc..1e72307f1ad 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java @@ -23,7 +23,9 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.utils.time.TimeUtils; import java.io.IOException; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.Objects; @@ -258,6 +260,16 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { && this.retain == that.retain; } + public List stateDocumentIds() { + String prefix = documentId(this); + List stateDocumentIds = new ArrayList<>(snapshotDocCount); + // The state documents count suffices are 1-based + for (int i = 1; i <= snapshotDocCount; i++) { + stateDocumentIds.add(prefix + '#' + i); + } + return stateDocumentIds; + } + public static String documentId(ModelSnapshot snapshot) { return documentId(snapshot.getJobId(), snapshot.getSnapshotId()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index d1425f25286..1922524eeae 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -13,8 +13,11 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.XPackSingleNodeTestCase; import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.JobTests; @@ -44,6 +47,7 @@ import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.EnumSet; @@ -61,7 +65,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { +public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { private static final String JOB_ID = "autodetect-result-processor-it-job"; private Renormalizer renormalizer; @@ -70,6 +74,22 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { private List capturedUpdateModelSnapshotOnJobRequests; private AutoDetectResultProcessor resultProcessor; + @Override + protected Settings nodeSettings() { + Settings.Builder newSettings = Settings.builder(); + newSettings.put(super.nodeSettings()); + // Disable security otherwise delete-by-query action fails to get authorized + newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + return newSettings.build(); + } + + @Override + protected Collection> getPlugins() { + return pluginList(XPackPlugin.class); + } + @Before public void createComponents() { renormalizer = new NoOpRenormalizer(); @@ -320,7 +340,6 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { private class ResultsBuilder { private List results = new ArrayList<>(); - FlushAcknowledgement flushAcknowledgement; ResultsBuilder addBucket(Bucket bucket) { results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java deleted file mode 100644 index ac1f97af798..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.persistence; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.text.Text; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; -import org.mockito.Mockito; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -import static org.elasticsearch.mock.orig.Mockito.times; -import static org.elasticsearch.mock.orig.Mockito.verify; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -public class JobDataDeleterTests extends ESTestCase { - - public void testDeleteResultsFromTime() { - - final long TOTAL_HIT_COUNT = 100L; - - SearchResponse response = createSearchResponseWithHits(TOTAL_HIT_COUNT); - BulkResponse bulkResponse = Mockito.mock(BulkResponse.class); - - Client client = new MockClientBuilder("myCluster") - .prepareSearchExecuteListener(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), response) - .prepareSearchScrollExecuteListener(response) - .prepareBulk(bulkResponse).build(); - - JobDataDeleter bulkDeleter = new JobDataDeleter(client, "foo"); - - // because of the mocking this runs in the current thread - bulkDeleter.deleteResultsFromTime(new Date().getTime(), new ActionListener() { - @Override - public void onResponse(Boolean aBoolean) { - assertTrue(aBoolean); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }); - - verify(client.prepareBulk(), times((int)TOTAL_HIT_COUNT)).add(any(DeleteRequestBuilder.class)); - - ActionListener bulkListener = new ActionListener() { - @Override - public void onResponse(BulkResponse bulkItemResponses) { - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }; - - when(client.prepareBulk().numberOfActions()).thenReturn(new Integer((int)TOTAL_HIT_COUNT)); - - bulkDeleter.commit(bulkListener, true); - verify(client.prepareBulk(), times(1)).execute(bulkListener); - verify(client.prepareBulk(), times(1)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - bulkDeleter.commit(bulkListener, false); - verify(client.prepareBulk(), times(2)).execute(bulkListener); - verify(client.prepareBulk(), times(1)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - } - - public void testDeleteModelSnapShot() { - String jobId = "foo"; - ModelSnapshot snapshot = new ModelSnapshot.Builder(jobId).setSnapshotDocCount(5) - .setSnapshotId("snap-1").build(); - - BulkResponse bulkResponse = Mockito.mock(BulkResponse.class); - Client client = new MockClientBuilder("myCluster").prepareBulk(bulkResponse).build(); - - JobDataDeleter bulkDeleter = new JobDataDeleter(client, jobId); - bulkDeleter.deleteModelSnapshot(snapshot); - verify(client, times(5)) - .prepareDelete(eq(AnomalyDetectorsIndex.jobStateIndexName()), eq(ModelState.TYPE.getPreferredName()), anyString()); - verify(client, times(1)) - .prepareDelete(eq(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()), - eq("foo-snap-1")); - } - - private SearchResponse createSearchResponseWithHits(long totalHitCount) { - SearchHits hits = mockSearchHits(totalHitCount); - SearchResponse searchResponse = Mockito.mock(SearchResponse.class); - when(searchResponse.getHits()).thenReturn(hits); - when(searchResponse.getScrollId()).thenReturn("scroll1"); - return searchResponse; - } - - private SearchHits mockSearchHits(long totalHitCount) { - - List hitList = new ArrayList<>(); - for (int i=0; i<20; i++) { - SearchHit hit = new SearchHit(123, "mockSeachHit-" + i, - new Text("mockSearchHit"), Collections.emptyMap()); - hitList.add(hit); - } - - return new SearchHits(hitList.toArray(new SearchHit[0]), totalHitCount, 1); - } -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java index 05ae375f3cd..226e6e2437b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshotTests.java @@ -10,8 +10,11 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; +import java.util.Arrays; import java.util.Date; +import static org.hamcrest.Matchers.equalTo; + public class ModelSnapshotTests extends AbstractSerializingTestCase { private static final Date DEFAULT_TIMESTAMP = new Date(); private static final String DEFAULT_DESCRIPTION = "a snapshot"; @@ -177,4 +180,14 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase