diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index c63bb4a7e94..e89259ad8ce 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -115,11 +115,7 @@ public class TransportClusterStatsAction extends TransportNodesOperationAction shardsStats = new ArrayList<>(); - for (String index : indicesService.indices()) { - IndexService indexService = indicesService.indexService(index); - if (indexService == null) { - continue; - } + for (IndexService indexService : indicesService.indices().values()) { for (IndexShard indexShard : indexService) { if (indexShard.routingEntry().active()) { // only report on fully started shards diff --git a/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index e45d6ac35ea..2663ab1b985 100644 --- a/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -106,7 +106,7 @@ public class TransportExplainAction extends TransportShardSingleOperationAction< } protected ExplainResponse shardOperation(ExplainRequest request, int shardId) throws ElasticsearchException { - IndexService indexService = indicesService.indexService(request.index()); + IndexService indexService = indicesService.indexServiceSafe(request.index()); IndexShard indexShard = indexService.shardSafe(shardId); Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id())); Engine.GetResult result = indexShard.get(new Engine.Get(false, uidTerm)); diff --git a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 47de27948d8..fab55537365 100644 --- a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -55,6 +55,8 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -284,7 +286,11 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio case NONE: UpdateResponse update = result.action(); listener.onResponse(update); - indicesService.indexService(request.index()).shard(request.shardId()).indexingService().noopUpdate(request.type()); + IndexService indexService = indicesService.indexService(request.index()); + if (indexService != null) { + IndexShard indexShard = indexService.shard(request.shardId()); + indexShard.indexingService().noopUpdate(request.type()); + } break; default: throw new ElasticsearchIllegalStateException("Illegal operation " + result.operation()); diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index d269f8bfd3a..1a1aac47525 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -537,6 +537,10 @@ public class MetaDataMappingService extends AbstractComponent { // do the actual merge here on the master, and update the mapping source DocumentMapper newMapper = entry.getValue(); IndexService indexService = indicesService.indexService(index); + if (indexService == null) { + continue; + } + CompressedString existingSource = null; if (existingMappers.containsKey(entry.getKey())) { existingSource = existingMappers.get(entry.getKey()).mappingSource(); diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index eb22095dc65..b33804de564 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -151,12 +151,14 @@ public class RoutingService extends AbstractLifecycleComponent i @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + ClusterState state = clusterService.state(); + logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint()); } }); routingTableDirty = false; } catch (Exception e) { - logger.warn("Failed to reroute routing table", e); + ClusterState state = clusterService.state(); + logger.warn("Failed to reroute routing table, current state:\n{}", e, state.prettyPrint()); } } diff --git a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java index bd9d8b21282..8eefed977f0 100644 --- a/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java @@ -200,6 +200,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA Set noNodes = Sets.newHashSet(); for (DiscoveryNode discoNode : nodesWithHighestVersion) { RoutingNode node = routingNodes.node(discoNode.id()); + if (node == null) { + continue; + } + Decision decision = allocation.deciders().canAllocate(shard, node, allocation); if (decision.type() == Decision.Type.THROTTLE) { throttledNodes.add(discoNode); diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index 9365a7aabd4..f8edce8e294 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -19,14 +19,13 @@ package org.elasticsearch.indices; +import com.google.common.collect.ImmutableMap; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.service.IndexService; -import java.util.Set; - /** * */ @@ -50,10 +49,24 @@ public interface IndicesService extends Iterable, LifecycleCompone IndicesLifecycle indicesLifecycle(); - Set indices(); + /** + * Returns a snapshot of the started indices and the associated {@link IndexService} instances. + * + * The map being returned is not a live view and subsequent calls can return a different view. + */ + ImmutableMap indices(); + /** + * Returns an IndexService for the specified index if exists otherwise returns null. + * + * Even if the index name appeared in {@link #indices()} null can still be returned as an + * index maybe removed in the meantime, so preferable use the associated {@link IndexService} in order to prevent NPE. + */ IndexService indexService(String index); + /** + * Returns an IndexService for the specified index if exists otherwise a {@link IndexMissingException} is thrown. + */ IndexService indexServiceSafe(String index) throws IndexMissingException; IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticsearchException; diff --git a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 71cb2575282..dd7f765dce8 100644 --- a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -74,14 +74,12 @@ import org.elasticsearch.plugins.PluginsService; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static com.google.common.collect.Maps.newHashMap; -import static com.google.common.collect.Sets.newHashSet; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; @@ -239,8 +237,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent indices() { - return newHashSet(indices.keySet()); + public ImmutableMap indices() { + return indices; } public IndexService indexService(String index) { diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 1575ad08394..a71290a3241 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -160,8 +160,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent entry : indicesService.indices().entrySet()) { + String index = entry.getKey(); + IndexService indexService = entry.getValue(); for (Integer shardId : indexService.shardIds()) { logger.debug("[{}][{}] removing shard (disabled block persistence)", index, shardId); try { @@ -218,10 +219,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent entry : indicesService.indices().entrySet()) { + String index = entry.getKey(); IndexMetaData indexMetaData = event.state().metaData().index(index); if (indexMetaData != null && indexMetaData.state() == IndexMetaData.State.CLOSE) { - IndexService indexService = indicesService.indexService(index); + IndexService indexService = entry.getValue(); for (Integer shardId : indexService.shardIds()) { logger.debug("[{}][{}] removing shard (index is closed)", index, shardId); try { @@ -232,8 +234,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent entry : indicesService.indices().entrySet()) { + String index = entry.getKey(); + IndexService indexService = entry.getValue(); + if (indexService.shardIds().isEmpty()) { if (logger.isDebugEnabled()) { logger.debug("[{}] cleaning index (no shards allocated)", index); } @@ -244,7 +248,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponentof(), ImmutableMap.of()); + + RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), clusterInfo); + allocator.allocateUnassigned(routingAllocation); + } + +}