From 79188ed38de72c3d2d1544bd837811b1b8b8bf69 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 1 Dec 2015 11:34:32 +0100 Subject: [PATCH] Treat mappings at an index-level feature. Today we try to have type-level granularity when dealing with mappings. This does not play well with the cross-type validations that we are adding. For instance we prevent the `_parent` field to point to an existing type. This validation would be skipped today in the case of dedicated master nodes, since those master nodes would only create the type that is being updated when updating a mapping. --- .../index/NodeMappingRefreshAction.java | 16 +--- .../metadata/MetaDataMappingService.java | 95 ++++++++----------- .../cluster/IndicesClusterStateService.java | 73 +++----------- 3 files changed, 52 insertions(+), 132 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java b/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java index d0eb29d6b22..f8507e5b689 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -57,7 +56,7 @@ public class NodeMappingRefreshAction extends AbstractComponent { public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefreshRequest request) { final DiscoveryNodes nodes = state.nodes(); if (nodes.masterNode() == null) { - logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types())); + logger.warn("can't send mapping refresh for [{}], no master known.", request.index()); return; } transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); @@ -67,7 +66,7 @@ public class NodeMappingRefreshAction extends AbstractComponent { @Override public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception { - metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types()); + metaDataMappingService.refreshMapping(request.index(), request.indexUUID()); channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -76,16 +75,14 @@ public class NodeMappingRefreshAction extends AbstractComponent { private String index; private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; - private String[] types; private String nodeId; public NodeMappingRefreshRequest() { } - public NodeMappingRefreshRequest(String index, String indexUUID, String[] types, String nodeId) { + public NodeMappingRefreshRequest(String index, String indexUUID, String nodeId) { this.index = index; this.indexUUID = indexUUID; - this.types = types; this.nodeId = nodeId; } @@ -107,11 +104,6 @@ public class NodeMappingRefreshAction extends AbstractComponent { return indexUUID; } - - public String[] types() { - return types; - } - public String nodeId() { return nodeId; } @@ -120,7 +112,6 @@ public class NodeMappingRefreshAction extends AbstractComponent { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(index); - out.writeStringArray(types); out.writeString(nodeId); out.writeString(indexUUID); } @@ -129,7 +120,6 @@ public class NodeMappingRefreshAction extends AbstractComponent { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); index = in.readString(); - types = in.readStringArray(); nodeId = in.readString(); indexUUID = in.readString(); } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index c2725359140..44de399bed4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -69,12 +69,10 @@ public class MetaDataMappingService extends AbstractComponent { static class RefreshTask { final String index; final String indexUUID; - final String[] types; - RefreshTask(String index, final String indexUUID, String[] types) { + RefreshTask(String index, final String indexUUID) { this.index = index; this.indexUUID = indexUUID; - this.types = types; } } @@ -120,13 +118,16 @@ public class MetaDataMappingService extends AbstractComponent { // the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep // the latest (based on order) update mapping one per node List allIndexTasks = entry.getValue(); - List tasks = new ArrayList<>(); + boolean hasTaskWithRightUUID = false; for (RefreshTask task : allIndexTasks) { - if (!indexMetaData.isSameUUID(task.indexUUID)) { + if (indexMetaData.isSameUUID(task.indexUUID)) { + hasTaskWithRightUUID = true; + } else { logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task); - continue; } - tasks.add(task); + } + if (hasTaskWithRightUUID == false) { + continue; } // construct the actual index if needed, and make sure the relevant mappings are there @@ -134,24 +135,17 @@ public class MetaDataMappingService extends AbstractComponent { IndexService indexService = indicesService.indexService(index); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge - indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST); + indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList()); removeIndex = true; - Set typesToIntroduce = new HashSet<>(); - for (RefreshTask task : tasks) { - Collections.addAll(typesToIntroduce, task.types); - } - for (String type : typesToIntroduce) { - // only add the current relevant mapping (if exists) - if (indexMetaData.getMappings().containsKey(type)) { - // don't apply the default mapping, it has been applied when the mapping was created - indexService.mapperService().merge(type, indexMetaData.getMappings().get(type).source(), false, true); - } + for (ObjectCursor metaData : indexMetaData.getMappings().values()) { + // don't apply the default mapping, it has been applied when the mapping was created + indexService.mapperService().merge(metaData.value.type(), metaData.value.source(), false, true); } } IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData); try { - boolean indexDirty = processIndexMappingTasks(tasks, indexService, builder); + boolean indexDirty = refreshIndexMapping(indexService, builder); if (indexDirty) { mdBuilder.put(builder); dirty = true; @@ -169,38 +163,28 @@ public class MetaDataMappingService extends AbstractComponent { return ClusterState.builder(currentState).metaData(mdBuilder).build(); } - private boolean processIndexMappingTasks(List tasks, IndexService indexService, IndexMetaData.Builder builder) { + private boolean refreshIndexMapping(IndexService indexService, IndexMetaData.Builder builder) { boolean dirty = false; String index = indexService.index().name(); - // keep track of what we already refreshed, no need to refresh it again... - Set processedRefreshes = new HashSet<>(); - for (RefreshTask refreshTask : tasks) { - try { - List updatedTypes = new ArrayList<>(); - for (String type : refreshTask.types) { - if (processedRefreshes.contains(type)) { - continue; - } - DocumentMapper mapper = indexService.mapperService().documentMapper(type); - if (mapper == null) { - continue; - } - if (!mapper.mappingSource().equals(builder.mapping(type).source())) { - updatedTypes.add(type); - builder.putMapping(new MappingMetaData(mapper)); - } - processedRefreshes.add(type); + try { + List updatedTypes = new ArrayList<>(); + for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) { + final String type = mapper.type(); + if (!mapper.mappingSource().equals(builder.mapping(type).source())) { + updatedTypes.add(type); } - - if (updatedTypes.isEmpty()) { - continue; - } - - logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); - dirty = true; - } catch (Throwable t) { - logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types); } + + // if a single type is not up-to-date, re-send everything + if (updatedTypes.isEmpty() == false) { + logger.warn("[{}] re-syncing mappings with cluster state because of types [{}]", index, updatedTypes); + dirty = true; + for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) { + builder.putMapping(new MappingMetaData(mapper)); + } + } + } catch (Throwable t) { + logger.warn("[{}] failed to refresh-mapping in cluster state", t, index); } return dirty; } @@ -208,9 +192,9 @@ public class MetaDataMappingService extends AbstractComponent { /** * Refreshes mappings if they are not the same between original and parsed version */ - public void refreshMapping(final String index, final String indexUUID, final String... types) { - final RefreshTask refreshTask = new RefreshTask(index, indexUUID, types); - clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", + public void refreshMapping(final String index, final String indexUUID) { + final RefreshTask refreshTask = new RefreshTask(index, indexUUID); + clusterService.submitStateUpdateTask("refresh-mapping [" + index + "]", refreshTask, ClusterStateTaskConfig.build(Priority.HIGH), refreshExecutor, @@ -236,18 +220,13 @@ public class MetaDataMappingService extends AbstractComponent { if (indicesService.hasIndex(index) == false) { indicesToClose.add(index); indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST); - // make sure to add custom default mapping if exists - if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) { - indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes()); + // add mappings for all types, we need them for cross-type validation + for (ObjectCursor mapping : indexMetaData.getMappings().values()) { + indexService.mapperService().merge(mapping.value.type(), mapping.value.source(), false, request.updateAllTypes()); } } else { indexService = indicesService.indexService(index); } - // only add the current relevant mapping (if exists and not yet added) - if (indexMetaData.getMappings().containsKey(request.type()) && - !indexService.mapperService().hasMapping(request.type())) { - indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes()); - } } } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index cd60b87765a..fc793a5dfda 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -349,7 +349,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent typesToRefresh = new ArrayList<>(); + boolean requireRefresh = false; String index = indexMetaData.getIndex(); IndexService indexService = indicesService.indexService(index); if (indexService == null) { @@ -358,31 +358,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent cursor : indexMetaData.getMappings().values()) { MappingMetaData mappingMd = cursor.value; String mappingType = mappingMd.type(); CompressedXContent mappingSource = mappingMd.source(); - if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first - continue; - } - boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource); - if (requireRefresh) { - typesToRefresh.add(mappingType); - } + requireRefresh |= processMapping(index, mapperService, mappingType, mappingSource); } - if (!typesToRefresh.isEmpty() && sendRefreshMapping) { + if (requireRefresh && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh(event.state(), new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.getIndexUUID(), - typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()) + event.state().nodes().localNodeId()) ); } } catch (Throwable t) { @@ -398,26 +384,21 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent(index, mappingType))) { - seenMappings.put(new Tuple<>(index, mappingType), true); - } - - // refresh mapping can happen for 2 reasons. The first is less urgent, and happens when the mapping on this - // node is ahead of what there is in the cluster state (yet an update-mapping has been sent to it already, - // it just hasn't been processed yet and published). Eventually, the mappings will converge, and the refresh - // mapping sent is more of a safe keeping (assuming the update mapping failed to reach the master, ...) - // the second case is where the parsing/merging of the mapping from the metadata doesn't result in the same + // refresh mapping can happen when the parsing/merging of the mapping from the metadata doesn't result in the same // mapping, in this case, we send to the master to refresh its own version of the mappings (to conform with the // merge version of it, which it does when refreshing the mappings), and warn log it. boolean requiresRefresh = false; try { - if (!mapperService.hasMapping(mappingType)) { + DocumentMapper existingMapper = mapperService.documentMapper(mappingType); + + if (existingMapper == null || mappingSource.equals(existingMapper.mappingSource()) == false) { + String op = existingMapper == null ? "adding" : "updating"; if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) { - logger.debug("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string()); + logger.debug("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string()); } else if (logger.isTraceEnabled()) { - logger.trace("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string()); + logger.trace("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string()); } else { - logger.debug("[{}] adding mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, mappingType); + logger.debug("[{}] {} mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, op, mappingType); } // we don't apply default, since it has been applied when the mappings were parsed initially mapperService.merge(mappingType, mappingSource, false, true); @@ -425,24 +406,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent tuple : seenMappings.keySet()) { - if (tuple.v1().equals(index)) { - seenMappings.remove(tuple); - } - } - } - private void deleteIndex(String index, String reason) { try { indicesService.deleteIndex(index, reason); } catch (Throwable e) { logger.warn("failed to delete index ({})", e, reason); } - // clear seen mappings as well - clearSeenMappings(index); }