Ensure we mark store as corrupted if we fail to read the segments info

This commit also reenables CorruptedFileTest#testReplicaCorruption which had
a missing `GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS: "one"` setting.

Closes #11226
This commit is contained in:
Simon Willnauer 2015-05-19 16:03:25 +02:00
parent 2e94753c95
commit 70b4bb6e0e
4 changed files with 63 additions and 12 deletions

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -53,7 +54,6 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public class IndexShardGateway extends AbstractIndexShardComponent implements Closeable { public class IndexShardGateway extends AbstractIndexShardComponent implements Closeable {
private final ThreadPool threadPool;
private final MappingUpdatedAction mappingUpdatedAction; private final MappingUpdatedAction mappingUpdatedAction;
private final IndexService indexService; private final IndexService indexService;
private final IndexShard indexShard; private final IndexShard indexShard;
@ -63,10 +63,9 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
@Inject @Inject
public IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, MappingUpdatedAction mappingUpdatedAction, public IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, MappingUpdatedAction mappingUpdatedAction,
IndexService indexService, IndexShard indexShard) { IndexService indexService, IndexShard indexShard) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.threadPool = threadPool;
this.mappingUpdatedAction = mappingUpdatedAction; this.mappingUpdatedAction = mappingUpdatedAction;
this.indexService = indexService; this.indexService = indexService;
this.indexShard = indexShard; this.indexShard = indexShard;
@ -82,16 +81,17 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
long version = -1; long version = -1;
final Map<String, Mapping> typesToUpdate; final Map<String, Mapping> typesToUpdate;
SegmentInfos si = null; SegmentInfos si = null;
indexShard.store().incRef(); final Store store = indexShard.store();
store.incRef();
try { try {
try { try {
indexShard.store().failIfCorrupted(); store.failIfCorrupted();
try { try {
si = Lucene.readSegmentInfos(indexShard.store().directory()); si = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) { } catch (Throwable e) {
String files = "_unknown_"; String files = "_unknown_";
try { try {
files = Arrays.toString(indexShard.store().directory().listAll()); files = Arrays.toString(store.directory().listAll());
} catch (Throwable e1) { } catch (Throwable e1) {
files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")"; files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
} }
@ -106,7 +106,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data // its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists"); logger.trace("cleaning existing shard, shouldn't exists");
IndexWriter writer = new IndexWriter(indexShard.store().directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE)); IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
writer.close(); writer.close();
recoveryState.getTranslog().totalOperations(0); recoveryState.getTranslog().totalOperations(0);
} }
@ -120,7 +120,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
try { try {
final RecoveryState.Index index = recoveryState.getIndex(); final RecoveryState.Index index = recoveryState.getIndex();
if (si != null) { if (si != null) {
final Directory directory = indexShard.store().directory(); final Directory directory = store.directory();
for (String name : Lucene.files(si)) { for (String name : Lucene.files(si)) {
long length = directory.fileLength(name); long length = directory.fileLength(name);
index.addFileDetail(name, length, true); index.addFileDetail(name, length, true);
@ -143,7 +143,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
} catch (EngineException e) { } catch (EngineException e) {
throw new IndexShardGatewayRecoveryException(shardId, "failed to recovery from gateway", e); throw new IndexShardGatewayRecoveryException(shardId, "failed to recovery from gateway", e);
} finally { } finally {
indexShard.store().decRef(); store.decRef();
} }
} }

View File

@ -136,7 +136,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* @throws IOException if the index is corrupted or the segments file is not present * @throws IOException if the index is corrupted or the segments file is not present
*/ */
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException { public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
return readSegmentsInfo(null, directory()); failIfCorrupted();
try {
return readSegmentsInfo(null, directory());
} catch (CorruptIndexException ex) {
markStoreCorrupted(ex);
throw ex;
}
} }
/** /**

View File

@ -50,6 +50,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.merge.policy.MergePolicyModule; import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -513,12 +514,12 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
* replica. * replica.
*/ */
@Test @Test
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11226")
public void testReplicaCorruption() throws Exception { public void testReplicaCorruption() throws Exception {
int numDocs = scaledRandomIntBetween(100, 1000); int numDocs = scaledRandomIntBetween(100, 1000);
internalCluster().ensureAtLeastNumDataNodes(2); internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class) .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose

View File

@ -1191,4 +1191,48 @@ public class StoreTest extends ElasticsearchTestCase {
} }
assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId())); assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId()));
} }
public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
int numDocs = 1 + random().nextInt(10);
List<Document> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
Document doc = new Document();
doc.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random()))));
docs.add(doc);
}
for (Document d : docs) {
writer.addDocument(d);
}
writer.commit();
writer.close();
MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class);
if (leaf != null) {
leaf.setPreventDoubleWrite(false); // I do this on purpose
}
SegmentInfos segmentCommitInfos = store.readLastCommittedSegmentsInfo();
try (IndexOutput out = store.directory().createOutput(segmentCommitInfos.getSegmentsFileName(), IOContext.DEFAULT)) {
// empty file
}
try {
if (randomBoolean()) {
store.getMetadata();
} else {
store.readLastCommittedSegmentsInfo();
}
fail("corrupted segments_N file");
} catch (CorruptIndexException ex) {
// expected
}
assertTrue(store.isMarkedCorrupted());
store.close();
}
} }