[ML] Unused state remover should also account for jobs in index (#37119)
The unused state remover was never adjusted to account for jobs stored in the config index. The result was that when triggered it removed state for all jobs stored in the config index. This commit fixes the issue. Closes #37109
This commit is contained in:
parent
bfe6f091da
commit
0fd27d4d6f
|
@ -43,7 +43,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
||||
|
@ -244,7 +243,10 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
|
|||
.setFetchSource(false)
|
||||
.setSize(10000)
|
||||
.get();
|
||||
assertThat(stateDocsResponse.getHits().getTotalHits().value, lessThan(10000L));
|
||||
|
||||
// Assert at least one state doc for each job
|
||||
assertThat(stateDocsResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(5L));
|
||||
|
||||
for (SearchHit hit : stateDocsResponse.getHits().getHits()) {
|
||||
assertThat(hit.getId().startsWith("non_existing_job"), is(false));
|
||||
}
|
||||
|
|
|
@ -14,15 +14,18 @@ import org.elasticsearch.action.delete.DeleteRequest;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Deque;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
@ -81,7 +84,18 @@ public class UnusedStateRemover implements MlDataRemover {
|
|||
}
|
||||
|
||||
private Set<String> getJobIds() {
|
||||
return MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet();
|
||||
Set<String> jobIds = new HashSet<>();
|
||||
|
||||
// TODO Once at 8.0, we can stop searching for jobs in cluster state
|
||||
// and remove cluster service as a member all together.
|
||||
jobIds.addAll(MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet());
|
||||
|
||||
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
|
||||
while (jobsIterator.hasNext()) {
|
||||
Deque<Job.Builder> jobs = jobsIterator.next();
|
||||
jobs.stream().map(Job.Builder::getId).forEach(jobIds::add);
|
||||
}
|
||||
return jobIds;
|
||||
}
|
||||
|
||||
private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener<Boolean> listener) {
|
||||
|
|
Loading…
Reference in New Issue