From a44d30bb61e0a32ee76b2d2f48ab2411c0e5c5ba Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 4 Aug 2010 09:02:10 +0300 Subject: [PATCH] After gateway recovery, mappings keep being applied on each cluster change, closes #295. --- .../metadata/MetaDataCreateIndexService.java | 72 +++++++++++++------ 1 file changed, 52 insertions(+), 20 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index e815157cfa7..ed176e58c22 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -39,7 +39,12 @@ import org.elasticsearch.common.timer.TimerTask; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.service.IndexService; import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.timer.TimerService; @@ -66,16 +71,19 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final ClusterService clusterService; + private final IndicesService indicesService; + private final ShardsRoutingStrategy shardsRoutingStrategy; private final NodeIndexCreatedAction nodeIndexCreatedAction; - @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, TimerService timerService, ClusterService clusterService, ShardsRoutingStrategy shardsRoutingStrategy, - NodeIndexCreatedAction nodeIndexCreatedAction) { + @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, TimerService timerService, ClusterService clusterService, IndicesService indicesService, + ShardsRoutingStrategy shardsRoutingStrategy, NodeIndexCreatedAction nodeIndexCreatedAction) { super(settings); this.environment = environment; this.timerService = timerService; this.clusterService = clusterService; + this.indicesService = indicesService; this.shardsRoutingStrategy = shardsRoutingStrategy; this.nodeIndexCreatedAction = nodeIndexCreatedAction; } @@ -135,7 +143,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { addMappings(mappings, indexMappingsDir); } } - // TODO add basic mapping validation // put this last so index level mappings can override default mappings mappings.putAll(request.mappings); @@ -149,10 +156,30 @@ public class MetaDataCreateIndexService extends AbstractComponent { } Settings actualIndexSettings = indexSettingsBuilder.build(); + // create the index here (on the master) to validate it can be created, as well as adding the mapping + indicesService.createIndex(request.index, actualIndexSettings, clusterService.state().nodes().localNode().id()); + // now add the mappings + IndexService indexService = indicesService.indexServiceSafe(request.index); + MapperService mapperService = indexService.mapperService(); + for (Map.Entry entry : mappings.entrySet()) { + try { + mapperService.add(entry.getKey(), entry.getValue()); + } catch (Exception e) { + indicesService.deleteIndex(request.index); + throw new MapperParsingException("mapping [" + entry.getKey() + "]", e); + } + } + // now, update the mappings with the actual source + mappings.clear(); + for (DocumentMapper mapper : mapperService) { + mappings.put(mapper.type(), mapper.mappingSource()); + } + IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(request.index).settings(actualIndexSettings); for (Map.Entry entry : mappings.entrySet()) { indexMetaData.putMapping(entry.getKey(), entry.getValue()); } + MetaData newMetaData = newMetaDataBuilder() .metaData(currentState.metaData()) .put(indexMetaData) @@ -160,27 +187,32 @@ public class MetaDataCreateIndexService extends AbstractComponent { logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); - final AtomicInteger counter = new AtomicInteger(currentState.nodes().size()); + final AtomicInteger counter = new AtomicInteger(currentState.nodes().size() - 1); // -1 since we added it on the master already + if (counter.get() == 0) { + // no nodes to add to + listener.onResponse(new Response(true)); + } else { - final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() { - @Override public void onNodeIndexCreated(String index, String nodeId) { - if (index.equals(request.index)) { - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(true)); - nodeIndexCreatedAction.remove(this); + final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() { + @Override public void onNodeIndexCreated(String index, String nodeId) { + if (index.equals(request.index)) { + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(true)); + nodeIndexCreatedAction.remove(this); + } } } - } - }; - nodeIndexCreatedAction.add(nodeIndexCreateListener); + }; + nodeIndexCreatedAction.add(nodeIndexCreateListener); - Timeout timeoutTask = timerService.newTimeout(new TimerTask() { - @Override public void run(Timeout timeout) throws Exception { - listener.onResponse(new Response(false)); - nodeIndexCreatedAction.remove(nodeIndexCreateListener); - } - }, request.timeout, TimerService.ExecutionType.THREADED); - listener.timeout = timeoutTask; + Timeout timeoutTask = timerService.newTimeout(new TimerTask() { + @Override public void run(Timeout timeout) throws Exception { + listener.onResponse(new Response(false)); + nodeIndexCreatedAction.remove(nodeIndexCreateListener); + } + }, request.timeout, TimerService.ExecutionType.THREADED); + listener.timeout = timeoutTask; + } return newClusterStateBuilder().state(currentState).metaData(newMetaData).build(); } catch (Exception e) {