Revert "Revert "Ensure we mark store as corrupted if we fail to read the segments info""
This reverts commit 5fbb6a714d
.
This commit is contained in:
parent
4dd4f48a2f
commit
7025a27f78
|
@ -37,6 +37,7 @@ import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
@ -53,7 +54,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
*/
|
*/
|
||||||
public class IndexShardGateway extends AbstractIndexShardComponent implements Closeable {
|
public class IndexShardGateway extends AbstractIndexShardComponent implements Closeable {
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
|
||||||
private final MappingUpdatedAction mappingUpdatedAction;
|
private final MappingUpdatedAction mappingUpdatedAction;
|
||||||
private final IndexService indexService;
|
private final IndexService indexService;
|
||||||
private final IndexShard indexShard;
|
private final IndexShard indexShard;
|
||||||
|
@ -63,10 +63,9 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
|
||||||
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, MappingUpdatedAction mappingUpdatedAction,
|
public IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, MappingUpdatedAction mappingUpdatedAction,
|
||||||
IndexService indexService, IndexShard indexShard) {
|
IndexService indexService, IndexShard indexShard) {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
this.threadPool = threadPool;
|
|
||||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||||
this.indexService = indexService;
|
this.indexService = indexService;
|
||||||
this.indexShard = indexShard;
|
this.indexShard = indexShard;
|
||||||
|
@ -82,16 +81,17 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
|
||||||
long version = -1;
|
long version = -1;
|
||||||
final Map<String, Mapping> typesToUpdate;
|
final Map<String, Mapping> typesToUpdate;
|
||||||
SegmentInfos si = null;
|
SegmentInfos si = null;
|
||||||
indexShard.store().incRef();
|
final Store store = indexShard.store();
|
||||||
|
store.incRef();
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
indexShard.store().failIfCorrupted();
|
store.failIfCorrupted();
|
||||||
try {
|
try {
|
||||||
si = Lucene.readSegmentInfos(indexShard.store().directory());
|
si = store.readLastCommittedSegmentsInfo();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
String files = "_unknown_";
|
String files = "_unknown_";
|
||||||
try {
|
try {
|
||||||
files = Arrays.toString(indexShard.store().directory().listAll());
|
files = Arrays.toString(store.directory().listAll());
|
||||||
} catch (Throwable e1) {
|
} catch (Throwable e1) {
|
||||||
files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
|
files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
|
||||||
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
|
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
|
||||||
// its a "new index create" API, we have to do something, so better to clean it than use same data
|
// its a "new index create" API, we have to do something, so better to clean it than use same data
|
||||||
logger.trace("cleaning existing shard, shouldn't exists");
|
logger.trace("cleaning existing shard, shouldn't exists");
|
||||||
IndexWriter writer = new IndexWriter(indexShard.store().directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
|
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
|
||||||
writer.close();
|
writer.close();
|
||||||
recoveryState.getTranslog().totalOperations(0);
|
recoveryState.getTranslog().totalOperations(0);
|
||||||
}
|
}
|
||||||
|
@ -120,7 +120,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
|
||||||
try {
|
try {
|
||||||
final RecoveryState.Index index = recoveryState.getIndex();
|
final RecoveryState.Index index = recoveryState.getIndex();
|
||||||
if (si != null) {
|
if (si != null) {
|
||||||
final Directory directory = indexShard.store().directory();
|
final Directory directory = store.directory();
|
||||||
for (String name : Lucene.files(si)) {
|
for (String name : Lucene.files(si)) {
|
||||||
long length = directory.fileLength(name);
|
long length = directory.fileLength(name);
|
||||||
index.addFileDetail(name, length, true);
|
index.addFileDetail(name, length, true);
|
||||||
|
@ -143,7 +143,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
|
||||||
} catch (EngineException e) {
|
} catch (EngineException e) {
|
||||||
throw new IndexShardGatewayRecoveryException(shardId, "failed to recovery from gateway", e);
|
throw new IndexShardGatewayRecoveryException(shardId, "failed to recovery from gateway", e);
|
||||||
} finally {
|
} finally {
|
||||||
indexShard.store().decRef();
|
store.decRef();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -136,7 +136,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||||
* @throws IOException if the index is corrupted or the segments file is not present
|
* @throws IOException if the index is corrupted or the segments file is not present
|
||||||
*/
|
*/
|
||||||
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
|
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
|
||||||
|
failIfCorrupted();
|
||||||
|
try {
|
||||||
return readSegmentsInfo(null, directory());
|
return readSegmentsInfo(null, directory());
|
||||||
|
} catch (CorruptIndexException ex) {
|
||||||
|
markStoreCorrupted(ex);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
|
import org.elasticsearch.gateway.GatewayAllocator;
|
||||||
import org.elasticsearch.index.merge.policy.MergePolicyModule;
|
import org.elasticsearch.index.merge.policy.MergePolicyModule;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
@ -513,12 +514,12 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
|
||||||
* replica.
|
* replica.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11226")
|
|
||||||
public void testReplicaCorruption() throws Exception {
|
public void testReplicaCorruption() throws Exception {
|
||||||
int numDocs = scaledRandomIntBetween(100, 1000);
|
int numDocs = scaledRandomIntBetween(100, 1000);
|
||||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||||
|
|
||||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
|
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
|
||||||
|
.put(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS, "one")
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
|
||||||
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
|
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
|
||||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||||
|
|
|
@ -1191,4 +1191,48 @@ public class StoreTest extends ElasticsearchTestCase {
|
||||||
}
|
}
|
||||||
assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId()));
|
assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {
|
||||||
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
|
final ShardId shardId = new ShardId(new Index("index"), 1);
|
||||||
|
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
|
||||||
|
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
|
||||||
|
IndexWriter writer = new IndexWriter(store.directory(), iwc);
|
||||||
|
|
||||||
|
int numDocs = 1 + random().nextInt(10);
|
||||||
|
List<Document> docs = new ArrayList<>();
|
||||||
|
for (int i = 0; i < numDocs; i++) {
|
||||||
|
Document doc = new Document();
|
||||||
|
doc.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
|
doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
|
doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random()))));
|
||||||
|
docs.add(doc);
|
||||||
|
}
|
||||||
|
for (Document d : docs) {
|
||||||
|
writer.addDocument(d);
|
||||||
|
}
|
||||||
|
writer.commit();
|
||||||
|
writer.close();
|
||||||
|
MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class);
|
||||||
|
if (leaf != null) {
|
||||||
|
leaf.setPreventDoubleWrite(false); // I do this on purpose
|
||||||
|
}
|
||||||
|
SegmentInfos segmentCommitInfos = store.readLastCommittedSegmentsInfo();
|
||||||
|
try (IndexOutput out = store.directory().createOutput(segmentCommitInfos.getSegmentsFileName(), IOContext.DEFAULT)) {
|
||||||
|
// empty file
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (randomBoolean()) {
|
||||||
|
store.getMetadata();
|
||||||
|
} else {
|
||||||
|
store.readLastCommittedSegmentsInfo();
|
||||||
|
}
|
||||||
|
fail("corrupted segments_N file");
|
||||||
|
} catch (CorruptIndexException ex) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertTrue(store.isMarkedCorrupted());
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue