diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 90684267f76..853fe6fc994 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1899,7 +1899,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl internalIndexingStats.noopUpdate(type); } - private void checkIndex() throws IOException { + void checkIndex() throws IOException { if (store.tryIncRef()) { try { doCheckIndex(); @@ -1938,29 +1938,25 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } else { // full checkindex - try (CheckIndex checkIndex = new CheckIndex(store.directory())) { - checkIndex.setInfoStream(out); - CheckIndex.Status status = checkIndex.checkIndex(); - out.flush(); - - if (!status.clean) { - if (state == IndexShardState.CLOSED) { - // ignore if closed.... - return; + final CheckIndex.Status status = store.checkIndex(out); + out.flush(); + if (!status.clean) { + if (state == IndexShardState.CLOSED) { + // ignore if closed.... + return; + } + logger.warn("check index [failure]\n{}", os.bytes().utf8ToString()); + if ("fix".equals(checkIndexOnStartup)) { + if (logger.isDebugEnabled()) { + logger.debug("fixing index, writing new segments file ..."); } - logger.warn("check index [failure]\n{}", os.bytes().utf8ToString()); - if ("fix".equals(checkIndexOnStartup)) { - if (logger.isDebugEnabled()) { - logger.debug("fixing index, writing new segments file ..."); - } - checkIndex.exorciseIndex(status); - if (logger.isDebugEnabled()) { - logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName); - } - } else { - // only throw a failure if we are not going to fix the index - throw new IllegalStateException("index check failure but can't fix it"); + store.exorciseIndex(status); + if (logger.isDebugEnabled()) { + logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName); } + } else { + // only throw a failure if we are not going to fix the index + throw new IllegalStateException("index check failure but can't fix it"); } } } diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 693791e66b6..7cc85e59e8d 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; @@ -86,6 +87,7 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.PrintStream; import java.nio.file.AccessDeniedException; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -341,6 +343,33 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } + /** + * Checks and returns the status of the existing index in this store. + * + * @param out where infoStream messages should go. See {@link CheckIndex#setInfoStream(PrintStream)} + */ + public CheckIndex.Status checkIndex(PrintStream out) throws IOException { + metadataLock.writeLock().lock(); + try (CheckIndex checkIndex = new CheckIndex(directory)) { + checkIndex.setInfoStream(out); + return checkIndex.checkIndex(); + } finally { + metadataLock.writeLock().unlock(); + } + } + + /** + * Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}. + */ + public void exorciseIndex(CheckIndex.Status status) throws IOException { + metadataLock.writeLock().lock(); + try (CheckIndex checkIndex = new CheckIndex(directory)) { + checkIndex.exorciseIndex(status); + } finally { + metadataLock.writeLock().unlock(); + } + } + public StoreStats stats() throws IOException { ensureOpen(); return statsCache.getOrRefresh(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b0b597f9085..4618d84d4ad 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2449,6 +2449,71 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(newShard); } + /** + * Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService + * and checking index concurrently. This should always be possible without any exception. + */ + public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { + final boolean isPrimary = randomBoolean(); + IndexShard indexShard = newStartedShard(isPrimary); + final long numDocs = between(10, 100); + for (long i = 0; i < numDocs; i++) { + indexDoc(indexShard, "doc", Long.toString(i), "{\"foo\" : \"bar\"}"); + if (randomBoolean()) { + indexShard.refresh("test"); + } + } + indexShard.flush(new FlushRequest()); + closeShards(indexShard); + + final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), + isPrimary ? RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE + ); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData()) + .settings(Settings.builder() + .put(indexShard.indexSettings.getSettings()) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix"))) + .build(); + final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, + null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer()); + + Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); + assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); + AtomicBoolean stop = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + Thread snapshotter = new Thread(() -> { + latch.countDown(); + while (stop.get() == false) { + try { + Store.MetadataSnapshot readMeta = newShard.snapshotStoreMetadata(); + assertThat(readMeta.getNumDocs(), equalTo(numDocs)); + assertThat(storeFileMetaDatas.recoveryDiff(readMeta).different.size(), equalTo(0)); + assertThat(storeFileMetaDatas.recoveryDiff(readMeta).missing.size(), equalTo(0)); + assertThat(storeFileMetaDatas.recoveryDiff(readMeta).identical.size(), equalTo(storeFileMetaDatas.size())); + } catch (IOException e) { + throw new AssertionError(e); + } + } + }); + snapshotter.start(); + + if (isPrimary) { + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), + getFakeDiscoNode(newShard.routingEntry().currentNodeId()), null)); + } else { + newShard.markAsRecovering("peer", new RecoveryState(newShard.routingEntry(), + getFakeDiscoNode(newShard.routingEntry().currentNodeId()), getFakeDiscoNode(newShard.routingEntry().currentNodeId()))); + } + int iters = iterations(10, 100); + latch.await(); + for (int i = 0; i < iters; i++) { + newShard.checkIndex(); + } + assertTrue(stop.compareAndSet(false, true)); + snapshotter.join(); + closeShards(newShard); + } + class Result { private final int localCheckpoint; private final int maxSeqNo; diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index 9bb180c9818..bbfa56a0e55 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -204,16 +204,13 @@ public class CorruptedFileIT extends ESIntegTestCase { if (!Lucene.indexExists(store.directory()) && indexShard.state() == IndexShardState.STARTED) { return; } - try (CheckIndex checkIndex = new CheckIndex(store.directory())) { - BytesStreamOutput os = new BytesStreamOutput(); - PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name()); - checkIndex.setInfoStream(out); - out.flush(); - CheckIndex.Status status = checkIndex.checkIndex(); - if (!status.clean) { - logger.warn("check index [failure]\n{}", os.bytes().utf8ToString()); - throw new IOException("index check failure"); - } + BytesStreamOutput os = new BytesStreamOutput(); + PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name()); + CheckIndex.Status status = store.checkIndex(out); + out.flush(); + if (!status.clean) { + logger.warn("check index [failure]\n{}", os.bytes().utf8ToString()); + throw new IOException("index check failure"); } } catch (Exception e) { exception.add(e); diff --git a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java index 1a164154df2..59189a14af8 100644 --- a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -1070,4 +1070,5 @@ public class StoreTests extends ESTestCase { } store.close(); } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryService.java b/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryService.java index 5d6fe757aa2..e825031a60f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryService.java @@ -119,17 +119,14 @@ public class MockFSDirectoryService extends FsDirectoryService { if (!Lucene.indexExists(dir)) { return; } - try (CheckIndex checkIndex = new CheckIndex(dir)) { + try { BytesStreamOutput os = new BytesStreamOutput(); PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name()); - checkIndex.setInfoStream(out); + CheckIndex.Status status = store.checkIndex(out); out.flush(); - CheckIndex.Status status = checkIndex.checkIndex(); if (!status.clean) { ESTestCase.checkIndexFailed = true; - logger.warn("check index [failure] index files={}\n{}", - Arrays.toString(dir.listAll()), - os.bytes().utf8ToString()); + logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString()); throw new IOException("index check failure"); } else { if (logger.isDebugEnabled()) {