Do not create engine under IndexShard#mutex (#45263)

Today we create new engines under IndexShard#mutex. This is not ideal
because it can block the cluster state updates which also execute under
the same mutex. We can avoid this problem by creating new engines under
a separate mutex.

Closes #43699
This commit is contained in:
Nhat Nguyen 2019-08-26 17:16:29 -04:00
parent 1b4d5b37c5
commit f2e8b17696
4 changed files with 167 additions and 65 deletions

View File

@ -213,8 +213,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
// ensure happens-before relation between addRefreshListener() and postRecovery()
private final Object postRecoveryMutex = new Object();
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory;
private final IndexingOperationListener indexingOperationListeners;
@ -1192,20 +1195,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present.
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex";
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
Engine engine;
synchronized (mutex) {
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
// That can be done out of mutex, since the engine can be closed half way.
engine = getEngineOrNull();
if (engine == null) {
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
synchronized (mutex) {
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
}
if (indexCommit == null) {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireLastIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
@ -1334,23 +1340,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
public IndexShard postRecovery(String reason)
throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (postRecoveryMutex) {
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener.
// responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener
// and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard).
getEngine().refresh("post_recovery");
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
}
}
return this;
}
/**
@ -1583,6 +1590,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
assert Thread.holdsLock(mutex) == false : "opening engine under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
@ -1595,16 +1603,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
synchronized (mutex) {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
synchronized (engineMutex) {
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
synchronized (mutex) {
try {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
} finally {
if (currentEngineReference.get() != newEngine) {
newEngine.close();
}
}
}
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
@ -1627,6 +1643,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
}
@ -2673,7 +2690,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
Sort indexSort = indexSortSupplier.get();
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
if (this.warmer != null) {
this.warmer.warm(reader);
}
};
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService != null ? mapperService.indexAnalyzer() : null,
@ -3235,10 +3258,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (isReadAllowed()) {
readAllowed = true;
} else {
// check again under mutex. this is important to create a happens before relationship
// check again under postRecoveryMutex. this is important to create a happens before relationship
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
// to a listener before a refresh actually happened that contained that operation.
synchronized (mutex) {
synchronized (postRecoveryMutex) {
readAllowed = isReadAllowed();
}
}
@ -3303,6 +3326,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
sync(); // persist the global checkpoint to disk
@ -3314,15 +3338,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
SetOnce<Engine> newEngineReference = new SetOnce<>();
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
synchronized (mutex) {
verifyNotClosed();
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
synchronized (engineMutex) {
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (mutex) {
synchronized (engineMutex) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
}
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
return newEngineReference.get().acquireLastIndexCommit(false);
}
@ -3330,7 +3356,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
@Override
public IndexCommitRef acquireSafeIndexCommit() {
synchronized (mutex) {
synchronized (engineMutex) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
}
return newEngineReference.get().acquireSafeIndexCommit();
}
}
@ -3347,9 +3376,28 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
IOUtils.close(super::close, newEngine);
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
synchronized (mutex) {
try {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
} finally {
if (currentEngineReference.get() != readOnlyEngine) {
readOnlyEngine.close();
}
}
}
final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
synchronized (mutex) {
try {
verifyNotClosed();
newEngineReference.set(newReadWriteEngine);
onNewEngine(newReadWriteEngine);
} finally {
if (newEngineReference.get() != newReadWriteEngine) {
newReadWriteEngine.close(); // shard was closed
}
}
}
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {

View File

@ -79,10 +79,12 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.DeleteResult;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
@ -4138,4 +4140,39 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs));
closeShards(readonlyShard);
}
public void testCloseShardWhileEngineIsWarming() throws Exception {
CountDownLatch warmerStarted = new CountDownLatch(1);
CountDownLatch warmerBlocking = new CountDownLatch(1);
IndexShard shard = newShard(true, Settings.EMPTY, config -> {
Engine.Warmer warmer = reader -> {
try {
warmerStarted.countDown();
warmerBlocking.await();
config.getWarmer().warm(reader);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
};
EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
config.getIndexSettings(), warmer, config.getStore(), config.getMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
return new InternalEngine(configWithWarmer);
});
Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard)));
recoveryThread.start();
try {
warmerStarted.await();
shard.close("testing", false);
assertThat(shard.state, equalTo(IndexShardState.CLOSED));
} finally {
warmerBlocking.countDown();
}
recoveryThread.join();
shard.store().close();
}
}

View File

@ -1131,34 +1131,38 @@ public abstract class EngineTestCase extends ESTestCase {
}
public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException {
if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false || engine instanceof InternalEngine == false) {
return;
if (engine instanceof InternalEngine) {
try {
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
assertAtMostOneLuceneDocumentPerSequenceNumber(engine.config().getIndexSettings(), searcher.getDirectoryReader());
}
} catch (AlreadyClosedException ignored) {
// engine was closed
}
}
try {
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader());
Set<Long> seqNos = new HashSet<>();
for (LeafReaderContext leaf : reader.leaves()) {
NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
assertTrue(seqNoDocValues.advanceExact(docId));
long seqNo = seqNoDocValues.longValue();
assertThat(seqNo, greaterThanOrEqualTo(0L));
if (primaryTermDocValues.advanceExact(docId)) {
if (seqNos.add(seqNo) == false) {
final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
leaf.reader().document(docId, idFieldVisitor);
throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId());
}
}
}
public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexSettings indexSettings,
DirectoryReader reader) throws IOException {
Set<Long> seqNos = new HashSet<>();
final DirectoryReader wrappedReader = indexSettings.isSoftDeleteEnabled() ? Lucene.wrapAllDocsLive(reader) : reader;
for (LeafReaderContext leaf : wrappedReader.leaves()) {
NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
assertTrue(seqNoDocValues.advanceExact(docId));
long seqNo = seqNoDocValues.longValue();
assertThat(seqNo, greaterThanOrEqualTo(0L));
if (primaryTermDocValues.advanceExact(docId)) {
if (seqNos.add(seqNo) == false) {
final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
leaf.reader().document(docId, idFieldVisitor);
throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId());
}
}
}
} catch (AlreadyClosedException ignored) {
}
}

View File

@ -375,7 +375,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
indexSettings.getSettings(), "index");
mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_RECOVERY);
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
final Engine.Warmer warmer = reader -> {};
final Engine.Warmer warmer = createTestWarmer(indexSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings);
indexShard = new IndexShard(
@ -860,4 +860,17 @@ public abstract class IndexShardTestCase extends ESTestCase {
public static ReplicationTracker getReplicationTracker(IndexShard indexShard) {
return indexShard.getReplicationTracker();
}
public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) {
return reader -> {
// This isn't a warmer but sometimes verify the content in the reader
if (randomBoolean()) {
try {
EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(indexSettings, reader);
} catch (IOException e) {
throw new AssertionError(e);
}
}
};
}
}