Core: Avoid null references that may be returned due to concurrent changes or inconsistent cluster state

Closes #7181
This commit is contained in:
Martijn van Groningen 2014-08-06 23:33:58 +02:00
parent ca5a17e4ba
commit 565dd90860
10 changed files with 134 additions and 26 deletions

View File

@ -115,11 +115,7 @@ public class TransportClusterStatsAction extends TransportNodesOperationAction<C
NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, false, true, false, true);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, false, true, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (String index : indicesService.indices()) {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}
for (IndexService indexService : indicesService.indices().values()) {
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry().active()) {
// only report on fully started shards

View File

@ -106,7 +106,7 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<
}
protected ExplainResponse shardOperation(ExplainRequest request, int shardId) throws ElasticsearchException {
IndexService indexService = indicesService.indexService(request.index());
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(shardId);
Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id()));
Engine.GetResult result = indexShard.get(new Engine.Get(false, uidTerm));

View File

@ -55,6 +55,8 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -284,7 +286,11 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
case NONE:
UpdateResponse update = result.action();
listener.onResponse(update);
indicesService.indexService(request.index()).shard(request.shardId()).indexingService().noopUpdate(request.type());
IndexService indexService = indicesService.indexService(request.index());
if (indexService != null) {
IndexShard indexShard = indexService.shard(request.shardId());
indexShard.indexingService().noopUpdate(request.type());
}
break;
default:
throw new ElasticsearchIllegalStateException("Illegal operation " + result.operation());

View File

@ -537,6 +537,10 @@ public class MetaDataMappingService extends AbstractComponent {
// do the actual merge here on the master, and update the mapping source
DocumentMapper newMapper = entry.getValue();
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}
CompressedString existingSource = null;
if (existingMappers.containsKey(entry.getKey())) {
existingSource = existingMappers.get(entry.getKey()).mappingSource();

View File

@ -151,12 +151,14 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
ClusterState state = clusterService.state();
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
}
});
routingTableDirty = false;
} catch (Exception e) {
logger.warn("Failed to reroute routing table", e);
ClusterState state = clusterService.state();
logger.warn("Failed to reroute routing table, current state:\n{}", e, state.prettyPrint());
}
}

View File

@ -200,6 +200,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
Set<DiscoveryNode> noNodes = Sets.newHashSet();
for (DiscoveryNode discoNode : nodesWithHighestVersion) {
RoutingNode node = routingNodes.node(discoNode.id());
if (node == null) {
continue;
}
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
throttledNodes.add(discoNode);

View File

@ -19,14 +19,13 @@
package org.elasticsearch.indices;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.service.IndexService;
import java.util.Set;
/**
*
*/
@ -50,10 +49,24 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
IndicesLifecycle indicesLifecycle();
Set<String> indices();
/**
* Returns a snapshot of the started indices and the associated {@link IndexService} instances.
*
* The map being returned is not a live view and subsequent calls can return a different view.
*/
ImmutableMap<String, IndexService> indices();
/**
* Returns an IndexService for the specified index if exists otherwise returns <code>null</code>.
*
* Even if the index name appeared in {@link #indices()} <code>null</code> can still be returned as an
* index maybe removed in the meantime, so preferable use the associated {@link IndexService} in order to prevent NPE.
*/
IndexService indexService(String index);
/**
* Returns an IndexService for the specified index if exists otherwise a {@link IndexMissingException} is thrown.
*/
IndexService indexServiceSafe(String index) throws IndexMissingException;
IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticsearchException;

View File

@ -74,14 +74,12 @@ import org.elasticsearch.plugins.PluginsService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
@ -239,8 +237,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
return indices.containsKey(index);
}
public Set<String> indices() {
return newHashSet(indices.keySet());
public ImmutableMap<String, IndexService> indices() {
return indices;
}
public IndexService indexService(String index) {

View File

@ -160,8 +160,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// are going to recover them again once state persistence is disabled (no master / not recovered)
// TODO: this feels a bit hacky here, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
if (event.state().blocks().disableStatePersistence()) {
for (final String index : indicesService.indices()) {
IndexService indexService = indicesService.indexService(index);
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
IndexService indexService = entry.getValue();
for (Integer shardId : indexService.shardIds()) {
logger.debug("[{}][{}] removing shard (disabled block persistence)", index, shardId);
try {
@ -218,10 +219,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void applyCleanedIndices(final ClusterChangedEvent event) {
// handle closed indices, since they are not allocated on a node once they are closed
// so applyDeletedIndices might not take them into account
for (final String index : indicesService.indices()) {
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
IndexMetaData indexMetaData = event.state().metaData().index(index);
if (indexMetaData != null && indexMetaData.state() == IndexMetaData.State.CLOSE) {
IndexService indexService = indicesService.indexService(index);
IndexService indexService = entry.getValue();
for (Integer shardId : indexService.shardIds()) {
logger.debug("[{}][{}] removing shard (index is closed)", index, shardId);
try {
@ -232,8 +234,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
}
for (final String index : indicesService.indices()) {
if (indicesService.indexService(index).shardIds().isEmpty()) {
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
IndexService indexService = entry.getValue();
if (indexService.shardIds().isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index (no shards allocated)", index);
}
@ -244,7 +248,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void applyDeletedIndices(final ClusterChangedEvent event) {
for (final String index : indicesService.indices()) {
for (final String index : indicesService.indices().keySet()) {
if (!event.state().metaData().hasIndex(index)) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
@ -441,12 +445,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (aliasesChanged(event)) {
// go over and update aliases
for (IndexMetaData indexMetaData : event.state().metaData()) {
if (!indicesService.hasIndex(indexMetaData.index())) {
String index = indexMetaData.index();
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we only create / update here
continue;
}
String index = indexMetaData.index();
IndexService indexService = indicesService.indexService(index);
IndexAliasesService indexAliasesService = indexService.aliasesService();
processAliases(index, indexMetaData.aliases().values(), indexAliasesService);
// go over and remove aliases

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.state;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.local.LocalGatewayAllocator;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes = 1, numClientNodes = 0, transportClientRatio = 0)
public class RareClusterStateTests extends ElasticsearchIntegrationTest {
@Override
protected int numberOfShards() {
return 1;
}
@Override
protected int numberOfReplicas() {
return 0;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("gateway.type", "local")
.build();
}
@Test
public void testUnassignedShardAndEmptyNodesInRoutingTable() throws Exception {
createIndex("a");
ensureSearchable("a");
ClusterState current = clusterService().state();
LocalGatewayAllocator allocator = internalCluster().getInstance(LocalGatewayAllocator.class);
AllocationDeciders allocationDeciders = new AllocationDeciders(ImmutableSettings.EMPTY, new AllocationDecider[0]);
RoutingNodes routingNodes = new RoutingNodes(
ClusterState.builder(current)
.routingTable(RoutingTable.builder(current.routingTable()).remove("a").addAsRecovery(current.metaData().index("a")))
.nodes(DiscoveryNodes.EMPTY_NODES)
.build()
);
ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.<String, DiskUsage>of(), ImmutableMap.<String, Long>of());
RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), clusterInfo);
allocator.allocateUnassigned(routingAllocation);
}
}