From 261d65a9c4aada72254b1d84f35d14ab195f3052 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 11 Jul 2017 07:44:41 +0200 Subject: [PATCH] [ML] add meta data field into mappings and update procedure (elastic/x-pack-elasticsearch#1925) Adds a update procedure for ML index mappings in order to allow adding new fields. The ES version is stored in the "_meta" field of the ML mappings which then get applied to every index. When opening a job, this version is checked on the (shared) index and a mapping update is performed in case that the version is older than current. Original commit: elastic/x-pack-elasticsearch@211608c7adb6d1bccacb9142e1ab94657092a8c9 --- .../xpack/ml/action/OpenJobAction.java | 60 ++++++++++- .../persistence/ElasticsearchMappings.java | 30 ++++-- .../xpack/ml/action/OpenJobActionTests.java | 99 +++++++++++++++++++ 3 files changed, 180 insertions(+), 9 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index b14d9745494..b19437cdbda 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.action; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; @@ -27,12 +28,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -491,9 +494,12 @@ public class OpenJobAction extends Action 0) { try (XContentBuilder mapping = mappingSupplier.get()) { - PutMappingRequest putMappingRequest = new PutMappingRequest(concreteIndices); + PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); putMappingRequest.type(ElasticsearchMappings.DOC_TYPE); putMappingRequest.source(mapping); client.execute(PutMappingAction.INSTANCE, putMappingRequest, ActionListener.wrap( @@ -502,13 +508,14 @@ public class OpenJobAction extends Action indicesToUpdate = new ArrayList<>(); + + ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, + new String[] { ElasticsearchMappings.DOC_TYPE }); + + for (String index : concreteIndices) { + ImmutableOpenMap innerMap = currentMapping.get(index); + if (innerMap != null) { + MappingMetaData metaData = innerMap.get(ElasticsearchMappings.DOC_TYPE); + try { + Map meta = (Map) metaData.sourceAsMap().get("_meta"); + if (meta != null) { + String versionString = (String) meta.get("version"); + if (versionString == null) { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + + Version mappingVersion = Version.fromString(versionString); + + if (mappingVersion.equals(Version.CURRENT)) { + continue; + } else { + logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + } catch (Exception e) { + logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("No mappings found for [{}], recreating", index); + indicesToUpdate.add(index); + } + } + return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index a3a83aeb15d..8de44e59cd5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.elasticsearch.Version; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; @@ -111,10 +112,24 @@ public class ElasticsearchMappings { .endArray(); } + /** + * Inserts "_meta" containing useful information like the version into the mapping + * template. + * + * @param builder The builder for the mappings + * @throws IOException On write error + */ + public static void addMetaInformation(XContentBuilder builder) throws IOException { + builder.startObject("_meta") + .field("version", Version.CURRENT) + .endObject(); + } + public static XContentBuilder docMapping() throws IOException { XContentBuilder builder = jsonBuilder(); builder.startObject(); builder.startObject(DOC_TYPE); + addMetaInformation(builder); addDefaultMapping(builder); builder.startObject(PROPERTIES); @@ -523,12 +538,15 @@ public class ElasticsearchMappings { * by knowing the ID of a particular document. */ public static XContentBuilder stateMapping() throws IOException { - return jsonBuilder() - .startObject() - .startObject(DOC_TYPE) - .field(ENABLED, false) - .endObject() - .endObject(); + XContentBuilder builder = jsonBuilder(); + builder.startObject(); + builder.startObject(DOC_TYPE); + addMetaInformation(builder); + builder.field(ENABLED, false); + builder.endObject(); + builder.endObject(); + + return builder; } /** diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index ed7cbdf0319..972a78908ac 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -33,11 +34,13 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; +import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -339,6 +342,66 @@ public class OpenJobActionTests extends ESTestCase { assertEquals(indexToRemove, result.get(0)); } + public void testMappingRequiresUpdateNoMapping() throws IOException { + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + ClusterState cs = csBuilder.build(); + String[] indices = new String[] { "no_index" }; + + assertArrayEquals(new String[] { "no_index" }, OpenJobAction.mappingRequiresUpdate(cs, indices, logger)); + } + + public void testMappingRequiresUpdateNullMapping() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); + String[] indices = new String[] { "null_index" }; + assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger)); + } + + public void testMappingRequiresUpdateNoVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); + String[] indices = new String[] { "no_version_field" }; + assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger)); + } + + public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); + String[] indices = new String[] { "version_current" }; + assertArrayEquals(new String[] {}, OpenJobAction.mappingRequiresUpdate(cs, indices, logger)); + } + + public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData( + Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); + String[] indices = new String[] { "version_nested" }; + assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger)); + } + + public void testMappingRequiresUpdateOldMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_54", Version.V_5_4_0.toString())); + String[] indices = new String[] { "version_54" }; + assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger)); + } + + public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); + String[] indices = new String[] { "version_bogus" }; + assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger)); + } + + public void testMappingRequiresUpdateSomeVersionMix() throws IOException { + Map versionMix = new HashMap(); + versionMix.put("version_54", Version.V_5_4_0); + versionMix.put("version_current", Version.CURRENT); + versionMix.put("version_null", null); + versionMix.put("version_current2", Version.CURRENT); + versionMix.put("version_bogus", "0.0.0"); + versionMix.put("version_current3", Version.CURRENT); + versionMix.put("version_bogus2", "0.0.0"); + + ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix); + String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" }; + assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger)); + } + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId), new Assignment(nodeId, "test assignment")); @@ -384,4 +447,40 @@ public class OpenJobActionTests extends ESTestCase { metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()); } + private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { + MetaData.Builder metaDataBuilder = MetaData.builder(); + + for (Map.Entry entry : namesAndVersions.entrySet()) { + + String indexName = entry.getKey(); + Object version = entry.getValue(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + + Map mapping = new HashMap<>(); + Map properties = new HashMap<>(); + for (int i = 0; i < 10; i++) { + properties.put("field" + i, Collections.singletonMap("type", "string")); + } + mapping.put("properties", properties); + + Map meta = new HashMap<>(); + if (version != null && version.equals("NO_VERSION_FIELD") == false) { + meta.put("version", version); + } + mapping.put("_meta", meta); + + indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); + + metaDataBuilder.put(indexMetaData); + } + MetaData metaData = metaDataBuilder.build(); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.metaData(metaData); + return csBuilder.build(); + } + }