Hide AlreadyClosedException on IndexCommit release (#57986)

Today `InternalEngine#releaseIndexCommit` fails with an
`AlreadyClosedException` if the engine is closed before the index commit is
released. This can happen if, for example, a node leaves and rejoins the
cluster and acquires an index commit for replica shard allocation concurrently
with shutting the shard down.

There's no need to fail the operation like this: if the engine is shut down
then we will clean up the unreferenced files when it's restarted (or if it's
allocated elsewhere) so we can suppress an `AlreadyClosedException` in this
case. This commit does so.

Fixes #57797
This commit is contained in:
David Turner 2020-06-11 15:39:35 +01:00
parent 9b52a250f8
commit f950c121bb
2 changed files with 20 additions and 9 deletions

View File

@ -2037,10 +2037,13 @@ public class InternalEngine extends Engine {
private void releaseIndexCommit(IndexCommit snapshot) throws IOException { private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit. // Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) { if (combinedDeletionPolicy.releaseCommit(snapshot)) {
ensureOpen(); try {
// Here we don't have to trim translog because snapshotting an index commit // Here we don't have to trim translog because snapshotting an index commit
// does not lock translog or prevents unreferenced files from trimming. // does not lock translog or prevents unreferenced files from trimming.
indexWriter.deleteUnusedFiles(); indexWriter.deleteUnusedFiles();
} catch (AlreadyClosedException ignored) {
// That's ok, we'll clean up unused files the next time it's opened.
}
} }
} }

View File

@ -4985,6 +4985,8 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(engine, store); IOUtils.close(engine, store);
store = createStore(); store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Engine.IndexCommitRef snapshot;
final boolean closeSnapshotBeforeEngine = randomBoolean();
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
int numDocs = between(1, 20); int numDocs = between(1, 20);
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
@ -4995,7 +4997,6 @@ public class InternalEngineTests extends EngineTestCase {
} }
final boolean flushFirst = randomBoolean(); final boolean flushFirst = randomBoolean();
final boolean safeCommit = randomBoolean(); final boolean safeCommit = randomBoolean();
final Engine.IndexCommitRef snapshot;
if (safeCommit) { if (safeCommit) {
snapshot = engine.acquireSafeIndexCommit(); snapshot = engine.acquireSafeIndexCommit();
} else { } else {
@ -5012,6 +5013,8 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0)); assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0));
} }
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2)); assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
if (closeSnapshotBeforeEngine) {
snapshot.close(); snapshot.close();
// check it's clean up // check it's clean up
engine.flush(true, true); engine.flush(true, true);
@ -5019,6 +5022,11 @@ public class InternalEngineTests extends EngineTestCase {
} }
} }
if (closeSnapshotBeforeEngine == false) {
snapshot.close(); // shouldn't throw AlreadyClosedException
}
}
public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store); IOUtils.close(engine, store);
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test",