diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 2d069acaab0..5cb6f8a1a47 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.*; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.search.*; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; @@ -65,6 +66,7 @@ import org.elasticsearch.indices.warmer.IndicesWarmer; import org.elasticsearch.indices.warmer.InternalIndicesWarmer; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; @@ -122,6 +124,7 @@ public class InternalEngine implements Engine { private volatile SearcherManager searcherManager; private volatile boolean closed = false; + private volatile Closeable storeReference; // flag indicating if a dirty operation has occurred since the last refresh private volatile boolean dirty = false; @@ -198,9 +201,6 @@ public class InternalEngine implements Engine { this.optimizeAutoGenerateId = optimizeAutoGenerateId; this.failEngineOnCorruption = failEngineOnCorruption; this.failedEngineListener = failedEngineListener; - // will be decremented in close() - store.incRef(); - throttle = new IndexThrottle(); } @@ -236,81 +236,91 @@ public class InternalEngine implements Engine { @Override public void addFailedEngineListener(FailedEngineListener listener) { - throw new UnsupportedOperationException("addFailedEngineListener is not supported by InternalEngineImpl. Use InternalEngine."); + throw new UnsupportedOperationException("addFailedEngineListener is not supported by InternalEngine. Use InternalEngineHolder."); } @Override public void start() throws EngineException { store.incRef(); + /* + * This might look weird but it's in-fact needed since if we close + * the engine due to a corruption on IW startup the reference is decremented in the close + * method and this must not happen more than once + */ + final Closeable storeRef = new Closeable() { + private final AtomicBoolean closed = new AtomicBoolean(false); + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + store.decRef(); + } + } + }; + final List closeOnFailure = new ArrayList<>(Arrays.asList(storeRef)); try (InternalLock _ = writeLock.acquire()) { - + IndexWriter indexWriter = this.indexWriter; if (indexWriter != null) { throw new EngineAlreadyStartedException(shardId); } if (closed) { throw new EngineClosedException(shardId); } + storeReference = storeRef; if (logger.isDebugEnabled()) { logger.debug("starting engine"); } try { - this.indexWriter = createWriter(); + indexWriter = createWriter(); + closeOnFailure.add(indexWriter); } catch (IOException e) { maybeFailEngine(e, "start"); - if (this.indexWriter != null) { - try { - IndexWriter pending = indexWriter; - indexWriter = null; - pending.rollback(); - } catch (IOException e1) { - e.addSuppressed(e1); - } - } throw new EngineCreationFailureException(shardId, "failed to create engine", e); } - try { // commit on a just opened writer will commit even if there are no changes done to it // we rely on that for the commit data translog id key - translogIdGenerator.set(Math.max(0, translog.findLargestPresentTranslogId())); - translogIdGenerator.incrementAndGet(); + final long translogId = Math.max(0, translog.findLargestPresentTranslogId()) + 1; boolean mustCommitTranslogId = true; if (Lucene.indexExists(store.directory())) { final Map commitUserData = Lucene.readSegmentInfos(store.directory()).getUserData(); mustCommitTranslogId = !commitUserData.containsKey(Translog.TRANSLOG_ID_KEY); } if (mustCommitTranslogId) { // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it. - indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogIdGenerator.get()))); + indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); indexWriter.commit(); } - translog.newTranslog(translogIdGenerator.get()); - this.searcherManager = buildSearchManager(indexWriter); + + translog.newTranslog(translogId); + final SearcherManager searcherManager = buildSearchManager(indexWriter); + closeOnFailure.add(searcherManager); versionMap.setManager(searcherManager); - readLastCommittedSegmentsInfo(); + this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + this.searcherManager = searcherManager; + translogIdGenerator.set(translogId); + this.indexWriter = indexWriter; + closeOnFailure.clear(); // all is well } catch (IOException e) { maybeFailEngine(e, "start"); try { - indexWriter.rollback(); - } catch (IOException e1) { - // ignore - } finally { - IOUtils.closeWhileHandlingException(indexWriter); + if (indexWriter != null) { + indexWriter.rollback(); + } + } catch (IOException e1) { // iw is closed below + e.addSuppressed(e1); } throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e); } } finally { - store.decRef(); + if (closeOnFailure.isEmpty() == false) { // release everything we created on a failure + IOUtils.closeWhileHandlingException(closeOnFailure); + } } } @Override public void stop() throws EngineException { - throw new UnsupportedOperationException("stop() is not supported by InternalEngineImpl. Use InternalEngine."); - } - - private void readLastCommittedSegmentsInfo() throws IOException { - lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + throw new UnsupportedOperationException("stop() is not supported by InternalEngine. Use InternalEngineHolder."); } @Override @@ -969,7 +979,7 @@ public class InternalEngine implements Engine { // reread the last committed segment infos try (InternalLock _ = readLock.acquire()) { ensureOpen(); - readLastCommittedSegmentsInfo(); + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); } catch (Throwable e) { if (!closed) { logger.warn("failed to read latest segment infos on flush", e); @@ -1333,8 +1343,8 @@ public class InternalEngine implements Engine { } catch (Throwable e) { logger.warn("failed to rollback writer on close", e); } finally { - store.decRef(); indexWriter = null; + IOUtils.closeWhileHandlingException(storeReference); } } } diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java index 73faaa4eeb4..c3c5f95a1ad 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java @@ -144,7 +144,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements this.settingsListener = new ApplySettings(); this.indexSettingsService.addListener(this.settingsListener); - + store.incRef(); } @Override @@ -193,10 +193,15 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements if (currentEngine != null) { throw new EngineAlreadyStartedException(shardId); } - InternalEngine newEngine = createEngineImpl(); - newEngine.start(); - boolean success = this.currentEngine.compareAndSet(null, newEngine); - assert success : "engine changes should be done under a synchronize"; + InternalEngine newEngine = createEngine(); + store.incRef(); + try { + newEngine.start(); + boolean success = this.currentEngine.compareAndSet(null, newEngine); + assert success : "engine changes should be done under a synchronize"; + } finally { + store.decRef(); + } } @Override @@ -209,17 +214,24 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements @Override public synchronized void close() throws ElasticsearchException { - closed = true; - InternalEngine currentEngine = this.currentEngine.getAndSet(null); - if (currentEngine != null) { - currentEngine.close(); + if (closed) { + return; + } + closed = true; + try { + InternalEngine currentEngine = this.currentEngine.getAndSet(null); + if (currentEngine != null) { + currentEngine.close(); + } + mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); + mergeScheduler.removeListener(mergeSchedulerListener); + indexSettingsService.removeListener(settingsListener); + } finally { + store.decRef(); } - mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); - mergeScheduler.removeListener(mergeSchedulerListener); - indexSettingsService.removeListener(settingsListener); } - protected InternalEngine createEngineImpl() { + protected InternalEngine createEngine() { return new InternalEngine(shardId, logger, codecService, threadPool, indexingService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService, enableGcDeletes, gcDeletesInMillis, @@ -331,12 +343,16 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements // called by the current engine @Override public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) { - for (FailedEngineListener listener : failedEngineListeners) { - try { - listener.onFailedEngine(shardId, reason, failure); - } catch (Exception e) { - logger.warn("exception while notifying engine failure", e); + try { + for (FailedEngineListener listener : failedEngineListeners) { + try { + listener.onFailedEngine(shardId, reason, failure); + } catch (Exception e) { + logger.warn("exception while notifying engine failure", e); + } } + } finally { + close(); // we need to close ourself - we failed all bets are off } } diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index c3fcbaf9dc9..48421ad4d6b 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -603,6 +603,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } + /** + * Returns the current reference count. + */ + public int refCount() { + return refCounter.refCount(); + } + private static final class StoreDirectory extends FilterDirectory { private final ESLogger deletesLogger; diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 8df812571f4..3f603652473 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.engine.internal; +import com.carrotsearch.randomizedtesting.annotations.Seed; import com.google.common.base.Predicate; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; @@ -33,8 +34,7 @@ import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.RAMDirectory; -import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -74,7 +74,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogSizeMatcher; import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.test.DummyShardLock; -import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import org.junit.After; @@ -89,15 +89,20 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy; +import static org.elasticsearch.test.ElasticsearchTestCase.terminate; import static org.hamcrest.Matchers.*; /** * */ -public class InternalEngineTests extends ElasticsearchTestCase { +public class InternalEngineTests extends ElasticsearchLuceneTestCase { protected final ShardId shardId = new ShardId(new Index("index"), 1); @@ -119,14 +124,14 @@ public class InternalEngineTests extends ElasticsearchTestCase { public void setUp() throws Exception { super.setUp(); defaultSettings = ImmutableSettings.builder() - .put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean()) + .put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, randomBoolean()) .put(InternalEngineHolder.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, randomBoolean()) .build(); // TODO randomize more settings threadPool = new ThreadPool(getClass().getName()); store = createStore(); store.deleteContent(); - storeReplica = createStoreReplica(); + storeReplica = createStore(); storeReplica.deleteContent(); engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); engine = createEngine(engineSettingsService, store, createTranslog()); @@ -181,26 +186,14 @@ public class InternalEngineTests extends ElasticsearchTestCase { } protected Store createStore() throws IOException { - final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) { - @Override - public Directory[] build() throws IOException { - return new Directory[]{new RAMDirectory()}; - } - - @Override - public long throttleTimeInNanos() { - return 0; - } - }; - return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); + return createStore(newDirectory()); } - protected Store createStoreReplica() throws IOException { - + protected Store createStore(final Directory directory) throws IOException { final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) { @Override public Directory[] build() throws IOException { - return new Directory[]{new RAMDirectory()}; + return new Directory[]{ directory }; } @Override @@ -1423,6 +1416,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { // Get should not find the document getResult = engine.get(new Engine.Get(true, newUid("2"))); assertThat(getResult.exists(), equalTo(false)); + engine.close(); } protected Term newUid(String id) { @@ -1435,6 +1429,61 @@ public class InternalEngineTests extends ElasticsearchTestCase { ShardId shardId = ShardUtils.extractShardId(test.reader()); assertNotNull(shardId); assertEquals(shardId, engine.shardId()); - }; + } + } + + /** + * Random test that throws random exception and ensures all references are + * counted down / released and resources are closed. + */ + @Test + public void testFailStart() throws IOException { + // this test fails if any reader, searcher or directory is not closed - MDW FTW + final int iters = scaledRandomIntBetween(10, 100); + for (int i = 0; i < iters; i++) { + MockDirectoryWrapper wrapper = newMockDirectory(); + wrapper.setFailOnOpenInput(randomBoolean()); + wrapper.setAllowRandomFileNotFoundException(randomBoolean()); + wrapper.setRandomIOExceptionRate(randomDouble()); + wrapper.setRandomIOExceptionRateOnOpen(randomDouble()); + try (Store store = createStore(wrapper)) { + int refCount = store.refCount(); + assertTrue("refCount: "+ store.refCount(), store.refCount() > 0); + Translog translog = createTranslog(); + Settings build = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), build); + Engine holder = createEngine(indexSettingsService, store, translog); + indexSettingsService.refreshSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true).build()); + + assertEquals(store.refCount(), refCount+1); + final int numStarts = scaledRandomIntBetween(1, 5); + for (int j = 0; j < numStarts; j++) { + try { + holder.start(); + assertEquals(store.refCount(), refCount + 2); + break; + } catch (EngineCreationFailureException ex) { + // all is fine + if (ex.getCause() instanceof CorruptIndexException) { + assertEquals(store.refCount(), refCount); + try { + holder.start(); + fail("Engine must have failed on corrupt index"); + } catch (EngineClosedException e) { + // good! + } + break; // failed engine can't start again + } + assertEquals(store.refCount(), refCount + 1); + } + } + translog.close(); + holder.close(); + assertEquals(store.refCount(), refCount); + } + } } } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchLuceneTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchLuceneTestCase.java index 46ad4d9a674..5d0255cb77f 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchLuceneTestCase.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchLuceneTestCase.java @@ -36,7 +36,8 @@ import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; ReproduceInfoPrinter.class }) @ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class}) -@ThreadLeakScope(Scope.NONE) +@ThreadLeakScope(Scope.SUITE) +@ThreadLeakLingering(linger = 5000) // 5 sec lingering @TimeoutSuite(millis = TimeUnits.HOUR) @SuppressCodecs("Lucene3x") @LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose") diff --git a/src/test/java/org/elasticsearch/test/engine/MockInternalEngineHolder.java b/src/test/java/org/elasticsearch/test/engine/MockInternalEngineHolder.java index fb8b3ca6895..9da2f57466f 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockInternalEngineHolder.java +++ b/src/test/java/org/elasticsearch/test/engine/MockInternalEngineHolder.java @@ -89,7 +89,7 @@ public final class MockInternalEngineHolder extends InternalEngineHolder impleme } @Override - protected InternalEngine createEngineImpl() { + protected InternalEngine createEngine() { return new MockInternalEngine(mockContext, shardId, logger, codecService, threadPool, indexingService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService, enableGcDeletes, gcDeletesInMillis,