[ML] Ensure annotations index mappings are up to date (#61142)
When the ML annotations index was first added, only the ML UI wrote to it, so the code to create it was designed with this in mind. Now the ML backend also creates annotations, and those mappings can change between versions. In this change: 1. The code that runs on the master node to create the annotations index if it doesn't exist but another ML index does also now ensures the mappings are up-to-date. This is good enough for the ML UI's use of the annotations index, because the upgrade order rules say that the whole Elasticsearch cluster must be upgraded prior to Kibana, so the master node should be on the newer version before Kibana tries to write an annotation with the new fields. 2. We now also check whether the annotations index exists with the correct mappings before starting an autodetect process on a node. This is necessary because ML nodes can be upgraded before the master node, so could write an annotation with the new fields before the master node knows about the new fields. Backport of #61107
This commit is contained in:
parent
7c3bfb9437
commit
d1b60269f4
|
@ -18,9 +18,11 @@ import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.template.TemplateUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
||||
|
@ -46,6 +48,15 @@ public class AnnotationIndex {
|
|||
|
||||
boolean isHiddenAttributeAvailable = state.nodes().getMinNodeVersion().onOrAfter(HIDDEN_INTRODUCED_VERSION);
|
||||
|
||||
final ActionListener<Boolean> checkMappingsListener = ActionListener.wrap(success -> {
|
||||
ElasticsearchMappings.addDocMappingIfMissing(
|
||||
WRITE_ALIAS_NAME,
|
||||
AnnotationIndex::annotationsMapping,
|
||||
client,
|
||||
state,
|
||||
finalListener);
|
||||
}, finalListener::onFailure);
|
||||
|
||||
final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
|
||||
IndicesAliasesRequest.AliasActions addReadAliasAction =
|
||||
IndicesAliasesRequest.AliasActions.add().index(INDEX_NAME).alias(READ_ALIAS_NAME);
|
||||
|
@ -61,7 +72,8 @@ public class AnnotationIndex {
|
|||
.addAliasAction(addWriteAliasAction)
|
||||
.request();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request,
|
||||
ActionListener.<AcknowledgedResponse>wrap(r -> finalListener.onResponse(r.isAcknowledged()), finalListener::onFailure),
|
||||
ActionListener.<AcknowledgedResponse>wrap(
|
||||
r -> checkMappingsListener.onResponse(r.isAcknowledged()), finalListener::onFailure),
|
||||
client.admin().indices()::aliases);
|
||||
}, finalListener::onFailure);
|
||||
|
||||
|
@ -107,6 +119,10 @@ public class AnnotationIndex {
|
|||
createAliasListener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check the mappings
|
||||
checkMappingsListener.onResponse(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// Nothing to do, but respond to the listener
|
||||
|
@ -114,7 +130,11 @@ public class AnnotationIndex {
|
|||
}
|
||||
|
||||
private static String annotationsMapping() {
|
||||
return TemplateUtils.loadTemplate(
|
||||
"/org/elasticsearch/xpack/core/ml/annotations_index_mappings.json", Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE);
|
||||
return annotationsMapping(SINGLE_MAPPING_NAME);
|
||||
}
|
||||
|
||||
private static String annotationsMapping(String mappingType) {
|
||||
return TemplateUtils.loadTemplate("/org/elasticsearch/xpack/core/ml/annotations_index_mappings.json",
|
||||
Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE, Collections.singletonMap("xpack.ml.mapping_type", mappingType));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
{
|
||||
"_doc": {
|
||||
"${xpack.ml.mapping_type}": {
|
||||
"_meta" : {
|
||||
"version" : "${xpack.ml.version}"
|
||||
},
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.xpack.core.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
|
||||
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
|
||||
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
|
@ -448,9 +449,21 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|||
);
|
||||
|
||||
// Try adding the results doc mapping - this updates to the latest version if an old mapping is present
|
||||
ActionListener<Boolean> annotationsIndexUpdateHandler = ActionListener.wrap(
|
||||
ack -> ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
|
||||
AnomalyDetectorsIndex::resultsMapping, client, clusterState, resultsMappingUpdateHandler),
|
||||
e -> {
|
||||
// Due to a bug in 7.9.0 it's possible that the annotations index already has incorrect mappings
|
||||
// and it would cause more harm than good to block jobs from opening in subsequent releases
|
||||
logger.warn(new ParameterizedMessage("[{}] ML annotations index could not be updated with latest mappings", jobId), e);
|
||||
ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
|
||||
AnomalyDetectorsIndex::resultsMapping, client, clusterState, resultsMappingUpdateHandler);
|
||||
}
|
||||
);
|
||||
|
||||
// Create the annotations index if necessary - this also updates the mappings if an old mapping is present
|
||||
AnnotationIndex.createAnnotationsIndexIfNecessary(client, clusterState, annotationsIndexUpdateHandler);
|
||||
}
|
||||
|
||||
private boolean createProcessAndSetRunning(ProcessContext processContext,
|
||||
Job job,
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.env.TestEnvironment;
|
|||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
|
||||
|
@ -183,6 +184,18 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
.build())
|
||||
.putAlias(AliasMetadata.builder(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).build())
|
||||
.build())
|
||||
.fPut(
|
||||
AnnotationIndex.INDEX_NAME,
|
||||
IndexMetadata.builder(AnnotationIndex.INDEX_NAME)
|
||||
.settings(
|
||||
Settings.builder()
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.build())
|
||||
.putAlias(AliasMetadata.builder(AnnotationIndex.READ_ALIAS_NAME).build())
|
||||
.putAlias(AliasMetadata.builder(AnnotationIndex.WRITE_ALIAS_NAME).build())
|
||||
.build())
|
||||
.build())
|
||||
.build();
|
||||
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
|
||||
|
|
|
@ -147,3 +147,14 @@ setup:
|
|||
job_id: old-cluster-function-shortcut-expansion
|
||||
- match: { count: 1 }
|
||||
- match: { jobs.0.analysis_config.detectors.0.function: "non_zero_count" }
|
||||
|
||||
---
|
||||
"Test annotation index mappings":
|
||||
|
||||
- do:
|
||||
indices.get_mapping:
|
||||
index: .ml-annotations-write
|
||||
|
||||
- match: { \.ml-annotations-6.mappings.properties.type.type: "keyword" }
|
||||
- match: { \.ml-annotations-6.mappings.properties.event.type: "keyword" }
|
||||
- match: { \.ml-annotations-6.mappings.properties.detector_index.type: "integer" }
|
||||
|
|
Loading…
Reference in New Issue