[ML] Update ML results mappings on process start (#37706)
This change moves the update to the results index mappings from the open job action to the code that starts the autodetect process. When a rolling upgrade is performed we need to update the mappings for already-open jobs that are reassigned from an old version node to a new version node, but the open job action is not called in this case. Closes #37607
This commit is contained in:
parent
b3f9becf5f
commit
7b3dd3022d
|
@ -5,8 +5,24 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ml.job.persistence;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.common.CheckedSupplier;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.plugins.MapperPlugin;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
|
||||
|
@ -38,10 +54,16 @@ import org.elasticsearch.xpack.core.ml.job.results.Result;
|
|||
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||
|
||||
/**
|
||||
* Static methods to create Elasticsearch index mappings for the autodetect
|
||||
|
@ -107,6 +129,8 @@ public class ElasticsearchMappings {
|
|||
|
||||
static final String RAW = "raw";
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ElasticsearchMappings.class);
|
||||
|
||||
private ElasticsearchMappings() {
|
||||
}
|
||||
|
||||
|
@ -964,4 +988,94 @@ public class ElasticsearchMappings {
|
|||
.endObject()
|
||||
.endObject();
|
||||
}
|
||||
|
||||
static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion) throws IOException {
|
||||
List<String> indicesToUpdate = new ArrayList<>();
|
||||
|
||||
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> currentMapping = state.metaData().findMappings(concreteIndices,
|
||||
new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER);
|
||||
|
||||
for (String index : concreteIndices) {
|
||||
ImmutableOpenMap<String, MappingMetaData> innerMap = currentMapping.get(index);
|
||||
if (innerMap != null) {
|
||||
MappingMetaData metaData = innerMap.get(DOC_TYPE);
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> meta = (Map<String, Object>) metaData.sourceAsMap().get("_meta");
|
||||
if (meta != null) {
|
||||
String versionString = (String) meta.get("version");
|
||||
if (versionString == null) {
|
||||
logger.info("Version of mappings for [{}] not found, recreating", index);
|
||||
indicesToUpdate.add(index);
|
||||
continue;
|
||||
}
|
||||
|
||||
Version mappingVersion = Version.fromString(versionString);
|
||||
|
||||
if (mappingVersion.onOrAfter(minVersion)) {
|
||||
continue;
|
||||
} else {
|
||||
logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT);
|
||||
indicesToUpdate.add(index);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
logger.info("Version of mappings for [{}] not found, recreating", index);
|
||||
indicesToUpdate.add(index);
|
||||
continue;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e);
|
||||
indicesToUpdate.add(index);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
logger.info("No mappings found for [{}], recreating", index);
|
||||
indicesToUpdate.add(index);
|
||||
}
|
||||
}
|
||||
return indicesToUpdate.toArray(new String[indicesToUpdate.size()]);
|
||||
}
|
||||
|
||||
public static void addDocMappingIfMissing(String alias, CheckedSupplier<XContentBuilder, IOException> mappingSupplier,
|
||||
Client client, ClusterState state, ActionListener<Boolean> listener) {
|
||||
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
|
||||
if (aliasOrIndex == null) {
|
||||
// The index has never been created yet
|
||||
listener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName)
|
||||
.toArray(String[]::new);
|
||||
|
||||
String[] indicesThatRequireAnUpdate;
|
||||
try {
|
||||
indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT);
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
|
||||
if (indicesThatRequireAnUpdate.length > 0) {
|
||||
try (XContentBuilder mapping = mappingSupplier.get()) {
|
||||
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
|
||||
putMappingRequest.type(DOC_TYPE);
|
||||
putMappingRequest.source(mapping);
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest,
|
||||
ActionListener.wrap(response -> {
|
||||
if (response.isAcknowledged()) {
|
||||
listener.onResponse(true);
|
||||
} else {
|
||||
listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices "
|
||||
+ Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged"));
|
||||
}
|
||||
}, listener::onFailure));
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
} else {
|
||||
logger.trace("Mappings are up to date.");
|
||||
listener.onResponse(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,10 +9,18 @@ import com.fasterxml.jackson.core.JsonFactory;
|
|||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
|
||||
|
@ -30,6 +38,8 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -128,6 +138,96 @@ public class ElasticsearchMappingsTests extends ESTestCase {
|
|||
assertNull(instanceMapping);
|
||||
}
|
||||
|
||||
|
||||
public void testMappingRequiresUpdateNoMapping() throws IOException {
|
||||
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
|
||||
ClusterState cs = csBuilder.build();
|
||||
String[] indices = new String[] { "no_index" };
|
||||
|
||||
assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNullMapping() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null));
|
||||
String[] indices = new String[] { "null_index" };
|
||||
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNoVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD"));
|
||||
String[] indices = new String[] { "no_version_field" };
|
||||
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateRecentMappingVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString()));
|
||||
String[] indices = new String[] { "version_current" };
|
||||
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(
|
||||
Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0")));
|
||||
String[] indices = new String[] { "version_nested" };
|
||||
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateBogusMappingVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0"));
|
||||
String[] indices = new String[] { "version_bogus" };
|
||||
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNewerMappingVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT));
|
||||
String[] indices = new String[] { "version_newer" };
|
||||
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion()));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT));
|
||||
String[] indices = new String[] { "version_newer_minor" };
|
||||
assertArrayEquals(new String[] {},
|
||||
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion()));
|
||||
}
|
||||
|
||||
|
||||
private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
|
||||
for (Map.Entry<String, Object> entry : namesAndVersions.entrySet()) {
|
||||
|
||||
String indexName = entry.getKey();
|
||||
Object version = entry.getValue();
|
||||
|
||||
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
|
||||
indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
|
||||
Map<String, Object> mapping = new HashMap<>();
|
||||
Map<String, Object> properties = new HashMap<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
properties.put("field" + i, Collections.singletonMap("type", "string"));
|
||||
}
|
||||
mapping.put("properties", properties);
|
||||
|
||||
Map<String, Object> meta = new HashMap<>();
|
||||
if (version != null && version.equals("NO_VERSION_FIELD") == false) {
|
||||
meta.put("version", version);
|
||||
}
|
||||
mapping.put("_meta", meta);
|
||||
|
||||
indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping));
|
||||
|
||||
metaDataBuilder.put(indexMetaData);
|
||||
}
|
||||
MetaData metaData = metaDataBuilder.build();
|
||||
|
||||
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
|
||||
csBuilder.metaData(metaData);
|
||||
return csBuilder.build();
|
||||
}
|
||||
|
||||
private Set<String> collectResultsDocFieldNames() throws IOException {
|
||||
// Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here.
|
||||
return collectFieldNames(ElasticsearchMappings.resultsMapping());
|
||||
|
|
|
@ -7,14 +7,10 @@ package org.elasticsearch.xpack.ml.action;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
|
@ -23,21 +19,14 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedSupplier;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
||||
|
@ -45,7 +34,6 @@ import org.elasticsearch.persistent.PersistentTaskState;
|
|||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.plugins.MapperPlugin;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -69,9 +57,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
|
|||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
@ -405,54 +391,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
return job.getAnalysisConfig().getDetectors().stream().anyMatch(d -> d.getRules().isEmpty() == false);
|
||||
}
|
||||
|
||||
static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion,
|
||||
Logger logger) throws IOException {
|
||||
List<String> indicesToUpdate = new ArrayList<>();
|
||||
|
||||
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> currentMapping = state.metaData().findMappings(concreteIndices,
|
||||
new String[] { ElasticsearchMappings.DOC_TYPE }, MapperPlugin.NOOP_FIELD_FILTER);
|
||||
|
||||
for (String index : concreteIndices) {
|
||||
ImmutableOpenMap<String, MappingMetaData> innerMap = currentMapping.get(index);
|
||||
if (innerMap != null) {
|
||||
MappingMetaData metaData = innerMap.get(ElasticsearchMappings.DOC_TYPE);
|
||||
try {
|
||||
Map<String, Object> meta = (Map<String, Object>) metaData.sourceAsMap().get("_meta");
|
||||
if (meta != null) {
|
||||
String versionString = (String) meta.get("version");
|
||||
if (versionString == null) {
|
||||
logger.info("Version of mappings for [{}] not found, recreating", index);
|
||||
indicesToUpdate.add(index);
|
||||
continue;
|
||||
}
|
||||
|
||||
Version mappingVersion = Version.fromString(versionString);
|
||||
|
||||
if (mappingVersion.onOrAfter(minVersion)) {
|
||||
continue;
|
||||
} else {
|
||||
logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT);
|
||||
indicesToUpdate.add(index);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
logger.info("Version of mappings for [{}] not found, recreating", index);
|
||||
indicesToUpdate.add(index);
|
||||
continue;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e);
|
||||
indicesToUpdate.add(index);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
logger.info("No mappings found for [{}], recreating", index);
|
||||
indicesToUpdate.add(index);
|
||||
}
|
||||
}
|
||||
return indicesToUpdate.toArray(new String[indicesToUpdate.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
// This api doesn't do heavy or blocking operations (just delegates PersistentTasksService),
|
||||
|
@ -527,25 +465,18 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
);
|
||||
|
||||
// Try adding state doc mapping
|
||||
ActionListener<Boolean> resultsPutMappingHandler = ActionListener.wrap(
|
||||
ActionListener<Void> getJobHandler = ActionListener.wrap(
|
||||
response -> {
|
||||
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping,
|
||||
state, jobUpdateListener);
|
||||
ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
|
||||
ElasticsearchMappings::stateMapping, client, state, jobUpdateListener);
|
||||
}, listener::onFailure
|
||||
);
|
||||
|
||||
// Get the job config
|
||||
jobConfigProvider.getJob(jobParams.getJobId(), ActionListener.wrap(
|
||||
builder -> {
|
||||
try {
|
||||
jobParams.setJob(builder.build());
|
||||
|
||||
// Try adding results doc mapping
|
||||
addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()),
|
||||
ElasticsearchMappings::resultsMapping, state, resultsPutMappingHandler);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
jobParams.setJob(builder.build());
|
||||
getJobHandler.onResponse(null);
|
||||
},
|
||||
listener::onFailure
|
||||
));
|
||||
|
@ -620,48 +551,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
);
|
||||
}
|
||||
|
||||
private void addDocMappingIfMissing(String alias, CheckedSupplier<XContentBuilder, IOException> mappingSupplier, ClusterState state,
|
||||
ActionListener<Boolean> listener) {
|
||||
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
|
||||
if (aliasOrIndex == null) {
|
||||
// The index has never been created yet
|
||||
listener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName)
|
||||
.toArray(String[]::new);
|
||||
|
||||
String[] indicesThatRequireAnUpdate;
|
||||
try {
|
||||
indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT, logger);
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
|
||||
if (indicesThatRequireAnUpdate.length > 0) {
|
||||
try (XContentBuilder mapping = mappingSupplier.get()) {
|
||||
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
|
||||
putMappingRequest.type(ElasticsearchMappings.DOC_TYPE);
|
||||
putMappingRequest.source(mapping);
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest,
|
||||
ActionListener.wrap(response -> {
|
||||
if (response.isAcknowledged()) {
|
||||
listener.onResponse(true);
|
||||
} else {
|
||||
listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices "
|
||||
+ Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged"));
|
||||
}
|
||||
}, listener::onFailure));
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
} else {
|
||||
logger.trace("Mappings are uptodate.");
|
||||
listener.onResponse(true);
|
||||
}
|
||||
}
|
||||
|
||||
public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecutor<OpenJobAction.JobParams> {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(OpenJobPersistentTasksExecutor.class);
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
|
||||
|
@ -417,7 +418,9 @@ public class AutodetectProcessManager {
|
|||
public void openJob(JobTask jobTask, ClusterState clusterState, Consumer<Exception> closeHandler) {
|
||||
String jobId = jobTask.getJobId();
|
||||
logger.info("Opening job [{}]", jobId);
|
||||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, ActionListener.wrap(
|
||||
|
||||
// Start the process
|
||||
ActionListener<Boolean> stateAliasHandler = ActionListener.wrap(
|
||||
r -> {
|
||||
jobManager.getJob(jobId, ActionListener.wrap(
|
||||
job -> {
|
||||
|
@ -427,7 +430,6 @@ public class AutodetectProcessManager {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask));
|
||||
jobResultsProvider.getAutodetectParams(job, params -> {
|
||||
// We need to fork, otherwise we restore model state from a network thread (several GET api calls):
|
||||
|
@ -477,7 +479,17 @@ public class AutodetectProcessManager {
|
|||
closeHandler
|
||||
));
|
||||
},
|
||||
closeHandler));
|
||||
closeHandler);
|
||||
|
||||
// Make sure the state index and alias exist
|
||||
ActionListener<Boolean> resultsMappingUpdateHandler = ActionListener.wrap(
|
||||
ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, stateAliasHandler),
|
||||
closeHandler
|
||||
);
|
||||
|
||||
// Try adding the results doc mapping - this updates to the latest version if an old mapping is present
|
||||
ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
|
||||
ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler);
|
||||
}
|
||||
|
||||
private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer<Exception> handler) {
|
||||
|
|
|
@ -15,7 +15,6 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -38,7 +37,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
|||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
|
||||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
||||
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
||||
|
@ -53,7 +51,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Operator;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
|
@ -61,7 +58,6 @@ import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
|
|||
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -486,59 +482,6 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
assertEquals(indexToRemove, result.get(0));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNoMapping() throws IOException {
|
||||
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
|
||||
ClusterState cs = csBuilder.build();
|
||||
String[] indices = new String[] { "no_index" };
|
||||
|
||||
assertArrayEquals(new String[] { "no_index" }, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNullMapping() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null));
|
||||
String[] indices = new String[] { "null_index" };
|
||||
assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNoVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD"));
|
||||
String[] indices = new String[] { "no_version_field" };
|
||||
assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateRecentMappingVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString()));
|
||||
String[] indices = new String[] { "version_current" };
|
||||
assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(
|
||||
Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0")));
|
||||
String[] indices = new String[] { "version_nested" };
|
||||
assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateBogusMappingVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0"));
|
||||
String[] indices = new String[] { "version_bogus" };
|
||||
assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNewerMappingVersion() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT));
|
||||
String[] indices = new String[] { "version_newer" };
|
||||
assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion(),
|
||||
logger));
|
||||
}
|
||||
|
||||
public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException {
|
||||
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT));
|
||||
String[] indices = new String[] { "version_newer_minor" };
|
||||
assertArrayEquals(new String[] {},
|
||||
TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion(), logger));
|
||||
}
|
||||
|
||||
public void testNodeNameAndVersion() {
|
||||
TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300);
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
|
@ -641,42 +584,6 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
|
||||
for (Map.Entry<String, Object> entry : namesAndVersions.entrySet()) {
|
||||
|
||||
String indexName = entry.getKey();
|
||||
Object version = entry.getValue();
|
||||
|
||||
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
|
||||
indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
|
||||
Map<String, Object> mapping = new HashMap<>();
|
||||
Map<String, Object> properties = new HashMap<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
properties.put("field" + i, Collections.singletonMap("type", "string"));
|
||||
}
|
||||
mapping.put("properties", properties);
|
||||
|
||||
Map<String, Object> meta = new HashMap<>();
|
||||
if (version != null && version.equals("NO_VERSION_FIELD") == false) {
|
||||
meta.put("version", version);
|
||||
}
|
||||
mapping.put("_meta", meta);
|
||||
|
||||
indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping));
|
||||
|
||||
metaDataBuilder.put(indexMetaData);
|
||||
}
|
||||
MetaData metaData = metaDataBuilder.build();
|
||||
|
||||
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
|
||||
csBuilder.metaData(metaData);
|
||||
return csBuilder.build();
|
||||
}
|
||||
|
||||
private static Job jobWithRules(String jobId) {
|
||||
DetectionRule rule = new DetectionRule.Builder(Collections.singletonList(
|
||||
new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0)
|
||||
|
|
|
@ -141,6 +141,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap);
|
||||
clusterState = mock(ClusterState.class);
|
||||
when(clusterState.getMetaData()).thenReturn(metaData);
|
||||
when(clusterState.metaData()).thenReturn(metaData);
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.upgrades;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.client.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.client.ml.job.config.Detector;
|
||||
import org.elasticsearch.client.ml.job.config.Job;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.test.rest.XPackRestTestHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class MlMappingsUpgradeIT extends AbstractUpgradeTestCase {
|
||||
|
||||
private static final String JOB_ID = "ml-mappings-upgrade-job";
|
||||
|
||||
@Override
|
||||
protected Collection<String> templatesToWaitFor() {
|
||||
return Stream.concat(XPackRestTestHelper.ML_POST_V660_TEMPLATES.stream(),
|
||||
super.templatesToWaitFor().stream()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results
|
||||
* index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
|
||||
*/
|
||||
public void testMappingsUpgrade() throws Exception {
|
||||
|
||||
switch (CLUSTER_TYPE) {
|
||||
case OLD:
|
||||
createAndOpenTestJob();
|
||||
break;
|
||||
case MIXED:
|
||||
// We don't know whether the job is on an old or upgraded node, so cannot assert that the mappings have been upgraded
|
||||
break;
|
||||
case UPGRADED:
|
||||
assertUpgradedMappings();
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private void createAndOpenTestJob() throws IOException {
|
||||
|
||||
Detector.Builder d = new Detector.Builder("metric", "responsetime");
|
||||
d.setByFieldName("airline");
|
||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
|
||||
analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10));
|
||||
Job.Builder job = new Job.Builder(JOB_ID);
|
||||
job.setAnalysisConfig(analysisConfig);
|
||||
job.setDataDescription(new DataDescription.Builder());
|
||||
|
||||
Request putJob = new Request("PUT", "_ml/anomaly_detectors/" + JOB_ID);
|
||||
putJob.setJsonEntity(Strings.toString(job.build()));
|
||||
Response response = client().performRequest(putJob);
|
||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||
|
||||
Request openJob = new Request("POST", "_ml/anomaly_detectors/" + JOB_ID + "/_open");
|
||||
response = client().performRequest(openJob);
|
||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void assertUpgradedMappings() throws Exception {
|
||||
|
||||
assertBusy(() -> {
|
||||
Request getMappings = new Request("GET", AnomalyDetectorsIndex.resultsWriteAlias(JOB_ID) + "/_mappings");
|
||||
Response response = client().performRequest(getMappings);
|
||||
|
||||
Map<String, Object> responseLevel = entityAsMap(response);
|
||||
assertNotNull(responseLevel);
|
||||
Map<String, Object> indexLevel = (Map<String, Object>) responseLevel.get(".ml-anomalies-shared");
|
||||
assertNotNull(indexLevel);
|
||||
Map<String, Object> mappingsLevel = (Map<String, Object>) indexLevel.get("mappings");
|
||||
assertNotNull(mappingsLevel);
|
||||
Map<String, Object> metaLevel = (Map<String, Object>) mappingsLevel.get("_meta");
|
||||
assertEquals(Collections.singletonMap("version", Version.CURRENT.toString()), metaLevel);
|
||||
Map<String, Object> propertiesLevel = (Map<String, Object>) mappingsLevel.get("properties");
|
||||
assertNotNull(propertiesLevel);
|
||||
// TODO: as the years go by, the field we assert on here should be changed
|
||||
// to the most recent field we've added that is NOT of type "keyword"
|
||||
Map<String, Object> fieldLevel = (Map<String, Object>) propertiesLevel.get("multi_bucket_impact");
|
||||
assertEquals(Collections.singletonMap("type", "double"), fieldLevel);
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue