diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java index 088690a5242..06f34c9d636 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java @@ -60,12 +60,7 @@ public class NodeMappingRefreshAction extends AbstractComponent { public void nodeMappingRefresh(final NodeMappingRefreshRequest request) throws ElasticSearchException { DiscoveryNodes nodes = clusterService.state().nodes(); if (nodes.localNodeMaster()) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - innerMappingRefresh(request); - } - }); + innerMappingRefresh(request); } else { transportService.sendRequest(clusterService.state().nodes().masterNode(), NodeMappingRefreshTransportHandler.ACTION, request, EmptyTransportResponseHandler.INSTANCE_SAME); diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index c2cc5df38be..e456ec42e47 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.*; @@ -32,6 +31,7 @@ import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; @@ -43,11 +43,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidTypeNameException; import org.elasticsearch.indices.TypeMissingException; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -68,7 +65,7 @@ public class MetaDataMappingService extends AbstractComponent { private final NodeMappingCreatedAction mappingCreatedAction; - private final Map> indicesAndTypesToRefresh = Maps.newHashMap(); + private final BlockingQueue refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue(); @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) { @@ -78,50 +75,62 @@ public class MetaDataMappingService extends AbstractComponent { this.mappingCreatedAction = mappingCreatedAction; } - /** - * Refreshes mappings if they are not the same between original and parsed version - */ - public void refreshMapping(final String index, final String... types) { - synchronized (indicesAndTypesToRefresh) { - Set sTypes = indicesAndTypesToRefresh.get(index); - if (sTypes == null) { - sTypes = Sets.newHashSet(); - indicesAndTypesToRefresh.put(index, sTypes); - } - sTypes.addAll(Arrays.asList(types)); + static class RefreshTask { + final String index; + final String[] types; + + RefreshTask(String index, String[] types) { + this.index = index; + this.types = types; } - clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.URGENT, new ClusterStateUpdateTask() { - @Override - public void onFailure(String source, Throwable t) { - logger.warn("failure during [{}]", t, source); - } + } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - boolean createdIndex = false; - try { - Set sTypes; - synchronized (indicesAndTypesToRefresh) { - sTypes = indicesAndTypesToRefresh.remove(index); - } - // we already processed those types... - if (sTypes == null || sTypes.isEmpty()) { - return currentState; - } + static class UpdateTask { + final String index; + final String type; + final CompressedString mappingSource; + final Listener listener; - // first, check if it really needs to be updated - final IndexMetaData indexMetaData = currentState.metaData().index(index); + UpdateTask(String index, String type, CompressedString mappingSource, Listener listener) { + this.index = index; + this.type = type; + this.mappingSource = mappingSource; + this.listener = listener; + } + } + + /** + * Batch method to apply all the queued refresh or update operations. The idea is to try and batch as much + * as possible so we won't create the same index all the time for example for the updates on the same mapping + * and generate a single cluster change event out of all of those. + */ + ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception { + List tasks = new ArrayList(); + refreshOrUpdateQueue.drainTo(tasks); + + if (tasks.isEmpty()) { + return currentState; + } + + Set indicesToRemove = Sets.newHashSet(); + try { + boolean dirty = false; + MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData()); + for (Object task : tasks) { + if (task instanceof RefreshTask) { + RefreshTask refreshTask = (RefreshTask) task; + String index = refreshTask.index; + final IndexMetaData indexMetaData = mdBuilder.get(index); if (indexMetaData == null) { // index got delete on us, ignore... - return currentState; + continue; } - IndexService indexService = indicesService.indexService(index); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); - createdIndex = true; - for (String type : sTypes) { + indicesToRemove.add(index); + for (String type : refreshTask.types) { // only add the current relevant mapping (if exists) if (indexMetaData.mappings().containsKey(type)) { // don't apply the default mapping, it has been applied when the mapping was created @@ -131,7 +140,7 @@ public class MetaDataMappingService extends AbstractComponent { } IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData); List updatedTypes = Lists.newArrayList(); - for (String type : sTypes) { + for (String type : refreshTask.types) { DocumentMapper mapper = indexService.mapperService().documentMapper(type); if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) { updatedTypes.add(type); @@ -140,49 +149,34 @@ public class MetaDataMappingService extends AbstractComponent { } if (updatedTypes.isEmpty()) { - return currentState; + continue; } logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - builder.put(indexMetaDataBuilder); - return newClusterStateBuilder().state(currentState).metaData(builder).build(); - } finally { - if (createdIndex) { - indicesService.removeIndex(index, "created for mapping processing"); - } - } - } - }); - } + mdBuilder.put(indexMetaDataBuilder); + dirty = true; - public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) { + } else if (task instanceof UpdateTask) { + UpdateTask updateTask = (UpdateTask) task; + String index = updateTask.index; + String type = updateTask.type; + CompressedString mappingSource = updateTask.mappingSource; - clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); - } - - @Override - public ClusterState execute(final ClusterState currentState) throws Exception { - boolean createdIndex = false; - try { // first, check if it really needs to be updated - final IndexMetaData indexMetaData = currentState.metaData().index(index); + final IndexMetaData indexMetaData = mdBuilder.get(index); if (indexMetaData == null) { // index got delete on us, ignore... - return currentState; + continue; } if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) { - return currentState; + continue; } IndexService indexService = indicesService.indexService(index); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); - createdIndex = true; + indicesToRemove.add(index); // only add the current relevant mapping (if exists) if (indexMetaData.mappings().containsKey(type)) { indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source().string(), false); @@ -193,29 +187,72 @@ public class MetaDataMappingService extends AbstractComponent { // if we end up with the same mapping as the original once, ignore if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) { - return currentState; + continue; } // build the updated mapping source if (logger.isDebugEnabled()) { try { logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource().string()); - } catch (IOException e) { + } catch (Exception e) { // ignore } } else if (logger.isInfoEnabled()) { logger.info("[{}] update_mapping [{}] (dynamic)", index, type); } - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(updatedMapper))); - return newClusterStateBuilder().state(currentState).metaData(builder).build(); - } finally { - if (createdIndex) { - indicesService.removeIndex(index, "created for mapping processing"); - } + mdBuilder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(updatedMapper))); + dirty = true; + } else { + logger.warn("illegal state, got wrong mapping task type [{}]", task); } } + if (!dirty) { + return currentState; + } + return newClusterStateBuilder().state(currentState).metaData(mdBuilder).build(); + } finally { + for (String index : indicesToRemove) { + indicesService.removeIndex(index, "created for mapping processing"); + } + for (Object task : tasks) { + if (task instanceof UpdateTask) { + ((UpdateTask) task).listener.onResponse(new Response(true)); + } + } + } + } + + /** + * Refreshes mappings if they are not the same between original and parsed version + */ + public void refreshMapping(final String index, final String... types) { + refreshOrUpdateQueue.add(new RefreshTask(index, types)); + clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() { + @Override + public void onFailure(String source, Throwable t) { + logger.warn("failure during [{}]", t, source); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return executeRefreshOrUpdate(currentState); + } + }); + } + + public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) { + refreshOrUpdateQueue.add(new UpdateTask(index, type, mappingSource, listener)); + clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() { + @Override + public void onFailure(String source, Throwable t) { + listener.onFailure(t); + } + + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + return executeRefreshOrUpdate(currentState); + } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { @@ -225,7 +262,7 @@ public class MetaDataMappingService extends AbstractComponent { } public void removeMapping(final RemoveRequest request, final Listener listener) { - clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() { @Override public TimeValue timeout() { return request.masterTimeout; @@ -275,7 +312,7 @@ public class MetaDataMappingService extends AbstractComponent { public void putMapping(final PutRequest request, final Listener listener) { final AtomicBoolean notifyOnPostProcess = new AtomicBoolean(); - clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() { @Override public TimeValue timeout() { return request.masterTimeout;