Fail shard when index service/mappings fails to instantiate

When the index service (which holds shards) fails to be created as a result of a shard being allocated on a node, we should fail the relevant shard, otherwise, it will remain stuck.
Same goes when there is a failure to process updated mappings form the master.

Note, both failures typically happen when the node is misconfigured (i.e. missing plugins, ...), since they get created and processed on the master node before being published.
closes #10283
This commit is contained in:
Shay Banon 2015-03-26 19:28:13 +01:00
parent 71bd40517d
commit 6181d8ecde
3 changed files with 109 additions and 106 deletions

View File

@ -79,14 +79,9 @@ public class ShardStateAction extends AbstractComponent {
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason) throws ElasticsearchException {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.warn("can't send shard failed for {}. no master known.", shardRouting);
logger.warn("can't send shard failed for {}, no master known.", shardRouting);
return;
}
shardFailed(shardRouting, indexUUID, reason, masterNode);
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) throws ElasticsearchException {
logger.warn("{} sending failed shard for {}, indexUUID [{}], reason [{}]", shardRouting.shardId(), shardRouting, indexUUID, reason);
innerShardFailed(shardRouting, indexUUID, reason, masterNode);
}

View File

@ -311,8 +311,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
try {
indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), event.state().nodes().localNode().id());
} catch (Exception e) {
logger.warn("[{}] failed to create index", e, indexMetaData.index());
} catch (Throwable e) {
sendFailShard(shard, indexMetaData.getUUID(), "failed to create index", e);
}
}
}
@ -357,46 +357,56 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// got deleted on us, ignore (closing the node)
return;
}
MapperService mapperService = indexService.mapperService();
// first, go over and update the _default_ mapping (if exists)
if (indexMetaData.mappings().containsKey(MapperService.DEFAULT_MAPPING)) {
boolean requireRefresh = processMapping(index, mapperService, MapperService.DEFAULT_MAPPING, indexMetaData.mapping(MapperService.DEFAULT_MAPPING).source());
if (requireRefresh) {
typesToRefresh.add(MapperService.DEFAULT_MAPPING);
try {
MapperService mapperService = indexService.mapperService();
// first, go over and update the _default_ mapping (if exists)
if (indexMetaData.mappings().containsKey(MapperService.DEFAULT_MAPPING)) {
boolean requireRefresh = processMapping(index, mapperService, MapperService.DEFAULT_MAPPING, indexMetaData.mapping(MapperService.DEFAULT_MAPPING).source());
if (requireRefresh) {
typesToRefresh.add(MapperService.DEFAULT_MAPPING);
}
}
}
// go over and add the relevant mappings (or update them)
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.mappings().values()) {
MappingMetaData mappingMd = cursor.value;
String mappingType = mappingMd.type();
CompressedString mappingSource = mappingMd.source();
if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first
continue;
// go over and add the relevant mappings (or update them)
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.mappings().values()) {
MappingMetaData mappingMd = cursor.value;
String mappingType = mappingMd.type();
CompressedString mappingSource = mappingMd.source();
if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first
continue;
}
boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource);
if (requireRefresh) {
typesToRefresh.add(mappingType);
}
}
boolean requireRefresh = processMapping(index, mapperService, mappingType, mappingSource);
if (requireRefresh) {
typesToRefresh.add(mappingType);
if (!typesToRefresh.isEmpty() && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.uuid(),
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId())
);
}
}
if (!typesToRefresh.isEmpty() && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.uuid(),
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId())
);
}
// go over and remove mappings
for (DocumentMapper documentMapper : mapperService.docMappers(true)) {
if (seenMappings.containsKey(new Tuple<>(index, documentMapper.type())) && !indexMetaData.mappings().containsKey(documentMapper.type())) {
// we have it in our mappings, but not in the metadata, and we have seen it in the cluster state, remove it
mapperService.remove(documentMapper.type());
seenMappings.remove(new Tuple<>(index, documentMapper.type()));
// go over and remove mappings
for (DocumentMapper documentMapper : mapperService.docMappers(true)) {
if (seenMappings.containsKey(new Tuple<>(index, documentMapper.type())) && !indexMetaData.mappings().containsKey(documentMapper.type())) {
// we have it in our mappings, but not in the metadata, and we have seen it in the cluster state, remove it
mapperService.remove(documentMapper.type());
seenMappings.remove(new Tuple<>(index, documentMapper.type()));
}
}
} catch (Throwable t) {
// if we failed the mappings anywhere, we need to fail the shards for this index, note, we safeguard
// by creating the processing the mappings on the master, or on the node the mapping was introduced on,
// so this failure typically means wrong node level configuration or something similar
for (IndexShard indexShard : indexService) {
ShardRouting shardRouting = indexShard.routingEntry();
failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t);
}
}
}
}
private boolean processMapping(String index, MapperService mapperService, String mappingType, CompressedString mappingSource) {
private boolean processMapping(String index, MapperService mapperService, String mappingType, CompressedString mappingSource) throws Throwable {
if (!seenMappings.containsKey(new Tuple<>(index, mappingType))) {
seenMappings.put(new Tuple<>(index, mappingType), true);
}
@ -445,6 +455,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
} catch (Throwable e) {
logger.warn("[{}] failed to add mapping [{}], source [{}]", e, index, mappingType, mappingSource);
throw e;
}
return requiresRefresh;
}
@ -544,14 +555,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
} else {
// the master thinks we are started, but we don't have this shard at all, mark it as failed
logger.warn("[{}][{}] master [{}] marked shard as started, but shard has not been created, mark shard as failed", shardRouting.index(), shardId, nodes.masterNode());
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
if (nodes.masterNode() != null) {
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(),
"master " + nodes.masterNode() + " marked shard as started, but shard has not been created, mark shard as failed",
nodes.masterNode()
);
}
sendFailShard(shardRouting, indexMetaData.getUUID(), "master [" + nodes.masterNode() + "] marked shard as started, but shard has not been created, mark shard as failed", null);
}
continue;
}
@ -705,22 +709,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
} catch (Throwable e) {
logger.warn("[{}][{}] failed to create shard", e, shardRouting.index(), shardRouting.id());
try {
indexService.removeShard(shardId, "failed to create [" + ExceptionsHelper.detailedMessage(e) + "]");
} catch (IndexShardMissingException e1) {
// ignore
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to remove shard after failed creation", e1, shardRouting.index(), shardRouting.id());
}
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
if (nodes.masterNode() != null) {
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to create shard, message [" + detailedMessage(e) + "]",
nodes.masterNode()
);
} else {
logger.debug("can't send shard failed for {} as there is no current master", shardRouting.shardId());
}
failAndRemoveShard(shardRouting, indexService, true, "failed to create shard", e);
return;
}
}
@ -746,7 +735,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
indexShard.failShard("corrupted preexisting index", e);
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
handleRecoveryFailure(indexService, shardRouting, true, e);
}
} else {
final IndexShardRoutingTable indexShardRouting = routingTable.index(shardRouting.index()).shard(shardRouting.id());
@ -766,7 +755,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void onRecoveryFailed(IndexShardGatewayRecoveryException e) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
handleRecoveryFailure(indexService, shardRouting, true, e);
}
});
}
@ -830,31 +819,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, sendShardFailure, e);
handleRecoveryFailure(indexService, shardRouting, sendShardFailure, e);
}
}
private void handleRecoveryFailure(IndexService indexService, IndexMetaData indexMetaData, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
logger.debug("[{}][{}] removing shard on failed recovery [{}]", shardRouting.index(), shardRouting.shardId().id(), failure.getMessage());
indexService.removeShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after recovery failure", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
if (sendShardFailure) {
logger.warn("[{}][{}] sending failed shard after recovery failure", failure, indexService.index().name(), shardRouting.shardId().id());
try {
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to start shard, message [" + detailedMessage(failure) + "]");
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed start", e1, indexService.index().name(), shardRouting.id());
}
}
failAndRemoveShard(shardRouting, indexService, sendShardFailure, "failed recovery", failure);
}
}
@ -888,6 +859,31 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
if (indexService.hasShard(shardRouting.getId())) {
try {
indexService.removeShard(shardRouting.getId(), message);
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to remove shard after failure ([{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
}
}
if (sendShardFailure) {
sendFailShard(shardRouting, indexService.indexUUID(), message, failure);
}
}
private void sendFailShard(ShardRouting shardRouting, String indexUUID, String message, @Nullable Throwable failure) {
try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexUUID, "shard failure [" + message + "]" + (failure == null ? "" : "[" + detailedMessage(failure) + "]"));
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
}
}
private class FailedEngineHandler implements Engine.FailedEngineListener {
@Override
public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) {
@ -900,34 +896,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
if (shardRouting == null) {
logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]",
logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]", failure,
shardId.index().name(), shardId.id(), reason);
return;
}
final ShardRouting fShardRouting = shardRouting;
final String indexUUID = indexService.indexUUID(); // we know indexService is not null here.
final String failureMessage = "engine failure, message [" + reason + "]" +
(failure == null ? "" : "[" + detailedMessage(failure) + "]");
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (indexService.hasShard(shardId.id())) {
try {
indexService.removeShard(shardId.id(), failureMessage);
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after failed engine ([{}])", e1, indexService.index().name(), shardId.id(), reason);
}
}
try {
failedShards.put(fShardRouting.shardId(), new FailedShard(fShardRouting.version()));
shardStateAction.shardFailed(fShardRouting, indexUUID, failureMessage);
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed engine ([{}])", e1, indexService.index().name(), shardId.id(), reason);
}
failAndRemoveShard(fShardRouting, indexService, true, "engine failure, reason [" + reason + "]", failure);
}
}
});

View File

@ -22,7 +22,11 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -33,6 +37,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.List;
@ -52,6 +57,7 @@ import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest {
@ -100,6 +106,30 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest
}
}
/**
* Tests that if an *index* structure creation fails on relocation to a new node, the shard
* is not stuck but properly failed.
*/
@Test
public void testIndexShardFailedOnRelocation() throws Throwable {
String node1 = internalCluster().startNode();
client().admin().indices().prepareCreate("index1").setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen("index1");
String node2 = internalCluster().startNode();
internalCluster().getInstance(IndicesLifecycle.class, node2).addListener(new IndexShardStateChangeListener() {
@Override
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
throw new RuntimeException("FAIL");
}
});
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("index1", 0), node1, node2)).get();
ensureGreen("index1");
ClusterState state = client().admin().cluster().prepareState().get().getState();
List<MutableShardRouting> shard = state.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED);
assertThat(shard, hasSize(1));
assertThat(state.nodes().resolveNode(shard.get(0).currentNodeId()).getName(), Matchers.equalTo(node1));
}
@Test
public void testIndexStateShardChanged() throws Throwable {