After gateway recovery, mappings keep being applied on each cluster change, closes #295.

This commit is contained in:
kimchy 2010-08-04 09:02:10 +03:00
parent 959eb0e703
commit a44d30bb61
1 changed files with 52 additions and 20 deletions

View File

@ -39,7 +39,12 @@ import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.timer.TimerService;
@ -66,16 +71,19 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final ClusterService clusterService;
private final IndicesService indicesService;
private final ShardsRoutingStrategy shardsRoutingStrategy;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, TimerService timerService, ClusterService clusterService, ShardsRoutingStrategy shardsRoutingStrategy,
NodeIndexCreatedAction nodeIndexCreatedAction) {
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, TimerService timerService, ClusterService clusterService, IndicesService indicesService,
ShardsRoutingStrategy shardsRoutingStrategy, NodeIndexCreatedAction nodeIndexCreatedAction) {
super(settings);
this.environment = environment;
this.timerService = timerService;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardsRoutingStrategy = shardsRoutingStrategy;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
}
@ -135,7 +143,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
addMappings(mappings, indexMappingsDir);
}
}
// TODO add basic mapping validation
// put this last so index level mappings can override default mappings
mappings.putAll(request.mappings);
@ -149,10 +156,30 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
Settings actualIndexSettings = indexSettingsBuilder.build();
// create the index here (on the master) to validate it can be created, as well as adding the mapping
indicesService.createIndex(request.index, actualIndexSettings, clusterService.state().nodes().localNode().id());
// now add the mappings
IndexService indexService = indicesService.indexServiceSafe(request.index);
MapperService mapperService = indexService.mapperService();
for (Map.Entry<String, String> entry : mappings.entrySet()) {
try {
mapperService.add(entry.getKey(), entry.getValue());
} catch (Exception e) {
indicesService.deleteIndex(request.index);
throw new MapperParsingException("mapping [" + entry.getKey() + "]", e);
}
}
// now, update the mappings with the actual source
mappings.clear();
for (DocumentMapper mapper : mapperService) {
mappings.put(mapper.type(), mapper.mappingSource());
}
IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(request.index).settings(actualIndexSettings);
for (Map.Entry<String, String> entry : mappings.entrySet()) {
indexMetaData.putMapping(entry.getKey(), entry.getValue());
}
MetaData newMetaData = newMetaDataBuilder()
.metaData(currentState.metaData())
.put(indexMetaData)
@ -160,27 +187,32 @@ public class MetaDataCreateIndexService extends AbstractComponent {
logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size() - 1); // -1 since we added it on the master already
if (counter.get() == 0) {
// no nodes to add to
listener.onResponse(new Response(true));
} else {
final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexCreatedAction.remove(this);
final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexCreatedAction.remove(this);
}
}
}
}
};
nodeIndexCreatedAction.add(nodeIndexCreateListener);
};
nodeIndexCreatedAction.add(nodeIndexCreateListener);
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
listener.onResponse(new Response(false));
nodeIndexCreatedAction.remove(nodeIndexCreateListener);
}
}, request.timeout, TimerService.ExecutionType.THREADED);
listener.timeout = timeoutTask;
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
listener.onResponse(new Response(false));
nodeIndexCreatedAction.remove(nodeIndexCreateListener);
}
}, request.timeout, TimerService.ExecutionType.THREADED);
listener.timeout = timeoutTask;
}
return newClusterStateBuilder().state(currentState).metaData(newMetaData).build();
} catch (Exception e) {