From 0f1c779d2c2aa3ddc482cb813f82f8e230729822 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 19 Feb 2015 11:46:10 +0100 Subject: [PATCH] Engine: close snapshots before recovery counter #9760 was a fix for translog leaking due to measing a delete flag. This is not needed here as we have a better solution to not loose the flag. This commit takes the changes from 1x in order to keep the code base similar and enjoy the extra tests. Closes #9760 --- .../index/engine/InternalEngine.java | 11 ++- .../index/translog/Translog.java | 4 +- .../index/translog/fs/FsTranslog.java | 12 ++-- .../index/engine/InternalEngineTests.java | 67 ++++++++++--------- 4 files changed, 48 insertions(+), 46 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ce35298d69a..d05ed4512d5 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -28,7 +28,6 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.routing.DjbHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; @@ -778,7 +777,7 @@ public class InternalEngine extends Engine { recoveryHandler.phase1(phase1Snapshot); } catch (Throwable e) { maybeFailEngine("recovery phase 1", e); - Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot); + Releasables.closeWhileHandlingException(phase1Snapshot, onGoingRecoveries); throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e)); } @@ -787,14 +786,14 @@ public class InternalEngine extends Engine { phase2Snapshot = translog.snapshot(); } catch (Throwable e) { maybeFailEngine("snapshot recovery", e); - Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot); + Releasables.closeWhileHandlingException(phase1Snapshot, onGoingRecoveries); throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e)); } try { recoveryHandler.phase2(phase2Snapshot); } catch (Throwable e) { maybeFailEngine("recovery phase 2", e); - Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot); + Releasables.closeWhileHandlingException(phase1Snapshot, phase2Snapshot, onGoingRecoveries); throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e)); } @@ -810,8 +809,8 @@ public class InternalEngine extends Engine { maybeFailEngine("recovery phase 3", e); throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e)); } finally { - Releasables.close(success, onGoingRecoveries, writeLock, phase1Snapshot, - phase2Snapshot, phase3Snapshot); // hmm why can't we use try-with here? + Releasables.close(success, phase1Snapshot, phase2Snapshot, phase3Snapshot, + onGoingRecoveries, writeLock); // hmm why can't we use try-with here? } } diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 268434a4479..951584e52b3 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -121,8 +121,10 @@ public interface Translog extends IndexShardComponent, Closeable, Accountable { /** * Clears unreferenced transaction logs. + * + * @return the number of clean up files */ - void clearUnreferenced(); + int clearUnreferenced(); /** * Sync's the translog. diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 1a64c3a3d70..5346994bbdd 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -43,12 +43,7 @@ import org.elasticsearch.index.translog.TranslogStreams; import java.io.IOException; import java.nio.channels.ClosedChannelException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.OpenOption; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; +import java.nio.file.*; import java.util.Collection; import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; @@ -200,8 +195,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } @Override - public void clearUnreferenced() { + public int clearUnreferenced() { rwl.writeLock().lock(); + int deleted = 0; try { for (Path location : locations) { try (DirectoryStream stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) { @@ -210,6 +206,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog try { logger.trace("delete unreferenced translog file: " + file); Files.delete(file); + deleted++; } catch (Exception ex) { logger.debug("failed to delete " + file, ex); } @@ -222,6 +219,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } finally { rwl.writeLock().unlock(); } + return deleted; } @Override diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 6d6d6ab43dd..6b96d058f11 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.engine; -import com.google.common.base.Predicate; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -35,7 +34,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; -import org.elasticsearch.ExceptionsHelper; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; @@ -57,7 +56,6 @@ import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; @@ -84,19 +82,18 @@ import java.io.IOException; import java.nio.file.Paths; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; import static com.carrotsearch.randomizedtesting.RandomizedTest.*; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy; import static org.elasticsearch.test.ElasticsearchTestCase.terminate; import static org.hamcrest.Matchers.*; public class InternalEngineTests extends ElasticsearchLuceneTestCase { + public static final String TRANSLOG_PRIMARY_LOCATION = "work/fs-translog/primary"; + public static final String TRANSLOG_REPLICA_LOCATION = "work/fs-translog/replica"; protected final ShardId shardId = new ShardId(new Index("index"), 1); protected ThreadPool threadPool; @@ -107,8 +104,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { protected Translog translog; protected Translog replicaTranslog; - protected Engine engine; - protected Engine replicaEngine; + protected InternalEngine engine; + protected InternalEngine replicaEngine; private Settings defaultSettings; private int indexConcurrency; @@ -118,6 +115,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { @Before public void setUp() throws Exception { super.setUp(); + // clean up shared directory + IOUtils.rm(Paths.get(TRANSLOG_PRIMARY_LOCATION)); + IOUtils.rm(Paths.get(TRANSLOG_REPLICA_LOCATION)); CodecService codecService = new CodecService(shardId.index()); indexConcurrency = randomIntBetween(1, 20); String name = Codec.getDefault().getName(); @@ -142,21 +142,21 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { Lucene.cleanLuceneIndex(storeReplica.directory()); translog = createTranslog(); engine = createEngine(store, translog); - LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)engine).getCurrentIndexWriterConfig(); + LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); - assertEquals(((InternalEngine)engine).config().getCodec().getName(), codecService.codec(codecName).getName()); + assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName()); assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); if (randomBoolean()) { - ((InternalEngine)engine).config().setEnableGcDeletes(false); + engine.config().setEnableGcDeletes(false); } replicaTranslog = createTranslogReplica(); replicaEngine = createEngine(storeReplica, replicaTranslog); - currentIndexWriterConfig = ((InternalEngine)replicaEngine).getCurrentIndexWriterConfig(); + currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig(); - assertEquals(((InternalEngine)replicaEngine).config().getCodec().getName(), codecService.codec(codecName).getName()); + assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); if (randomBoolean()) { - ((InternalEngine)engine).config().setEnableGcDeletes(false); + engine.config().setEnableGcDeletes(false); } } @@ -193,14 +193,14 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { } protected Store createStore() throws IOException { - return createStore(newDirectory()); + return createStore(newDirectory()); } protected Store createStore(final Directory directory) throws IOException { final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) { @Override public Directory[] build() throws IOException { - return new Directory[]{ directory }; + return new Directory[]{directory}; } @Override @@ -212,11 +212,11 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { } protected Translog createTranslog() throws IOException { - return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get("work/fs-translog/primary")); + return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get(TRANSLOG_PRIMARY_LOCATION)); } protected Translog createTranslogReplica() throws IOException { - return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get("work/fs-translog/replica")); + return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get(TRANSLOG_REPLICA_LOCATION)); } protected IndexDeletionPolicy createIndexDeletionPolicy() { @@ -248,7 +248,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { IndexWriterConfig iwc = newIndexWriterConfig(); EngineConfig config = new EngineConfig(shardId, false/*per default optimization for auto generated ids is disabled*/, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService , null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider, - iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() { + iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() { @Override public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test @@ -306,7 +306,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); - ((InternalEngine)engine).config().setCompoundOnFlush(false); + ((InternalEngine) engine).config().setCompoundOnFlush(false); ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, false); engine.create(new Engine.Create(null, newUid("3"), doc3)); @@ -354,7 +354,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); assertThat(segments.get(1).isCompound(), equalTo(false)); - ((InternalEngine)engine).config().setCompoundOnFlush(true); + ((InternalEngine) engine).config().setCompoundOnFlush(true); ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, false); engine.create(new Engine.Create(null, newUid("4"), doc4)); engine.refresh("test"); @@ -381,11 +381,11 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { assertThat(segments.get(2).getDeletedDocs(), equalTo(0)); assertThat(segments.get(2).isCompound(), equalTo(true)); } - + public void testVerboseSegments() throws Exception { List segments = engine.segments(true); assertThat(segments.isEmpty(), equalTo(true)); - + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false); engine.create(new Engine.Create(null, newUid("1"), doc)); engine.refresh("test"); @@ -393,7 +393,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { segments = engine.segments(true); assertThat(segments.size(), equalTo(1)); assertThat(segments.get(0).ramTree, notNullValue()); - + ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false); engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.refresh("test"); @@ -406,7 +406,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { assertThat(segments.get(0).ramTree, notNullValue()); assertThat(segments.get(1).ramTree, notNullValue()); assertThat(segments.get(2).ramTree, notNullValue()); - + } @@ -416,7 +416,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)); IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); final Engine engine = createEngine(indexSettingsService, store, createTranslog(), mergeSchedulerProvider); - + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); @@ -438,7 +438,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { for (Segment segment : segments) { assertThat(segment.getMergeId(), nullValue()); } - + index = new Engine.Index(null, newUid("4"), doc); engine.index(index); engine.flush(); @@ -763,8 +763,13 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { }); // post recovery should flush the translog MatcherAssert.assertThat(translog.snapshot(), TranslogSizeMatcher.translogSize(0)); + // and we should not leak files + assertThat("there are unreferenced translog files left", translog.clearUnreferenced(), equalTo(0)); engine.flush(); + + assertThat("there are unreferenced translog files left, post flush", translog.clearUnreferenced(), equalTo(0)); + engine.close(); } @@ -1351,11 +1356,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { Store store = createStore(); - - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); Engine engine = new InternalEngine(config(indexSettingsService, store, createTranslog(), createMergeScheduler(indexSettingsService))); - ((InternalEngine)engine).config().setEnableGcDeletes(false); + engine.config().setEnableGcDeletes(false); // Add document Document document = testDocument(); @@ -1441,7 +1444,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { wrapper.setRandomIOExceptionRateOnOpen(randomDouble()); try (Store store = createStore(wrapper)) { int refCount = store.refCount(); - assertTrue("refCount: "+ store.refCount(), store.refCount() > 0); + assertTrue("refCount: " + store.refCount(), store.refCount() > 0); Translog translog = createTranslog(); InternalEngine holder; try { @@ -1451,7 +1454,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { continue; } holder.config().setFailEngineOnCorruption(true); - assertEquals(store.refCount(), refCount+1); + assertEquals(store.refCount(), refCount + 1); final int numStarts = scaledRandomIntBetween(1, 5); for (int j = 0; j < numStarts; j++) { try {