Avoid a race condition while waiting for pre warm to finish on SearchableSnapshotDirectoryTests (#60906)
Backport of #60885. Closes #60813
This commit is contained in:
parent
f30f1f04e2
commit
2a4fd8329b
|
@ -399,7 +399,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
|||
|
||||
private void prewarmCache() {
|
||||
if (prewarmCache == false) {
|
||||
recoveryState.preWarmFinished();
|
||||
recoveryState.setPreWarmComplete();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -407,7 +407,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
|||
final Executor executor = prewarmExecutor();
|
||||
|
||||
final GroupedActionListener<Void> completionListener = new GroupedActionListener<>(
|
||||
ActionListener.wrap(voids -> recoveryState.preWarmFinished(), e -> {}), // Ignore pre-warm errors
|
||||
ActionListener.wrap(voids -> recoveryState.setPreWarmComplete(), e -> {}), // Ignore pre-warm errors
|
||||
snapshot().totalFileCount()
|
||||
);
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ import java.util.HashSet;
|
|||
import java.util.Set;
|
||||
|
||||
public final class SearchableSnapshotRecoveryState extends RecoveryState {
|
||||
private boolean preWarmFinished;
|
||||
private boolean preWarmComplete;
|
||||
|
||||
public SearchableSnapshotRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) {
|
||||
super(shardRouting, targetNode, sourceNode, new Index());
|
||||
|
@ -29,7 +29,7 @@ public final class SearchableSnapshotRecoveryState extends RecoveryState {
|
|||
|
||||
// Pre-warm is still running, hold the state transition
|
||||
// until the pre-warm process finishes
|
||||
if (preWarmFinished == false && stage == Stage.DONE) {
|
||||
if (preWarmComplete == false && stage == Stage.DONE) {
|
||||
validateCurrentStage(Stage.FINALIZE);
|
||||
return this;
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ public final class SearchableSnapshotRecoveryState extends RecoveryState {
|
|||
return super.setStage(stage);
|
||||
}
|
||||
|
||||
public synchronized void preWarmFinished() {
|
||||
public synchronized void setPreWarmComplete() {
|
||||
// For small shards it's possible that the
|
||||
// cache is pre-warmed before the stage has transitioned
|
||||
// to FINALIZE, so the transition to the final state is delayed until
|
||||
|
@ -48,7 +48,11 @@ public final class SearchableSnapshotRecoveryState extends RecoveryState {
|
|||
|
||||
SearchableSnapshotRecoveryState.Index index = (Index) getIndex();
|
||||
index.stopTimer();
|
||||
preWarmFinished = true;
|
||||
preWarmComplete = true;
|
||||
}
|
||||
|
||||
public synchronized boolean isPreWarmComplete() {
|
||||
return preWarmComplete;
|
||||
}
|
||||
|
||||
public synchronized void ignoreFile(String name) {
|
||||
|
|
|
@ -117,8 +117,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
|
@ -778,9 +776,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
|
|||
try {
|
||||
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
|
||||
testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
|
||||
assertExecutorIsIdle(snapshotDirectory.prewarmExecutor());
|
||||
|
||||
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.FINALIZE));
|
||||
assertBusy(() -> assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.FINALIZE)));
|
||||
// All pre-warm tasks failed
|
||||
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
|
||||
});
|
||||
|
@ -824,7 +820,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
|
|||
.putList(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.getKey(), fileTypesExcludedFromCaching)
|
||||
.build();
|
||||
testDirectories(true, true, recoveryState, settings, (directory, snapshotDirectory) -> {
|
||||
assertExecutorIsIdle(snapshotDirectory.prewarmExecutor());
|
||||
assertBusy(() -> assertTrue(recoveryState.isPreWarmComplete()));
|
||||
|
||||
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
|
||||
for (RecoveryState.FileDetail fileDetail : recoveryState.getIndex().fileDetails()) {
|
||||
|
@ -838,7 +834,7 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
|
|||
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
|
||||
|
||||
testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
|
||||
assertExecutorIsIdle(snapshotDirectory.prewarmExecutor());
|
||||
assertBusy(() -> assertTrue(recoveryState.isPreWarmComplete()));
|
||||
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
|
||||
|
||||
List<BlobStoreIndexShardSnapshot.FileInfo> filesWithEqualContent = snapshotDirectory.snapshot()
|
||||
|
@ -890,14 +886,6 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
|
|||
assertThat("Sum of file sizes mismatch, got: " + files, files.values().stream().mapToLong(Long::longValue).sum(), matchSizeOfFiles);
|
||||
}
|
||||
|
||||
private void assertExecutorIsIdle(Executor executor) throws Exception {
|
||||
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
|
||||
assertBusy(() -> {
|
||||
assertThat(threadPoolExecutor.getActiveCount(), equalTo(0));
|
||||
assertThat(threadPoolExecutor.getQueue().size(), equalTo(0));
|
||||
});
|
||||
}
|
||||
|
||||
private static IndexSettings newIndexSettings() {
|
||||
return IndexSettingsModule.newIndexSettings(
|
||||
"_index",
|
||||
|
|
|
@ -41,7 +41,7 @@ public class SearchableSnapshotsRecoveryStateTests extends ESTestCase {
|
|||
public void testStageTransitionsToDoneOncePreWarmingHasFinished() {
|
||||
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
|
||||
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.INIT));
|
||||
recoveryState.preWarmFinished();
|
||||
recoveryState.setPreWarmComplete();
|
||||
|
||||
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.INIT));
|
||||
|
||||
|
@ -59,7 +59,7 @@ public class SearchableSnapshotsRecoveryStateTests extends ESTestCase {
|
|||
recoveryState.getIndex().setFileDetailsComplete();
|
||||
recoveryState.setStage(RecoveryState.Stage.FINALIZE);
|
||||
|
||||
recoveryState.preWarmFinished();
|
||||
recoveryState.setPreWarmComplete();
|
||||
|
||||
recoveryState.setStage(RecoveryState.Stage.DONE);
|
||||
|
||||
|
@ -75,7 +75,7 @@ public class SearchableSnapshotsRecoveryStateTests extends ESTestCase {
|
|||
recoveryState.getIndex().setFileDetailsComplete();
|
||||
recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE);
|
||||
|
||||
recoveryState.preWarmFinished();
|
||||
recoveryState.setPreWarmComplete();
|
||||
|
||||
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
|
||||
|
||||
|
@ -114,7 +114,7 @@ public class SearchableSnapshotsRecoveryStateTests extends ESTestCase {
|
|||
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
|
||||
assertThat(recoveryState.getIndex().stopTime(), equalTo(0L));
|
||||
|
||||
recoveryState.preWarmFinished();
|
||||
recoveryState.setPreWarmComplete();
|
||||
|
||||
assertThat(recoveryState.getIndex().stopTime(), greaterThan(0L));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue