Merge pull request #15142 from jpountz/fix/index_level_mappings

Treat mappings at an index-level feature.
This commit is contained in:
Adrien Grand 2015-12-02 16:32:49 +01:00
commit 7854368180
3 changed files with 52 additions and 132 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -57,7 +56,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefreshRequest request) {
final DiscoveryNodes nodes = state.nodes();
if (nodes.masterNode() == null) {
logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types()));
logger.warn("can't send mapping refresh for [{}], no master known.", request.index());
return;
}
transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
@ -67,7 +66,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
@Override
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
metaDataMappingService.refreshMapping(request.index(), request.indexUUID());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -76,16 +75,14 @@ public class NodeMappingRefreshAction extends AbstractComponent {
private String index;
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
private String[] types;
private String nodeId;
public NodeMappingRefreshRequest() {
}
public NodeMappingRefreshRequest(String index, String indexUUID, String[] types, String nodeId) {
public NodeMappingRefreshRequest(String index, String indexUUID, String nodeId) {
this.index = index;
this.indexUUID = indexUUID;
this.types = types;
this.nodeId = nodeId;
}
@ -107,11 +104,6 @@ public class NodeMappingRefreshAction extends AbstractComponent {
return indexUUID;
}
public String[] types() {
return types;
}
public String nodeId() {
return nodeId;
}
@ -120,7 +112,6 @@ public class NodeMappingRefreshAction extends AbstractComponent {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeStringArray(types);
out.writeString(nodeId);
out.writeString(indexUUID);
}
@ -129,7 +120,6 @@ public class NodeMappingRefreshAction extends AbstractComponent {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
types = in.readStringArray();
nodeId = in.readString();
indexUUID = in.readString();
}

View File

@ -69,12 +69,10 @@ public class MetaDataMappingService extends AbstractComponent {
static class RefreshTask {
final String index;
final String indexUUID;
final String[] types;
RefreshTask(String index, final String indexUUID, String[] types) {
RefreshTask(String index, final String indexUUID) {
this.index = index;
this.indexUUID = indexUUID;
this.types = types;
}
}
@ -120,13 +118,16 @@ public class MetaDataMappingService extends AbstractComponent {
// the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep
// the latest (based on order) update mapping one per node
List<RefreshTask> allIndexTasks = entry.getValue();
List<RefreshTask> tasks = new ArrayList<>();
boolean hasTaskWithRightUUID = false;
for (RefreshTask task : allIndexTasks) {
if (!indexMetaData.isSameUUID(task.indexUUID)) {
if (indexMetaData.isSameUUID(task.indexUUID)) {
hasTaskWithRightUUID = true;
} else {
logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task);
continue;
}
tasks.add(task);
}
if (hasTaskWithRightUUID == false) {
continue;
}
// construct the actual index if needed, and make sure the relevant mappings are there
@ -134,24 +135,17 @@ public class MetaDataMappingService extends AbstractComponent {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList());
removeIndex = true;
Set<String> typesToIntroduce = new HashSet<>();
for (RefreshTask task : tasks) {
Collections.addAll(typesToIntroduce, task.types);
}
for (String type : typesToIntroduce) {
// only add the current relevant mapping (if exists)
if (indexMetaData.getMappings().containsKey(type)) {
for (ObjectCursor<MappingMetaData> metaData : indexMetaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
indexService.mapperService().merge(type, indexMetaData.getMappings().get(type).source(), false, true);
}
indexService.mapperService().merge(metaData.value.type(), metaData.value.source(), false, true);
}
}
IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
try {
boolean indexDirty = processIndexMappingTasks(tasks, indexService, builder);
boolean indexDirty = refreshIndexMapping(indexService, builder);
if (indexDirty) {
mdBuilder.put(builder);
dirty = true;
@ -169,38 +163,28 @@ public class MetaDataMappingService extends AbstractComponent {
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
private boolean processIndexMappingTasks(List<RefreshTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
private boolean refreshIndexMapping(IndexService indexService, IndexMetaData.Builder builder) {
boolean dirty = false;
String index = indexService.index().name();
// keep track of what we already refreshed, no need to refresh it again...
Set<String> processedRefreshes = new HashSet<>();
for (RefreshTask refreshTask : tasks) {
try {
List<String> updatedTypes = new ArrayList<>();
for (String type : refreshTask.types) {
if (processedRefreshes.contains(type)) {
continue;
}
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (mapper == null) {
continue;
}
for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) {
final String type = mapper.type();
if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
updatedTypes.add(type);
}
}
// if a single type is not up-to-date, re-send everything
if (updatedTypes.isEmpty() == false) {
logger.warn("[{}] re-syncing mappings with cluster state because of types [{}]", index, updatedTypes);
dirty = true;
for (DocumentMapper mapper : indexService.mapperService().docMappers(true)) {
builder.putMapping(new MappingMetaData(mapper));
}
processedRefreshes.add(type);
}
if (updatedTypes.isEmpty()) {
continue;
}
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
}
logger.warn("[{}] failed to refresh-mapping in cluster state", t, index);
}
return dirty;
}
@ -208,9 +192,9 @@ public class MetaDataMappingService extends AbstractComponent {
/**
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String indexUUID, final String... types) {
final RefreshTask refreshTask = new RefreshTask(index, indexUUID, types);
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]",
public void refreshMapping(final String index, final String indexUUID) {
final RefreshTask refreshTask = new RefreshTask(index, indexUUID);
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "]",
refreshTask,
ClusterStateTaskConfig.build(Priority.HIGH),
refreshExecutor,
@ -236,18 +220,13 @@ public class MetaDataMappingService extends AbstractComponent {
if (indicesService.hasIndex(index) == false) {
indicesToClose.add(index);
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
// make sure to add custom default mapping if exists
if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes());
// add mappings for all types, we need them for cross-type validation
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
indexService.mapperService().merge(mapping.value.type(), mapping.value.source(), false, request.updateAllTypes());
}
} else {
indexService = indicesService.indexService(index);
}
// only add the current relevant mapping (if exists and not yet added)
if (indexMetaData.getMappings().containsKey(request.type()) &&
!indexService.mapperService().hasMapping(request.type())) {
indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes());
}
}
}
}

View File

@ -349,7 +349,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// we only create / update here
continue;
}
List<String> typesToRefresh = new ArrayList<>();
boolean requireRefresh = false;
String index = indexMetaData.getIndex();
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
@ -358,31 +358,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
try {
MapperService mapperService = indexService.mapperService();
// first, go over and update the _default_ mapping (if exists)
if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
boolean requireRefresh = processMapping(index, mapperService, MapperService.DEFAULT_MAPPING, indexMetaData.mapping(MapperService.DEFAULT_MAPPING).source());
if (requireRefresh) {
typesToRefresh.add(MapperService.DEFAULT_MAPPING);
}
}
// go over and add the relevant mappings (or update them)
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
MappingMetaData mappingMd = cursor.value;
String mappingType = mappingMd.type();
CompressedXContent mappingSource = mappingMd.source();
if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first
continue;
requireRefresh |= processMapping(index, mapperService, mappingType, mappingSource);
}
boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource);
if (requireRefresh) {
typesToRefresh.add(mappingType);
}
}
if (!typesToRefresh.isEmpty() && sendRefreshMapping) {
if (requireRefresh && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.getIndexUUID(),
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId())
event.state().nodes().localNodeId())
);
}
} catch (Throwable t) {
@ -398,50 +384,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private boolean processMapping(String index, MapperService mapperService, String mappingType, CompressedXContent mappingSource) throws Throwable {
if (!seenMappings.containsKey(new Tuple<>(index, mappingType))) {
seenMappings.put(new Tuple<>(index, mappingType), true);
}
// refresh mapping can happen for 2 reasons. The first is less urgent, and happens when the mapping on this
// node is ahead of what there is in the cluster state (yet an update-mapping has been sent to it already,
// it just hasn't been processed yet and published). Eventually, the mappings will converge, and the refresh
// mapping sent is more of a safe keeping (assuming the update mapping failed to reach the master, ...)
// the second case is where the parsing/merging of the mapping from the metadata doesn't result in the same
// refresh mapping can happen when the parsing/merging of the mapping from the metadata doesn't result in the same
// mapping, in this case, we send to the master to refresh its own version of the mappings (to conform with the
// merge version of it, which it does when refreshing the mappings), and warn log it.
boolean requiresRefresh = false;
try {
if (!mapperService.hasMapping(mappingType)) {
if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) {
logger.debug("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string());
} else if (logger.isTraceEnabled()) {
logger.trace("[{}] adding mapping [{}], source [{}]", index, mappingType, mappingSource.string());
} else {
logger.debug("[{}] adding mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, mappingType);
}
// we don't apply default, since it has been applied when the mappings were parsed initially
mapperService.merge(mappingType, mappingSource, false, true);
if (!mapperService.documentMapper(mappingType).mappingSource().equals(mappingSource)) {
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
requiresRefresh = true;
}
} else {
DocumentMapper existingMapper = mapperService.documentMapper(mappingType);
if (!mappingSource.equals(existingMapper.mappingSource())) {
// mapping changed, update it
if (existingMapper == null || mappingSource.equals(existingMapper.mappingSource()) == false) {
String op = existingMapper == null ? "adding" : "updating";
if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) {
logger.debug("[{}] updating mapping [{}], source [{}]", index, mappingType, mappingSource.string());
logger.debug("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string());
} else if (logger.isTraceEnabled()) {
logger.trace("[{}] updating mapping [{}], source [{}]", index, mappingType, mappingSource.string());
logger.trace("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string());
} else {
logger.debug("[{}] updating mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, mappingType);
logger.debug("[{}] {} mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, op, mappingType);
}
// we don't apply default, since it has been applied when the mappings were parsed initially
mapperService.merge(mappingType, mappingSource, false, true);
if (!mapperService.documentMapper(mappingType).mappingSource().equals(mappingSource)) {
requiresRefresh = true;
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
}
requiresRefresh = true;
}
}
} catch (Throwable e) {
@ -781,27 +744,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (Throwable e) {
logger.warn("failed to clean index ({})", e, reason);
}
clearSeenMappings(index);
}
private void clearSeenMappings(String index) {
// clear seen mappings as well
for (Tuple<String, String> tuple : seenMappings.keySet()) {
if (tuple.v1().equals(index)) {
seenMappings.remove(tuple);
}
}
}
private void deleteIndex(String index, String reason) {
try {
indicesService.deleteIndex(index, reason);
} catch (Throwable e) {
logger.warn("failed to delete index ({})", e, reason);
}
// clear seen mappings as well
clearSeenMappings(index);
}