[ML] add meta data field into mappings and update procedure (elastic/x-pack-elasticsearch#1925)
Adds a update procedure for ML index mappings in order to allow adding new fields. The ES version is stored in the "_meta" field of the ML mappings which then get applied to every index. When opening a job, this version is checked on the (shared) index and a mapping update is performed in case that the version is older than current. Original commit: elastic/x-pack-elasticsearch@211608c7ad
This commit is contained in:
parent
14c12cfcde
commit
261d65a9c4
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.ml.action;
|
package org.elasticsearch.xpack.ml.action;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchStatusException;
|
import org.elasticsearch.ElasticsearchStatusException;
|
||||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||||
|
@ -27,12 +28,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.CheckedSupplier;
|
import org.elasticsearch.common.CheckedSupplier;
|
||||||
import org.elasticsearch.common.ParseField;
|
import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
@ -491,9 +494,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||||
}
|
}
|
||||||
String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName)
|
String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName)
|
||||||
.toArray(String[]::new);
|
.toArray(String[]::new);
|
||||||
if (state.metaData().findMappings(concreteIndices, new String[] {ElasticsearchMappings.DOC_TYPE}).isEmpty()) {
|
|
||||||
|
String[] indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, logger);
|
||||||
|
|
||||||
|
if (indicesThatRequireAnUpdate.length > 0) {
|
||||||
try (XContentBuilder mapping = mappingSupplier.get()) {
|
try (XContentBuilder mapping = mappingSupplier.get()) {
|
||||||
PutMappingRequest putMappingRequest = new PutMappingRequest(concreteIndices);
|
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
|
||||||
putMappingRequest.type(ElasticsearchMappings.DOC_TYPE);
|
putMappingRequest.type(ElasticsearchMappings.DOC_TYPE);
|
||||||
putMappingRequest.source(mapping);
|
putMappingRequest.source(mapping);
|
||||||
client.execute(PutMappingAction.INSTANCE, putMappingRequest, ActionListener.wrap(
|
client.execute(PutMappingAction.INSTANCE, putMappingRequest, ActionListener.wrap(
|
||||||
|
@ -502,13 +508,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||||
listener.onResponse(true);
|
listener.onResponse(true);
|
||||||
} else {
|
} else {
|
||||||
listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices "
|
listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices "
|
||||||
+ Arrays.toString(concreteIndices) + " was not acknowledged"));
|
+ Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged"));
|
||||||
}
|
}
|
||||||
}, listener::onFailure));
|
}, listener::onFailure));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
logger.trace("Mappings are uptodate.");
|
||||||
listener.onResponse(true);
|
listener.onResponse(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -747,4 +754,51 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||||
static boolean nodeSupportsJobVersion(Version nodeVersion, Version jobVersion) {
|
static boolean nodeSupportsJobVersion(Version nodeVersion, Version jobVersion) {
|
||||||
return nodeVersion.onOrAfter(Version.V_5_5_0);
|
return nodeVersion.onOrAfter(Version.V_5_5_0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Logger logger) {
|
||||||
|
List<String> indicesToUpdate = new ArrayList<>();
|
||||||
|
|
||||||
|
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> currentMapping = state.metaData().findMappings(concreteIndices,
|
||||||
|
new String[] { ElasticsearchMappings.DOC_TYPE });
|
||||||
|
|
||||||
|
for (String index : concreteIndices) {
|
||||||
|
ImmutableOpenMap<String, MappingMetaData> innerMap = currentMapping.get(index);
|
||||||
|
if (innerMap != null) {
|
||||||
|
MappingMetaData metaData = innerMap.get(ElasticsearchMappings.DOC_TYPE);
|
||||||
|
try {
|
||||||
|
Map<String, Object> meta = (Map<String, Object>) metaData.sourceAsMap().get("_meta");
|
||||||
|
if (meta != null) {
|
||||||
|
String versionString = (String) meta.get("version");
|
||||||
|
if (versionString == null) {
|
||||||
|
logger.info("Version of mappings for [{}] not found, recreating", index);
|
||||||
|
indicesToUpdate.add(index);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Version mappingVersion = Version.fromString(versionString);
|
||||||
|
|
||||||
|
if (mappingVersion.equals(Version.CURRENT)) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT);
|
||||||
|
indicesToUpdate.add(index);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.info("Version of mappings for [{}] not found, recreating", index);
|
||||||
|
indicesToUpdate.add(index);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e);
|
||||||
|
indicesToUpdate.add(index);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.info("No mappings found for [{}], recreating", index);
|
||||||
|
indicesToUpdate.add(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return indicesToUpdate.toArray(new String[indicesToUpdate.size()]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml.job.persistence;
|
package org.elasticsearch.xpack.ml.job.persistence;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||||
|
@ -111,10 +112,24 @@ public class ElasticsearchMappings {
|
||||||
.endArray();
|
.endArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inserts "_meta" containing useful information like the version into the mapping
|
||||||
|
* template.
|
||||||
|
*
|
||||||
|
* @param builder The builder for the mappings
|
||||||
|
* @throws IOException On write error
|
||||||
|
*/
|
||||||
|
public static void addMetaInformation(XContentBuilder builder) throws IOException {
|
||||||
|
builder.startObject("_meta")
|
||||||
|
.field("version", Version.CURRENT)
|
||||||
|
.endObject();
|
||||||
|
}
|
||||||
|
|
||||||
public static XContentBuilder docMapping() throws IOException {
|
public static XContentBuilder docMapping() throws IOException {
|
||||||
XContentBuilder builder = jsonBuilder();
|
XContentBuilder builder = jsonBuilder();
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
builder.startObject(DOC_TYPE);
|
builder.startObject(DOC_TYPE);
|
||||||
|
addMetaInformation(builder);
|
||||||
addDefaultMapping(builder);
|
addDefaultMapping(builder);
|
||||||
builder.startObject(PROPERTIES);
|
builder.startObject(PROPERTIES);
|
||||||
|
|
||||||
|
@ -523,12 +538,15 @@ public class ElasticsearchMappings {
|
||||||
* by knowing the ID of a particular document.
|
* by knowing the ID of a particular document.
|
||||||
*/
|
*/
|
||||||
public static XContentBuilder stateMapping() throws IOException {
|
public static XContentBuilder stateMapping() throws IOException {
|
||||||
return jsonBuilder()
|
XContentBuilder builder = jsonBuilder();
|
||||||
.startObject()
|
builder.startObject();
|
||||||
.startObject(DOC_TYPE)
|
builder.startObject(DOC_TYPE);
|
||||||
.field(ENABLED, false)
|
addMetaInformation(builder);
|
||||||
.endObject()
|
builder.field(ENABLED, false);
|
||||||
.endObject();
|
builder.endObject();
|
||||||
|
builder.endObject();
|
||||||
|
|
||||||
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
@ -33,11 +34,13 @@ import org.elasticsearch.xpack.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||||
|
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
|
||||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||||
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -339,6 +342,66 @@ public class OpenJobActionTests extends ESTestCase {
|
||||||
assertEquals(indexToRemove, result.get(0));
|
assertEquals(indexToRemove, result.get(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMappingRequiresUpdateNoMapping() throws IOException {
|
||||||
|
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
|
||||||
|
ClusterState cs = csBuilder.build();
|
||||||
|
String[] indices = new String[] { "no_index" };
|
||||||
|
|
||||||
|
assertArrayEquals(new String[] { "no_index" }, OpenJobAction.mappingRequiresUpdate(cs, indices, logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMappingRequiresUpdateNullMapping() throws IOException {
|
||||||
|
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null));
|
||||||
|
String[] indices = new String[] { "null_index" };
|
||||||
|
assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMappingRequiresUpdateNoVersion() throws IOException {
|
||||||
|
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD"));
|
||||||
|
String[] indices = new String[] { "no_version_field" };
|
||||||
|
assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMappingRequiresUpdateRecentMappingVersion() throws IOException {
|
||||||
|
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString()));
|
||||||
|
String[] indices = new String[] { "version_current" };
|
||||||
|
assertArrayEquals(new String[] {}, OpenJobAction.mappingRequiresUpdate(cs, indices, logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException {
|
||||||
|
ClusterState cs = getClusterStateWithMappingsWithMetaData(
|
||||||
|
Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0")));
|
||||||
|
String[] indices = new String[] { "version_nested" };
|
||||||
|
assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMappingRequiresUpdateOldMappingVersion() throws IOException {
|
||||||
|
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_54", Version.V_5_4_0.toString()));
|
||||||
|
String[] indices = new String[] { "version_54" };
|
||||||
|
assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMappingRequiresUpdateBogusMappingVersion() throws IOException {
|
||||||
|
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0"));
|
||||||
|
String[] indices = new String[] { "version_bogus" };
|
||||||
|
assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMappingRequiresUpdateSomeVersionMix() throws IOException {
|
||||||
|
Map<String, Object> versionMix = new HashMap<String, Object>();
|
||||||
|
versionMix.put("version_54", Version.V_5_4_0);
|
||||||
|
versionMix.put("version_current", Version.CURRENT);
|
||||||
|
versionMix.put("version_null", null);
|
||||||
|
versionMix.put("version_current2", Version.CURRENT);
|
||||||
|
versionMix.put("version_bogus", "0.0.0");
|
||||||
|
versionMix.put("version_current3", Version.CURRENT);
|
||||||
|
versionMix.put("version_bogus2", "0.0.0");
|
||||||
|
|
||||||
|
ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix);
|
||||||
|
String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" };
|
||||||
|
assertArrayEquals(indices, OpenJobAction.mappingRequiresUpdate(cs, indices, logger));
|
||||||
|
}
|
||||||
|
|
||||||
public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
|
public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
|
||||||
builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId),
|
builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId),
|
||||||
new Assignment(nodeId, "test assignment"));
|
new Assignment(nodeId, "test assignment"));
|
||||||
|
@ -384,4 +447,40 @@ public class OpenJobActionTests extends ESTestCase {
|
||||||
metaData.putCustom(MlMetadata.TYPE, mlMetadata.build());
|
metaData.putCustom(MlMetadata.TYPE, mlMetadata.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
|
||||||
|
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||||
|
|
||||||
|
for (Map.Entry<String, Object> entry : namesAndVersions.entrySet()) {
|
||||||
|
|
||||||
|
String indexName = entry.getKey();
|
||||||
|
Object version = entry.getValue();
|
||||||
|
|
||||||
|
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
|
||||||
|
indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||||
|
|
||||||
|
Map<String, Object> mapping = new HashMap<>();
|
||||||
|
Map<String, Object> properties = new HashMap<>();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
properties.put("field" + i, Collections.singletonMap("type", "string"));
|
||||||
|
}
|
||||||
|
mapping.put("properties", properties);
|
||||||
|
|
||||||
|
Map<String, Object> meta = new HashMap<>();
|
||||||
|
if (version != null && version.equals("NO_VERSION_FIELD") == false) {
|
||||||
|
meta.put("version", version);
|
||||||
|
}
|
||||||
|
mapping.put("_meta", meta);
|
||||||
|
|
||||||
|
indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping));
|
||||||
|
|
||||||
|
metaDataBuilder.put(indexMetaData);
|
||||||
|
}
|
||||||
|
MetaData metaData = metaDataBuilder.build();
|
||||||
|
|
||||||
|
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
|
||||||
|
csBuilder.metaData(metaData);
|
||||||
|
return csBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue