From bb6df3467136b5b3ad1eea6f247af85ee21bc1b0 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 24 Jul 2013 01:55:32 +0200 Subject: [PATCH] bulk refresh and update mapping cluster events try and bulk as much as possible refresh and update mapping events, so they will all be processed at a single go, resulting in less cluster change events, and also reduce the load of multiple changes to the same index also, change the prio for those to HIGH, since we want URGENT ones (like create index, delete index) to execute --- .../index/NodeMappingRefreshAction.java | 7 +- .../metadata/MetaDataMappingService.java | 197 +++++++++++------- 2 files changed, 118 insertions(+), 86 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java index 088690a5242..06f34c9d636 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java @@ -60,12 +60,7 @@ public class NodeMappingRefreshAction extends AbstractComponent { public void nodeMappingRefresh(final NodeMappingRefreshRequest request) throws ElasticSearchException { DiscoveryNodes nodes = clusterService.state().nodes(); if (nodes.localNodeMaster()) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - innerMappingRefresh(request); - } - }); + innerMappingRefresh(request); } else { transportService.sendRequest(clusterService.state().nodes().masterNode(), NodeMappingRefreshTransportHandler.ACTION, request, EmptyTransportResponseHandler.INSTANCE_SAME); diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index c2cc5df38be..e456ec42e47 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.*; @@ -32,6 +31,7 @@ import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; @@ -43,11 +43,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidTypeNameException; import org.elasticsearch.indices.TypeMissingException; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -68,7 +65,7 @@ public class MetaDataMappingService extends AbstractComponent { private final NodeMappingCreatedAction mappingCreatedAction; - private final Map> indicesAndTypesToRefresh = Maps.newHashMap(); + private final BlockingQueue refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue(); @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) { @@ -78,50 +75,62 @@ public class MetaDataMappingService extends AbstractComponent { this.mappingCreatedAction = mappingCreatedAction; } - /** - * Refreshes mappings if they are not the same between original and parsed version - */ - public void refreshMapping(final String index, final String... types) { - synchronized (indicesAndTypesToRefresh) { - Set sTypes = indicesAndTypesToRefresh.get(index); - if (sTypes == null) { - sTypes = Sets.newHashSet(); - indicesAndTypesToRefresh.put(index, sTypes); - } - sTypes.addAll(Arrays.asList(types)); + static class RefreshTask { + final String index; + final String[] types; + + RefreshTask(String index, String[] types) { + this.index = index; + this.types = types; } - clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.URGENT, new ClusterStateUpdateTask() { - @Override - public void onFailure(String source, Throwable t) { - logger.warn("failure during [{}]", t, source); - } + } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - boolean createdIndex = false; - try { - Set sTypes; - synchronized (indicesAndTypesToRefresh) { - sTypes = indicesAndTypesToRefresh.remove(index); - } - // we already processed those types... - if (sTypes == null || sTypes.isEmpty()) { - return currentState; - } + static class UpdateTask { + final String index; + final String type; + final CompressedString mappingSource; + final Listener listener; - // first, check if it really needs to be updated - final IndexMetaData indexMetaData = currentState.metaData().index(index); + UpdateTask(String index, String type, CompressedString mappingSource, Listener listener) { + this.index = index; + this.type = type; + this.mappingSource = mappingSource; + this.listener = listener; + } + } + + /** + * Batch method to apply all the queued refresh or update operations. The idea is to try and batch as much + * as possible so we won't create the same index all the time for example for the updates on the same mapping + * and generate a single cluster change event out of all of those. + */ + ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception { + List tasks = new ArrayList(); + refreshOrUpdateQueue.drainTo(tasks); + + if (tasks.isEmpty()) { + return currentState; + } + + Set indicesToRemove = Sets.newHashSet(); + try { + boolean dirty = false; + MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData()); + for (Object task : tasks) { + if (task instanceof RefreshTask) { + RefreshTask refreshTask = (RefreshTask) task; + String index = refreshTask.index; + final IndexMetaData indexMetaData = mdBuilder.get(index); if (indexMetaData == null) { // index got delete on us, ignore... - return currentState; + continue; } - IndexService indexService = indicesService.indexService(index); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); - createdIndex = true; - for (String type : sTypes) { + indicesToRemove.add(index); + for (String type : refreshTask.types) { // only add the current relevant mapping (if exists) if (indexMetaData.mappings().containsKey(type)) { // don't apply the default mapping, it has been applied when the mapping was created @@ -131,7 +140,7 @@ public class MetaDataMappingService extends AbstractComponent { } IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData); List updatedTypes = Lists.newArrayList(); - for (String type : sTypes) { + for (String type : refreshTask.types) { DocumentMapper mapper = indexService.mapperService().documentMapper(type); if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) { updatedTypes.add(type); @@ -140,49 +149,34 @@ public class MetaDataMappingService extends AbstractComponent { } if (updatedTypes.isEmpty()) { - return currentState; + continue; } logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - builder.put(indexMetaDataBuilder); - return newClusterStateBuilder().state(currentState).metaData(builder).build(); - } finally { - if (createdIndex) { - indicesService.removeIndex(index, "created for mapping processing"); - } - } - } - }); - } + mdBuilder.put(indexMetaDataBuilder); + dirty = true; - public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) { + } else if (task instanceof UpdateTask) { + UpdateTask updateTask = (UpdateTask) task; + String index = updateTask.index; + String type = updateTask.type; + CompressedString mappingSource = updateTask.mappingSource; - clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); - } - - @Override - public ClusterState execute(final ClusterState currentState) throws Exception { - boolean createdIndex = false; - try { // first, check if it really needs to be updated - final IndexMetaData indexMetaData = currentState.metaData().index(index); + final IndexMetaData indexMetaData = mdBuilder.get(index); if (indexMetaData == null) { // index got delete on us, ignore... - return currentState; + continue; } if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) { - return currentState; + continue; } IndexService indexService = indicesService.indexService(index); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); - createdIndex = true; + indicesToRemove.add(index); // only add the current relevant mapping (if exists) if (indexMetaData.mappings().containsKey(type)) { indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source().string(), false); @@ -193,29 +187,72 @@ public class MetaDataMappingService extends AbstractComponent { // if we end up with the same mapping as the original once, ignore if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) { - return currentState; + continue; } // build the updated mapping source if (logger.isDebugEnabled()) { try { logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource().string()); - } catch (IOException e) { + } catch (Exception e) { // ignore } } else if (logger.isInfoEnabled()) { logger.info("[{}] update_mapping [{}] (dynamic)", index, type); } - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(updatedMapper))); - return newClusterStateBuilder().state(currentState).metaData(builder).build(); - } finally { - if (createdIndex) { - indicesService.removeIndex(index, "created for mapping processing"); - } + mdBuilder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(updatedMapper))); + dirty = true; + } else { + logger.warn("illegal state, got wrong mapping task type [{}]", task); } } + if (!dirty) { + return currentState; + } + return newClusterStateBuilder().state(currentState).metaData(mdBuilder).build(); + } finally { + for (String index : indicesToRemove) { + indicesService.removeIndex(index, "created for mapping processing"); + } + for (Object task : tasks) { + if (task instanceof UpdateTask) { + ((UpdateTask) task).listener.onResponse(new Response(true)); + } + } + } + } + + /** + * Refreshes mappings if they are not the same between original and parsed version + */ + public void refreshMapping(final String index, final String... types) { + refreshOrUpdateQueue.add(new RefreshTask(index, types)); + clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() { + @Override + public void onFailure(String source, Throwable t) { + logger.warn("failure during [{}]", t, source); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return executeRefreshOrUpdate(currentState); + } + }); + } + + public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) { + refreshOrUpdateQueue.add(new UpdateTask(index, type, mappingSource, listener)); + clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() { + @Override + public void onFailure(String source, Throwable t) { + listener.onFailure(t); + } + + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + return executeRefreshOrUpdate(currentState); + } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { @@ -225,7 +262,7 @@ public class MetaDataMappingService extends AbstractComponent { } public void removeMapping(final RemoveRequest request, final Listener listener) { - clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() { @Override public TimeValue timeout() { return request.masterTimeout; @@ -275,7 +312,7 @@ public class MetaDataMappingService extends AbstractComponent { public void putMapping(final PutRequest request, final Listener listener) { final AtomicBoolean notifyOnPostProcess = new AtomicBoolean(); - clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() { @Override public TimeValue timeout() { return request.masterTimeout;