Check index under the store metadata lock (#27768)

Today when we get a metadata snapshot directly from a store directory, 
we acquire a metadata lock, then acquire an IndexWriter lock. However,
we create a CheckIndex in IndexShard without acquiring the metadata lock 
first. This causes a recovery failed because the IndexWriter lock can be
still held by method snapshotStoreMetadata. This commit makes sure to
create a CheckIndex under the metadata lock.

Closes #24481
Closes #27731
Relates #24787
This commit is contained in:
Nhat Nguyen 2017-12-20 11:26:06 -05:00 committed by GitHub
parent 4cbbe3ed93
commit 54b6885844
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 123 additions and 38 deletions

View File

@ -1899,7 +1899,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
internalIndexingStats.noopUpdate(type);
}
private void checkIndex() throws IOException {
void checkIndex() throws IOException {
if (store.tryIncRef()) {
try {
doCheckIndex();
@ -1938,29 +1938,25 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
} else {
// full checkindex
try (CheckIndex checkIndex = new CheckIndex(store.directory())) {
checkIndex.setInfoStream(out);
CheckIndex.Status status = checkIndex.checkIndex();
out.flush();
if (!status.clean) {
if (state == IndexShardState.CLOSED) {
// ignore if closed....
return;
final CheckIndex.Status status = store.checkIndex(out);
out.flush();
if (!status.clean) {
if (state == IndexShardState.CLOSED) {
// ignore if closed....
return;
}
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
if ("fix".equals(checkIndexOnStartup)) {
if (logger.isDebugEnabled()) {
logger.debug("fixing index, writing new segments file ...");
}
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
if ("fix".equals(checkIndexOnStartup)) {
if (logger.isDebugEnabled()) {
logger.debug("fixing index, writing new segments file ...");
}
checkIndex.exorciseIndex(status);
if (logger.isDebugEnabled()) {
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
}
} else {
// only throw a failure if we are not going to fix the index
throw new IllegalStateException("index check failure but can't fix it");
store.exorciseIndex(status);
if (logger.isDebugEnabled()) {
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
}
} else {
// only throw a failure if we are not going to fix the index
throw new IllegalStateException("index check failure but can't fix it");
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
@ -86,6 +87,7 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.AccessDeniedException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@ -341,6 +343,33 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
/**
* Checks and returns the status of the existing index in this store.
*
* @param out where infoStream messages should go. See {@link CheckIndex#setInfoStream(PrintStream)}
*/
public CheckIndex.Status checkIndex(PrintStream out) throws IOException {
metadataLock.writeLock().lock();
try (CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.setInfoStream(out);
return checkIndex.checkIndex();
} finally {
metadataLock.writeLock().unlock();
}
}
/**
* Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}.
*/
public void exorciseIndex(CheckIndex.Status status) throws IOException {
metadataLock.writeLock().lock();
try (CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.exorciseIndex(status);
} finally {
metadataLock.writeLock().unlock();
}
}
public StoreStats stats() throws IOException {
ensureOpen();
return statsCache.getOrRefresh();

View File

@ -2449,6 +2449,71 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(newShard);
}
/**
* Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
* and checking index concurrently. This should always be possible without any exception.
*/
public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
final boolean isPrimary = randomBoolean();
IndexShard indexShard = newStartedShard(isPrimary);
final long numDocs = between(10, 100);
for (long i = 0; i < numDocs; i++) {
indexDoc(indexShard, "doc", Long.toString(i), "{\"foo\" : \"bar\"}");
if (randomBoolean()) {
indexShard.refresh("test");
}
}
indexShard.flush(new FlushRequest());
closeShards(indexShard);
final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
isPrimary ? RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
);
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
.build();
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer());
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
AtomicBoolean stop = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
Thread snapshotter = new Thread(() -> {
latch.countDown();
while (stop.get() == false) {
try {
Store.MetadataSnapshot readMeta = newShard.snapshotStoreMetadata();
assertThat(readMeta.getNumDocs(), equalTo(numDocs));
assertThat(storeFileMetaDatas.recoveryDiff(readMeta).different.size(), equalTo(0));
assertThat(storeFileMetaDatas.recoveryDiff(readMeta).missing.size(), equalTo(0));
assertThat(storeFileMetaDatas.recoveryDiff(readMeta).identical.size(), equalTo(storeFileMetaDatas.size()));
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
snapshotter.start();
if (isPrimary) {
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(),
getFakeDiscoNode(newShard.routingEntry().currentNodeId()), null));
} else {
newShard.markAsRecovering("peer", new RecoveryState(newShard.routingEntry(),
getFakeDiscoNode(newShard.routingEntry().currentNodeId()), getFakeDiscoNode(newShard.routingEntry().currentNodeId())));
}
int iters = iterations(10, 100);
latch.await();
for (int i = 0; i < iters; i++) {
newShard.checkIndex();
}
assertTrue(stop.compareAndSet(false, true));
snapshotter.join();
closeShards(newShard);
}
class Result {
private final int localCheckpoint;
private final int maxSeqNo;

View File

@ -204,16 +204,13 @@ public class CorruptedFileIT extends ESIntegTestCase {
if (!Lucene.indexExists(store.directory()) && indexShard.state() == IndexShardState.STARTED) {
return;
}
try (CheckIndex checkIndex = new CheckIndex(store.directory())) {
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name());
checkIndex.setInfoStream(out);
out.flush();
CheckIndex.Status status = checkIndex.checkIndex();
if (!status.clean) {
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
throw new IOException("index check failure");
}
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name());
CheckIndex.Status status = store.checkIndex(out);
out.flush();
if (!status.clean) {
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
throw new IOException("index check failure");
}
} catch (Exception e) {
exception.add(e);

View File

@ -1070,4 +1070,5 @@ public class StoreTests extends ESTestCase {
}
store.close();
}
}

View File

@ -119,17 +119,14 @@ public class MockFSDirectoryService extends FsDirectoryService {
if (!Lucene.indexExists(dir)) {
return;
}
try (CheckIndex checkIndex = new CheckIndex(dir)) {
try {
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name());
checkIndex.setInfoStream(out);
CheckIndex.Status status = store.checkIndex(out);
out.flush();
CheckIndex.Status status = checkIndex.checkIndex();
if (!status.clean) {
ESTestCase.checkIndexFailed = true;
logger.warn("check index [failure] index files={}\n{}",
Arrays.toString(dir.listAll()),
os.bytes().utf8ToString());
logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString());
throw new IOException("index check failure");
} else {
if (logger.isDebugEnabled()) {