[RECOVERY] Wipe shard state before switching recovered files live

Today we leave the shard state behind even if a recovery is half finished
this causes in rare conditions shards to be recovered and promoted as
primaries that have never been fully recovered.

Closes #10053
This commit is contained in:
Simon Willnauer 2015-03-19 17:05:16 -07:00
parent 0f2d2d0495
commit 93fedcbb88
3 changed files with 55 additions and 5 deletions

View File

@ -57,6 +57,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.aliases.IndexAliasesService;
@ -1005,6 +1006,17 @@ public class IndexShard extends AbstractIndexShardComponent {
return flushOnClose; return flushOnClose;
} }
/**
* Deletes the shards metadata state. This method can only be executed if the shard is not active.
* @throws IOException if the delete fails
*/
public void deleteShardState() throws IOException {
if (this.routingEntry() != null && this.routingEntry().active()) {
throw new ElasticsearchIllegalStateException("Can't delete shard state on a active shard");
}
MetaDataStateFormat.deleteMetaState(nodeEnv.shardPaths(shardId));
}
private class ApplyRefreshSettings implements IndexSettingsService.Listener { private class ApplyRefreshSettings implements IndexSettingsService.Listener {
@Override @Override
public void onRefreshSettings(Settings settings) { public void onRefreshSettings(Settings settings) {
@ -1202,11 +1214,19 @@ public class IndexShard extends AbstractIndexShardComponent {
// called by the current engine // called by the current engine
@Override @Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) { public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
for (Engine.FailedEngineListener listener : delegates) { try {
for (Engine.FailedEngineListener listener : delegates) {
try {
listener.onFailedEngine(shardId, reason, failure);
} catch (Exception e) {
logger.warn("exception while notifying engine failure", e);
}
}
} finally {
try { try {
listener.onFailedEngine(shardId, reason, failure); deleteShardState();
} catch (Exception e) { } catch (IOException e) {
logger.warn("exception while notifying engine failure", e); logger.warn("failed to delete shard state", e);
} }
} }
} }

View File

@ -386,6 +386,7 @@ public class RecoveryTarget extends AbstractComponent {
// first, we go and move files that were created with the recovery id suffix to // first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas // the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes... // to recover from in case of a full cluster shutdown just when this code executes...
recoveryStatus.indexShard().deleteShardState(); // we have to delete it first since even if we fail to rename the shard might be invalid
recoveryStatus.renameAllTempFiles(); recoveryStatus.renameAllTempFiles();
final Store store = recoveryStatus.store(); final Store store = recoveryStatus.store();
// now write checksums // now write checksums

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.shard; package org.elasticsearch.index.shard;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.MutableShardRouting;
@ -30,6 +31,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -123,7 +125,6 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID))); assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
shard.updateRoutingEntry(new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1), false); shard.updateRoutingEntry(new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1), false);
shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId)); shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard))); assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard)));
@ -135,6 +136,34 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId)); shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard)); assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID))); assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
}
public void testDeleteShardState() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
IndexService test = indicesService.indexService("test");
IndexShard shard = test.shard(0);
try {
shard.deleteShardState();
fail("shard is active metadata delete must fail");
} catch (ElasticsearchIllegalStateException ex) {
// fine - only delete if non-active
}
ShardRouting routing = shard.routingEntry();
ShardStateMetaData shardStateMetaData = ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
routing = new MutableShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version()+1);
shard.updateRoutingEntry(routing, true);
shard.deleteShardState();
assertNull("no shard state expected after delete on initializing", ShardStateMetaData.load(logger, shard.shardId, env.shardPaths(shard.shardId)));
} }