diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index a912b5d65f2..e6be2df0aed 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -265,18 +265,24 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO } /** - * The name of the index storing the job's results and state. - * This defaults to {@link #getId()} if a specific index name is not set. - * @return The job's index name + * A good starting name for the index storing the job's results. + * This defaults to the shared results index if a specific index name is not set. + * This method must only be used during initial job creation. + * After that the read/write aliases must always be used to access the job's + * results index, as the underlying index may roll or be reindexed. + * @return The job's initial results index name */ - public String getResultsIndexName() { + public String getInitialResultsIndexName() { return AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + resultsIndexName; } /** - * Private version of getResultsIndexName so that a job can be built from another - * job and pass index name validation - * @return The job's index name, minus prefix + * Get the unmodified results_index_name field from the job. + * This is provided to allow a job to be copied via the builder. + * After creation this does not necessarily reflect the actual concrete + * index used by the job. A job's results must always be read and written + * using the read and write aliases. + * @return The job's configured "index name" */ private String getResultsIndexNameNoPrefix() { return resultsIndexName; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 1ba774e4231..fc93f5c02a9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -416,14 +416,14 @@ public class JobTests extends AbstractSerializingTestCase { Job.Builder builder = buildJobBuilder("foo"); Job job = builder.build(); assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT, - job.getResultsIndexName()); + job.getInitialResultsIndexName()); } public void testBuilder_setsIndexName() { Job.Builder builder = buildJobBuilder("foo"); builder.setResultsIndexName("carol"); Job job = builder.build(); - assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-carol", job.getResultsIndexName()); + assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-carol", job.getInitialResultsIndexName()); } public void testBuilder_withInvalidIndexNameThrows() { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 9f38791bb9f..66bbe908fd0 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.ml.MachineLearning; @@ -57,7 +58,7 @@ public class MlJobIT extends ESRestTestCase { assertThat(responseAsString, containsString("\"job_id\":\"given-farequote-config-job\"")); } - public void testGetJob_GivenNoSuchJob() throws Exception { + public void testGetJob_GivenNoSuchJob() { ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/non-existing-job/_stats"))); @@ -519,8 +520,30 @@ public class MlJobIT extends ESRestTestCase { String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT; createFarequoteJob(jobId); - client().performRequest(new Request("PUT", indexName + "-001")); - client().performRequest(new Request("PUT", indexName + "-002")); + // Make the job's results span an extra two indices, i.e. three in total. + // To do this the job's results alias needs to encompass all three indices. + Request extraIndex1 = new Request("PUT", indexName + "-001"); + extraIndex1.setJsonEntity("{\n" + + " \"aliases\" : {\n" + + " \"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId)+ "\" : {\n" + + " \"filter\" : {\n" + + " \"term\" : {\"" + Job.ID + "\" : \"" + jobId + "\" }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"); + client().performRequest(extraIndex1); + Request extraIndex2 = new Request("PUT", indexName + "-002"); + extraIndex2.setJsonEntity("{\n" + + " \"aliases\" : {\n" + + " \"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId)+ "\" : {\n" + + " \"filter\" : {\n" + + " \"term\" : {\"" + Job.ID + "\" : \"" + jobId + "\" }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"); + client().performRequest(extraIndex2); String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity()); assertThat(indicesBeforeDelete, containsString(indexName)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 876f2cd1aac..90d8c6e677a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -267,26 +267,25 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler, Consumer failureHandler) { - AtomicReference indexName = new AtomicReference<>(); + AtomicReference indexNames = new AtomicReference<>(); final ActionListener completionHandler = ActionListener.wrap( response -> finishedHandler.accept(response.isAcknowledged()), failureHandler); - // Step 8. If we did not drop the index and after DBQ state done, we delete the aliases + // Step 8. If we did not drop the indices and after DBQ state done, we delete the aliases ActionListener dbqHandler = ActionListener.wrap( bulkByScrollResponse -> { - if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted + if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume indices were deleted completionHandler.onResponse(new AcknowledgedResponse(true)); } else { if (bulkByScrollResponse.isTimedOut()) { - logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName.get(), - indexName.get() + "-*"); + logger.warn("[{}] DeleteByQuery for indices [{}] timed out.", jobId, String.join(", ", indexNames.get())); } if (!bulkByScrollResponse.getBulkFailures().isEmpty()) { - logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].", + logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}].", jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(), - indexName.get(), indexName.get() + "-*"); + String.join(", ", indexNames.get())); for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) { logger.warn("DBQ failure: " + failure); } @@ -296,13 +295,12 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction deleteByQueryExecutor = ActionListener.wrap( response -> { - if (response) { - String indexPattern = indexName.get() + "-*"; - logger.info("Running DBQ on [" + indexName.get() + "," + indexPattern + "] for job [" + jobId + "]"); - DeleteByQueryRequest request = new DeleteByQueryRequest(indexName.get(), indexPattern); + if (response && indexNames.get().length > 0) { + logger.info("Running DBQ on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]"); + DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get()); ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); request.setQuery(query); @@ -318,15 +316,15 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction customIndexSearchHandler = ActionListener.wrap( searchResponse -> { if (searchResponse == null || searchResponse.getHits().getTotalHits().value > 0) { deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion } else { - logger.info("Running DELETE Index on [" + indexName.get() + "] for job [" + jobId + "]"); - DeleteIndexRequest request = new DeleteIndexRequest(indexName.get()); + logger.info("Running DELETE Index on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]"); + DeleteIndexRequest request = new DeleteIndexRequest(indexNames.get()); request.indicesOptions(IndicesOptions.lenientExpandOpen()); // If we have deleted the index, then we don't need to delete the aliases or run the DBQ executeAsyncWithOrigin( @@ -348,14 +346,28 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction getJobHandler = ActionListener.wrap( builder -> { Job job = builder.build(); - indexName.set(job.getResultsIndexName()); - if (indexName.get().equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + - AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) { - //don't bother searching the index any further, we are on the default shared + indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterService.state(), + IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId))); + // The job may no longer be using the initial shared index, but if it started off on a + // shared index then it will still be on a shared index even if it's been reindexed + if (job.getInitialResultsIndexName() + .equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) { + // don't bother searching the index any further, we are on the default shared + customIndexSearchHandler.onResponse(null); + } else if (indexNames.get().length == 0) { + // don't bother searching the index any further - it's already been closed or deleted customIndexSearchHandler.onResponse(null); } else { SearchSourceBuilder source = new SearchSourceBuilder() @@ -364,7 +376,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction deleteCategorizerStateHandler = ActionListener.wrap( response -> { jobConfigProvider.getJob(jobId, getJobHandler); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 820da6a6213..36381a58378 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -135,8 +135,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState); + String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId); + List unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState); if (unavailableIndices.size() != 0) { String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]"; @@ -359,9 +359,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction verifyIndicesPrimaryShardsAreActive(String resultsIndex, ClusterState clusterState) { + static List verifyIndicesPrimaryShardsAreActive(String resultsWriteIndex, ClusterState clusterState) { IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(); - String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indicesOfInterest(resultsIndex)); + String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), + indicesOfInterest(resultsWriteIndex)); List unavailableIndices = new ArrayList<>(indices.length); for (String index : indices) { // Indices are created on demand from templates. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index b942c49c14e..5ae1cafc9c4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -166,7 +166,7 @@ public class JobResultsProvider { .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId()))) .setIndicesOptions(IndicesOptions.strictExpand()); - String resultsIndexName = job.getResultsIndexName(); + String resultsIndexName = job.getInitialResultsIndexName(); SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId())) @@ -252,7 +252,7 @@ public class JobResultsProvider { String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()); String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(job.getId()); - String indexName = job.getResultsIndexName(); + String indexName = job.getInitialResultsIndexName(); final ActionListener createAliasListener = ActionListener.wrap(success -> { final IndicesAliasesRequest request = client.admin().indices().prepareAliases() diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index da54b33d275..b23e0426090 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -347,7 +347,7 @@ public class TransportOpenJobActionTests extends ESTestCase { when(job.getId()).thenReturn("incompatible_type_job"); when(job.getJobVersion()).thenReturn(Version.CURRENT); when(job.getJobType()).thenReturn("incompatible_type"); - when(job.getResultsIndexName()).thenReturn("shared"); + when(job.getInitialResultsIndexName()).thenReturn("shared"); cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);