From 571477d0adeba52a49177555f9e586433c177e1a Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 26 May 2020 10:56:42 +0100 Subject: [PATCH] [7.x] Fix delete_expired_data/nightly maintenance when many model snapshots need deleting (#57041) (#57136) Fix delete_expired_data/nightly maintenance when many model snapshots need deleting (#57041) The queries performed by the expired data removers pull back entire documents when only a few fields are required. For ModelSnapshots in particular this is a problem as they contain quantiles which may be 100s of KB and the search size is set to 10,000. This change makes the search more efficient by only requesting the fields needed to work out which expired data should be deleted. --- .../xpack/core/common/time/TimeUtils.java | 30 ++++++++ .../core/common/time/TimeUtilsTests.java | 6 ++ .../xpack/ml/extractor/TimeField.java | 19 +---- .../retention/ExpiredForecastsRemover.java | 75 ++++++++++++------- .../ExpiredModelSnapshotsRemover.java | 73 ++++++++++++++---- .../xpack/ml/job/retention/MlDataRemover.java | 20 +++++ .../AbstractExpiredJobDataRemoverTests.java | 8 ++ .../ExpiredModelSnapshotsRemoverTests.java | 61 ++++++++++----- .../ml/job/retention/MlDataRemoverTests.java | 30 ++++++++ 9 files changed, 246 insertions(+), 76 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java index 01667f8a481..aee26a01830 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java @@ -46,6 +46,36 @@ public final class TimeUtils { "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]"); } + /** + * Safely parses a string epoch representation to a Long + * + * Commonly this function is used for parsing Date fields from doc values + * requested with the format "epoch_millis". + * + * Since nanosecond support was added epoch_millis timestamps may have a fractional component. + * We discard this, taking just whole milliseconds. Arguably it would be better to retain the + * precision here and let the downstream component decide whether it wants the accuracy, but + * that makes it hard to pass around the value as a number. The double type doesn't have + * enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would + * work, but that isn't supported by the JSON parser if the number gets round-tripped through + * JSON. So String is really the only format that could be used, but the consumers of time + * are expecting a number. + * + * @param epoch The epoch value as a string. This may contain a fractional component. + * @return The epoch value. + */ + public static long parseToEpochMs(String epoch) { + int dotPos = epoch.indexOf('.'); + if (dotPos == -1) { + return Long.parseLong(epoch); + } else if (dotPos > 0) { + return Long.parseLong(epoch.substring(0, dotPos)); + } else { + // The first character is '.' so round down to 0 + return 0L; + } + } + /** * First tries to parse the date first as a Long and convert that to an * epoch time. If the long number has more than 10 digits it is considered a diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java index e122202b5fa..0dcb245c780 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java @@ -72,6 +72,12 @@ public class TimeUtilsTests extends ESTestCase { assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500")); } + public void testParseToEpochMs() { + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000")); + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005")); + assertEquals(0L, TimeUtils.parseToEpochMs(".005")); + } + public void testCheckMultiple_GivenMultiples() { TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo")); TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java index 24412fe6eb7..9436dddde78 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.extractor; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import java.util.Collections; import java.util.Objects; @@ -44,23 +45,7 @@ public class TimeField extends AbstractField { return value; } if (value[0] instanceof String) { // doc_value field with the epoch_millis format - // Since nanosecond support was added epoch_millis timestamps may have a fractional component. - // We discard this, taking just whole milliseconds. Arguably it would be better to retain the - // precision here and let the downstream component decide whether it wants the accuracy, but - // that makes it hard to pass around the value as a number. The double type doesn't have - // enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would - // work, but that isn't supported by the JSON parser if the number gets round-tripped through - // JSON. So String is really the only format that could be used, but the ML consumers of time - // are expecting a number. - String strVal0 = (String) value[0]; - int dotPos = strVal0.indexOf('.'); - if (dotPos == -1) { - value[0] = Long.parseLong(strVal0); - } else if (dotPos > 0) { - value[0] = Long.parseLong(strVal0.substring(0, dotPos)); - } else { - value[0] = 0L; - } + value[0] = TimeUtils.parseToEpochMs((String)value[0]); } else if (value[0] instanceof Long == false) { // pre-6.0 field throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index f4f85dc7043..dbe2dc4dea7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -14,11 +14,6 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -30,6 +25,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -38,8 +34,6 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.MachineLearning; -import java.io.IOException; -import java.io.InputStream; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; @@ -85,6 +79,11 @@ public class ExpiredForecastsRemover implements MlDataRemover { .filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName()))); source.size(MAX_FORECASTS); source.trackTotalHits(true); + source.fetchSource(false); + source.docValueField(Job.ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis"); + // _doc is the most efficient sort order and will also disable scoring source.sort(ElasticsearchMappings.ES_DOC); @@ -101,11 +100,9 @@ public class ExpiredForecastsRemover implements MlDataRemover { ActionListener listener, Supplier isTimedOutSupplier ) { - List forecastsToDelete; - try { - forecastsToDelete = findForecastsToDelete(searchResponse); - } catch (IOException e) { - listener.onFailure(e); + List forecastsToDelete = findForecastsToDelete(searchResponse); + if (forecastsToDelete.isEmpty()) { + listener.onResponse(true); return; } @@ -138,8 +135,8 @@ public class ExpiredForecastsRemover implements MlDataRemover { }); } - private List findForecastsToDelete(SearchResponse searchResponse) throws IOException { - List forecastsToDelete = new ArrayList<>(); + private List findForecastsToDelete(SearchResponse searchResponse) { + List forecastsToDelete = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); if (hits.getTotalHits().value > MAX_FORECASTS) { @@ -147,19 +144,29 @@ public class ExpiredForecastsRemover implements MlDataRemover { } for (SearchHit hit : hits.getHits()) { - try (InputStream stream = hit.getSourceRef().streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null); - if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { - forecastsToDelete.add(forecastRequestStats); - } + String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + if (expiryTime == null) { + LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + continue; } + long expiryMs = TimeUtils.parseToEpochMs(expiryTime); + if (expiryMs < cutoffEpochMs) { + JobForecastId idPair = new JobForecastId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + forecastsToDelete.add(idPair); + } + + } + } return forecastsToDelete; } - private DeleteByQueryRequest buildDeleteByQuery(List forecastsToDelete) { + private DeleteByQueryRequest buildDeleteByQuery(List ids) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); @@ -167,10 +174,12 @@ public class ExpiredForecastsRemover implements MlDataRemover { BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)); - for (ForecastRequestStats forecastToDelete : forecastsToDelete) { - boolQuery.should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId())) - .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId()))); + for (JobForecastId jobForecastId : ids) { + if (jobForecastId.hasNullValue() == false) { + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId)) + .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId))); + } } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); request.setQuery(query); @@ -180,4 +189,18 @@ public class ExpiredForecastsRemover implements MlDataRemover { return request; } + + private static class JobForecastId { + private final String jobId; + private final String forecastId; + + private JobForecastId(String jobId, String forecastId) { + this.jobId = jobId; + this.forecastId = forecastId; + } + + boolean hasNullValue() { + return jobId == null || forecastId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 241527325dd..de7206d4260 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -104,13 +105,16 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private void latestSnapshotTimeStamp(String jobId, ActionListener listener) { SortBuilder sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); QueryBuilder snapshotQuery = QueryBuilders.boolQuery() - .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())); + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) + .filter(QueryBuilders.existsQuery(ModelSnapshot.TIMESTAMP.getPreferredName())); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(sortBuilder); searchSourceBuilder.query(snapshotQuery); searchSourceBuilder.size(1); searchSourceBuilder.trackTotalHits(false); + searchSourceBuilder.fetchSource(false); + searchSourceBuilder.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); SearchRequest searchRequest = new SearchRequest(indexName); @@ -124,8 +128,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover // no snapshots found listener.onResponse(null); } else { - ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef()); - listener.onResponse(snapshot.getTimestamp().getTime()); + String timestamp = stringFieldValueOrNull(hits[0], ModelSnapshot.TIMESTAMP.getPreferredName()); + if (timestamp == null) { + LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hits[0].getId()); + listener.onResponse(null); + } else { + long timestampMs = TimeUtils.parseToEpochMs(timestamp); + listener.onResponse(timestampMs); + } } }, listener::onFailure) @@ -159,8 +169,15 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover .mustNot(activeSnapshotFilter) .mustNot(retainFilter); - searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE) - .sort(ModelSnapshot.TIMESTAMP.getPreferredName())); + SearchSourceBuilder source = new SearchSourceBuilder(); + source.query(query); + source.size(MODEL_SNAPSHOT_SEARCH_SIZE); + source.sort(ModelSnapshot.TIMESTAMP.getPreferredName()); + source.fetchSource(false); + source.docValueField(Job.ID.getPreferredName(), null); + source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null); + source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); + searchRequest.source(source); long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null) ? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis(); @@ -175,19 +192,29 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover public void onResponse(SearchResponse searchResponse) { long nextToKeepMs = deleteAllBeforeMs; try { - List modelSnapshots = new ArrayList<>(); + List snapshotIds = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { - ModelSnapshot modelSnapshot = ModelSnapshot.fromJson(hit.getSourceRef()); - long timestampMs = modelSnapshot.getTimestamp().getTime(); + String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName()); + if (timestamp == null) { + LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hit.getId()); + continue; + } + long timestampMs = TimeUtils.parseToEpochMs(timestamp); if (timestampMs >= nextToKeepMs) { do { nextToKeepMs += MS_IN_ONE_DAY; } while (timestampMs >= nextToKeepMs); continue; } - modelSnapshots.add(modelSnapshot); + JobSnapshotId idPair = new JobSnapshotId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + snapshotIds.add(idPair); + } } - deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); + deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener); } catch (Exception e) { onFailure(e); } @@ -200,14 +227,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover }; } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { + private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { if (modelSnapshotIterator.hasNext() == false) { listener.onResponse(true); return; } - ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request( - modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); + JobSnapshotId idPair = modelSnapshotIterator.next(); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = + new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { @Override public void onResponse(AcknowledgedResponse response) { @@ -220,9 +247,23 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" - + modelSnapshot.getSnapshotId() + "]", e)); + listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot [" + + idPair.snapshotId + "]", e)); } }); } + + static class JobSnapshotId { + private final String jobId; + private final String snapshotId; + + JobSnapshotId(String jobId, String snapshotId) { + this.jobId = jobId; + this.snapshotId = snapshotId; + } + + boolean hasNullValue() { + return jobId == null || snapshotId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 34a5335da8c..14685145699 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -6,9 +6,29 @@ package org.elasticsearch.xpack.ml.job.retention; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.SearchHit; import java.util.function.Supplier; public interface MlDataRemover { void remove(float requestsPerSecond, ActionListener listener, Supplier isTimedOutSupplier); + + /** + * Extract {@code fieldName} from {@code hit} and if it is a string + * return the string else {@code null}. + * @param hit The search hit + * @param fieldName Field to find + * @return value iff the docfield is present and it is a string. Otherwise {@code null} + */ + default String stringFieldValueOrNull(SearchHit hit, String fieldName) { + DocumentField docField = hit.field(fieldName); + if (docField != null) { + Object value = docField.getValue(); + if (value instanceof String) { + return (String)value; + } + } + return null; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 432d8ffa3e4..a2e573ac855 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -93,6 +93,14 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase { return createSearchResponse(toXContents, toXContents.size()); } + static SearchResponse createSearchResponseFromHits(List hits) { + SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), + new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + return searchResponse; + } + @SuppressWarnings("unchecked") static void givenJobs(Client client, List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 9ee4cb177aa..7a644a5fe65 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -13,15 +13,19 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -91,17 +95,20 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { Date now = new Date(); Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); - ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted))); + SearchHit snapshotToBeDeleted = createModelSnapshotQueryHit("job-1", "old-snapshot", eightDaysAndOneMsAgo); - ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAndOneMsAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1))); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); + + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshotToBeDeleted))); + + SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-1", "snapshots-1_1", eightDaysAndOneMsAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.emptyList())); givenClientRequestsSucceed(searchResponses); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); @@ -178,18 +185,18 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { Date now = new Date(); Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); - List snapshots1JobSnapshots = Arrays.asList( + List snapshots1JobSnapshots = Arrays.asList( snapshot1_1, - createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(snapshots1JobSnapshots)); - ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); + SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); givenClientDeleteModelSnapshotRequestsFail(searchResponses); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); @@ -208,12 +215,12 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { } @SuppressWarnings("unchecked") - public void testCalcCutoffEpochMs() throws IOException { + public void testCalcCutoffEpochMs() { List searchResponses = new ArrayList<>(); Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "newest-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); givenClientRequests(searchResponses, true, true); @@ -226,6 +233,17 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime))); } + public void testJobSnapshotId() { + ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b"); + assertFalse(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b"); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null); + assertTrue(id.hasNullValue()); + } + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -245,6 +263,15 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build(); } + private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId, Date date) { + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId)); + hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId)); + String dateAsString = Long.valueOf(date.getTime()).toString(); + hitBuilder.addField(ModelSnapshot.TIMESTAMP.getPreferredName(), Collections.singletonList(dateAsString)); + return hitBuilder.build(); + } + private void givenClientRequestsSucceed(List searchResponses) { givenClientRequests(searchResponses, true, true); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java new file mode 100644 index 00000000000..465109e6934 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; + +import java.util.Collections; +import java.util.Date; + +public class MlDataRemoverTests extends ESTestCase { + public void testStringOrNull() { + MlDataRemover remover = (requestsPerSecond, listener, isTimedOutSupplier) -> { }; + + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "missing")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("not_a_string", Collections.singletonList(new Date())); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "not_a_string")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("string_field", Collections.singletonList("actual_string_value")); + assertEquals("actual_string_value", remover.stringFieldValueOrNull(hitBuilder.build(), "string_field")); + } +}