Bubble up exception when processing NoOp (#39338)

Today we do not bubble up exceptions when processing NoOps but always
treat them as document-level failures. This incorrect treatment causes
the assert_no_failure being tripped in peer-recovery if IndexWriter was
closed exceptionally before.

Closes #38898
This commit is contained in:
Nhat Nguyen 2019-02-25 15:31:48 -05:00
parent e9dda75834
commit 575eed8582
4 changed files with 19 additions and 16 deletions

View File

@ -400,7 +400,7 @@ public abstract class Engine implements Closeable {
*/ */
public abstract DeleteResult delete(Delete delete) throws IOException; public abstract DeleteResult delete(Delete delete) throws IOException;
public abstract NoOpResult noOp(NoOp noOp); public abstract NoOpResult noOp(NoOp noOp) throws IOException;
/** /**
* Base class for index and delete operation results * Base class for index and delete operation results

View File

@ -1452,13 +1452,19 @@ public class InternalEngine extends Engine {
} }
@Override @Override
public NoOpResult noOp(final NoOp noOp) { public NoOpResult noOp(final NoOp noOp) throws IOException {
NoOpResult noOpResult; final NoOpResult noOpResult;
try (ReleasableLock ignored = readLock.acquire()) { try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
markSeqNoAsSeen(noOp.seqNo()); markSeqNoAsSeen(noOp.seqNo());
noOpResult = innerNoOp(noOp); noOpResult = innerNoOp(noOp);
} catch (final Exception e) { } catch (final Exception e) {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e); try {
maybeFailEngine("noop", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
} }
return noOpResult; return noOpResult;
} }

View File

@ -807,7 +807,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return noOp(engine, noOp); return noOp(engine, noOp);
} }
private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) throws IOException {
active.set(true); active.set(true);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("noop (seq# [{}])", noOp.seqNo()); logger.trace("noop (seq# [{}])", noOp.seqNo());

View File

@ -3232,7 +3232,7 @@ public class InternalEngineTests extends EngineTestCase {
final ParsedDocument doc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_1, null); final ParsedDocument doc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_1, null);
AtomicReference<ThrowingIndexWriter> throwingIndexWriter = new AtomicReference<>(); AtomicReference<ThrowingIndexWriter> throwingIndexWriter = new AtomicReference<>();
try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, try (InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
(directory, iwc) -> { (directory, iwc) -> {
throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc)); throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc));
return throwingIndexWriter.get(); return throwingIndexWriter.get();
@ -3297,16 +3297,13 @@ public class InternalEngineTests extends EngineTestCase {
engine.close(); engine.close();
} }
// now the engine is closed check we respond correctly // now the engine is closed check we respond correctly
try { expectThrows(AlreadyClosedException.class, () -> engine.index(indexForDoc(doc1)));
if (randomBoolean()) { expectThrows(AlreadyClosedException.class,
engine.index(indexForDoc(doc1)); () -> engine.delete(new Engine.Delete("test", "", newUid(doc1), primaryTerm.get())));
} else { expectThrows(AlreadyClosedException.class, () -> engine.noOp(
engine.delete(new Engine.Delete("test", "", newUid(doc1), primaryTerm.get())); new Engine.NoOp(engine.getLocalCheckpointTracker().generateSeqNo(),
} engine.config().getPrimaryTermSupplier().getAsLong(),
fail("engine should be closed"); randomFrom(Engine.Operation.Origin.values()), randomNonNegativeLong(), "test")));
} catch (Exception e) {
assertThat(e, instanceOf(AlreadyClosedException.class));
}
} }
} }
} }