[ML] Shave off DeleteExpiredDataIT runtime (#39557)

This commit parallelizes some parts of the test
and its remove an unnecessary refresh call.
On my local machine it shaves off about 15 seconds
for a test execution time of ~64s (down from ~80s).
This test is still slow but progress over perfection.

Relates #37339
This commit is contained in:
Dimitris Athanasiou 2019-03-01 17:17:26 +02:00
parent 0c6b7cfb77
commit 8843832039
1 changed files with 24 additions and 13 deletions

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.integration; package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -77,9 +78,6 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get(); .get();
assertThat(bulkResponse.hasFailures(), is(false)); assertThat(bulkResponse.hasFailures(), is(false));
// Ensure all data is searchable
client().admin().indices().prepareRefresh(DATA_INDEX).get();
} }
@After @After
@ -94,6 +92,17 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
} }
public void testDeleteExpiredData() throws Exception { public void testDeleteExpiredData() throws Exception {
// Index some unused state documents (more than 10K to test scrolling works)
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
indexRequest.source(Collections.emptyMap());
bulkRequestBuilder.add(indexRequest);
}
ActionFuture<BulkResponse> indexUnusedStateDocsResponse = bulkRequestBuilder.execute();
registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L)); registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(1000L));
registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L)); registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(1000L));
registerJob(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L)); registerJob(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
@ -104,6 +113,8 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1; long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1;
// Start all jobs
for (Job.Builder job : getJobs()) { for (Job.Builder job : getJobs()) {
putJob(job); putJob(job);
@ -117,7 +128,14 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
// Run up to a day ago // Run up to a day ago
openJob(job.getId()); openJob(job.getId());
startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis()); startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis());
}
// Now let's wait for all jobs to be closed
for (Job.Builder job : getJobs()) {
waitUntilJobIsClosed(job.getId()); waitUntilJobIsClosed(job.getId());
}
for (Job.Builder job : getJobs()) {
assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(47))); assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(47)));
assertThat(getRecords(job.getId()).size(), equalTo(1)); assertThat(getRecords(job.getId()).size(), equalTo(1));
List<ModelSnapshot> modelSnapshots = getModelSnapshots(job.getId()); List<ModelSnapshot> modelSnapshots = getModelSnapshots(job.getId());
@ -143,6 +161,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
waitForecastToFinish(job.getId(), forecastDefaultExpiryId); waitForecastToFinish(job.getId(), forecastDefaultExpiryId);
waitForecastToFinish(job.getId(), forecastNoExpiryId); waitForecastToFinish(job.getId(), forecastNoExpiryId);
} }
// Refresh to ensure the snapshot timestamp updates are visible // Refresh to ensure the snapshot timestamp updates are visible
client().admin().indices().prepareRefresh("*").get(); client().admin().indices().prepareRefresh("*").get();
@ -175,16 +194,8 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount())); assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount()));
} }
// Index some unused state documents (more than 10K to test scrolling works) // Before we call the delete-expired-data action we need to make sure the unused state docs were indexed
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
indexRequest.source(Collections.emptyMap());
bulkRequestBuilder.add(indexRequest);
}
assertThat(bulkRequestBuilder.get().status(), equalTo(RestStatus.OK));
// Now call the action under test // Now call the action under test
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();