diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/ElasticSearchException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/ElasticSearchException.java index b1354621d2b..64911b726d6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/ElasticSearchException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/ElasticSearchException.java @@ -63,13 +63,15 @@ public class ElasticSearchException extends RuntimeException { public String getDetailedMessage() { if (getCause() != null) { StringBuilder sb = new StringBuilder(); - if (super.getMessage() != null) { - sb.append(super.getMessage()).append("; "); + sb.append(toString()).append("; "); + if (getCause() instanceof ElasticSearchException) { + sb.append(((ElasticSearchException) getCause()).getDetailedMessage()); + } else { + sb.append(getCause()); } - sb.append("nested exception is ").append(getCause()); return sb.toString(); } else { - return super.getMessage(); + return super.toString(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java index 6e007f2887b..816e018b4e5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java @@ -87,6 +87,8 @@ public class MetaDataService extends AbstractComponent { private final NodeMappingCreatedAction nodeMappingCreatedAction; + private final Object mutex = new Object(); + @Inject public MetaDataService(Settings settings, Environment environment, ClusterService clusterService, IndicesService indicesService, ShardsRoutingStrategy shardsRoutingStrategy, NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction, NodeMappingCreatedAction nodeMappingCreatedAction) { @@ -102,94 +104,46 @@ public class MetaDataService extends AbstractComponent { // TODO should find nicer solution than sync here, since we block for timeout (same for other ops) - public synchronized IndicesAliasesResult indicesAliases(final List aliasActions) { - ClusterState clusterState = clusterService.state(); + public IndicesAliasesResult indicesAliases(final List aliasActions) { + synchronized (mutex) { + ClusterState clusterState = clusterService.state(); - for (AliasAction aliasAction : aliasActions) { - if (!clusterState.metaData().hasIndex(aliasAction.index())) { - throw new IndexMissingException(new Index(aliasAction.index())); - } - } - - clusterService.submitStateUpdateTask("index-aliases", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - for (AliasAction aliasAction : aliasActions) { - IndexMetaData indexMetaData = builder.get(aliasAction.index()); - if (indexMetaData == null) { - throw new IndexMissingException(new Index(aliasAction.index())); - } - Set indexAliases = newHashSet(indexMetaData.settings().getAsArray("index.aliases")); - if (aliasAction.actionType() == AliasAction.Type.ADD) { - indexAliases.add(aliasAction.alias()); - } else if (aliasAction.actionType() == AliasAction.Type.REMOVE) { - indexAliases.remove(aliasAction.alias()); - } - - Settings settings = settingsBuilder().put(indexMetaData.settings()) - .putArray("index.aliases", indexAliases.toArray(new String[indexAliases.size()])) - .build(); - - builder.put(newIndexMetaDataBuilder(indexMetaData).settings(settings)); + for (AliasAction aliasAction : aliasActions) { + if (!clusterState.metaData().hasIndex(aliasAction.index())) { + throw new IndexMissingException(new Index(aliasAction.index())); } - return newClusterStateBuilder().state(currentState).metaData(builder).build(); } - }); - return new IndicesAliasesResult(); + clusterService.submitStateUpdateTask("index-aliases", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); + for (AliasAction aliasAction : aliasActions) { + IndexMetaData indexMetaData = builder.get(aliasAction.index()); + if (indexMetaData == null) { + throw new IndexMissingException(new Index(aliasAction.index())); + } + Set indexAliases = newHashSet(indexMetaData.settings().getAsArray("index.aliases")); + if (aliasAction.actionType() == AliasAction.Type.ADD) { + indexAliases.add(aliasAction.alias()); + } else if (aliasAction.actionType() == AliasAction.Type.REMOVE) { + indexAliases.remove(aliasAction.alias()); + } + + Settings settings = settingsBuilder().put(indexMetaData.settings()) + .putArray("index.aliases", indexAliases.toArray(new String[indexAliases.size()])) + .build(); + + builder.put(newIndexMetaDataBuilder(indexMetaData).settings(settings)); + } + return newClusterStateBuilder().state(currentState).metaData(builder).build(); + } + }); + + return new IndicesAliasesResult(); + } } - public synchronized CreateIndexResult createIndex(final String cause, final String index, final Settings indexSettings, Map mappings, TimeValue timeout) throws IndexAlreadyExistsException { - ClusterState clusterState = clusterService.state(); - - if (clusterState.routingTable().hasIndex(index)) { - throw new IndexAlreadyExistsException(new Index(index)); - } - if (clusterState.metaData().hasIndex(index)) { - throw new IndexAlreadyExistsException(new Index(index)); - } - if (index.contains(" ")) { - throw new InvalidIndexNameException(new Index(index), index, "must not contain whitespace"); - } - if (index.contains(",")) { - throw new InvalidIndexNameException(new Index(index), index, "must not contain ',"); - } - if (index.contains("#")) { - throw new InvalidIndexNameException(new Index(index), index, "must not contain '#"); - } - if (index.charAt(0) == '_') { - throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'"); - } - if (!index.toLowerCase().equals(index)) { - throw new InvalidIndexNameException(new Index(index), index, "must be lowercase"); - } - if (!Strings.validFileName(index)) { - throw new InvalidIndexNameException(new Index(index), index, "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS); - } - if (clusterState.metaData().aliases().contains(index)) { - throw new InvalidIndexNameException(new Index(index), index, "an alias with the same name already exists"); - } - - // add to the mappings files that exists within the config/mappings location - if (mappings == null) { - mappings = Maps.newHashMap(); - } else { - mappings = Maps.newHashMap(mappings); - } - File mappingsDir = new File(environment.configFile(), "mappings"); - if (mappingsDir.exists() && mappingsDir.isDirectory()) { - File defaultMappingsDir = new File(mappingsDir, "_default"); - if (mappingsDir.exists() && mappingsDir.isDirectory()) { - addMappings(mappings, defaultMappingsDir); - } - File indexMappingsDir = new File(mappingsDir, index); - if (mappingsDir.exists() && mappingsDir.isDirectory()) { - addMappings(mappings, indexMappingsDir); - } - } - - final Map fMappings = mappings; - + public CreateIndexResult createIndex(final String cause, final String index, final Settings indexSettings, Map mappings, TimeValue timeout) throws IndexAlreadyExistsException { final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size()); NodeIndexCreatedAction.Listener nodeCreatedListener = new NodeIndexCreatedAction.Listener() { @Override public void onNodeIndexCreated(String mIndex, String nodeId) { @@ -198,31 +152,83 @@ public class MetaDataService extends AbstractComponent { } } }; - nodeIndexCreatedAction.add(nodeCreatedListener); - clusterService.submitStateUpdateTask("create-index [" + index + "], cause [" + cause + "]", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder().put(indexSettings); - if (indexSettings.get(SETTING_NUMBER_OF_SHARDS) == null) { - indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); - } - if (indexSettings.get(SETTING_NUMBER_OF_REPLICAS) == null) { - indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); - } - Settings actualIndexSettings = indexSettingsBuilder.build(); + synchronized (mutex) { + ClusterState clusterState = clusterService.state(); - IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(index).settings(actualIndexSettings); - for (Map.Entry entry : fMappings.entrySet()) { - indexMetaData.putMapping(entry.getKey(), entry.getValue()); - } - MetaData newMetaData = newMetaDataBuilder() - .metaData(currentState.metaData()) - .put(indexMetaData) - .build(); - - logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", index, cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), fMappings.keySet()); - return newClusterStateBuilder().state(currentState).metaData(newMetaData).build(); + if (clusterState.routingTable().hasIndex(index)) { + throw new IndexAlreadyExistsException(new Index(index)); } - }); + if (clusterState.metaData().hasIndex(index)) { + throw new IndexAlreadyExistsException(new Index(index)); + } + if (index.contains(" ")) { + throw new InvalidIndexNameException(new Index(index), index, "must not contain whitespace"); + } + if (index.contains(",")) { + throw new InvalidIndexNameException(new Index(index), index, "must not contain ',"); + } + if (index.contains("#")) { + throw new InvalidIndexNameException(new Index(index), index, "must not contain '#"); + } + if (index.charAt(0) == '_') { + throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'"); + } + if (!index.toLowerCase().equals(index)) { + throw new InvalidIndexNameException(new Index(index), index, "must be lowercase"); + } + if (!Strings.validFileName(index)) { + throw new InvalidIndexNameException(new Index(index), index, "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS); + } + if (clusterState.metaData().aliases().contains(index)) { + throw new InvalidIndexNameException(new Index(index), index, "an alias with the same name already exists"); + } + + // add to the mappings files that exists within the config/mappings location + if (mappings == null) { + mappings = Maps.newHashMap(); + } else { + mappings = Maps.newHashMap(mappings); + } + File mappingsDir = new File(environment.configFile(), "mappings"); + if (mappingsDir.exists() && mappingsDir.isDirectory()) { + File defaultMappingsDir = new File(mappingsDir, "_default"); + if (mappingsDir.exists() && mappingsDir.isDirectory()) { + addMappings(mappings, defaultMappingsDir); + } + File indexMappingsDir = new File(mappingsDir, index); + if (mappingsDir.exists() && mappingsDir.isDirectory()) { + addMappings(mappings, indexMappingsDir); + } + } + + final Map fMappings = mappings; + + nodeIndexCreatedAction.add(nodeCreatedListener); + clusterService.submitStateUpdateTask("create-index [" + index + "], cause [" + cause + "]", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder().put(indexSettings); + if (indexSettings.get(SETTING_NUMBER_OF_SHARDS) == null) { + indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); + } + if (indexSettings.get(SETTING_NUMBER_OF_REPLICAS) == null) { + indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); + } + Settings actualIndexSettings = indexSettingsBuilder.build(); + + IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(index).settings(actualIndexSettings); + for (Map.Entry entry : fMappings.entrySet()) { + indexMetaData.putMapping(entry.getKey(), entry.getValue()); + } + MetaData newMetaData = newMetaDataBuilder() + .metaData(currentState.metaData()) + .put(indexMetaData) + .build(); + + logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", index, cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), fMappings.keySet()); + return newClusterStateBuilder().state(currentState).metaData(newMetaData).build(); + } + }); + } boolean acknowledged; try { @@ -279,16 +285,8 @@ public class MetaDataService extends AbstractComponent { } } - public synchronized DeleteIndexResult deleteIndex(final String index, TimeValue timeout) throws IndexMissingException { + public DeleteIndexResult deleteIndex(final String index, TimeValue timeout) throws IndexMissingException { ClusterState clusterState = clusterService.state(); - - RoutingTable routingTable = clusterState.routingTable(); - if (!routingTable.hasIndex(index)) { - throw new IndexMissingException(new Index(index)); - } - - logger.info("[{}] deleting index", index); - final CountDownLatch latch = new CountDownLatch(clusterState.nodes().size()); NodeIndexDeletedAction.Listener listener = new NodeIndexDeletedAction.Listener() { @Override public void onNodeIndexDeleted(String fIndex, String nodeId) { @@ -298,24 +296,35 @@ public class MetaDataService extends AbstractComponent { } }; nodeIndexDeletedAction.add(listener); - clusterService.submitStateUpdateTask("delete-index [" + index + "]", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder(); - for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) { - if (!indexRoutingTable.index().equals(index)) { - routingTableBuilder.add(indexRoutingTable); - } - } - MetaData newMetaData = newMetaDataBuilder() - .metaData(currentState.metaData()) - .remove(index) - .build(); + synchronized (mutex) { - RoutingTable newRoutingTable = shardsRoutingStrategy.reroute( - newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build()); - return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build(); + RoutingTable routingTable = clusterState.routingTable(); + if (!routingTable.hasIndex(index)) { + throw new IndexMissingException(new Index(index)); } - }); + + logger.info("[{}] deleting index", index); + + clusterService.submitStateUpdateTask("delete-index [" + index + "]", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder(); + for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) { + if (!indexRoutingTable.index().equals(index)) { + routingTableBuilder.add(indexRoutingTable); + } + } + MetaData newMetaData = newMetaDataBuilder() + .metaData(currentState.metaData()) + .remove(index) + .build(); + + RoutingTable newRoutingTable = shardsRoutingStrategy.reroute( + newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build()); + return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build(); + } + }); + } + boolean acknowledged; try { acknowledged = latch.await(timeout.millis(), TimeUnit.MILLISECONDS); @@ -327,102 +336,40 @@ public class MetaDataService extends AbstractComponent { return new DeleteIndexResult(acknowledged); } - public synchronized void updateMapping(final String index, final String type, final String mappingSource) { - MapperService mapperService = indicesService.indexServiceSafe(index).mapperService(); + public void updateMapping(final String index, final String type, final String mappingSource) { + synchronized (mutex) { + MapperService mapperService = indicesService.indexServiceSafe(index).mapperService(); - DocumentMapper existingMapper = mapperService.documentMapper(type); - // parse the updated one - DocumentMapper updatedMapper = mapperService.parse(type, mappingSource); - if (existingMapper == null) { - existingMapper = updatedMapper; - } else { - // merge from the updated into the existing, ignore conflicts (we know we have them, we just want the new ones) - existingMapper.merge(updatedMapper, mergeFlags().simulate(false)); - } - // build the updated mapping source - final String updatedMappingSource = existingMapper.buildSource(); - if (logger.isDebugEnabled()) { - logger.debug("[{}] update mapping [{}] (dynamic) with source [{}]", index, type, updatedMappingSource); - } else if (logger.isInfoEnabled()) { - logger.info("[{}] update mapping [{}] (dynamic)", index, type); - } - // publish the new mapping - clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - IndexMetaData indexMetaData = currentState.metaData().index(index); - builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, updatedMappingSource)); - return newClusterStateBuilder().state(currentState).metaData(builder).build(); + DocumentMapper existingMapper = mapperService.documentMapper(type); + // parse the updated one + DocumentMapper updatedMapper = mapperService.parse(type, mappingSource); + if (existingMapper == null) { + existingMapper = updatedMapper; + } else { + // merge from the updated into the existing, ignore conflicts (we know we have them, we just want the new ones) + existingMapper.merge(updatedMapper, mergeFlags().simulate(false)); } - }); + // build the updated mapping source + final String updatedMappingSource = existingMapper.buildSource(); + if (logger.isDebugEnabled()) { + logger.debug("[{}] update mapping [{}] (dynamic) with source [{}]", index, type, updatedMappingSource); + } else if (logger.isInfoEnabled()) { + logger.info("[{}] update mapping [{}] (dynamic)", index, type); + } + // publish the new mapping + clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); + IndexMetaData indexMetaData = currentState.metaData().index(index); + builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, updatedMappingSource)); + return newClusterStateBuilder().state(currentState).metaData(builder).build(); + } + }); + } } - public synchronized PutMappingResult putMapping(final String[] indices, String mappingType, final String mappingSource, boolean ignoreConflicts, TimeValue timeout) throws ElasticSearchException { + public PutMappingResult putMapping(final String[] indices, String mappingType, final String mappingSource, boolean ignoreConflicts, TimeValue timeout) throws ElasticSearchException { ClusterState clusterState = clusterService.state(); - if (indices.length == 0) { - throw new IndexMissingException(new Index("_all")); - } - for (String index : indices) { - if (!clusterState.metaData().hasIndex(index)) { - throw new IndexMissingException(new Index(index)); - } - } - - Map newMappers = newHashMap(); - Map existingMappers = newHashMap(); - for (String index : indices) { - IndexService indexService = indicesService.indexService(index); - if (indexService != null) { - // try and parse it (no need to add it here) so we can bail early in case of parsing exception - DocumentMapper newMapper = indexService.mapperService().parse(mappingType, mappingSource); - newMappers.put(index, newMapper); - DocumentMapper existingMapper = indexService.mapperService().documentMapper(mappingType); - if (existingMapper != null) { - // first, simulate - DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true)); - // if we have conflicts, and we are not supposed to ignore them, throw an exception - if (!ignoreConflicts && mergeResult.hasConflicts()) { - throw new MergeMappingException(mergeResult.conflicts()); - } - existingMappers.put(index, existingMapper); - } - } else { - throw new IndexMissingException(new Index(index)); - } - } - - if (mappingType == null) { - mappingType = newMappers.values().iterator().next().type(); - } else if (!mappingType.equals(newMappers.values().iterator().next().type())) { - throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition"); - } - if (mappingType.charAt(0) == '_') { - throw new InvalidTypeNameException("Document mapping type name can't start with '_'"); - } - - final Map> mappings = newHashMap(); - for (Map.Entry entry : newMappers.entrySet()) { - Tuple mapping; - String index = entry.getKey(); - // do the actual merge here on the master, and update the mapping source - DocumentMapper newMapper = entry.getValue(); - if (existingMappers.containsKey(entry.getKey())) { - // we have an existing mapping, do the merge here (on the master), it will automatically update the mapping source - DocumentMapper existingMapper = existingMappers.get(entry.getKey()); - existingMapper.merge(newMapper, mergeFlags().simulate(false)); - // use the merged mapping source - mapping = new Tuple(existingMapper.type(), existingMapper.buildSource()); - } else { - mapping = new Tuple(newMapper.type(), newMapper.buildSource()); - } - mappings.put(index, mapping); - if (logger.isDebugEnabled()) { - logger.debug("[{}] put_mapping [{}] with source [{}]", index, mapping.v1(), mapping.v2()); - } else if (logger.isInfoEnabled()) { - logger.info("[{}] put_mapping [{}]", index, mapping.v1()); - } - } - final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length); final Set indicesSet = newHashSet(indices); final String fMappingType = mappingType; @@ -433,22 +380,88 @@ public class MetaDataService extends AbstractComponent { } } }; - nodeMappingCreatedAction.add(listener); - - clusterService.submitStateUpdateTask("put-mapping [" + mappingType + "]", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); - for (String indexName : indices) { - IndexMetaData indexMetaData = currentState.metaData().index(indexName); - if (indexMetaData == null) { - throw new IndexMissingException(new Index(indexName)); - } - Tuple mapping = mappings.get(indexName); - builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2())); - } - return newClusterStateBuilder().state(currentState).metaData(builder).build(); + synchronized (mutex) { + if (indices.length == 0) { + throw new IndexMissingException(new Index("_all")); } - }); + for (String index : indices) { + if (!clusterState.metaData().hasIndex(index)) { + throw new IndexMissingException(new Index(index)); + } + } + + Map newMappers = newHashMap(); + Map existingMappers = newHashMap(); + for (String index : indices) { + IndexService indexService = indicesService.indexService(index); + if (indexService != null) { + // try and parse it (no need to add it here) so we can bail early in case of parsing exception + DocumentMapper newMapper = indexService.mapperService().parse(mappingType, mappingSource); + newMappers.put(index, newMapper); + DocumentMapper existingMapper = indexService.mapperService().documentMapper(mappingType); + if (existingMapper != null) { + // first, simulate + DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true)); + // if we have conflicts, and we are not supposed to ignore them, throw an exception + if (!ignoreConflicts && mergeResult.hasConflicts()) { + throw new MergeMappingException(mergeResult.conflicts()); + } + existingMappers.put(index, existingMapper); + } + } else { + throw new IndexMissingException(new Index(index)); + } + } + + if (mappingType == null) { + mappingType = newMappers.values().iterator().next().type(); + } else if (!mappingType.equals(newMappers.values().iterator().next().type())) { + throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition"); + } + if (mappingType.charAt(0) == '_') { + throw new InvalidTypeNameException("Document mapping type name can't start with '_'"); + } + + final Map> mappings = newHashMap(); + for (Map.Entry entry : newMappers.entrySet()) { + Tuple mapping; + String index = entry.getKey(); + // do the actual merge here on the master, and update the mapping source + DocumentMapper newMapper = entry.getValue(); + if (existingMappers.containsKey(entry.getKey())) { + // we have an existing mapping, do the merge here (on the master), it will automatically update the mapping source + DocumentMapper existingMapper = existingMappers.get(entry.getKey()); + existingMapper.merge(newMapper, mergeFlags().simulate(false)); + // use the merged mapping source + mapping = new Tuple(existingMapper.type(), existingMapper.buildSource()); + } else { + mapping = new Tuple(newMapper.type(), newMapper.buildSource()); + } + mappings.put(index, mapping); + if (logger.isDebugEnabled()) { + logger.debug("[{}] put_mapping [{}] with source [{}]", index, mapping.v1(), mapping.v2()); + } else if (logger.isInfoEnabled()) { + logger.info("[{}] put_mapping [{}]", index, mapping.v1()); + } + } + + nodeMappingCreatedAction.add(listener); + + clusterService.submitStateUpdateTask("put-mapping [" + mappingType + "]", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); + for (String indexName : indices) { + IndexMetaData indexMetaData = currentState.metaData().index(indexName); + if (indexMetaData == null) { + throw new IndexMissingException(new Index(indexName)); + } + Tuple mapping = mappings.get(indexName); + builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2())); + } + return newClusterStateBuilder().state(currentState).metaData(builder).build(); + } + }); + } boolean acknowledged; try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java index f70e5331c2b..a064f5efa03 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/PreferUnallocatedShardUnassignedStrategy.java @@ -20,14 +20,17 @@ package org.elasticsearch.cluster.routing.strategy; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway; import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.store.IndexStore; @@ -36,6 +39,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import java.util.Iterator; +import java.util.Set; /** * @author kimchy (shay.banon) @@ -53,9 +57,20 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData; } - public boolean allocateUnassigned(RoutingNodes routingNodes) { + public boolean allocateUnassigned(RoutingNodes routingNodes, DiscoveryNodes nodes) { boolean changed = false; + Set nodesIds = Sets.newHashSet(); + for (DiscoveryNode node : nodes) { + if (node.dataNode()) { + nodesIds.add(node.id()); + } + } + + if (nodesIds.isEmpty()) { + return changed; + } + Iterator unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { MutableShardRouting shard = unassignedIterator.next(); @@ -68,7 +83,25 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent continue; } - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false).actionGet(); + if (!shard.primary()) { + // if its a backup, only allocate it if the primary is active + MutableShardRouting primary = routingNodes.findPrimaryForBackup(shard); + if (primary == null || !primary.active()) { + continue; + } + } + + TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodesIds.toArray(new String[nodesIds.size()])).actionGet(); + + if (logger.isWarnEnabled()) { + if (nodesStoreFilesMetaData.failures().length > 0) { + StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:\n"); + for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { + sb.append(i).append(". ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()).append("\n"); + } + logger.warn(sb.toString()); + } + } long lastSizeMatched = 0; DiscoveryNode lastDiscoNodeMatched = null; @@ -76,6 +109,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) { DiscoveryNode discoNode = nodeStoreFilesMetaData.node(); + logger.trace("{}: checking node [{}]", shard, discoNode); IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreFilesMetaData.storeFilesMetaData(); if (storeFilesMetaData == null) { @@ -104,16 +138,39 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent try { ImmutableMap indexBlobsMetaData = indexGateway.listIndexBlobs(shard.id()); + if (logger.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n"); + sb.append(" gateway_files:\n"); + for (BlobMetaData md : indexBlobsMetaData.values()) { + sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.sizeInBytes())).append("], md5 [").append(md.md5()).append("]\n"); + } + sb.append(" node_files:\n"); + for (StoreFileMetaData md : storeFilesMetaData) { + sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.sizeInBytes())).append("], md5 [").append(md.md5()).append("]\n"); + } + logger.debug(sb.toString()); + } + logger.trace("{}: checking for pre_allocation (gateway) on node [{}]\n gateway files", shard, discoNode, indexBlobsMetaData.keySet()); long sizeMatched = 0; for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { - if (indexBlobsMetaData.containsKey(storeFileMetaData.name()) && indexBlobsMetaData.get(storeFileMetaData.name()).md5().equals(storeFileMetaData.md5())) { - sizeMatched += storeFileMetaData.sizeInBytes(); + if (indexBlobsMetaData.containsKey(storeFileMetaData.name())) { + if (indexBlobsMetaData.get(storeFileMetaData.name()).md5().equals(storeFileMetaData.md5())) { + logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway (same md5) with size [{}]", shard, storeFileMetaData.name(), new ByteSizeValue(storeFileMetaData.sizeInBytes())); + sizeMatched += storeFileMetaData.sizeInBytes(); + } else { + logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but has different md5, remote node [{}], gateway [{}]", shard, storeFileMetaData.name(), storeFileMetaData.md5(), indexBlobsMetaData.get(storeFileMetaData.name()).md5()); + } + } else { + logger.trace("{}: [{}] exists on remote node, does not exists on gateway", shard, storeFileMetaData.name()); } } if (sizeMatched > lastSizeMatched) { lastSizeMatched = sizeMatched; lastDiscoNodeMatched = discoNode; lastNodeMatched = node; + logger.trace("{}: node elected for pre_allocation [{}], total_size_matched [{}]", shard, discoNode, new ByteSizeValue(sizeMatched)); + } else { + logger.trace("{}: node ignored for pre_allocation [{}], total_size_matched [{}] smaller than last_size_matched [{}]", shard, discoNode, new ByteSizeValue(sizeMatched), new ByteSizeValue(lastSizeMatched)); } continue; @@ -152,7 +209,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent if (lastNodeMatched != null) { if (logger.isDebugEnabled()) { - logger.debug("[{}][{}] allocating to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), lastDiscoNodeMatched); + logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); } // we found a match changed = true; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java index 292b267e4c7..5527d168fac 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/ShardsRoutingStrategy.java @@ -111,7 +111,7 @@ public class ShardsRoutingStrategy extends AbstractComponent { // now allocate all the unassigned to available nodes if (routingNodes.hasUnassigned()) { if (preferUnallocatedShardUnassignedStrategy != null) { - changed |= preferUnallocatedShardUnassignedStrategy.allocateUnassigned(routingNodes); + changed |= preferUnallocatedShardUnassignedStrategy.allocateUnassigned(routingNodes, nodes); } changed |= allocateUnassigned(routingNodes); // elect primaries again, in case this is needed with unassigned allocation diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index ee209a0d0cc..ebc68ae6ab5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -247,13 +247,17 @@ public class GatewayService extends AbstractLifecycleComponent i // go over the meta data and create indices, we don't really need to copy over // the meta data per index, since we create the index and it will be added automatically for (final IndexMetaData indexMetaData : fMetaData) { - try { - metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueMillis(10)); - } catch (Exception e) { - logger.error("failed to create index [" + indexMetaData.index() + "]", e); - } finally { - latch.countDown(); - } + threadPool.execute(new Runnable() { + @Override public void run() { + try { + metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueSeconds(30)); + } catch (Exception e) { + logger.error("failed to create index [" + indexMetaData.index() + "]", e); + } finally { + latch.countDown(); + } + } + }); } clusterService.submitStateUpdateTask("gateway (remove block)", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java index 1f4f6a26aa6..031e7ceb528 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java @@ -269,8 +269,8 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close } } else if (cause instanceof IgnoreRecoveryException) { throw (IgnoreRecoveryException) cause; - } else if (cause instanceof NodeNotConnectedException) { - throw new IgnoreRecoveryException("Ignore recovery attemot, remote node not connected", e); + } else if ((cause instanceof NodeNotConnectedException) || (cause instanceof NodeDisconnectedException)) { + throw new IgnoreRecoveryException("Ignore recovery attempt, remote node not connected", e); } throw new RecoveryFailedException(shardId, node, targetNode, e); } catch (Exception e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 879cd2f8143..a4f25717864 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -21,14 +21,12 @@ package org.elasticsearch.indices.store; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.*; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.collect.Lists; -import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -42,7 +40,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicReferenceArray; /** @@ -58,15 +55,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio this.indicesService = indicesService; } - public ActionFuture list(ShardId shardId, boolean onlyUnallocated) { - ClusterState state = clusterService.state(); - Set nodesIds = Sets.newHashSet(); - for (DiscoveryNode node : state.nodes()) { - if (node.dataNode()) { - nodesIds.add(node.id()); - } - } - return execute(new Request(shardId, onlyUnallocated, nodesIds.toArray(new String[nodesIds.size()]))); + public ActionFuture list(ShardId shardId, boolean onlyUnallocated, String[] nodesIds) { + return execute(new Request(shardId, onlyUnallocated, nodesIds)); } @Override protected String transportAction() { @@ -93,27 +83,19 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio return new NodeStoreFilesMetaData(); } - // only list stores on data node - - @Override protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) { - Set onlyDataNodeIds = Sets.newHashSet(); - for (String nodeId : nodesIds) { - if (nodes.nodeExists(nodeId) && nodes.get(nodeId).dataNode()) { - onlyDataNodeIds.add(nodeId); - } - } - return onlyDataNodeIds.toArray(new String[onlyDataNodeIds.size()]); - } - @Override protected NodesStoreFilesMetaData newResponse(Request request, AtomicReferenceArray responses) { final List nodeStoreFilesMetaDatas = Lists.newArrayList(); + final List failures = Lists.newArrayList(); for (int i = 0; i < responses.length(); i++) { Object resp = responses.get(i); if (resp instanceof NodeStoreFilesMetaData) { // will also filter out null response for unallocated ones nodeStoreFilesMetaDatas.add((NodeStoreFilesMetaData) resp); + } else if (resp instanceof FailedNodeException) { + failures.add((FailedNodeException) resp); } } - return new NodesStoreFilesMetaData(clusterName, nodeStoreFilesMetaDatas.toArray(new NodeStoreFilesMetaData[nodeStoreFilesMetaDatas.size()])); + return new NodesStoreFilesMetaData(clusterName, nodeStoreFilesMetaDatas.toArray(new NodeStoreFilesMetaData[nodeStoreFilesMetaDatas.size()]), + failures.toArray(new FailedNodeException[failures.size()])); } @Override protected NodeStoreFilesMetaData nodeOperation(NodeRequest request) throws ElasticSearchException { @@ -129,7 +111,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio } @Override protected boolean accumulateExceptions() { - return false; + return true; } static class Request extends NodesOperationRequest { @@ -162,11 +144,18 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio public static class NodesStoreFilesMetaData extends NodesOperationResponse { + private FailedNodeException[] failures; + NodesStoreFilesMetaData() { } - public NodesStoreFilesMetaData(ClusterName clusterName, NodeStoreFilesMetaData[] nodes) { + public NodesStoreFilesMetaData(ClusterName clusterName, NodeStoreFilesMetaData[] nodes, FailedNodeException[] failures) { super(clusterName, nodes); + this.failures = failures; + } + + public FailedNodeException[] failures() { + return failures; } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedTransportException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedException.java similarity index 86% rename from modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedTransportException.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedException.java index 62e0e656dd6..b33ee732d88 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedTransportException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedException.java @@ -24,9 +24,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; /** * @author kimchy (shay.banon) */ -public class NodeDisconnectedTransportException extends ConnectTransportException { +public class NodeDisconnectedException extends ConnectTransportException { - public NodeDisconnectedTransportException(DiscoveryNode node, String action) { + public NodeDisconnectedException(DiscoveryNode node, String action) { super(node, "disconnected", action, null); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index 0f74a1ed16f..a53606b725e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -286,7 +286,7 @@ public class TransportService extends AbstractLifecycleComponent