Make static Store access shard lock aware (#19416)

We currently have concurrency issue between the static methods on the Store class and store changes that are done via a valid open store. An example of this is the async shard fetch which can reach out to a node while a local shard copy is shutting down (the fetch does check if we have an open shard and tries to use that first, but if the shard is shutting down, it will not be available from IndexService).

Specifically, async shard fetching tries to read metadata from store, concurrently the shard that shuts down commits to lucene, changing the segments_N file. this causes a file not find exception on the shard fetching side. That one in turns makes the master think the shard is unusable. In tests this can cause the shard assignment to be delayed (up to 1m) which fails tests. See https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+java9-periodic/570 for details.

This is one of the things #18938 caused to bubble up.
This commit is contained in:
Boaz Leskes 2016-07-18 11:22:58 +02:00 committed by GitHub
parent dd95dc7a0f
commit 9ededa46bc
7 changed files with 73 additions and 49 deletions

View File

@ -633,6 +633,15 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
};
}
/**
* A functional interface that people can use to reference {@link #shardLock(ShardId, long)}
*/
@FunctionalInterface
public interface ShardLocker {
ShardLock lock(ShardId shardId, long lockTimeoutMS) throws IOException;
}
/**
* Returns all currently lock shards.
*

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.shard.ShardStateMetaData;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -65,18 +66,19 @@ public class TransportNodesListGatewayStartedShards extends
public static final String ACTION_NAME = "internal:gateway/local/started_shards";
private final NodeEnvironment nodeEnv;
private final IndicesService indicesService;
@Inject
public TransportNodesListGatewayStartedShards(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
NodeEnvironment env) {
NodeEnvironment env, IndicesService indicesService) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED,
NodeGatewayStartedShards.class);
this.nodeEnv = env;
this.indicesService = indicesService;
}
@Override
@ -127,21 +129,24 @@ public class TransportNodesListGatewayStartedShards extends
throw e;
}
ShardPath shardPath = null;
try {
IndexSettings indexSettings = new IndexSettings(metaData, settings);
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
if (indicesService.getShardOrNull(shardId) == null) {
// we don't have an open shard on the store, validate the files on disk are openable
ShardPath shardPath = null;
try {
IndexSettings indexSettings = new IndexSettings(metaData, settings);
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);
} catch (Exception exception) {
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId,
shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
String allocationId = shardStateMetaData.allocationId != null ?
shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
allocationId, shardStateMetaData.primary, exception);
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId, logger);
} catch (Exception exception) {
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId,
shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
String allocationId = shardStateMetaData.allocationId != null ?
shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
allocationId, shardStateMetaData.primary, exception);
}
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);

View File

@ -68,6 +68,7 @@ import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
@ -88,6 +89,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.CRC32;
@ -403,8 +405,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*
* @throws IOException if the index we try to read is corrupted
*/
public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, ESLogger logger) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker,
ESLogger logger) throws IOException {
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
return new MetadataSnapshot(null, dir, logger);
} catch (IndexNotFoundException ex) {
@ -420,9 +424,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* can be successfully opened. This includes reading the segment infos and possible
* corruption markers.
*/
public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId shardId) throws IOException {
public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker) throws IOException {
try {
tryOpenIndex(indexLocation, shardId, logger);
tryOpenIndex(indexLocation, shardId, shardLocker, logger);
} catch (Exception ex) {
logger.trace("Can't open index for path [{}]", ex, indexLocation);
return false;
@ -435,8 +439,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* segment infos and possible corruption markers. If the index can not
* be opened, an exception is thrown
*/
public static void tryOpenIndex(Path indexLocation, ShardId shardId, ESLogger logger) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, ESLogger logger) throws IOException {
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
logger.trace("{} loaded segment info [{}]", shardId, segInfo);

View File

@ -150,7 +150,12 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
if (shardPath == null) {
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
}
return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, logger));
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
// 1) a shard is being constructed, which means the master will not use a copy of this replica
// 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not
// reuse local resources.
return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId,
nodeEnv::shardLock, logger));
} finally {
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
if (exists) {

View File

@ -442,7 +442,6 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
*/
@TestLogging("_root:DEBUG,action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,"
+ "indices.recovery:TRACE,indices.cluster:TRACE")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/19416")
public void testAckedIndexing() throws Exception {
final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5;

View File

@ -18,27 +18,6 @@
*/
package org.elasticsearch.index.shard;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
@ -134,6 +113,27 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
@ -253,7 +253,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardPath shardPath = ShardPath.loadShardPath(logger, env, shard.shardId(), test.getIndexSettings());
assertNotNull(shardPath);
// but index can't be opened for a failed shard
assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId()), equalTo(false));
assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId(), env::shardLock),
equalTo(false));
}
ShardStateMetaData getShardStateMetadata(IndexShard shard) {

View File

@ -1000,14 +1000,14 @@ public class StoreTests extends ESTestCase {
IndexWriterConfig iwc = newIndexWriterConfig();
Path tempDir = createTempDir();
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
assertFalse(Store.canOpenIndex(logger, tempDir,shardId));
assertFalse(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
writer.close();
assertTrue(Store.canOpenIndex(logger, tempDir, shardId));
assertTrue(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
@Override
@ -1022,7 +1022,7 @@ public class StoreTests extends ESTestCase {
};
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
assertFalse(Store.canOpenIndex(logger, tempDir, shardId));
assertFalse(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
store.close();
}