Check if the index can be opened and is not corrupted on state listing

We fetch the state version to find the right shard to be started as
the primary. This can return a valid shard state even if the shard is
corrupted and can't even be opened. This commit adds best effort detection
for this scenario and returns an invalid version for the shard if it's corrupted

Closes #11226
This commit is contained in:
Simon Willnauer 2015-05-20 22:19:30 +02:00
parent ea3c5d5820
commit 4cb21d02a4
3 changed files with 67 additions and 3 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.gateway;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
@ -30,15 +29,15 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.shard.ShardStateMetaData;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -108,6 +107,11 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
logger.trace("{} loading local shard state info", shardId);
ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.availableShardPaths(request.shardId));
if (shardStateMetaData != null) {
final IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name()); // it's a mystery why this is sometimes null
if (metaData != null && canOpenIndex(request.getShardId(), metaData) == false) {
logger.trace("{} can't open index for shard", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), -1);
}
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
// is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index.
if (indexUUID.equals(shardStateMetaData.indexUUID) == false
@ -125,6 +129,18 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
}
}
private boolean canOpenIndex(ShardId shardId, IndexMetaData metaData) throws IOException {
// try and see if we an list unallocated
if (metaData == null) {
return false;
}
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, metaData.settings());
if (shardPath == null) {
return false;
}
return Store.canOpenIndex(logger, shardPath.resolveIndex());
}
@Override
protected boolean accumulateExceptions() {
return true;

View File

@ -382,6 +382,22 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return MetadataSnapshot.EMPTY;
}
/**
* Returns <code>true</code> iff the given location contains an index an the index
* can be successfully opened. This includes reading the segment infos and possible
* corruption markers.
*/
public static boolean canOpenIndex(ESLogger logger, Path indexLocation) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, new ShardId("", 1));
Lucene.readSegmentInfos(dir);
return true;
} catch (Exception ex) {
logger.trace("Can't open index for path [{}]", ex, indexLocation);
return false;
}
}
/**
* The returned IndexOutput might validate the files checksum if the file has been written with a newer lucene version
* and the metadata holds the necessary information to detect that it was been written by Lucene 4.8 or newer. If it has only

View File

@ -56,6 +56,7 @@ import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -1235,4 +1236,35 @@ public class StoreTest extends ElasticsearchTestCase {
assertTrue(store.isMarkedCorrupted());
store.close();
}
public void testCanOpenIndex() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
Path tempDir = createTempDir();
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
assertFalse(Store.canOpenIndex(logger, tempDir));
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
writer.close();
assertTrue(Store.canOpenIndex(logger, tempDir));
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new DirectoryService(shardId, ImmutableSettings.EMPTY) {
@Override
public long throttleTimeInNanos() {
return 0;
}
@Override
public Directory newDirectory() throws IOException {
return dir;
}
};
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
assertFalse(Store.canOpenIndex(logger, tempDir));
store.close();
}
}