Recovery: allow to recover into a folder containing a corrupted shard

At the moment, we are very strict when handling data folders containing corrupted shards and will fail any recovery attempt into it. Typically this wouldn't be a problem as the shard will be assigned to another node (which we try first anyway when a shard fails). However, it has been proven to be too strict for smaller clusters which may not have an extra node available (either because of allocation filtering, disk space issues etc.). This commit changes the behavior to force a full recovery. Once all the new files are verified we remove the old corrupted data and start the shard.

This also fixes a small issue where the shard state file wasn't deleted on an engine failure (we had a protection against deleting the state file on an active shard, but in this case the shard is still active but will be removed). The state deletion is also moved to before the failure handlers are called, to avoid race conditions when calling the master (it will potentially try to read it when allocating the shard)

Closes #10558
This commit is contained in:
Boaz Leskes 2015-04-09 13:31:43 +02:00
parent 1b35854768
commit 8e302f1172
4 changed files with 50 additions and 40 deletions

View File

@ -240,7 +240,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.config = new EngineConfig(shardId,
indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false),
threadPool,indexingService,indexSettingsService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler,
threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler,
mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener);
logger.debug("state: [CREATED]");
@ -448,7 +448,7 @@ public class IndexShard extends AbstractIndexShardComponent {
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
} catch (Throwable t) {
if (docMapper.v2() || (t instanceof MapperParsingException && ((MapperParsingException)t).isMappingsModified())) {
if (docMapper.v2() || (t instanceof MapperParsingException && ((MapperParsingException) t).isMappingsModified())) {
throw new WriteFailureException(t, docMapper.v1().type());
} else {
throw t;
@ -481,7 +481,7 @@ public class IndexShard extends AbstractIndexShardComponent {
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
} catch (Throwable t) {
if (docMapper.v2() || (t instanceof MapperParsingException && ((MapperParsingException)t).isMappingsModified())) {
if (docMapper.v2() || (t instanceof MapperParsingException && ((MapperParsingException) t).isMappingsModified())) {
throw new WriteFailureException(t, docMapper.v1().type());
} else {
throw t;
@ -1008,11 +1008,12 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* Deletes the shards metadata state. This method can only be executed if the shard is not active.
*
* @throws IOException if the delete fails
*/
public void deleteShardState() throws IOException {
if (this.routingEntry() != null && this.routingEntry().active()) {
throw new ElasticsearchIllegalStateException("Can't delete shard state on a active shard");
if (this.routingEntry() != null && this.routingEntry().active()) {
throw new ElasticsearchIllegalStateException("Can't delete shard state on an active shard");
}
MetaDataStateFormat.deleteMetaState(nodeEnv.shardPaths(shardId));
}
@ -1120,7 +1121,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
}
private void checkIndex() throws IndexShardException {
if (store.tryIncRef()) {
try {
@ -1140,12 +1141,12 @@ public class IndexShard extends AbstractIndexShardComponent {
}
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, Charsets.UTF_8.name());
if ("checksum".equalsIgnoreCase(checkIndexOnStartup)) {
// physical verification only: verify all checksums for the latest commit
boolean corrupt = false;
MetadataSnapshot metadata = store.getMetadata();
for (Map.Entry<String,StoreFileMetaData> entry : metadata.asMap().entrySet()) {
for (Map.Entry<String, StoreFileMetaData> entry : metadata.asMap().entrySet()) {
try {
Store.checkIntegrity(entry.getValue(), store.directory());
out.println("checksum passed: " + entry.getKey());
@ -1166,7 +1167,7 @@ public class IndexShard extends AbstractIndexShardComponent {
checkIndex.setInfoStream(out);
CheckIndex.Status status = checkIndex.checkIndex();
out.flush();
if (!status.clean) {
if (state == IndexShardState.CLOSED) {
// ignore if closed....
@ -1188,7 +1189,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("check index [success]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
}
@ -1215,6 +1216,11 @@ public class IndexShard extends AbstractIndexShardComponent {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
try {
// delete the shard state so this folder will not be reused
MetaDataStateFormat.deleteMetaState(nodeEnv.shardPaths(shardId));
} catch (IOException e) {
logger.warn("failed to delete shard state", e);
} finally {
for (Engine.FailedEngineListener listener : delegates) {
try {
listener.onFailedEngine(shardId, reason, failure);
@ -1222,12 +1228,6 @@ public class IndexShard extends AbstractIndexShardComponent {
logger.warn("exception while notifying engine failure", e);
}
}
} finally {
try {
deleteShardState();
} catch (IOException e) {
logger.warn("failed to delete shard state", e);
}
}
}
}
@ -1287,8 +1287,8 @@ public class IndexShard extends AbstractIndexShardComponent {
private String getIndexUUID() {
assert indexSettings.get(IndexMetaData.SETTING_UUID) != null
|| indexSettings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).before(Version.V_0_90_6):
"version: " + indexSettings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, null) + " uuid: " + indexSettings.get(IndexMetaData.SETTING_UUID) ;
|| indexSettings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).before(Version.V_0_90_6) :
"version: " + indexSettings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, null) + " uuid: " + indexSettings.get(IndexMetaData.SETTING_UUID);
return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
}
}

View File

@ -171,6 +171,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
*/
public MetadataSnapshot getMetadataOrEmpty() throws IOException {
try {
@ -188,6 +190,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
* @throws IndexNotFoundException if no index / valid commit-point can be found in this store
@ -202,6 +206,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
* @throws IndexNotFoundException if the commit point can't be found in this store
@ -538,7 +544,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* @throws ElasticsearchIllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException {
failIfCorrupted();
metadataLock.writeLock().lock();
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock

View File

@ -156,11 +156,15 @@ public class RecoveryTarget extends AbstractComponent {
assert recoveryStatus.sourceNode() != null : "can't do a recovery without a source node";
logger.trace("collecting local files for {}", recoveryStatus);
final Map<String, StoreFileMetaData> existingFiles;
Map<String, StoreFileMetaData> existingFiles;
try {
existingFiles = recoveryStatus.store().getMetadataOrEmpty().asMap();
} catch (IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
existingFiles = Store.MetadataSnapshot.EMPTY.asMap();
} catch (Exception e) {
logger.debug("error while listing local files, recovery as if there are none", e);
// this will be logged as warning later on...
logger.trace("unexpected error while listing local files, failing recovery", e);
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(),
new RecoveryFailedException(recoveryStatus.state(), "failed to list local files", e), true);
return;

View File

@ -66,6 +66,7 @@ import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.index.merge.NoMergePolicyProvider;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.*;
@ -108,20 +109,19 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
* Tests that we can actually recover from a corruption on the primary given that we have replica shards around.
*/
@Test
@TestLogging("indices.recovery:TRACE")
public void testCorruptFileAndRecover() throws ExecutionException, InterruptedException, IOException {
int numDocs = scaledRandomIntBetween(100, 1000);
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2));
while (cluster().numDataNodes() < 4) {
/**
* We need 4 nodes since if we have 2 replicas and only 3 nodes we can't get into green state since
* the corrupted node will never be used reallocate a replica since it's marked as corrupted
*/
internalCluster().startNode(ImmutableSettings.builder().put("node.data", true).put("node.client", false).put("node.master", false));
// have enough space for 3 copies
internalCluster().ensureAtLeastNumDataNodes(3);
if (cluster().numDataNodes() == 3) {
logger.info("--> cluster has [3] data nodes, corrupted primary will be overwritten");
}
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(3));
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
@ -142,7 +142,8 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertHitCount(countResponse, numDocs);
final int numShards = numShards("test");
ShardRouting corruptedShardRouting = corruptRandomFile();
ShardRouting corruptedShardRouting = corruptRandomPrimaryFile();
logger.info("--> {} corrupted", corruptedShardRouting);
enableAllocation("test");
/*
* we corrupted the primary shard - now lets make sure we never recover from it successfully
@ -224,7 +225,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
@Test
public void testCorruptPrimaryNoReplica() throws ExecutionException, InterruptedException, IOException {
int numDocs = scaledRandomIntBetween(100, 1000);
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2));
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
@ -245,7 +246,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
ShardRouting shardRouting = corruptRandomFile();
ShardRouting shardRouting = corruptRandomPrimaryFile();
/*
* we corrupted the primary shard - now lets make sure we never recover from it successfully
*/
@ -356,7 +357,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
@Test
public void testCorruptionOnNetworkLayer() throws ExecutionException, InterruptedException {
int numDocs = scaledRandomIntBetween(100, 1000);
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2));
internalCluster().ensureAtLeastNumDataNodes(2);
if (cluster().numDataNodes() < 3) {
internalCluster().startNode(ImmutableSettings.builder().put("node.data", true).put("node.client", false).put("node.master", false));
}
@ -455,7 +456,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
@Test
public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException {
int numDocs = scaledRandomIntBetween(100, 1000);
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2));
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
@ -476,7 +477,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
ShardRouting shardRouting = corruptRandomFile(false);
ShardRouting shardRouting = corruptRandomPrimaryFile(false);
// we don't corrupt segments.gen since S/R doesn't snapshot this file
// the other problem here why we can't corrupt segments.X files is that the snapshot flushes again before
// it snapshots and that will write a new segments.X+1 file
@ -508,11 +509,11 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
}
private ShardRouting corruptRandomFile() throws IOException {
return corruptRandomFile(true);
private ShardRouting corruptRandomPrimaryFile() throws IOException {
return corruptRandomPrimaryFile(true);
}
private ShardRouting corruptRandomFile(final boolean includePerCommitFiles) throws IOException {
private ShardRouting corruptRandomPrimaryFile(final boolean includePerCommitFiles) throws IOException {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
List<ShardIterator> iterators = Lists.newArrayList(shardIterators);
@ -554,12 +555,12 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
raf.read(bb);
bb.flip();
// corrupt
byte oldValue = bb.get(0);
byte newValue = (byte) (oldValue + 1);
bb.put(0, newValue);
// rewrite
raf.position(filePointer);
raf.write(bb);