[ML] Tighten up use of aliases rather than concrete indices (#37874)
We have read and write aliases for the ML results indices. However, the job still had methods that purported to reliably return the name of the concrete results index being used by the job. After reindexing prior to upgrade to 7.x this will be wrong, so the method has been renamed and the comments made more explicit to say the returned index name may not be the actual concrete index name for the lifetime of the job. Additionally, the selection of indices when deleting the job has been changed so that it works regardless of concrete index names. All these changes are nice-to-have for 6.7 and 7.0, but will become critical if we add rolling results indices in the 7.x release stream as 6.7 and 7.0 nodes may have to operate in a mixed version cluster that includes a version that can roll results indices.
This commit is contained in:
parent
a5f578f7ea
commit
57d321ed5f
|
@ -265,18 +265,24 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
}
|
||||
|
||||
/**
|
||||
* The name of the index storing the job's results and state.
|
||||
* This defaults to {@link #getId()} if a specific index name is not set.
|
||||
* @return The job's index name
|
||||
* A good starting name for the index storing the job's results.
|
||||
* This defaults to the shared results index if a specific index name is not set.
|
||||
* This method must <em>only</em> be used during initial job creation.
|
||||
* After that the read/write aliases must always be used to access the job's
|
||||
* results index, as the underlying index may roll or be reindexed.
|
||||
* @return The job's initial results index name
|
||||
*/
|
||||
public String getResultsIndexName() {
|
||||
public String getInitialResultsIndexName() {
|
||||
return AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + resultsIndexName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Private version of getResultsIndexName so that a job can be built from another
|
||||
* job and pass index name validation
|
||||
* @return The job's index name, minus prefix
|
||||
* Get the unmodified <code>results_index_name</code> field from the job.
|
||||
* This is provided to allow a job to be copied via the builder.
|
||||
* After creation this does not necessarily reflect the actual concrete
|
||||
* index used by the job. A job's results must always be read and written
|
||||
* using the read and write aliases.
|
||||
* @return The job's configured "index name"
|
||||
*/
|
||||
private String getResultsIndexNameNoPrefix() {
|
||||
return resultsIndexName;
|
||||
|
|
|
@ -416,14 +416,14 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
Job.Builder builder = buildJobBuilder("foo");
|
||||
Job job = builder.build();
|
||||
assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT,
|
||||
job.getResultsIndexName());
|
||||
job.getInitialResultsIndexName());
|
||||
}
|
||||
|
||||
public void testBuilder_setsIndexName() {
|
||||
Job.Builder builder = buildJobBuilder("foo");
|
||||
builder.setResultsIndexName("carol");
|
||||
Job job = builder.build();
|
||||
assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-carol", job.getResultsIndexName());
|
||||
assertEquals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-carol", job.getInitialResultsIndexName());
|
||||
}
|
||||
|
||||
public void testBuilder_withInvalidIndexNameThrows() {
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
|||
import org.elasticsearch.test.SecuritySettingsSourceField;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
|
@ -57,7 +58,7 @@ public class MlJobIT extends ESRestTestCase {
|
|||
assertThat(responseAsString, containsString("\"job_id\":\"given-farequote-config-job\""));
|
||||
}
|
||||
|
||||
public void testGetJob_GivenNoSuchJob() throws Exception {
|
||||
public void testGetJob_GivenNoSuchJob() {
|
||||
ResponseException e = expectThrows(ResponseException.class, () ->
|
||||
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/non-existing-job/_stats")));
|
||||
|
||||
|
@ -519,8 +520,30 @@ public class MlJobIT extends ESRestTestCase {
|
|||
String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
|
||||
createFarequoteJob(jobId);
|
||||
|
||||
client().performRequest(new Request("PUT", indexName + "-001"));
|
||||
client().performRequest(new Request("PUT", indexName + "-002"));
|
||||
// Make the job's results span an extra two indices, i.e. three in total.
|
||||
// To do this the job's results alias needs to encompass all three indices.
|
||||
Request extraIndex1 = new Request("PUT", indexName + "-001");
|
||||
extraIndex1.setJsonEntity("{\n" +
|
||||
" \"aliases\" : {\n" +
|
||||
" \"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId)+ "\" : {\n" +
|
||||
" \"filter\" : {\n" +
|
||||
" \"term\" : {\"" + Job.ID + "\" : \"" + jobId + "\" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
client().performRequest(extraIndex1);
|
||||
Request extraIndex2 = new Request("PUT", indexName + "-002");
|
||||
extraIndex2.setJsonEntity("{\n" +
|
||||
" \"aliases\" : {\n" +
|
||||
" \"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId)+ "\" : {\n" +
|
||||
" \"filter\" : {\n" +
|
||||
" \"term\" : {\"" + Job.ID + "\" : \"" + jobId + "\" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
client().performRequest(extraIndex2);
|
||||
|
||||
String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
|
||||
assertThat(indicesBeforeDelete, containsString(indexName));
|
||||
|
|
|
@ -267,26 +267,25 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId,
|
||||
CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {
|
||||
|
||||
AtomicReference<String> indexName = new AtomicReference<>();
|
||||
AtomicReference<String[]> indexNames = new AtomicReference<>();
|
||||
|
||||
final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
|
||||
response -> finishedHandler.accept(response.isAcknowledged()),
|
||||
failureHandler);
|
||||
|
||||
// Step 8. If we did not drop the index and after DBQ state done, we delete the aliases
|
||||
// Step 8. If we did not drop the indices and after DBQ state done, we delete the aliases
|
||||
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
|
||||
bulkByScrollResponse -> {
|
||||
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted
|
||||
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume indices were deleted
|
||||
completionHandler.onResponse(new AcknowledgedResponse(true));
|
||||
} else {
|
||||
if (bulkByScrollResponse.isTimedOut()) {
|
||||
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName.get(),
|
||||
indexName.get() + "-*");
|
||||
logger.warn("[{}] DeleteByQuery for indices [{}] timed out.", jobId, String.join(", ", indexNames.get()));
|
||||
}
|
||||
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
|
||||
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].",
|
||||
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}].",
|
||||
jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(),
|
||||
indexName.get(), indexName.get() + "-*");
|
||||
String.join(", ", indexNames.get()));
|
||||
for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
|
||||
logger.warn("DBQ failure: " + failure);
|
||||
}
|
||||
|
@ -296,13 +295,12 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 7. If we did not delete the index, we run a delete by query
|
||||
// Step 7. If we did not delete the indices, we run a delete by query
|
||||
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
|
||||
response -> {
|
||||
if (response) {
|
||||
String indexPattern = indexName.get() + "-*";
|
||||
logger.info("Running DBQ on [" + indexName.get() + "," + indexPattern + "] for job [" + jobId + "]");
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName.get(), indexPattern);
|
||||
if (response && indexNames.get().length > 0) {
|
||||
logger.info("Running DBQ on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]");
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get());
|
||||
ConstantScoreQueryBuilder query =
|
||||
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
|
||||
request.setQuery(query);
|
||||
|
@ -318,15 +316,15 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
},
|
||||
failureHandler);
|
||||
|
||||
// Step 6. If we have any hits, that means we are NOT the only job on this index, and should not delete it
|
||||
// if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion
|
||||
// Step 6. If we have any hits, that means we are NOT the only job on these indices, and should not delete the indices.
|
||||
// If we do not have any hits, we can drop the indices and then skip the DBQ and alias deletion.
|
||||
ActionListener<SearchResponse> customIndexSearchHandler = ActionListener.wrap(
|
||||
searchResponse -> {
|
||||
if (searchResponse == null || searchResponse.getHits().getTotalHits().value > 0) {
|
||||
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
|
||||
} else {
|
||||
logger.info("Running DELETE Index on [" + indexName.get() + "] for job [" + jobId + "]");
|
||||
DeleteIndexRequest request = new DeleteIndexRequest(indexName.get());
|
||||
logger.info("Running DELETE Index on [" + String.join(", ", indexNames.get()) + "] for job [" + jobId + "]");
|
||||
DeleteIndexRequest request = new DeleteIndexRequest(indexNames.get());
|
||||
request.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
|
||||
executeAsyncWithOrigin(
|
||||
|
@ -348,14 +346,28 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
}
|
||||
);
|
||||
|
||||
// Step 5. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
|
||||
// Step 5. Determine if we are on shared indices by looking at whether the initial index was ".ml-anomalies-shared"
|
||||
// or whether the indices that the job's results alias points to contain any documents from other jobs.
|
||||
// TODO: this check is currently assuming that a job's results indices are either ALL shared or ALL
|
||||
// dedicated to the job. We have considered functionality like rolling jobs that generate large
|
||||
// volumes of results from shared to dedicated indices. On deletion such a job would have a mix of
|
||||
// shared indices requiring DBQ and dedicated indices that could be simply dropped. The current
|
||||
// functionality would apply DBQ to all these indices, which is safe but suboptimal. So this functionality
|
||||
// should be revisited when we add rolling results index functionality, especially if we add the ability
|
||||
// to switch a job over to a dedicated index for future results.
|
||||
ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
|
||||
builder -> {
|
||||
Job job = builder.build();
|
||||
indexName.set(job.getResultsIndexName());
|
||||
if (indexName.get().equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
|
||||
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
|
||||
//don't bother searching the index any further, we are on the default shared
|
||||
indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
|
||||
IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId)));
|
||||
// The job may no longer be using the initial shared index, but if it started off on a
|
||||
// shared index then it will still be on a shared index even if it's been reindexed
|
||||
if (job.getInitialResultsIndexName()
|
||||
.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
|
||||
// don't bother searching the index any further, we are on the default shared
|
||||
customIndexSearchHandler.onResponse(null);
|
||||
} else if (indexNames.get().length == 0) {
|
||||
// don't bother searching the index any further - it's already been closed or deleted
|
||||
customIndexSearchHandler.onResponse(null);
|
||||
} else {
|
||||
SearchSourceBuilder source = new SearchSourceBuilder()
|
||||
|
@ -364,7 +376,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
.query(QueryBuilders.boolQuery().filter(
|
||||
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indexName.get());
|
||||
SearchRequest searchRequest = new SearchRequest(indexNames.get());
|
||||
searchRequest.source(source);
|
||||
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
|
||||
}
|
||||
|
@ -372,7 +384,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
|
|||
failureHandler
|
||||
);
|
||||
|
||||
// Step 4. Get the job as the result index name is required
|
||||
// Step 4. Get the job as the initial result index name is required
|
||||
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
jobConfigProvider.getJob(jobId, getJobHandler);
|
||||
|
|
|
@ -135,8 +135,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
int maxMachineMemoryPercent,
|
||||
MlMemoryTracker memoryTracker,
|
||||
Logger logger) {
|
||||
String resultsIndexName = job.getResultsIndexName();
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState);
|
||||
String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
||||
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
|
||||
if (unavailableIndices.size() != 0) {
|
||||
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
|
||||
String.join(",", unavailableIndices) + "]";
|
||||
|
@ -359,9 +359,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), resultsIndex, MlMetaIndex.INDEX_NAME};
|
||||
}
|
||||
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(String resultsIndex, ClusterState clusterState) {
|
||||
static List<String> verifyIndicesPrimaryShardsAreActive(String resultsWriteIndex, ClusterState clusterState) {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
|
||||
String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indicesOfInterest(resultsIndex));
|
||||
String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(),
|
||||
indicesOfInterest(resultsWriteIndex));
|
||||
List<String> unavailableIndices = new ArrayList<>(indices.length);
|
||||
for (String index : indices) {
|
||||
// Indices are created on demand from templates.
|
||||
|
|
|
@ -166,7 +166,7 @@ public class JobResultsProvider {
|
|||
.setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId())))
|
||||
.setIndicesOptions(IndicesOptions.strictExpand());
|
||||
|
||||
String resultsIndexName = job.getResultsIndexName();
|
||||
String resultsIndexName = job.getInitialResultsIndexName();
|
||||
SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName)
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.setQuery(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
|
||||
|
@ -252,7 +252,7 @@ public class JobResultsProvider {
|
|||
|
||||
String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
|
||||
String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(job.getId());
|
||||
String indexName = job.getResultsIndexName();
|
||||
String indexName = job.getInitialResultsIndexName();
|
||||
|
||||
final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
|
||||
final IndicesAliasesRequest request = client.admin().indices().prepareAliases()
|
||||
|
|
|
@ -347,7 +347,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
when(job.getId()).thenReturn("incompatible_type_job");
|
||||
when(job.getJobVersion()).thenReturn(Version.CURRENT);
|
||||
when(job.getJobType()).thenReturn("incompatible_type");
|
||||
when(job.getResultsIndexName()).thenReturn("shared");
|
||||
when(job.getInitialResultsIndexName()).thenReturn("shared");
|
||||
|
||||
cs.nodes(nodes);
|
||||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
|
||||
|
|
Loading…
Reference in New Issue