From 817d55cf938f028823a330d3291df845894ec126 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 19 Sep 2016 15:15:51 +0200 Subject: [PATCH] Take refresh IOExceptions into account when catching ACE in InternalEngine (#20546) Since #19975 we are aggressively failing with AssertionError when we catch an ACE inside the InternalEngine. We treat everything that is neither a tragic even on the IndexWriter or the Translog as a bug and throw an AssertionError. Yet, if the engine hits an IOException on refresh of some sort and the IW doesn't realize it since it's not fully under it's control we fail he engine but neither IW nor Translog are marked as failed by tragic event while they are already closed. This change takes the `failedEngine` exception into account and if it's set we know that the engine failed by some other even than a tragic one and can continue. This change also uses the `ReferenceManager#RefreshListener` interface in the engine rather than it's concrete implementation. Relates to #19975 --- .../elasticsearch/index/engine/Engine.java | 17 ++-- .../index/engine/EngineConfig.java | 10 +- .../index/engine/InternalEngine.java | 6 +- .../elasticsearch/index/shard/IndexShard.java | 4 + .../index/shard/ShadowIndexShard.java | 5 + .../index/translog/Translog.java | 3 - .../index/engine/InternalEngineTests.java | 91 ++++++++++++++++--- .../index/shard/RefreshListenersTests.java | 1 + 8 files changed, 106 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 9df03beb1ab..a19df39d420 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -42,6 +42,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; @@ -101,7 +102,7 @@ public abstract class Engine implements Closeable { protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock()); - protected volatile Exception failedEngine = null; + protected final SetOnce failedEngine = new SetOnce<>(); /* * on lastWriteNanos we use System.nanoTime() to initialize this since: * - we use the value for figuring out if the shard / engine is active so if we startup and no write has happened yet we still consider it active @@ -377,7 +378,7 @@ public abstract class Engine implements Closeable { protected void ensureOpen() { if (isClosed.get()) { - throw new EngineClosedException(shardId, failedEngine); + throw new EngineClosedException(shardId, failedEngine.get()); } } @@ -670,17 +671,19 @@ public abstract class Engine implements Closeable { if (failEngineLock.tryLock()) { store.incRef(); try { + if (failedEngine.get() != null) { + logger.warn((Supplier) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure); + return; + } + // this must happen before we close IW or Translog such that we can check this state to opt out of failing the engine + // again on any caught AlreadyClosedException + failedEngine.set((failure != null) ? failure : new IllegalStateException(reason)); try { // we just go and close this engine - no way to recover closeNoLock("engine failed on: [" + reason + "]"); } finally { - if (failedEngine != null) { - logger.debug((Supplier) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure); - return; - } logger.warn((Supplier) () -> new ParameterizedMessage("failed engine [{}]", reason), failure); // we must set a failure exception, generate one if not supplied - failedEngine = (failure != null) ? failure : new IllegalStateException(reason); // we first mark the store as corrupted before we notify any listeners // this must happen first otherwise we might try to reallocate so quickly // on the same node that we don't see the corrupted marker file when diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 9f9d2186a83..e598eecc07e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.similarities.Similarity; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; @@ -34,7 +35,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.shard.RefreshListeners; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; @@ -68,7 +68,7 @@ public final class EngineConfig { private final QueryCachingPolicy queryCachingPolicy; private final long maxUnsafeAutoIdTimestamp; @Nullable - private final RefreshListeners refreshListeners; + private final ReferenceManager.RefreshListener refreshListeners; /** * Index setting to change the low level lucene codec used for writing new segments. @@ -112,7 +112,7 @@ public final class EngineConfig { MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - TranslogConfig translogConfig, TimeValue flushMergesAfter, RefreshListeners refreshListeners, + TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners, long maxUnsafeAutoIdTimestamp) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); @@ -322,9 +322,9 @@ public final class EngineConfig { } /** - * {@linkplain RefreshListeners} instance to configure. + * {@linkplain ReferenceManager.RefreshListener} instance to configure. */ - public RefreshListeners getRefreshListeners() { + public ReferenceManager.RefreshListener getRefreshListeners() { return refreshListeners; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a9136fb0228..e10d364e745 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -82,9 +82,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -/** - * - */ public class InternalEngine extends Engine { /** * When we last pruned expired tombstones from versionMap.deletes: @@ -170,7 +167,6 @@ public class InternalEngine extends Engine { allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); if (engineConfig.getRefreshListeners() != null) { searcherManager.addListener(engineConfig.getRefreshListeners()); - engineConfig.getRefreshListeners().setTranslog(translog); } success = true; } finally { @@ -951,7 +947,7 @@ public class InternalEngine extends Engine { failEngine("already closed by tragic event on the index writer", tragedy); } else if (translog.isOpen() == false && translog.getTragicException() != null) { failEngine("already closed by tragic event on the translog", translog.getTragicException()); - } else { + } else if (failedEngine.get() == null) { // we are closed but the engine is not failed yet? // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error throw new AssertionError("Unexpected AlreadyClosedException", ex); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f87702771b2..4d255c4c9f9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -992,6 +992,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); Engine newEngine = createNewEngine(config); + onNewEngine(newEngine); verifyNotClosed(); if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, @@ -999,7 +1000,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl active.set(true); newEngine.recoverFromTranslog(); } + } + protected void onNewEngine(Engine newEngine) { + refreshListeners.setTranslog(newEngine.getTranslog()); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 45a471e1aa9..11023a6a135 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -114,4 +114,9 @@ public final class ShadowIndexShard extends IndexShard { public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us"); } + + @Override + protected void onNewEngine(Engine newEngine) { + // nothing to do here - the superclass sets the translog on some listeners but we don't have such a thing + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 0082b7a0336..198d9b4cd45 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -59,7 +59,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -112,7 +111,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // the list of translog readers is guaranteed to be in order of translog generation private final List readers = new ArrayList<>(); - private volatile ScheduledFuture syncScheduler; // this is a concurrent set and is not protected by any of the locks. The main reason // is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed) private final Set outstandingViews = ConcurrentCollections.newConcurrentSet(); @@ -312,7 +310,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC closeFilesIfNoPendingViews(); } } finally { - FutureUtils.cancel(syncScheduler); logger.debug("translog closed"); } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5f009800996..35c7b7da880 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -43,6 +43,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; @@ -276,7 +277,7 @@ public class InternalEngineTests extends ESTestCase { } protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { - EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); InternalEngine internalEngine = new InternalEngine(config); if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); @@ -284,7 +285,8 @@ public class InternalEngineTests extends ESTestCase { return internalEngine; } - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, long maxUnsafeAutoIdTimestamp) { + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final EngineConfig.OpenMode openMode; @@ -306,7 +308,8 @@ public class InternalEngineTests extends ESTestCase { EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(), mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), null, maxUnsafeAutoIdTimestamp); + IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, + maxUnsafeAutoIdTimestamp); return config; } @@ -903,7 +906,7 @@ public class InternalEngineTests extends ESTestCase { public void testSyncedFlush() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { + new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); engine.index(new Engine.Index(newUid("1"), doc)); @@ -930,7 +933,7 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < iters; i++) { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { + new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); Engine.Index doc1 = new Engine.Index(newUid("1"), doc); @@ -1158,7 +1161,7 @@ public class InternalEngineTests extends ESTestCase { public void testForceMerge() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { // use log MP here we test some behavior in ESMP + new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null); @@ -1592,7 +1595,7 @@ public class InternalEngineTests extends ESTestCase { public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { engine.config().setEnableGcDeletes(false); // Add document @@ -1728,7 +1731,7 @@ public class InternalEngineTests extends ESTestCase { // expected } // now it should be OK. - EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); + EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); engine = new InternalEngine(config); } @@ -2103,7 +2106,7 @@ public class InternalEngineTests extends ESTestCase { public void testCurrentTranslogIDisCommitted() throws IOException { try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); // create { @@ -2368,7 +2371,7 @@ public class InternalEngineTests extends ESTestCase { public void testEngineMaxTimestampIsInitialized() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } @@ -2376,7 +2379,7 @@ public class InternalEngineTests extends ESTestCase { long maxTimestamp = Math.abs(randomLong()); try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - maxTimestamp))) { + maxTimestamp, null))) { assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } } @@ -2435,4 +2438,70 @@ public class InternalEngineTests extends ESTestCase { public static long getNumIndexVersionsLookups(InternalEngine engine) { // for other tests to access this return engine.getNumIndexVersionsLookups(); } + + public void testFailEngineOnRandomIO() throws IOException, InterruptedException { + MockDirectoryWrapper wrapper = newMockDirectory(); + final Path translogPath = createTempDir("testFailEngineOnRandomIO"); + try (Store store = createStore(wrapper)) { + CyclicBarrier join = new CyclicBarrier(2); + CountDownLatch start = new CountDownLatch(1); + AtomicInteger controller = new AtomicInteger(0); + EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() throws IOException { + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + int i = controller.incrementAndGet(); + if (i == 1) { + throw new MockDirectoryWrapper.FakeIOException(); + } else if (i == 2) { + try { + start.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + throw new AlreadyClosedException("boom"); + } + } + }); + InternalEngine internalEngine = new InternalEngine(config); + int docId = 0; + final ParsedDocument doc = testParsedDocument(Integer.toString(docId), Integer.toString(docId), "test", null, docId, -1, + testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + + Engine.Index index = randomAppendOnly(docId, doc, false); + internalEngine.index(index); + Runnable r = () -> { + try { + join.await(); + } catch (Exception e) { + throw new AssertionError(e); + } + try { + internalEngine.refresh("test"); + fail(); + } catch (EngineClosedException ex) { + // we can't guarantee that we are entering the refresh call before it's fully + // closed so we also expecting ECE here + assertTrue(ex.toString(), ex.getCause() instanceof MockDirectoryWrapper.FakeIOException); + } catch (RefreshFailedEngineException | AlreadyClosedException ex) { + // fine + } finally { + start.countDown(); + } + + }; + Thread t = new Thread(r); + Thread t1 = new Thread(r); + t.start(); + t1.start(); + t.join(); + t1.join(); + assertTrue(internalEngine.isClosed.get()); + assertTrue(internalEngine.failedEngine.get() instanceof MockDirectoryWrapper.FakeIOException); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index f3f15b2639c..05147d4a72a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -126,6 +126,7 @@ public class RefreshListenersTests extends ESTestCase { IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); engine = new InternalEngine(config); + listeners.setTranslog(engine.getTranslog()); } @After