diff --git a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index a69272b2e43..eb52365217f 100644 --- a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; @@ -53,7 +54,6 @@ import java.util.concurrent.atomic.AtomicReference; */ public class IndexShardGateway extends AbstractIndexShardComponent implements Closeable { - private final ThreadPool threadPool; private final MappingUpdatedAction mappingUpdatedAction; private final IndexService indexService; private final IndexShard indexShard; @@ -63,10 +63,9 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl @Inject - public IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, MappingUpdatedAction mappingUpdatedAction, + public IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, MappingUpdatedAction mappingUpdatedAction, IndexService indexService, IndexShard indexShard) { super(shardId, indexSettings); - this.threadPool = threadPool; this.mappingUpdatedAction = mappingUpdatedAction; this.indexService = indexService; this.indexShard = indexShard; @@ -82,16 +81,17 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl long version = -1; final Map typesToUpdate; SegmentInfos si = null; - indexShard.store().incRef(); + final Store store = indexShard.store(); + store.incRef(); try { try { - indexShard.store().failIfCorrupted(); + store.failIfCorrupted(); try { - si = Lucene.readSegmentInfos(indexShard.store().directory()); + si = store.readLastCommittedSegmentsInfo(); } catch (Throwable e) { String files = "_unknown_"; try { - files = Arrays.toString(indexShard.store().directory().listAll()); + files = Arrays.toString(store.directory().listAll()); } catch (Throwable e1) { files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")"; } @@ -106,7 +106,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) // its a "new index create" API, we have to do something, so better to clean it than use same data logger.trace("cleaning existing shard, shouldn't exists"); - IndexWriter writer = new IndexWriter(indexShard.store().directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE)); + IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE)); writer.close(); recoveryState.getTranslog().totalOperations(0); } @@ -120,7 +120,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl try { final RecoveryState.Index index = recoveryState.getIndex(); if (si != null) { - final Directory directory = indexShard.store().directory(); + final Directory directory = store.directory(); for (String name : Lucene.files(si)) { long length = directory.fileLength(name); index.addFileDetail(name, length, true); @@ -143,7 +143,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl } catch (EngineException e) { throw new IndexShardGatewayRecoveryException(shardId, "failed to recovery from gateway", e); } finally { - indexShard.store().decRef(); + store.decRef(); } } diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index edcd930b8d6..6bdcd726cce 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -136,7 +136,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * @throws IOException if the index is corrupted or the segments file is not present */ public SegmentInfos readLastCommittedSegmentsInfo() throws IOException { - return readSegmentsInfo(null, directory()); + failIfCorrupted(); + try { + return readSegmentsInfo(null, directory()); + } catch (CorruptIndexException ex) { + markStoreCorrupted(ex); + throw ex; + } } /** diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index 08a85363890..78754fcbd0b 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.merge.policy.MergePolicyModule; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IndexShard; @@ -513,12 +514,12 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { * replica. */ @Test - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11226") public void testReplicaCorruption() throws Exception { int numDocs = scaledRandomIntBetween(100, 1000); internalCluster().ensureAtLeastNumDataNodes(2); assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class) .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index f17862150f6..9aee409b4b9 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -1191,4 +1191,48 @@ public class StoreTest extends ElasticsearchTestCase { } assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId())); } + + public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException { + IndexWriterConfig iwc = newIndexWriterConfig(); + final ShardId shardId = new ShardId(new Index("index"), 1); + DirectoryService directoryService = new LuceneManagedDirectoryService(random()); + Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId)); + IndexWriter writer = new IndexWriter(store.directory(), iwc); + + int numDocs = 1 + random().nextInt(10); + List docs = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); + docs.add(doc); + } + for (Document d : docs) { + writer.addDocument(d); + } + writer.commit(); + writer.close(); + MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class); + if (leaf != null) { + leaf.setPreventDoubleWrite(false); // I do this on purpose + } + SegmentInfos segmentCommitInfos = store.readLastCommittedSegmentsInfo(); + try (IndexOutput out = store.directory().createOutput(segmentCommitInfos.getSegmentsFileName(), IOContext.DEFAULT)) { + // empty file + } + + try { + if (randomBoolean()) { + store.getMetadata(); + } else { + store.readLastCommittedSegmentsInfo(); + } + fail("corrupted segments_N file"); + } catch (CorruptIndexException ex) { + // expected + } + assertTrue(store.isMarkedCorrupted()); + store.close(); + } }