[7.x] Fix delete_expired_data/nightly maintenance when many model snapshots need deleting (#57041) (#57136)
Fix delete_expired_data/nightly maintenance when many model snapshots need deleting (#57041) The queries performed by the expired data removers pull back entire documents when only a few fields are required. For ModelSnapshots in particular this is a problem as they contain quantiles which may be 100s of KB and the search size is set to 10,000. This change makes the search more efficient by only requesting the fields needed to work out which expired data should be deleted.
This commit is contained in:
parent
6984b3ef6f
commit
571477d0ad
|
@ -46,6 +46,36 @@ public final class TimeUtils {
|
|||
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely parses a string epoch representation to a Long
|
||||
*
|
||||
* Commonly this function is used for parsing Date fields from doc values
|
||||
* requested with the format "epoch_millis".
|
||||
*
|
||||
* Since nanosecond support was added epoch_millis timestamps may have a fractional component.
|
||||
* We discard this, taking just whole milliseconds. Arguably it would be better to retain the
|
||||
* precision here and let the downstream component decide whether it wants the accuracy, but
|
||||
* that makes it hard to pass around the value as a number. The double type doesn't have
|
||||
* enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
|
||||
* work, but that isn't supported by the JSON parser if the number gets round-tripped through
|
||||
* JSON. So String is really the only format that could be used, but the consumers of time
|
||||
* are expecting a number.
|
||||
*
|
||||
* @param epoch The epoch value as a string. This may contain a fractional component.
|
||||
* @return The epoch value.
|
||||
*/
|
||||
public static long parseToEpochMs(String epoch) {
|
||||
int dotPos = epoch.indexOf('.');
|
||||
if (dotPos == -1) {
|
||||
return Long.parseLong(epoch);
|
||||
} else if (dotPos > 0) {
|
||||
return Long.parseLong(epoch.substring(0, dotPos));
|
||||
} else {
|
||||
// The first character is '.' so round down to 0
|
||||
return 0L;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* First tries to parse the date first as a Long and convert that to an
|
||||
* epoch time. If the long number has more than 10 digits it is considered a
|
||||
|
|
|
@ -72,6 +72,12 @@ public class TimeUtilsTests extends ESTestCase {
|
|||
assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500"));
|
||||
}
|
||||
|
||||
public void testParseToEpochMs() {
|
||||
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000"));
|
||||
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005"));
|
||||
assertEquals(0L, TimeUtils.parseToEpochMs(".005"));
|
||||
}
|
||||
|
||||
public void testCheckMultiple_GivenMultiples() {
|
||||
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo"));
|
||||
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo"));
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.extractor;
|
|||
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.xpack.core.common.time.TimeUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
|
@ -44,23 +45,7 @@ public class TimeField extends AbstractField {
|
|||
return value;
|
||||
}
|
||||
if (value[0] instanceof String) { // doc_value field with the epoch_millis format
|
||||
// Since nanosecond support was added epoch_millis timestamps may have a fractional component.
|
||||
// We discard this, taking just whole milliseconds. Arguably it would be better to retain the
|
||||
// precision here and let the downstream component decide whether it wants the accuracy, but
|
||||
// that makes it hard to pass around the value as a number. The double type doesn't have
|
||||
// enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
|
||||
// work, but that isn't supported by the JSON parser if the number gets round-tripped through
|
||||
// JSON. So String is really the only format that could be used, but the ML consumers of time
|
||||
// are expecting a number.
|
||||
String strVal0 = (String) value[0];
|
||||
int dotPos = strVal0.indexOf('.');
|
||||
if (dotPos == -1) {
|
||||
value[0] = Long.parseLong(strVal0);
|
||||
} else if (dotPos > 0) {
|
||||
value[0] = Long.parseLong(strVal0.substring(0, dotPos));
|
||||
} else {
|
||||
value[0] = 0L;
|
||||
}
|
||||
value[0] = TimeUtils.parseToEpochMs((String)value[0]);
|
||||
} else if (value[0] instanceof Long == false) { // pre-6.0 field
|
||||
throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());
|
||||
}
|
||||
|
|
|
@ -14,11 +14,6 @@ import org.elasticsearch.action.search.SearchRequest;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
@ -30,6 +25,7 @@ import org.elasticsearch.search.SearchHit;
|
|||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.common.time.TimeUtils;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
|
@ -38,8 +34,6 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
|
|||
import org.elasticsearch.xpack.core.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
|
@ -85,6 +79,11 @@ public class ExpiredForecastsRemover implements MlDataRemover {
|
|||
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
|
||||
source.size(MAX_FORECASTS);
|
||||
source.trackTotalHits(true);
|
||||
source.fetchSource(false);
|
||||
source.docValueField(Job.ID.getPreferredName(), null);
|
||||
source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null);
|
||||
source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis");
|
||||
|
||||
|
||||
// _doc is the most efficient sort order and will also disable scoring
|
||||
source.sort(ElasticsearchMappings.ES_DOC);
|
||||
|
@ -101,11 +100,9 @@ public class ExpiredForecastsRemover implements MlDataRemover {
|
|||
ActionListener<Boolean> listener,
|
||||
Supplier<Boolean> isTimedOutSupplier
|
||||
) {
|
||||
List<ForecastRequestStats> forecastsToDelete;
|
||||
try {
|
||||
forecastsToDelete = findForecastsToDelete(searchResponse);
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
List<JobForecastId> forecastsToDelete = findForecastsToDelete(searchResponse);
|
||||
if (forecastsToDelete.isEmpty()) {
|
||||
listener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -138,8 +135,8 @@ public class ExpiredForecastsRemover implements MlDataRemover {
|
|||
});
|
||||
}
|
||||
|
||||
private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
|
||||
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
|
||||
private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse) {
|
||||
List<JobForecastId> forecastsToDelete = new ArrayList<>();
|
||||
|
||||
SearchHits hits = searchResponse.getHits();
|
||||
if (hits.getTotalHits().value > MAX_FORECASTS) {
|
||||
|
@ -147,19 +144,29 @@ public class ExpiredForecastsRemover implements MlDataRemover {
|
|||
}
|
||||
|
||||
for (SearchHit hit : hits.getHits()) {
|
||||
try (InputStream stream = hit.getSourceRef().streamInput();
|
||||
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
|
||||
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
|
||||
ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
|
||||
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
|
||||
forecastsToDelete.add(forecastRequestStats);
|
||||
}
|
||||
String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName());
|
||||
if (expiryTime == null) {
|
||||
LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(),
|
||||
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
|
||||
continue;
|
||||
}
|
||||
long expiryMs = TimeUtils.parseToEpochMs(expiryTime);
|
||||
if (expiryMs < cutoffEpochMs) {
|
||||
JobForecastId idPair = new JobForecastId(
|
||||
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
|
||||
stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName()));
|
||||
|
||||
if (idPair.hasNullValue() == false) {
|
||||
forecastsToDelete.add(idPair);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
return forecastsToDelete;
|
||||
}
|
||||
|
||||
private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
|
||||
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest();
|
||||
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
|
||||
|
||||
|
@ -167,10 +174,12 @@ public class ExpiredForecastsRemover implements MlDataRemover {
|
|||
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
|
||||
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
|
||||
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
|
||||
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
|
||||
boolQuery.should(QueryBuilders.boolQuery()
|
||||
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
|
||||
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
|
||||
for (JobForecastId jobForecastId : ids) {
|
||||
if (jobForecastId.hasNullValue() == false) {
|
||||
boolQuery.should(QueryBuilders.boolQuery()
|
||||
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId))
|
||||
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId)));
|
||||
}
|
||||
}
|
||||
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
|
||||
request.setQuery(query);
|
||||
|
@ -180,4 +189,18 @@ public class ExpiredForecastsRemover implements MlDataRemover {
|
|||
|
||||
return request;
|
||||
}
|
||||
|
||||
private static class JobForecastId {
|
||||
private final String jobId;
|
||||
private final String forecastId;
|
||||
|
||||
private JobForecastId(String jobId, String forecastId) {
|
||||
this.jobId = jobId;
|
||||
this.forecastId = forecastId;
|
||||
}
|
||||
|
||||
boolean hasNullValue() {
|
||||
return jobId == null || forecastId == null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
|
|||
import org.elasticsearch.search.sort.SortBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.common.time.TimeUtils;
|
||||
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
|
@ -104,13 +105,16 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
|
|||
private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
|
||||
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
|
||||
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
|
||||
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));
|
||||
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()))
|
||||
.filter(QueryBuilders.existsQuery(ModelSnapshot.TIMESTAMP.getPreferredName()));
|
||||
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.sort(sortBuilder);
|
||||
searchSourceBuilder.query(snapshotQuery);
|
||||
searchSourceBuilder.size(1);
|
||||
searchSourceBuilder.trackTotalHits(false);
|
||||
searchSourceBuilder.fetchSource(false);
|
||||
searchSourceBuilder.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
|
||||
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
|
@ -124,8 +128,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
|
|||
// no snapshots found
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
|
||||
listener.onResponse(snapshot.getTimestamp().getTime());
|
||||
String timestamp = stringFieldValueOrNull(hits[0], ModelSnapshot.TIMESTAMP.getPreferredName());
|
||||
if (timestamp == null) {
|
||||
LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hits[0].getId());
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
long timestampMs = TimeUtils.parseToEpochMs(timestamp);
|
||||
listener.onResponse(timestampMs);
|
||||
}
|
||||
}
|
||||
},
|
||||
listener::onFailure)
|
||||
|
@ -159,8 +169,15 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
|
|||
.mustNot(activeSnapshotFilter)
|
||||
.mustNot(retainFilter);
|
||||
|
||||
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE)
|
||||
.sort(ModelSnapshot.TIMESTAMP.getPreferredName()));
|
||||
SearchSourceBuilder source = new SearchSourceBuilder();
|
||||
source.query(query);
|
||||
source.size(MODEL_SNAPSHOT_SEARCH_SIZE);
|
||||
source.sort(ModelSnapshot.TIMESTAMP.getPreferredName());
|
||||
source.fetchSource(false);
|
||||
source.docValueField(Job.ID.getPreferredName(), null);
|
||||
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
|
||||
source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
|
||||
searchRequest.source(source);
|
||||
|
||||
long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null)
|
||||
? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis();
|
||||
|
@ -175,19 +192,29 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
|
|||
public void onResponse(SearchResponse searchResponse) {
|
||||
long nextToKeepMs = deleteAllBeforeMs;
|
||||
try {
|
||||
List<ModelSnapshot> modelSnapshots = new ArrayList<>();
|
||||
List<JobSnapshotId> snapshotIds = new ArrayList<>();
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
ModelSnapshot modelSnapshot = ModelSnapshot.fromJson(hit.getSourceRef());
|
||||
long timestampMs = modelSnapshot.getTimestamp().getTime();
|
||||
String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName());
|
||||
if (timestamp == null) {
|
||||
LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hit.getId());
|
||||
continue;
|
||||
}
|
||||
long timestampMs = TimeUtils.parseToEpochMs(timestamp);
|
||||
if (timestampMs >= nextToKeepMs) {
|
||||
do {
|
||||
nextToKeepMs += MS_IN_ONE_DAY;
|
||||
} while (timestampMs >= nextToKeepMs);
|
||||
continue;
|
||||
}
|
||||
modelSnapshots.add(modelSnapshot);
|
||||
JobSnapshotId idPair = new JobSnapshotId(
|
||||
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
|
||||
stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()));
|
||||
|
||||
if (idPair.hasNullValue() == false) {
|
||||
snapshotIds.add(idPair);
|
||||
}
|
||||
}
|
||||
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);
|
||||
deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
|
@ -200,14 +227,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
|
|||
};
|
||||
}
|
||||
|
||||
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {
|
||||
private void deleteModelSnapshots(Iterator<JobSnapshotId> modelSnapshotIterator, ActionListener<Boolean> listener) {
|
||||
if (modelSnapshotIterator.hasNext() == false) {
|
||||
listener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
|
||||
DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(
|
||||
modelSnapshot.getJobId(), modelSnapshot.getSnapshotId());
|
||||
JobSnapshotId idPair = modelSnapshotIterator.next();
|
||||
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
|
||||
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
|
||||
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<AcknowledgedResponse>() {
|
||||
@Override
|
||||
public void onResponse(AcknowledgedResponse response) {
|
||||
|
@ -220,9 +247,23 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
|
||||
+ modelSnapshot.getSnapshotId() + "]", e));
|
||||
listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot ["
|
||||
+ idPair.snapshotId + "]", e));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static class JobSnapshotId {
|
||||
private final String jobId;
|
||||
private final String snapshotId;
|
||||
|
||||
JobSnapshotId(String jobId, String snapshotId) {
|
||||
this.jobId = jobId;
|
||||
this.snapshotId = snapshotId;
|
||||
}
|
||||
|
||||
boolean hasNullValue() {
|
||||
return jobId == null || snapshotId == null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,29 @@
|
|||
package org.elasticsearch.xpack.ml.job.retention;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.document.DocumentField;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public interface MlDataRemover {
|
||||
void remove(float requestsPerSecond, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
|
||||
|
||||
/**
|
||||
* Extract {@code fieldName} from {@code hit} and if it is a string
|
||||
* return the string else {@code null}.
|
||||
* @param hit The search hit
|
||||
* @param fieldName Field to find
|
||||
* @return value iff the docfield is present and it is a string. Otherwise {@code null}
|
||||
*/
|
||||
default String stringFieldValueOrNull(SearchHit hit, String fieldName) {
|
||||
DocumentField docField = hit.field(fieldName);
|
||||
if (docField != null) {
|
||||
Object value = docField.getValue();
|
||||
if (value instanceof String) {
|
||||
return (String)value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,6 +93,14 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
|
|||
return createSearchResponse(toXContents, toXContents.size());
|
||||
}
|
||||
|
||||
static SearchResponse createSearchResponseFromHits(List<SearchHit> hits) {
|
||||
SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}),
|
||||
new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1.0f);
|
||||
SearchResponse searchResponse = mock(SearchResponse.class);
|
||||
when(searchResponse.getHits()).thenReturn(searchHits);
|
||||
return searchResponse;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static void givenJobs(Client client, List<Job> jobs) throws IOException {
|
||||
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
|
||||
|
|
|
@ -13,15 +13,19 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
|
||||
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
|
||||
import org.junit.Before;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -91,17 +95,20 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
|
|||
|
||||
Date now = new Date();
|
||||
Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis());
|
||||
ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
|
||||
SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "fresh-snapshot", oneDayAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1)));
|
||||
|
||||
// It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond
|
||||
Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1);
|
||||
ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted)));
|
||||
SearchHit snapshotToBeDeleted = createModelSnapshotQueryHit("job-1", "old-snapshot", eightDaysAndOneMsAgo);
|
||||
|
||||
ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAndOneMsAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1)));
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList()));
|
||||
|
||||
searchResponses.add(
|
||||
AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshotToBeDeleted)));
|
||||
|
||||
SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-1", "snapshots-1_1", eightDaysAndOneMsAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1)));
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.emptyList()));
|
||||
|
||||
givenClientRequestsSucceed(searchResponses);
|
||||
createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false);
|
||||
|
@ -178,18 +185,18 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
|
|||
|
||||
Date now = new Date();
|
||||
Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis());
|
||||
ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
|
||||
SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1", oneDayAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1)));
|
||||
|
||||
// It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond
|
||||
Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1);
|
||||
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(
|
||||
List<SearchHit> snapshots1JobSnapshots = Arrays.asList(
|
||||
snapshot1_1,
|
||||
createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo));
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
|
||||
createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo));
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(snapshots1JobSnapshots));
|
||||
|
||||
ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2)));
|
||||
SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2)));
|
||||
|
||||
givenClientDeleteModelSnapshotRequestsFail(searchResponses);
|
||||
createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false);
|
||||
|
@ -208,12 +215,12 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCalcCutoffEpochMs() throws IOException {
|
||||
public void testCalcCutoffEpochMs() {
|
||||
List<SearchResponse> searchResponses = new ArrayList<>();
|
||||
|
||||
Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis());
|
||||
ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
|
||||
SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "newest-snapshot", oneDayAgo);
|
||||
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1)));
|
||||
|
||||
givenClientRequests(searchResponses, true, true);
|
||||
|
||||
|
@ -226,6 +233,17 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
|
|||
verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime)));
|
||||
}
|
||||
|
||||
public void testJobSnapshotId() {
|
||||
ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b");
|
||||
assertFalse(id.hasNullValue());
|
||||
id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b");
|
||||
assertTrue(id.hasNullValue());
|
||||
id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null);
|
||||
assertTrue(id.hasNullValue());
|
||||
id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null);
|
||||
assertTrue(id.hasNullValue());
|
||||
}
|
||||
|
||||
private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executor = mock(ExecutorService.class);
|
||||
|
@ -245,6 +263,15 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
|
|||
return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build();
|
||||
}
|
||||
|
||||
private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId, Date date) {
|
||||
SearchHitBuilder hitBuilder = new SearchHitBuilder(0);
|
||||
hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId));
|
||||
hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId));
|
||||
String dateAsString = Long.valueOf(date.getTime()).toString();
|
||||
hitBuilder.addField(ModelSnapshot.TIMESTAMP.getPreferredName(), Collections.singletonList(dateAsString));
|
||||
return hitBuilder.build();
|
||||
}
|
||||
|
||||
private void givenClientRequestsSucceed(List<SearchResponse> searchResponses) {
|
||||
givenClientRequests(searchResponses, true, true);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ml.job.retention;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
|
||||
public class MlDataRemoverTests extends ESTestCase {
|
||||
public void testStringOrNull() {
|
||||
MlDataRemover remover = (requestsPerSecond, listener, isTimedOutSupplier) -> { };
|
||||
|
||||
SearchHitBuilder hitBuilder = new SearchHitBuilder(0);
|
||||
assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "missing"));
|
||||
|
||||
hitBuilder = new SearchHitBuilder(0);
|
||||
hitBuilder.addField("not_a_string", Collections.singletonList(new Date()));
|
||||
assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "not_a_string"));
|
||||
|
||||
hitBuilder = new SearchHitBuilder(0);
|
||||
hitBuilder.addField("string_field", Collections.singletonList("actual_string_value"));
|
||||
assertEquals("actual_string_value", remover.stringFieldValueOrNull(hitBuilder.build(), "string_field"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue