diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 9df03beb1ab..a19df39d420 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -42,6 +42,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; @@ -101,7 +102,7 @@ public abstract class Engine implements Closeable { protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock()); - protected volatile Exception failedEngine = null; + protected final SetOnce failedEngine = new SetOnce<>(); /* * on lastWriteNanos we use System.nanoTime() to initialize this since: * - we use the value for figuring out if the shard / engine is active so if we startup and no write has happened yet we still consider it active @@ -377,7 +378,7 @@ public abstract class Engine implements Closeable { protected void ensureOpen() { if (isClosed.get()) { - throw new EngineClosedException(shardId, failedEngine); + throw new EngineClosedException(shardId, failedEngine.get()); } } @@ -670,17 +671,19 @@ public abstract class Engine implements Closeable { if (failEngineLock.tryLock()) { store.incRef(); try { + if (failedEngine.get() != null) { + logger.warn((Supplier) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure); + return; + } + // this must happen before we close IW or Translog such that we can check this state to opt out of failing the engine + // again on any caught AlreadyClosedException + failedEngine.set((failure != null) ? failure : new IllegalStateException(reason)); try { // we just go and close this engine - no way to recover closeNoLock("engine failed on: [" + reason + "]"); } finally { - if (failedEngine != null) { - logger.debug((Supplier) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure); - return; - } logger.warn((Supplier) () -> new ParameterizedMessage("failed engine [{}]", reason), failure); // we must set a failure exception, generate one if not supplied - failedEngine = (failure != null) ? failure : new IllegalStateException(reason); // we first mark the store as corrupted before we notify any listeners // this must happen first otherwise we might try to reallocate so quickly // on the same node that we don't see the corrupted marker file when diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 9f9d2186a83..e598eecc07e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.similarities.Similarity; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; @@ -34,7 +35,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.shard.RefreshListeners; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; @@ -68,7 +68,7 @@ public final class EngineConfig { private final QueryCachingPolicy queryCachingPolicy; private final long maxUnsafeAutoIdTimestamp; @Nullable - private final RefreshListeners refreshListeners; + private final ReferenceManager.RefreshListener refreshListeners; /** * Index setting to change the low level lucene codec used for writing new segments. @@ -112,7 +112,7 @@ public final class EngineConfig { MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - TranslogConfig translogConfig, TimeValue flushMergesAfter, RefreshListeners refreshListeners, + TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners, long maxUnsafeAutoIdTimestamp) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); @@ -322,9 +322,9 @@ public final class EngineConfig { } /** - * {@linkplain RefreshListeners} instance to configure. + * {@linkplain ReferenceManager.RefreshListener} instance to configure. */ - public RefreshListeners getRefreshListeners() { + public ReferenceManager.RefreshListener getRefreshListeners() { return refreshListeners; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a9136fb0228..e10d364e745 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -82,9 +82,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -/** - * - */ public class InternalEngine extends Engine { /** * When we last pruned expired tombstones from versionMap.deletes: @@ -170,7 +167,6 @@ public class InternalEngine extends Engine { allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); if (engineConfig.getRefreshListeners() != null) { searcherManager.addListener(engineConfig.getRefreshListeners()); - engineConfig.getRefreshListeners().setTranslog(translog); } success = true; } finally { @@ -951,7 +947,7 @@ public class InternalEngine extends Engine { failEngine("already closed by tragic event on the index writer", tragedy); } else if (translog.isOpen() == false && translog.getTragicException() != null) { failEngine("already closed by tragic event on the translog", translog.getTragicException()); - } else { + } else if (failedEngine.get() == null) { // we are closed but the engine is not failed yet? // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error throw new AssertionError("Unexpected AlreadyClosedException", ex); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f87702771b2..4d255c4c9f9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -992,6 +992,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); Engine newEngine = createNewEngine(config); + onNewEngine(newEngine); verifyNotClosed(); if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, @@ -999,7 +1000,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl active.set(true); newEngine.recoverFromTranslog(); } + } + protected void onNewEngine(Engine newEngine) { + refreshListeners.setTranslog(newEngine.getTranslog()); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 45a471e1aa9..11023a6a135 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -114,4 +114,9 @@ public final class ShadowIndexShard extends IndexShard { public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us"); } + + @Override + protected void onNewEngine(Engine newEngine) { + // nothing to do here - the superclass sets the translog on some listeners but we don't have such a thing + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 0082b7a0336..198d9b4cd45 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -59,7 +59,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -112,7 +111,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // the list of translog readers is guaranteed to be in order of translog generation private final List readers = new ArrayList<>(); - private volatile ScheduledFuture syncScheduler; // this is a concurrent set and is not protected by any of the locks. The main reason // is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed) private final Set outstandingViews = ConcurrentCollections.newConcurrentSet(); @@ -312,7 +310,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC closeFilesIfNoPendingViews(); } } finally { - FutureUtils.cancel(syncScheduler); logger.debug("translog closed"); } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5f009800996..35c7b7da880 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -43,6 +43,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; @@ -276,7 +277,7 @@ public class InternalEngineTests extends ESTestCase { } protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { - EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); InternalEngine internalEngine = new InternalEngine(config); if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); @@ -284,7 +285,8 @@ public class InternalEngineTests extends ESTestCase { return internalEngine; } - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, long maxUnsafeAutoIdTimestamp) { + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final EngineConfig.OpenMode openMode; @@ -306,7 +308,8 @@ public class InternalEngineTests extends ESTestCase { EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(), mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), null, maxUnsafeAutoIdTimestamp); + IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, + maxUnsafeAutoIdTimestamp); return config; } @@ -903,7 +906,7 @@ public class InternalEngineTests extends ESTestCase { public void testSyncedFlush() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { + new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); engine.index(new Engine.Index(newUid("1"), doc)); @@ -930,7 +933,7 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < iters; i++) { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { + new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); Engine.Index doc1 = new Engine.Index(newUid("1"), doc); @@ -1158,7 +1161,7 @@ public class InternalEngineTests extends ESTestCase { public void testForceMerge() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { // use log MP here we test some behavior in ESMP + new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null); @@ -1592,7 +1595,7 @@ public class InternalEngineTests extends ESTestCase { public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { engine.config().setEnableGcDeletes(false); // Add document @@ -1728,7 +1731,7 @@ public class InternalEngineTests extends ESTestCase { // expected } // now it should be OK. - EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); + EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); engine = new InternalEngine(config); } @@ -2103,7 +2106,7 @@ public class InternalEngineTests extends ESTestCase { public void testCurrentTranslogIDisCommitted() throws IOException { try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); // create { @@ -2368,7 +2371,7 @@ public class InternalEngineTests extends ESTestCase { public void testEngineMaxTimestampIsInitialized() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } @@ -2376,7 +2379,7 @@ public class InternalEngineTests extends ESTestCase { long maxTimestamp = Math.abs(randomLong()); try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - maxTimestamp))) { + maxTimestamp, null))) { assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } } @@ -2435,4 +2438,70 @@ public class InternalEngineTests extends ESTestCase { public static long getNumIndexVersionsLookups(InternalEngine engine) { // for other tests to access this return engine.getNumIndexVersionsLookups(); } + + public void testFailEngineOnRandomIO() throws IOException, InterruptedException { + MockDirectoryWrapper wrapper = newMockDirectory(); + final Path translogPath = createTempDir("testFailEngineOnRandomIO"); + try (Store store = createStore(wrapper)) { + CyclicBarrier join = new CyclicBarrier(2); + CountDownLatch start = new CountDownLatch(1); + AtomicInteger controller = new AtomicInteger(0); + EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() throws IOException { + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + int i = controller.incrementAndGet(); + if (i == 1) { + throw new MockDirectoryWrapper.FakeIOException(); + } else if (i == 2) { + try { + start.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + throw new AlreadyClosedException("boom"); + } + } + }); + InternalEngine internalEngine = new InternalEngine(config); + int docId = 0; + final ParsedDocument doc = testParsedDocument(Integer.toString(docId), Integer.toString(docId), "test", null, docId, -1, + testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + + Engine.Index index = randomAppendOnly(docId, doc, false); + internalEngine.index(index); + Runnable r = () -> { + try { + join.await(); + } catch (Exception e) { + throw new AssertionError(e); + } + try { + internalEngine.refresh("test"); + fail(); + } catch (EngineClosedException ex) { + // we can't guarantee that we are entering the refresh call before it's fully + // closed so we also expecting ECE here + assertTrue(ex.toString(), ex.getCause() instanceof MockDirectoryWrapper.FakeIOException); + } catch (RefreshFailedEngineException | AlreadyClosedException ex) { + // fine + } finally { + start.countDown(); + } + + }; + Thread t = new Thread(r); + Thread t1 = new Thread(r); + t.start(); + t1.start(); + t.join(); + t1.join(); + assertTrue(internalEngine.isClosed.get()); + assertTrue(internalEngine.failedEngine.get() instanceof MockDirectoryWrapper.FakeIOException); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index f3f15b2639c..05147d4a72a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -126,6 +126,7 @@ public class RefreshListenersTests extends ESTestCase { IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); engine = new InternalEngine(config); + listeners.setTranslog(engine.getTranslog()); } @After