diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 1efac00a684..f8611e7ed6c 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -20,13 +20,13 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -106,130 +106,154 @@ public class MetaDataMappingService extends AbstractComponent { * and generate a single cluster change event out of all of those. */ ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception { - List tasks = new ArrayList(); - refreshOrUpdateQueue.drainTo(tasks); + List allTasks = new ArrayList(); + refreshOrUpdateQueue.drainTo(allTasks); - if (tasks.isEmpty()) { + if (allTasks.isEmpty()) { return currentState; } - Set indicesToRemove = Sets.newHashSet(); - // keep track of what we already refreshed, no need to refresh it again... - Set> processedRefreshes = Sets.newHashSet(); - try { - boolean dirty = false; - MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData()); - for (Object task : tasks) { - if (task instanceof RefreshTask) { - RefreshTask refreshTask = (RefreshTask) task; - String index = refreshTask.index; - final IndexMetaData indexMetaData = mdBuilder.get(index); - if (indexMetaData == null) { - // index got delete on us, ignore... - continue; - } - IndexService indexService = indicesService.indexService(index); - if (indexService == null) { - // we need to create the index here, and add the current mapping to it, so we can merge - indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); - indicesToRemove.add(index); + // break down to tasks per index, so we can optimize the on demand index service creation + // to only happen for the duration of a single index processing of its respective events + Map> tasksPerIndex = Maps.newHashMap(); + for (Object task : allTasks) { + String index = null; + if (task instanceof UpdateTask) { + index = ((UpdateTask) task).index; + } else if (task instanceof RefreshTask) { + index = ((RefreshTask) task).index; + } else { + logger.warn("illegal state, got wrong mapping task type [{}]", task); + } + if (index != null) { + List indexTasks = tasksPerIndex.get(index); + if (indexTasks == null) { + indexTasks = new ArrayList(); + tasksPerIndex.put(index, indexTasks); + } + indexTasks.add(task); + } + } + + boolean dirty = false; + MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData()); + for (Map.Entry> entry : tasksPerIndex.entrySet()) { + String index = entry.getKey(); + List tasks = entry.getValue(); + boolean removeIndex = false; + // keep track of what we already refreshed, no need to refresh it again... + Set processedRefreshes = Sets.newHashSet(); + try { + for (Object task : tasks) { + if (task instanceof RefreshTask) { + RefreshTask refreshTask = (RefreshTask) task; + final IndexMetaData indexMetaData = mdBuilder.get(index); + if (indexMetaData == null) { + // index got delete on us, ignore... + continue; + } + IndexService indexService = indicesService.indexService(index); + if (indexService == null) { + // we need to create the index here, and add the current mapping to it, so we can merge + indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); + removeIndex = true; + for (String type : refreshTask.types) { + // only add the current relevant mapping (if exists) + if (indexMetaData.mappings().containsKey(type)) { + // don't apply the default mapping, it has been applied when the mapping was created + indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source().string(), false); + } + } + } + IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData); + List updatedTypes = Lists.newArrayList(); for (String type : refreshTask.types) { + if (processedRefreshes.contains(type)) { + continue; + } + DocumentMapper mapper = indexService.mapperService().documentMapper(type); + if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) { + updatedTypes.add(type); + indexMetaDataBuilder.putMapping(new MappingMetaData(mapper)); + } + processedRefreshes.add(type); + } + + if (updatedTypes.isEmpty()) { + continue; + } + + logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); + mdBuilder.put(indexMetaDataBuilder); + dirty = true; + + } else if (task instanceof UpdateTask) { + UpdateTask updateTask = (UpdateTask) task; + String type = updateTask.type; + CompressedString mappingSource = updateTask.mappingSource; + + // first, check if it really needs to be updated + final IndexMetaData indexMetaData = mdBuilder.get(index); + if (indexMetaData == null) { + // index got delete on us, ignore... + continue; + } + if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) { + continue; + } + + IndexService indexService = indicesService.indexService(index); + if (indexService == null) { + // we need to create the index here, and add the current mapping to it, so we can merge + indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); + removeIndex = true; // only add the current relevant mapping (if exists) if (indexMetaData.mappings().containsKey(type)) { - // don't apply the default mapping, it has been applied when the mapping was created indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source().string(), false); } } - } - IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData); - List updatedTypes = Lists.newArrayList(); - for (String type : refreshTask.types) { - Tuple processedRefresh = Tuple.tuple(index, type); - if (processedRefreshes.contains(processedRefresh)) { + + DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource.string(), false); + processedRefreshes.add(type); + + // if we end up with the same mapping as the original once, ignore + if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) { continue; } - DocumentMapper mapper = indexService.mapperService().documentMapper(type); - if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) { - updatedTypes.add(type); - indexMetaDataBuilder.putMapping(new MappingMetaData(mapper)); + + // build the updated mapping source + if (logger.isDebugEnabled()) { + try { + logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource().string()); + } catch (Exception e) { + // ignore + } + } else if (logger.isInfoEnabled()) { + logger.info("[{}] update_mapping [{}] (dynamic)", index, type); } - processedRefreshes.add(processedRefresh); + + mdBuilder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(updatedMapper))); + dirty = true; + } else { + logger.warn("illegal state, got wrong mapping task type [{}]", task); } - - if (updatedTypes.isEmpty()) { - continue; - } - - logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); - mdBuilder.put(indexMetaDataBuilder); - dirty = true; - - } else if (task instanceof UpdateTask) { - UpdateTask updateTask = (UpdateTask) task; - String index = updateTask.index; - String type = updateTask.type; - CompressedString mappingSource = updateTask.mappingSource; - - // first, check if it really needs to be updated - final IndexMetaData indexMetaData = mdBuilder.get(index); - if (indexMetaData == null) { - // index got delete on us, ignore... - continue; - } - if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) { - continue; - } - - IndexService indexService = indicesService.indexService(index); - if (indexService == null) { - // we need to create the index here, and add the current mapping to it, so we can merge - indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); - indicesToRemove.add(index); - // only add the current relevant mapping (if exists) - if (indexMetaData.mappings().containsKey(type)) { - indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source().string(), false); - } - } - - DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource.string(), false); - processedRefreshes.add(Tuple.tuple(index, type)); - - // if we end up with the same mapping as the original once, ignore - if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) { - continue; - } - - // build the updated mapping source - if (logger.isDebugEnabled()) { - try { - logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource().string()); - } catch (Exception e) { - // ignore - } - } else if (logger.isInfoEnabled()) { - logger.info("[{}] update_mapping [{}] (dynamic)", index, type); - } - - mdBuilder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(updatedMapper))); - dirty = true; - } else { - logger.warn("illegal state, got wrong mapping task type [{}]", task); } - } - if (!dirty) { - return currentState; - } - return newClusterStateBuilder().state(currentState).metaData(mdBuilder).build(); - } finally { - for (String index : indicesToRemove) { - indicesService.removeIndex(index, "created for mapping processing"); - } - for (Object task : tasks) { - if (task instanceof UpdateTask) { - ((UpdateTask) task).listener.onResponse(new Response(true)); + } finally { + if (removeIndex) { + indicesService.removeIndex(index, "created for mapping processing"); + } + for (Object task : tasks) { + if (task instanceof UpdateTask) { + ((UpdateTask) task).listener.onResponse(new Response(true)); + } } } } + + if (!dirty) { + return currentState; + } + return newClusterStateBuilder().state(currentState).metaData(mdBuilder).build(); } /**