Treat mappings at an index-level feature.

Today we try to have type-level granularity when dealing with mappings. This
does not play well with the cross-type validations that we are adding. For
instance we prevent the `_parent` field to point to an existing type. This
validation would be skipped today in the case of dedicated master nodes, since
those master nodes would only create the type that is being updated when
updating a mapping.
This commit is contained in:
Adrien Grand 2015-12-01 11:34:32 +01:00
parent b7f497627f
commit 79188ed38d
3 changed files with 52 additions and 132 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -57,7 +56,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefreshRequest request) {
final DiscoveryNodes nodes = state.nodes();
if (nodes.masterNode() == null) {
logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types()));
logger.warn("can't send mapping refresh for [{}], no master known.", request.index());
return;
}
transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
@ -67,7 +66,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
@Override
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
metaDataMappingService.refreshMapping(request.index(), request.indexUUID());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -76,16 +75,14 @@ public class NodeMappingRefreshAction extends AbstractComponent {
private String index;
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
private String[] types;
private String nodeId;
public NodeMappingRefreshRequest() {
}
public NodeMappingRefreshRequest(String index, String indexUUID, String[] types, String nodeId) {
public NodeMappingRefreshRequest(String index, String indexUUID, String nodeId) {
this.index = index;
this.indexUUID = indexUUID;
this.types = types;
this.nodeId = nodeId;
}
@ -107,11 +104,6 @@ public class NodeMappingRefreshAction extends AbstractComponent {
return indexUUID;
}
public String[] types() {
return types;
}
public String nodeId() {
return nodeId;
}
@ -120,7 +112,6 @@ public class NodeMappingRefreshAction extends AbstractComponent {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeStringArray(types);
out.writeString(nodeId);
out.writeString(indexUUID);
}
@ -129,7 +120,6 @@ public class NodeMappingRefreshAction extends AbstractComponent {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
types = in.readStringArray();
nodeId = in.readString();
indexUUID = in.readString();
}

View File

@ -69,12 +69,10 @@ public class MetaDataMappingService extends AbstractComponent {
static class RefreshTask {
final String index;
final String indexUUID;
final String[] types;
RefreshTask(String index, final String indexUUID, String[] types) {
RefreshTask(String index, final String indexUUID) {
this.index = index;
this.indexUUID = indexUUID;
this.types = types;
}
}
@ -120,13 +118,16 @@ public class MetaDataMappingService extends AbstractComponent {
// the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep
// the latest (based on order) update mapping one per node
List<RefreshTask> allIndexTasks = entry.getValue();
List<RefreshTask> tasks = new ArrayList<>();
boolean hasTaskWithRightUUID = false;
for (RefreshTask task : allIndexTasks) {
if (!indexMetaData.isSameUUID(task.indexUUID)) {
if (indexMetaData.isSameUUID(task.indexUUID)) {
hasTaskWithRightUUID = true;
} else {
logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task);
continue;
}
tasks.add(task);
}
if (hasTaskWithRightUUID == false) {
continue;
}
// construct the actual index if needed, and make sure the relevant mappings are there
@ -134,24 +135,17 @@ public class MetaDataMappingService extends AbstractComponent {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList());
removeIndex = true;
Set<String> typesToIntroduce = new HashSet<>();
for (RefreshTask task : tasks) {
Collections.addAll(typesToIntroduce, task.types);
}
for (String type : typesToIntroduce) {
// only add the current relevant mapping (if exists)
if (indexMetaData.getMappings().containsKey(type)) {
// don't apply the default mapping, it has been applied when the mapping was created
indexService.mapperService().merge(type, indexMetaData.getMappings().get(type).source(), false, true);
}
for (ObjectCursor<MappingMetaData> metaData : indexMetaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
indexService.mapperService().merge(metaData.value.type(), metaData.value.source(), false, true);
}
}
IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
try {
boolean indexDirty = processIndexMappingTasks(tasks, indexService, builder);
boolean indexDirty = refreshIndexMapping(indexService, builder);
if (indexDirty) {
mdBuilder.put(builder);
dirty = true;
@ -169,38 +163,28 @@ public class MetaDataMappingService extends AbstractComponent {
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
private boolean processIndexMappingTasks(List<RefreshTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
private boolean refreshIndexMapping(IndexService indexService, IndexMetaData.Builder builder) {
boolean dirty = false;
String index = indexService.index().name();
// keep track of what we already refreshed, no need to refresh it again...
Set<String> processedRefreshes = new HashSet<>();
for (RefreshTask refreshTask : tasks) {
try {
List<String> updatedTypes = new ArrayList<>();
for (String type : refreshTask.types) {
if (processedRefreshes.contains(type)) {
continue;
}
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (mapper == null) {
continue;
}
if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
updatedTypes.add(type);
builder.putMapping(new MappingMetaData(mapper));
}
processedRefreshes.add(type);
try {
List<String> updatedTypes = new ArrayList<>();
for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) {
final String type = mapper.type();
if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
updatedTypes.add(type);
}
if (updatedTypes.isEmpty()) {
continue;
}
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
}
// if a single type is not up-to-date, re-send everything
if (updatedTypes.isEmpty() == false) {
logger.warn("[{}] re-syncing mappings with cluster state because of types [{}]", index, updatedTypes);
dirty = true;
for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) {
builder.putMapping(new MappingMetaData(mapper));
}
}
} catch (Throwable t) {
logger.warn("[{}] failed to refresh-mapping in cluster state", t, index);
}
return dirty;
}
@ -208,9 +192,9 @@ public class MetaDataMappingService extends AbstractComponent {
/**
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String indexUUID, final String... types) {
final RefreshTask refreshTask = new RefreshTask(index, indexUUID, types);
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]",
public void refreshMapping(final String index, final String indexUUID) {
final RefreshTask refreshTask = new RefreshTask(index, indexUUID);
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "]",
refreshTask,
ClusterStateTaskConfig.build(Priority.HIGH),
refreshExecutor,
@ -236,18 +220,13 @@ public class MetaDataMappingService extends AbstractComponent {
if (indicesService.hasIndex(index) == false) {
indicesToClose.add(index);
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
// make sure to add custom default mapping if exists
if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes());
// add mappings for all types, we need them for cross-type validation
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
indexService.mapperService().merge(mapping.value.type(), mapping.value.source(), false, request.updateAllTypes());
}
} else {
indexService = indicesService.indexService(index);
}
// only add the current relevant mapping (if exists and not yet added)
if (indexMetaData.getMappings().containsKey(request.type()) &&
!indexService.mapperService().hasMapping(request.type())) {
indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes());
}
}
}
}

View File

@ -349,7 +349,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// we only create / update here
continue;
}
List<String> typesToRefresh = new ArrayList<>();
boolean requireRefresh = false;
String index = indexMetaData.getIndex();
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
@ -358,31 +358,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
try {
MapperService mapperService = indexService.mapperService();
// first, go over and update the _default_ mapping (if exists)
if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
boolean requireRefresh = processMapping(index, mapperService, MapperService.DEFAULT_MAPPING, indexMetaData.mapping(MapperService.DEFAULT_MAPPING).source());
if (requireRefresh) {
typesToRefresh.add(MapperService.DEFAULT_MAPPING);
}
}
// go over and add the relevant mappings (or update them)
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
MappingMetaData mappingMd = cursor.value;
String mappingType = mappingMd.type();
CompressedXContent mappingSource = mappingMd.source();
if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first
continue;
}
boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource);
if (requireRefresh) {
typesToRefresh.add(mappingType);
}
requireRefresh |= processMapping(index, mapperService, mappingType, mappingSource);
}
if (!typesToRefresh.isEmpty() && sendRefreshMapping) {
if (requireRefresh && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.getIndexUUID(),
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId())
event.state().nodes().localNodeId())
);
}
} catch (Throwable t) {
@ -398,26 +384,21 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private boolean processMapping(String index, MapperService mapperService, String mappingType, CompressedXContent mappingSource) throws Throwable {
if (!seenMappings.containsKey(new Tuple<>(index, mappingType))) {
seenMappings.put(new Tuple<>(index, mappingType), true);
}
// refresh mapping can happen for 2 reasons. The first is less urgent, and happens when the mapping on this
// node is ahead of what there is in the cluster state (yet an update-mapping has been sent to it already,
// it just hasn't been processed yet and published). Eventually, the mappings will converge, and the refresh
// mapping sent is more of a safe keeping (assuming the update mapping failed to reach the master, ...)
// the second case is where the parsing/merging of the mapping from the metadata doesn't result in the same
// refresh mapping can happen when the parsing/merging of the mapping from the metadata doesn't result in the same
// mapping, in this case, we send to the master to refresh its own version of the mappings (to conform with the
// merge version of it, which it does when refreshing the mappings), and warn log it.
boolean requiresRefresh = false;
try {
if (!mapperService.hasMapping(mappingType)) {
DocumentMapper existingMapper = mapperService.documentMapper(mappingType);
if (existingMapper == null || mappingSource.equals(existingMapper.mappingSource()) == false) {
String op = existingMapper == null ? "adding" : "updating";
if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) {
logger.debug("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string());
logger.debug("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string());
} else if (logger.isTraceEnabled()) {
logger.trace("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string());
logger.trace("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string());
} else {
logger.debug("[{}] adding mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, mappingType);
logger.debug("[{}] {} mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, op, mappingType);
}
// we don't apply default, since it has been applied when the mappings were parsed initially
mapperService.merge(mappingType, mappingSource, false, true);
@ -425,24 +406,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
requiresRefresh = true;
}
} else {
DocumentMapper existingMapper = mapperService.documentMapper(mappingType);
if (!mappingSource.equals(existingMapper.mappingSource())) {
// mapping changed, update it
if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) {
logger.debug("[{}] updating mapping [{}], source [{}]", index, mappingType, mappingSource.string());
} else if (logger.isTraceEnabled()) {
logger.trace("[{}] updating mapping [{}], source [{}]", index, mappingType, mappingSource.string());
} else {
logger.debug("[{}] updating mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, mappingType);
}
// we don't apply default, since it has been applied when the mappings were parsed initially
mapperService.merge(mappingType, mappingSource, false, true);
if (!mapperService.documentMapper(mappingType).mappingSource().equals(mappingSource)) {
requiresRefresh = true;
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
}
}
}
} catch (Throwable e) {
logger.warn("[{}] failed to add mapping [{}], source [{}]", e, index, mappingType, mappingSource);
@ -781,27 +744,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (Throwable e) {
logger.warn("failed to clean index ({})", e, reason);
}
clearSeenMappings(index);
}
private void clearSeenMappings(String index) {
// clear seen mappings as well
for (Tuple<String, String> tuple : seenMappings.keySet()) {
if (tuple.v1().equals(index)) {
seenMappings.remove(tuple);
}
}
}
private void deleteIndex(String index, String reason) {
try {
indicesService.deleteIndex(index, reason);
} catch (Throwable e) {
logger.warn("failed to delete index ({})", e, reason);
}
// clear seen mappings as well
clearSeenMappings(index);
}