diff --git a/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java b/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java index d0eb29d6b22..f8507e5b689 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -57,7 +56,7 @@ public class NodeMappingRefreshAction extends AbstractComponent { public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefreshRequest request) { final DiscoveryNodes nodes = state.nodes(); if (nodes.masterNode() == null) { - logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types())); + logger.warn("can't send mapping refresh for [{}], no master known.", request.index()); return; } transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); @@ -67,7 +66,7 @@ public class NodeMappingRefreshAction extends AbstractComponent { @Override public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception { - metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types()); + metaDataMappingService.refreshMapping(request.index(), request.indexUUID()); channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -76,16 +75,14 @@ public class NodeMappingRefreshAction extends AbstractComponent { private String index; private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; - private String[] types; private String nodeId; public NodeMappingRefreshRequest() { } - public NodeMappingRefreshRequest(String index, String indexUUID, String[] types, String nodeId) { + public NodeMappingRefreshRequest(String index, String indexUUID, String nodeId) { this.index = index; this.indexUUID = indexUUID; - this.types = types; this.nodeId = nodeId; } @@ -107,11 +104,6 @@ public class NodeMappingRefreshAction extends AbstractComponent { return indexUUID; } - - public String[] types() { - return types; - } - public String nodeId() { return nodeId; } @@ -120,7 +112,6 @@ public class NodeMappingRefreshAction extends AbstractComponent { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(index); - out.writeStringArray(types); out.writeString(nodeId); out.writeString(indexUUID); } @@ -129,7 +120,6 @@ public class NodeMappingRefreshAction extends AbstractComponent { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); index = in.readString(); - types = in.readStringArray(); nodeId = in.readString(); indexUUID = in.readString(); } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index c2725359140..44de399bed4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -69,12 +69,10 @@ public class MetaDataMappingService extends AbstractComponent { static class RefreshTask { final String index; final String indexUUID; - final String[] types; - RefreshTask(String index, final String indexUUID, String[] types) { + RefreshTask(String index, final String indexUUID) { this.index = index; this.indexUUID = indexUUID; - this.types = types; } } @@ -120,13 +118,16 @@ public class MetaDataMappingService extends AbstractComponent { // the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep // the latest (based on order) update mapping one per node List allIndexTasks = entry.getValue(); - List tasks = new ArrayList<>(); + boolean hasTaskWithRightUUID = false; for (RefreshTask task : allIndexTasks) { - if (!indexMetaData.isSameUUID(task.indexUUID)) { + if (indexMetaData.isSameUUID(task.indexUUID)) { + hasTaskWithRightUUID = true; + } else { logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task); - continue; } - tasks.add(task); + } + if (hasTaskWithRightUUID == false) { + continue; } // construct the actual index if needed, and make sure the relevant mappings are there @@ -134,24 +135,17 @@ public class MetaDataMappingService extends AbstractComponent { IndexService indexService = indicesService.indexService(index); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge - indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST); + indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList()); removeIndex = true; - Set typesToIntroduce = new HashSet<>(); - for (RefreshTask task : tasks) { - Collections.addAll(typesToIntroduce, task.types); - } - for (String type : typesToIntroduce) { - // only add the current relevant mapping (if exists) - if (indexMetaData.getMappings().containsKey(type)) { - // don't apply the default mapping, it has been applied when the mapping was created - indexService.mapperService().merge(type, indexMetaData.getMappings().get(type).source(), false, true); - } + for (ObjectCursor metaData : indexMetaData.getMappings().values()) { + // don't apply the default mapping, it has been applied when the mapping was created + indexService.mapperService().merge(metaData.value.type(), metaData.value.source(), false, true); } } IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData); try { - boolean indexDirty = processIndexMappingTasks(tasks, indexService, builder); + boolean indexDirty = refreshIndexMapping(indexService, builder); if (indexDirty) { mdBuilder.put(builder); dirty = true; @@ -169,38 +163,28 @@ public class MetaDataMappingService extends AbstractComponent { return ClusterState.builder(currentState).metaData(mdBuilder).build(); } - private boolean processIndexMappingTasks(List tasks, IndexService indexService, IndexMetaData.Builder builder) { + private boolean refreshIndexMapping(IndexService indexService, IndexMetaData.Builder builder) { boolean dirty = false; String index = indexService.index().name(); - // keep track of what we already refreshed, no need to refresh it again... - Set processedRefreshes = new HashSet<>(); - for (RefreshTask refreshTask : tasks) { - try { - List updatedTypes = new ArrayList<>(); - for (String type : refreshTask.types) { - if (processedRefreshes.contains(type)) { - continue; - } - DocumentMapper mapper = indexService.mapperService().documentMapper(type); - if (mapper == null) { - continue; - } - if (!mapper.mappingSource().equals(builder.mapping(type).source())) { - updatedTypes.add(type); - builder.putMapping(new MappingMetaData(mapper)); - } - processedRefreshes.add(type); + try { + List updatedTypes = new ArrayList<>(); + for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) { + final String type = mapper.type(); + if (!mapper.mappingSource().equals(builder.mapping(type).source())) { + updatedTypes.add(type); } - - if (updatedTypes.isEmpty()) { - continue; - } - - logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); - dirty = true; - } catch (Throwable t) { - logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types); } + + // if a single type is not up-to-date, re-send everything + if (updatedTypes.isEmpty() == false) { + logger.warn("[{}] re-syncing mappings with cluster state because of types [{}]", index, updatedTypes); + dirty = true; + for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) { + builder.putMapping(new MappingMetaData(mapper)); + } + } + } catch (Throwable t) { + logger.warn("[{}] failed to refresh-mapping in cluster state", t, index); } return dirty; } @@ -208,9 +192,9 @@ public class MetaDataMappingService extends AbstractComponent { /** * Refreshes mappings if they are not the same between original and parsed version */ - public void refreshMapping(final String index, final String indexUUID, final String... types) { - final RefreshTask refreshTask = new RefreshTask(index, indexUUID, types); - clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", + public void refreshMapping(final String index, final String indexUUID) { + final RefreshTask refreshTask = new RefreshTask(index, indexUUID); + clusterService.submitStateUpdateTask("refresh-mapping [" + index + "]", refreshTask, ClusterStateTaskConfig.build(Priority.HIGH), refreshExecutor, @@ -236,18 +220,13 @@ public class MetaDataMappingService extends AbstractComponent { if (indicesService.hasIndex(index) == false) { indicesToClose.add(index); indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST); - // make sure to add custom default mapping if exists - if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) { - indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes()); + // add mappings for all types, we need them for cross-type validation + for (ObjectCursor mapping : indexMetaData.getMappings().values()) { + indexService.mapperService().merge(mapping.value.type(), mapping.value.source(), false, request.updateAllTypes()); } } else { indexService = indicesService.indexService(index); } - // only add the current relevant mapping (if exists and not yet added) - if (indexMetaData.getMappings().containsKey(request.type()) && - !indexService.mapperService().hasMapping(request.type())) { - indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes()); - } } } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index cd60b87765a..fc793a5dfda 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -349,7 +349,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent typesToRefresh = new ArrayList<>(); + boolean requireRefresh = false; String index = indexMetaData.getIndex(); IndexService indexService = indicesService.indexService(index); if (indexService == null) { @@ -358,31 +358,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent cursor : indexMetaData.getMappings().values()) { MappingMetaData mappingMd = cursor.value; String mappingType = mappingMd.type(); CompressedXContent mappingSource = mappingMd.source(); - if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first - continue; - } - boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource); - if (requireRefresh) { - typesToRefresh.add(mappingType); - } + requireRefresh |= processMapping(index, mapperService, mappingType, mappingSource); } - if (!typesToRefresh.isEmpty() && sendRefreshMapping) { + if (requireRefresh && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh(event.state(), new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.getIndexUUID(), - typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()) + event.state().nodes().localNodeId()) ); } } catch (Throwable t) { @@ -398,26 +384,21 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent(index, mappingType))) { - seenMappings.put(new Tuple<>(index, mappingType), true); - } - - // refresh mapping can happen for 2 reasons. The first is less urgent, and happens when the mapping on this - // node is ahead of what there is in the cluster state (yet an update-mapping has been sent to it already, - // it just hasn't been processed yet and published). Eventually, the mappings will converge, and the refresh - // mapping sent is more of a safe keeping (assuming the update mapping failed to reach the master, ...) - // the second case is where the parsing/merging of the mapping from the metadata doesn't result in the same + // refresh mapping can happen when the parsing/merging of the mapping from the metadata doesn't result in the same // mapping, in this case, we send to the master to refresh its own version of the mappings (to conform with the // merge version of it, which it does when refreshing the mappings), and warn log it. boolean requiresRefresh = false; try { - if (!mapperService.hasMapping(mappingType)) { + DocumentMapper existingMapper = mapperService.documentMapper(mappingType); + + if (existingMapper == null || mappingSource.equals(existingMapper.mappingSource()) == false) { + String op = existingMapper == null ? "adding" : "updating"; if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) { - logger.debug("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string()); + logger.debug("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string()); } else if (logger.isTraceEnabled()) { - logger.trace("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string()); + logger.trace("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string()); } else { - logger.debug("[{}] adding mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, mappingType); + logger.debug("[{}] {} mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, op, mappingType); } // we don't apply default, since it has been applied when the mappings were parsed initially mapperService.merge(mappingType, mappingSource, false, true); @@ -425,24 +406,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent tuple : seenMappings.keySet()) { - if (tuple.v1().equals(index)) { - seenMappings.remove(tuple); - } - } - } - private void deleteIndex(String index, String reason) { try { indicesService.deleteIndex(index, reason); } catch (Throwable e) { logger.warn("failed to delete index ({})", e, reason); } - // clear seen mappings as well - clearSeenMappings(index); }