Take refresh IOExceptions into account when catching ACE in InternalEngine (#20546)

Since #19975 we are aggressively failing with AssertionError when we catch an ACE
inside the InternalEngine. We treat everything that is neither a tragic even on
the IndexWriter or the Translog as a bug and throw an AssertionError. Yet, if the
engine hits an IOException on refresh of some sort and the IW doesn't realize it since
it's not fully under it's control we fail he engine but neither IW nor Translog are marked
as failed by tragic event while they are already closed.
This change takes the `failedEngine` exception into account and if it's set we know
that the engine failed by some other even than a tragic one and can continue.

This change also uses the `ReferenceManager#RefreshListener` interface in the engine rather
than it's concrete implementation.

Relates to #19975
This commit is contained in:
Simon Willnauer 2016-09-19 15:15:51 +02:00 committed by GitHub
parent a530399ae5
commit 817d55cf93
8 changed files with 106 additions and 31 deletions

View File

@ -42,6 +42,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables; import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -101,7 +102,7 @@ public abstract class Engine implements Closeable {
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock()); protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
protected volatile Exception failedEngine = null; protected final SetOnce<Exception> failedEngine = new SetOnce<>();
/* /*
* on <tt>lastWriteNanos</tt> we use System.nanoTime() to initialize this since: * on <tt>lastWriteNanos</tt> we use System.nanoTime() to initialize this since:
* - we use the value for figuring out if the shard / engine is active so if we startup and no write has happened yet we still consider it active * - we use the value for figuring out if the shard / engine is active so if we startup and no write has happened yet we still consider it active
@ -377,7 +378,7 @@ public abstract class Engine implements Closeable {
protected void ensureOpen() { protected void ensureOpen() {
if (isClosed.get()) { if (isClosed.get()) {
throw new EngineClosedException(shardId, failedEngine); throw new EngineClosedException(shardId, failedEngine.get());
} }
} }
@ -670,17 +671,19 @@ public abstract class Engine implements Closeable {
if (failEngineLock.tryLock()) { if (failEngineLock.tryLock()) {
store.incRef(); store.incRef();
try { try {
if (failedEngine.get() != null) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure);
return;
}
// this must happen before we close IW or Translog such that we can check this state to opt out of failing the engine
// again on any caught AlreadyClosedException
failedEngine.set((failure != null) ? failure : new IllegalStateException(reason));
try { try {
// we just go and close this engine - no way to recover // we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]"); closeNoLock("engine failed on: [" + reason + "]");
} finally { } finally {
if (failedEngine != null) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure);
return;
}
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed engine [{}]", reason), failure); logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed engine [{}]", reason), failure);
// we must set a failure exception, generate one if not supplied // we must set a failure exception, generate one if not supplied
failedEngine = (failure != null) ? failure : new IllegalStateException(reason);
// we first mark the store as corrupted before we notify any listeners // we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly // this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when // on the same node that we don't see the corrupted marker file when

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
@ -34,7 +35,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.shard.RefreshListeners;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
@ -68,7 +68,7 @@ public final class EngineConfig {
private final QueryCachingPolicy queryCachingPolicy; private final QueryCachingPolicy queryCachingPolicy;
private final long maxUnsafeAutoIdTimestamp; private final long maxUnsafeAutoIdTimestamp;
@Nullable @Nullable
private final RefreshListeners refreshListeners; private final ReferenceManager.RefreshListener refreshListeners;
/** /**
* Index setting to change the low level lucene codec used for writing new segments. * Index setting to change the low level lucene codec used for writing new segments.
@ -112,7 +112,7 @@ public final class EngineConfig {
MergePolicy mergePolicy, Analyzer analyzer, MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener, Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, RefreshListeners refreshListeners, TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners,
long maxUnsafeAutoIdTimestamp) { long maxUnsafeAutoIdTimestamp) {
if (openMode == null) { if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null"); throw new IllegalArgumentException("openMode must not be null");
@ -322,9 +322,9 @@ public final class EngineConfig {
} }
/** /**
* {@linkplain RefreshListeners} instance to configure. * {@linkplain ReferenceManager.RefreshListener} instance to configure.
*/ */
public RefreshListeners getRefreshListeners() { public ReferenceManager.RefreshListener getRefreshListeners() {
return refreshListeners; return refreshListeners;
} }

View File

@ -82,9 +82,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;
/**
*
*/
public class InternalEngine extends Engine { public class InternalEngine extends Engine {
/** /**
* When we last pruned expired tombstones from versionMap.deletes: * When we last pruned expired tombstones from versionMap.deletes:
@ -170,7 +167,6 @@ public class InternalEngine extends Engine {
allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
if (engineConfig.getRefreshListeners() != null) { if (engineConfig.getRefreshListeners() != null) {
searcherManager.addListener(engineConfig.getRefreshListeners()); searcherManager.addListener(engineConfig.getRefreshListeners());
engineConfig.getRefreshListeners().setTranslog(translog);
} }
success = true; success = true;
} finally { } finally {
@ -951,7 +947,7 @@ public class InternalEngine extends Engine {
failEngine("already closed by tragic event on the index writer", tragedy); failEngine("already closed by tragic event on the index writer", tragedy);
} else if (translog.isOpen() == false && translog.getTragicException() != null) { } else if (translog.isOpen() == false && translog.getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translog.getTragicException()); failEngine("already closed by tragic event on the translog", translog.getTragicException());
} else { } else if (failedEngine.get() == null) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex); throw new AssertionError("Unexpected AlreadyClosedException", ex);

View File

@ -992,6 +992,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// but we need to make sure we don't loose deletes until we are done recovering // but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false); config.setEnableGcDeletes(false);
Engine newEngine = createNewEngine(config); Engine newEngine = createNewEngine(config);
onNewEngine(newEngine);
verifyNotClosed(); verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
@ -999,7 +1000,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
active.set(true); active.set(true);
newEngine.recoverFromTranslog(); newEngine.recoverFromTranslog();
} }
}
protected void onNewEngine(Engine newEngine) {
refreshListeners.setTranslog(newEngine.getTranslog());
} }
/** /**

View File

@ -114,4 +114,9 @@ public final class ShadowIndexShard extends IndexShard {
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us"); throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us");
} }
@Override
protected void onNewEngine(Engine newEngine) {
// nothing to do here - the superclass sets the translog on some listeners but we don't have such a thing
}
} }

View File

@ -59,7 +59,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -112,7 +111,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// the list of translog readers is guaranteed to be in order of translog generation // the list of translog readers is guaranteed to be in order of translog generation
private final List<TranslogReader> readers = new ArrayList<>(); private final List<TranslogReader> readers = new ArrayList<>();
private volatile ScheduledFuture<?> syncScheduler;
// this is a concurrent set and is not protected by any of the locks. The main reason // this is a concurrent set and is not protected by any of the locks. The main reason
// is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed) // is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed)
private final Set<View> outstandingViews = ConcurrentCollections.newConcurrentSet(); private final Set<View> outstandingViews = ConcurrentCollections.newConcurrentSet();
@ -312,7 +310,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
closeFilesIfNoPendingViews(); closeFilesIfNoPendingViews();
} }
} finally { } finally {
FutureUtils.cancel(syncScheduler);
logger.debug("translog closed"); logger.debug("translog closed");
} }
} }

View File

@ -43,6 +43,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.TotalHitCountCollector;
@ -276,7 +277,7 @@ public class InternalEngineTests extends ESTestCase {
} }
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
InternalEngine internalEngine = new InternalEngine(config); InternalEngine internalEngine = new InternalEngine(config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog(); internalEngine.recoverFromTranslog();
@ -284,7 +285,8 @@ public class InternalEngineTests extends ESTestCase {
return internalEngine; return internalEngine;
} }
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, long maxUnsafeAutoIdTimestamp) { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) {
IndexWriterConfig iwc = newIndexWriterConfig(); IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
final EngineConfig.OpenMode openMode; final EngineConfig.OpenMode openMode;
@ -306,7 +308,8 @@ public class InternalEngineTests extends ESTestCase {
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(), EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(),
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), null, maxUnsafeAutoIdTimestamp); IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener,
maxUnsafeAutoIdTimestamp);
return config; return config;
} }
@ -903,7 +906,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSyncedFlush() throws IOException { public void testSyncedFlush() throws IOException {
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc)); engine.index(new Engine.Index(newUid("1"), doc));
@ -930,7 +933,7 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < iters; i++) { for (int i = 0; i < iters; i++) {
try (Store store = createStore(); try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
Engine.Index doc1 = new Engine.Index(newUid("1"), doc); Engine.Index doc1 = new Engine.Index(newUid("1"), doc);
@ -1158,7 +1161,7 @@ public class InternalEngineTests extends ESTestCase {
public void testForceMerge() throws IOException { public void testForceMerge() throws IOException {
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { // use log MP here we test some behavior in ESMP new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100); int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null); ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null);
@ -1592,7 +1595,7 @@ public class InternalEngineTests extends ESTestCase {
public void testEnableGcDeletes() throws Exception { public void testEnableGcDeletes() throws Exception {
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
engine.config().setEnableGcDeletes(false); engine.config().setEnableGcDeletes(false);
// Add document // Add document
@ -1728,7 +1731,7 @@ public class InternalEngineTests extends ESTestCase {
// expected // expected
} }
// now it should be OK. // now it should be OK.
EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
engine = new InternalEngine(config); engine = new InternalEngine(config);
} }
@ -2103,7 +2106,7 @@ public class InternalEngineTests extends ESTestCase {
public void testCurrentTranslogIDisCommitted() throws IOException { public void testCurrentTranslogIDisCommitted() throws IOException {
try (Store store = createStore()) { try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
// create // create
{ {
@ -2368,7 +2371,7 @@ public class InternalEngineTests extends ESTestCase {
public void testEngineMaxTimestampIsInitialized() throws IOException { public void testEngineMaxTimestampIsInitialized() throws IOException {
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
} }
@ -2376,7 +2379,7 @@ public class InternalEngineTests extends ESTestCase {
long maxTimestamp = Math.abs(randomLong()); long maxTimestamp = Math.abs(randomLong());
try (Store store = createStore(); try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
maxTimestamp))) { maxTimestamp, null))) {
assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
} }
} }
@ -2435,4 +2438,70 @@ public class InternalEngineTests extends ESTestCase {
public static long getNumIndexVersionsLookups(InternalEngine engine) { // for other tests to access this public static long getNumIndexVersionsLookups(InternalEngine engine) { // for other tests to access this
return engine.getNumIndexVersionsLookups(); return engine.getNumIndexVersionsLookups();
} }
public void testFailEngineOnRandomIO() throws IOException, InterruptedException {
MockDirectoryWrapper wrapper = newMockDirectory();
final Path translogPath = createTempDir("testFailEngineOnRandomIO");
try (Store store = createStore(wrapper)) {
CyclicBarrier join = new CyclicBarrier(2);
CountDownLatch start = new CountDownLatch(1);
AtomicInteger controller = new AtomicInteger(0);
EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() throws IOException {
}
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
int i = controller.incrementAndGet();
if (i == 1) {
throw new MockDirectoryWrapper.FakeIOException();
} else if (i == 2) {
try {
start.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
throw new AlreadyClosedException("boom");
}
}
});
InternalEngine internalEngine = new InternalEngine(config);
int docId = 0;
final ParsedDocument doc = testParsedDocument(Integer.toString(docId), Integer.toString(docId), "test", null, docId, -1,
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index index = randomAppendOnly(docId, doc, false);
internalEngine.index(index);
Runnable r = () -> {
try {
join.await();
} catch (Exception e) {
throw new AssertionError(e);
}
try {
internalEngine.refresh("test");
fail();
} catch (EngineClosedException ex) {
// we can't guarantee that we are entering the refresh call before it's fully
// closed so we also expecting ECE here
assertTrue(ex.toString(), ex.getCause() instanceof MockDirectoryWrapper.FakeIOException);
} catch (RefreshFailedEngineException | AlreadyClosedException ex) {
// fine
} finally {
start.countDown();
}
};
Thread t = new Thread(r);
Thread t1 = new Thread(r);
t.start();
t1.start();
t.join();
t1.join();
assertTrue(internalEngine.isClosed.get());
assertTrue(internalEngine.failedEngine.get() instanceof MockDirectoryWrapper.FakeIOException);
}
}
} }

View File

@ -126,6 +126,7 @@ public class RefreshListenersTests extends ESTestCase {
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
engine = new InternalEngine(config); engine = new InternalEngine(config);
listeners.setTranslog(engine.getTranslog());
} }
@After @After