diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 365149c1239..c28de014e2e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -689,21 +689,17 @@ public class MlJobIT extends ESRestTestCase { refreshAllIndices(); - // check that the indices still exist but are empty + // check that the default shared index still exists but is empty String indicesAfterDelete = EntityUtils.toString(client().performRequest( new Request("GET", "/_cat/indices/" + AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "*")).getEntity()); assertThat(indicesAfterDelete, containsString(indexName)); - assertThat(indicesAfterDelete, containsString(indexName + "-001")); - assertThat(indicesAfterDelete, containsString(indexName + "-002")); + + // other results indices should be deleted as this test job ID is the only job in those indices + assertThat(indicesAfterDelete, not(containsString(indexName + "-001"))); + assertThat(indicesAfterDelete, not(containsString(indexName + "-002"))); assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "/_count")).getEntity()), containsString("\"count\":0")); - assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "-001/_count")).getEntity()), - containsString("\"count\":0")); - assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "-002/_count")).getEntity()), - containsString("\"count\":0")); - - expectThrows(ResponseException.class, () -> client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"))); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 25e9a0516f9..b6af373c834 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -16,7 +16,9 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.MultiSearchAction; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; @@ -316,15 +318,15 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction deleteByQueryExecutor = ActionListener.wrap( response -> { if (response && indexNames.get().length > 0) { - logger.info("Running DBQ on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]"); - DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get()); + logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get())); ConstantScoreQueryBuilder query = - new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); - request.setQuery(query); - request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); - request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - request.setAbortOnVersionConflict(false); - request.setRefresh(true); + new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); + DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get()) + .setQuery(query) + .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden())) + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) + .setAbortOnVersionConflict(false) + .setRefresh(true); executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler); } else { // We did not execute DBQ, no need to delete aliases or check the response @@ -333,72 +335,97 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction customIndexSearchHandler = ActionListener.wrap( - searchResponse -> { - if (searchResponse == null || searchResponse.getHits().getTotalHits().value > 0) { - deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion - } else { - logger.info("Running DELETE Index on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]"); - DeleteIndexRequest request = new DeleteIndexRequest(indexNames.get()); - request.indicesOptions(IndicesOptions.lenientExpandOpen()); - // If we have deleted the index, then we don't need to delete the aliases or run the DBQ - executeAsyncWithOrigin( - parentTaskClient.threadPool().getThreadContext(), - ML_ORIGIN, - request, - ActionListener.wrap( - response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias - failureHandler), - parentTaskClient.admin().indices()::delete); - } - }, - failure -> { - if (ExceptionsHelper.unwrapCause(failure) instanceof IndexNotFoundException) { // assume the index is already deleted - deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias - } else { - failureHandler.accept(failure); - } - } + // Step 6. Handle each multi-search response. There should be one response for each underlying index. + // For each underlying index that contains results ONLY for the current job, we will delete that index. + // If there exists at least 1 index that has another job's results, we will run DBQ. + ActionListener customIndexSearchHandler = ActionListener.wrap( + multiSearchResponse -> { + if (multiSearchResponse == null) { + deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion + return; + } + String defaultSharedIndex = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT; + List indicesToDelete = new ArrayList<>(); + boolean needToRunDBQTemp = false; + assert multiSearchResponse.getResponses().length == indexNames.get().length; + int i = 0; + for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) { + if (item.isFailure()) { + ++i; + if (ExceptionsHelper.unwrapCause(item.getFailure()) instanceof IndexNotFoundException) { + // index is already deleted, no need to take action against it + continue; + } else { + failureHandler.accept(item.getFailure()); + return; + } + } + SearchResponse searchResponse = item.getResponse(); + if (searchResponse.getHits().getTotalHits().value > 0 || indexNames.get()[i].equals(defaultSharedIndex)) { + needToRunDBQTemp = true; + } else { + indicesToDelete.add(indexNames.get()[i]); + } + ++i; + } + final boolean needToRunDBQ = needToRunDBQTemp; + if (indicesToDelete.isEmpty()) { + deleteByQueryExecutor.onResponse(needToRunDBQ); + return; + } + logger.info("[{}] deleting the following indices directly {}", jobId, indicesToDelete); + DeleteIndexRequest request = new DeleteIndexRequest(indicesToDelete.toArray(new String[0])); + request.indicesOptions(IndicesOptions.lenientExpandOpenHidden()); + executeAsyncWithOrigin( + parentTaskClient.threadPool().getThreadContext(), + ML_ORIGIN, + request, + ActionListener.wrap( + response -> deleteByQueryExecutor.onResponse(needToRunDBQ), // only run DBQ if there is a shared index + failureHandler), + parentTaskClient.admin().indices()::delete); + }, + failure -> { + if (ExceptionsHelper.unwrapCause(failure) instanceof IndexNotFoundException) { // assume the index is already deleted + deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias + } else { + failureHandler.accept(failure); + } + } ); - // Step 5. Determine if we are on shared indices by looking at whether the initial index was ".ml-anomalies-shared" - // or whether the indices that the job's results alias points to contain any documents from other jobs. - // TODO: this check is currently assuming that a job's results indices are either ALL shared or ALL - // dedicated to the job. We have considered functionality like rolling jobs that generate large - // volumes of results from shared to dedicated indices. On deletion such a job would have a mix of - // shared indices requiring DBQ and dedicated indices that could be simply dropped. The current - // functionality would apply DBQ to all these indices, which is safe but suboptimal. So this functionality - // should be revisited when we add rolling results index functionality, especially if we add the ability - // to switch a job over to a dedicated index for future results. + // Step 5. If we successfully find a job, gather information about its result indices. + // This will execute a multi-search action for every concrete index behind the job results alias. + // If there are no concrete indices, take no action and go to the next step. ActionListener getJobHandler = ActionListener.wrap( - builder -> { - Job job = builder.build(); - indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterService.state(), - IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId))); - // The job may no longer be using the initial shared index, but if it started off on a - // shared index then it will still be on a shared index even if it's been reindexed - if (job.getInitialResultsIndexName() - .equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) { - // don't bother searching the index any further, we are on the default shared - customIndexSearchHandler.onResponse(null); - } else if (indexNames.get().length == 0) { - // don't bother searching the index any further - it's already been closed or deleted - customIndexSearchHandler.onResponse(null); - } else { - SearchSourceBuilder source = new SearchSourceBuilder() - .size(1) - .trackTotalHits(true) - .query(QueryBuilders.boolQuery().filter( - QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)))); - - SearchRequest searchRequest = new SearchRequest(indexNames.get()); - searchRequest.source(source); - executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler); - } - }, - failureHandler + builder -> { + indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterService.state(), + IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId))); + if (indexNames.get().length == 0) { + // don't bother searching the index any further - it's already been closed or deleted + customIndexSearchHandler.onResponse(null); + return; + } + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + // It is important that the requests are in the same order as the index names. + // This is because responses are ordered according to their requests. + for (String indexName : indexNames.get()) { + SearchSourceBuilder source = new SearchSourceBuilder() + .size(0) + // if we have just one hit we cannot delete the index + .trackTotalHitsUpTo(1) + .query(QueryBuilders.boolQuery().filter( + QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)))); + multiSearchRequest.add(new SearchRequest(indexName).source(source)); + } + executeAsyncWithOrigin(parentTaskClient, + ML_ORIGIN, + MultiSearchAction.INSTANCE, + multiSearchRequest, + customIndexSearchHandler); + }, + failureHandler ); // Step 4. Get the job as the initial result index name is required diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index 18f7bed0261..543ab85a71b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -173,16 +173,18 @@ public class JobResultsProvider { SearchRequestBuilder stateDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setQuery(QueryBuilders.idsQuery().addIds(CategorizerState.documentId(job.getId(), 1), CategorizerState.v54DocumentId(job.getId(), 1))) + .setTrackTotalHits(false) .setIndicesOptions(IndicesOptions.strictExpand()); SearchRequestBuilder quantilesDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId()))) + .setTrackTotalHits(false) .setIndicesOptions(IndicesOptions.strictExpand()); - String resultsIndexName = job.getInitialResultsIndexName(); - SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + SearchRequestBuilder resultDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*") + .setIndicesOptions(IndicesOptions.lenientExpandHidden()) .setQuery(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId())) + .setTrackTotalHits(false) .setSize(1); MultiSearchRequestBuilder msearch = client.prepareMultiSearch() diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java index 463106c507e..fa3db2956d9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java @@ -5,22 +5,82 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; +import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; /** * Test that ML does not touch unnecessary indices when removing job index aliases */ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase { + private static long bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN.getMillis(); private static final String UNRELATED_INDEX = "unrelated-data"; + private JobResultsProvider jobResultsProvider; + private JobResultsPersister jobResultsPersister; + + @Before + public void createComponents() { + Settings settings = nodeSettings(0); + ThreadPool tp = mock(ThreadPool.class); + ClusterSettings clusterSettings = new ClusterSettings(settings, + new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS, + MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, + AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, + ClusterService.USER_DEFINED_METADATA, + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING))); + ClusterService clusterService = new ClusterService(settings, clusterSettings, tp); + OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN); + ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings); + jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver()); + jobResultsPersister = new JobResultsPersister( + originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")); + } + public void testUnrelatedIndexNotTouched() throws Exception { internalCluster().ensureAtLeastNumDataNodes(1); ensureStableCluster(1); @@ -46,4 +106,106 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase { disableIndexBlock(UNRELATED_INDEX, IndexMetadata.SETTING_READ_ONLY); } + + public void testDeleteDedicatedJobWithDataInShared() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(1); + ensureStableCluster(1); + String jobIdDedicated = "delete-test-job-dedicated"; + + Job.Builder job = createJob(jobIdDedicated, new ByteSizeValue(2, ByteSizeUnit.MB)).setResultsIndexName(jobIdDedicated); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).actionGet(); + String dedicatedIndex = job.build().getInitialResultsIndexName(); + awaitJobOpenedAndAssigned(job.getId(), null); + createBuckets(jobIdDedicated, 1, 10); + + String jobIdShared = "delete-test-job-shared"; + job = createJob(jobIdShared, new ByteSizeValue(2, ByteSizeUnit.MB)); + client().execute(PutJobAction.INSTANCE, new PutJobAction.Request(job)).actionGet(); + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).actionGet(); + awaitJobOpenedAndAssigned(job.getId(), null); + createBuckets(jobIdShared, 1, 10); + + // Manually switching over alias info + IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest() + .addAliasAction(IndicesAliasesRequest.AliasActions + .add() + .alias(AnomalyDetectorsIndex.jobResultsAliasedName(jobIdDedicated)) + .isHidden(true) + .index(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared") + .writeIndex(false) + .filter(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobIdDedicated)))) + .addAliasAction(IndicesAliasesRequest.AliasActions + .add() + .alias(AnomalyDetectorsIndex.resultsWriteAlias(jobIdDedicated)) + .index(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "shared") + .isHidden(true) + .writeIndex(true)) + .addAliasAction(IndicesAliasesRequest.AliasActions + .remove() + .alias(AnomalyDetectorsIndex.resultsWriteAlias(jobIdDedicated)) + .index(dedicatedIndex)); + + client().admin().indices().aliases(aliasesRequest).actionGet(); + + createBuckets(jobIdDedicated, 11, 10); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*").get(); + AtomicReference> bucketHandler = new AtomicReference<>(); + AtomicReference failureHandler = new AtomicReference<>(); + blockingCall(listener -> jobResultsProvider.buckets(jobIdDedicated, + new BucketsQueryBuilder().from(0).size(22), + listener::onResponse, + listener::onFailure, + client()), bucketHandler, failureHandler); + assertThat(failureHandler.get(), is(nullValue())); + assertThat(bucketHandler.get().count(), equalTo(22L)); + + DeleteJobAction.Request deleteJobRequest = new DeleteJobAction.Request(jobIdDedicated); + deleteJobRequest.setForce(true); + client().execute(DeleteJobAction.INSTANCE, deleteJobRequest).get(); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*").get(); + // Make sure our shared index job is OK + bucketHandler = new AtomicReference<>(); + failureHandler = new AtomicReference<>(); + blockingCall(listener -> jobResultsProvider.buckets(jobIdShared, + new BucketsQueryBuilder().from(0).size(21), + listener::onResponse, + listener::onFailure, + client()), bucketHandler, failureHandler); + assertThat(failureHandler.get(), is(nullValue())); + assertThat(bucketHandler.get().count(), equalTo(11L)); + + // Make sure dedicated index is gone + assertThat(client().admin() + .indices() + .prepareGetIndex() + .setIndices(dedicatedIndex) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .get() + .indices().length, equalTo(0)); + + // Make sure all results referencing the dedicated job are gone + assertThat(client().prepareSearch() + .setIndices(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*") + .setIndicesOptions(IndicesOptions.lenientExpandOpenHidden()) + .setTrackTotalHits(true) + .setSize(0) + .setSource(SearchSourceBuilder.searchSource() + .query(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobIdDedicated)))) + .get() + .getHits() + .getTotalHits() + .value, equalTo(0L)); + } + + private void createBuckets(String jobId, int from, int count) { + JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId, () -> true); + for (int i = from; i <= count + from; ++i) { + Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan); + builder.persistBucket(bucket); + } + builder.executeRequest(); + } + } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 05c37a69f89..45bb5a5405e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.support; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -66,8 +67,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; @@ -391,6 +394,25 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { } } + protected static void blockingCall(Consumer> function, + AtomicReference response, + AtomicReference error) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = ActionListener.wrap( + r -> { + response.set(r); + latch.countDown(); + }, + e -> { + error.set(e); + latch.countDown(); + } + ); + + function.accept(listener); + latch.await(); + } + protected String awaitJobOpenedAndAssigned(String jobId, String queryNode) throws Exception { PersistentTasksClusterService persistentTasksClusterService =