If closing a shard while resetting engine, IndexEventListener.afterIndexShardClosed would be called while there is still an active IndexWriter on the shard. For integration tests, this leads to an exception during check index called from MockFSIndexStore .Listener. Fixed. Relates to #38561
This commit is contained in:
parent
990be1f806
commit
c6abe74dd6
|
@ -32,6 +32,7 @@ import org.apache.lucene.search.ReferenceManager;
|
|||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
|
@ -3067,42 +3068,61 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final TranslogStats translogStats = translogStats();
|
||||
// flush to make sure the latest commit, which will be opened by the read-only engine, includes all operations.
|
||||
flush(new FlushRequest().waitIfOngoing(true));
|
||||
synchronized (mutex) {
|
||||
verifyNotClosed();
|
||||
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
|
||||
final Engine readOnlyEngine = new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity());
|
||||
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
|
||||
}
|
||||
|
||||
Engine newEngine = null;
|
||||
try {
|
||||
SetOnce<Engine> newEngineReference = new SetOnce<>();
|
||||
final long globalCheckpoint = getGlobalCheckpoint();
|
||||
synchronized (mutex) {
|
||||
assert currentEngineReference.get() instanceof ReadOnlyEngine : "another write engine is running";
|
||||
verifyNotClosed();
|
||||
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
|
||||
newEngine = engineFactory.newReadWriteEngine(newEngineConfig());
|
||||
onNewEngine(newEngine);
|
||||
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
|
||||
// acquireXXXCommit and close works.
|
||||
final Engine readOnlyEngine =
|
||||
new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity()) {
|
||||
@Override
|
||||
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
|
||||
synchronized (mutex) {
|
||||
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
|
||||
return newEngineReference.get().acquireLastIndexCommit(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexCommitRef acquireSafeIndexCommit() {
|
||||
synchronized (mutex) {
|
||||
return newEngineReference.get().acquireSafeIndexCommit();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
assert Thread.holdsLock(mutex);
|
||||
|
||||
Engine newEngine = newEngineReference.get();
|
||||
if (newEngine == currentEngineReference.get()) {
|
||||
// we successfully installed the new engine so do not close it.
|
||||
newEngine = null;
|
||||
}
|
||||
IOUtils.close(super::close, newEngine);
|
||||
}
|
||||
};
|
||||
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
|
||||
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig()));
|
||||
onNewEngine(newEngineReference.get());
|
||||
}
|
||||
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
|
||||
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
|
||||
// TODO: add a dedicate recovery stats for the reset translog
|
||||
});
|
||||
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
|
||||
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
|
||||
synchronized (mutex) {
|
||||
verifyNotClosed();
|
||||
IOUtils.close(currentEngineReference.getAndSet(newEngine));
|
||||
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
|
||||
// We set active because we are now writing operations to the engine; this way,
|
||||
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
|
||||
active.set(true);
|
||||
newEngine = null;
|
||||
}
|
||||
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
|
||||
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
|
||||
onSettingsChanged();
|
||||
} finally {
|
||||
IOUtils.close(newEngine);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -125,6 +125,7 @@ import org.elasticsearch.test.CorruptionUtils;
|
|||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.FieldMaskingReader;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Assert;
|
||||
|
||||
|
@ -3697,6 +3698,122 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShard(shard, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. Closing a shard while engine is inside
|
||||
* resetEngineToGlobalCheckpoint can lead to check index failure in integration tests.
|
||||
*/
|
||||
public void testCloseShardWhileResettingEngine() throws Exception {
|
||||
CountDownLatch readyToCloseLatch = new CountDownLatch(1);
|
||||
CountDownLatch closeDoneLatch = new CountDownLatch(1);
|
||||
IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
|
||||
@Override
|
||||
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
|
||||
long recoverUpToSeqNo) throws IOException {
|
||||
readyToCloseLatch.countDown();
|
||||
try {
|
||||
closeDoneLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
|
||||
}
|
||||
});
|
||||
|
||||
Thread closeShardThread = new Thread(() -> {
|
||||
try {
|
||||
readyToCloseLatch.await();
|
||||
shard.close("testing", false);
|
||||
// in integration tests, this is done as a listener on IndexService.
|
||||
MockFSDirectoryService.checkIndex(logger, shard.store(), shard.shardId);
|
||||
} catch (InterruptedException | IOException e) {
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
closeDoneLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
closeShardThread.start();
|
||||
|
||||
final CountDownLatch engineResetLatch = new CountDownLatch(1);
|
||||
shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getGlobalCheckpoint(), 0L,
|
||||
ActionListener.wrap(r -> {
|
||||
try (Releasable dummy = r) {
|
||||
shard.resetEngineToGlobalCheckpoint();
|
||||
} finally {
|
||||
engineResetLatch.countDown();
|
||||
}
|
||||
}, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
|
||||
|
||||
engineResetLatch.await();
|
||||
|
||||
closeShardThread.join();
|
||||
|
||||
// close store.
|
||||
closeShard(shard, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. While engine is inside
|
||||
* resetEngineToGlobalCheckpoint snapshot metadata could fail
|
||||
*/
|
||||
public void testSnapshotWhileResettingEngine() throws Exception {
|
||||
CountDownLatch readyToSnapshotLatch = new CountDownLatch(1);
|
||||
CountDownLatch snapshotDoneLatch = new CountDownLatch(1);
|
||||
IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
|
||||
@Override
|
||||
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
|
||||
long recoverUpToSeqNo) throws IOException {
|
||||
InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
|
||||
readyToSnapshotLatch.countDown();
|
||||
try {
|
||||
snapshotDoneLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
return internalEngine;
|
||||
}
|
||||
});
|
||||
|
||||
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
|
||||
final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint());
|
||||
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
|
||||
|
||||
Thread snapshotThread = new Thread(() -> {
|
||||
try {
|
||||
readyToSnapshotLatch.await();
|
||||
shard.snapshotStoreMetadata();
|
||||
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(randomBoolean())) {
|
||||
shard.store().getMetadata(indexCommitRef.getIndexCommit());
|
||||
}
|
||||
try (Engine.IndexCommitRef indexCommitRef = shard.acquireSafeIndexCommit()) {
|
||||
shard.store().getMetadata(indexCommitRef.getIndexCommit());
|
||||
}
|
||||
} catch (InterruptedException | IOException e) {
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
snapshotDoneLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
snapshotThread.start();
|
||||
|
||||
final CountDownLatch engineResetLatch = new CountDownLatch(1);
|
||||
shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getGlobalCheckpoint(), 0L,
|
||||
ActionListener.wrap(r -> {
|
||||
try (Releasable dummy = r) {
|
||||
shard.resetEngineToGlobalCheckpoint();
|
||||
} finally {
|
||||
engineResetLatch.countDown();
|
||||
}
|
||||
}, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
|
||||
|
||||
engineResetLatch.await();
|
||||
|
||||
snapshotThread.join();
|
||||
|
||||
closeShard(shard, false);
|
||||
}
|
||||
|
||||
public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
|
||||
final IndexShard replica = newStartedShard(false);
|
||||
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));
|
||||
|
|
Loading…
Reference in New Issue