From 6181d8ecde5610878e014ac1b0b957901901c9df Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 26 Mar 2015 19:28:13 +0100 Subject: [PATCH] Fail shard when index service/mappings fails to instantiate When the index service (which holds shards) fails to be created as a result of a shard being allocated on a node, we should fail the relevant shard, otherwise, it will remain stuck. Same goes when there is a failure to process updated mappings form the master. Note, both failures typically happen when the node is misconfigured (i.e. missing plugins, ...), since they get created and processed on the master node before being published. closes #10283 --- .../action/shard/ShardStateAction.java | 7 +- .../cluster/IndicesClusterStateService.java | 178 ++++++++---------- .../IndicesLifecycleListenerTests.java | 30 +++ 3 files changed, 109 insertions(+), 106 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index e12b88fc8c9..6c26201bee2 100644 --- a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -79,14 +79,9 @@ public class ShardStateAction extends AbstractComponent { public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason) throws ElasticsearchException { DiscoveryNode masterNode = clusterService.state().nodes().masterNode(); if (masterNode == null) { - logger.warn("can't send shard failed for {}. no master known.", shardRouting); + logger.warn("can't send shard failed for {}, no master known.", shardRouting); return; } - shardFailed(shardRouting, indexUUID, reason, masterNode); - } - - public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) throws ElasticsearchException { - logger.warn("{} sending failed shard for {}, indexUUID [{}], reason [{}]", shardRouting.shardId(), shardRouting, indexUUID, reason); innerShardFailed(shardRouting, indexUUID, reason, masterNode); } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index b7082ffaba2..4e7e4957f30 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -311,8 +311,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent cursor : indexMetaData.mappings().values()) { - MappingMetaData mappingMd = cursor.value; - String mappingType = mappingMd.type(); - CompressedString mappingSource = mappingMd.source(); - if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first - continue; + // go over and add the relevant mappings (or update them) + for (ObjectCursor cursor : indexMetaData.mappings().values()) { + MappingMetaData mappingMd = cursor.value; + String mappingType = mappingMd.type(); + CompressedString mappingSource = mappingMd.source(); + if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first + continue; + } + boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource); + if (requireRefresh) { + typesToRefresh.add(mappingType); + } } - boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource); - if (requireRefresh) { - typesToRefresh.add(mappingType); + if (!typesToRefresh.isEmpty() && sendRefreshMapping) { + nodeMappingRefreshAction.nodeMappingRefresh(event.state(), + new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.uuid(), + typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()) + ); } - } - if (!typesToRefresh.isEmpty() && sendRefreshMapping) { - nodeMappingRefreshAction.nodeMappingRefresh(event.state(), - new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.uuid(), - typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()) - ); - } - // go over and remove mappings - for (DocumentMapper documentMapper : mapperService.docMappers(true)) { - if (seenMappings.containsKey(new Tuple<>(index, documentMapper.type())) && !indexMetaData.mappings().containsKey(documentMapper.type())) { - // we have it in our mappings, but not in the metadata, and we have seen it in the cluster state, remove it - mapperService.remove(documentMapper.type()); - seenMappings.remove(new Tuple<>(index, documentMapper.type())); + // go over and remove mappings + for (DocumentMapper documentMapper : mapperService.docMappers(true)) { + if (seenMappings.containsKey(new Tuple<>(index, documentMapper.type())) && !indexMetaData.mappings().containsKey(documentMapper.type())) { + // we have it in our mappings, but not in the metadata, and we have seen it in the cluster state, remove it + mapperService.remove(documentMapper.type()); + seenMappings.remove(new Tuple<>(index, documentMapper.type())); + } + } + } catch (Throwable t) { + // if we failed the mappings anywhere, we need to fail the shards for this index, note, we safeguard + // by creating the processing the mappings on the master, or on the node the mapping was introduced on, + // so this failure typically means wrong node level configuration or something similar + for (IndexShard indexShard : indexService) { + ShardRouting shardRouting = indexShard.routingEntry(); + failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t); } } } } - private boolean processMapping(String index, MapperService mapperService, String mappingType, CompressedString mappingSource) { + private boolean processMapping(String index, MapperService mapperService, String mappingType, CompressedString mappingSource) throws Throwable { if (!seenMappings.containsKey(new Tuple<>(index, mappingType))) { seenMappings.put(new Tuple<>(index, mappingType), true); } @@ -445,6 +455,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent shard = state.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED); + assertThat(shard, hasSize(1)); + assertThat(state.nodes().resolveNode(shard.get(0).currentNodeId()).getName(), Matchers.equalTo(node1)); + } + @Test public void testIndexStateShardChanged() throws Throwable {