From 7fb44a3ab63cfa502b2120fe9b1958b9bfa9630c Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Wed, 26 Oct 2016 20:21:54 -0400 Subject: [PATCH] add tests --- .../TransportWriteActionTests.java | 54 ++++++++- .../index/engine/InternalEngineTests.java | 110 +++++++++++++++--- .../index/shard/RefreshListenersTests.java | 10 +- 3 files changed, 149 insertions(+), 25 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 66251c92abf..712d1ebe993 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -70,6 +70,7 @@ public class TransportWriteActionTests extends ESTestCase { CapturingActionListener listener = new CapturingActionListener<>(); responder.accept(result, listener); assertNotNull(listener.response); + assertNull(listener.failure); verify(indexShard, never()).refresh(any()); verify(indexShard, never()).addRefreshListener(any(), any()); } @@ -91,6 +92,7 @@ public class TransportWriteActionTests extends ESTestCase { CapturingActionListener listener = new CapturingActionListener<>(); responder.accept(result, listener); assertNotNull(listener.response); + assertNull(listener.failure); responseChecker.accept(listener.response); verify(indexShard).refresh("refresh_flag_index"); verify(indexShard, never()).addRefreshListener(any(), any()); @@ -124,15 +126,46 @@ public class TransportWriteActionTests extends ESTestCase { boolean forcedRefresh = randomBoolean(); refreshListener.getValue().accept(forcedRefresh); assertNotNull(listener.response); + assertNull(listener.failure); resultChecker.accept(listener.response, forcedRefresh); } + public void testDocumentFailureInShardOperationOnPrimary() throws Exception { + handleDocumentFailure(new TestAction(true, true), TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond); + } + + public void testDocumentFailureInShardOperationOnReplica() throws Exception { + handleDocumentFailure(new TestAction(randomBoolean(), true), TestAction::shardOperationOnReplica, + TestAction.WriteReplicaResult::respond); + } + + private void handleDocumentFailure(TestAction testAction, + ThrowingTriFunction action, + BiConsumer> responder) + throws Exception { + TestRequest request = new TestRequest(); + Result result = action.apply(testAction, request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + responder.accept(result, listener); + assertNull(listener.response); + assertNotNull(listener.failure); + } + private class TestAction extends TransportWriteAction { + + private final boolean withDocumentFailureOnPrimary; + private final boolean withDocumentFailureOnReplica; + protected TestAction() { + this(false, false); + } + protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) { super(Settings.EMPTY, "test", new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null), null, null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); + this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; + this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @Override @@ -142,12 +175,24 @@ public class TransportWriteActionTests extends ESTestCase { @Override protected WritePrimaryResult shardOperationOnPrimary(TestRequest request, IndexShard primary) throws Exception { - return new WritePrimaryResult(request, new TestResponse(), location, null, primary); + final WritePrimaryResult primaryResult; + if (withDocumentFailureOnPrimary) { + primaryResult = new WritePrimaryResult(request, null, null, new RuntimeException("simulated"), primary); + } else { + primaryResult = new WritePrimaryResult(request, new TestResponse(), location, null, primary); + } + return primaryResult; } @Override protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception { - return new WriteReplicaResult(request, location, null, replica); + final WriteReplicaResult replicaResult; + if (withDocumentFailureOnReplica) { + replicaResult = new WriteReplicaResult(request, null, new RuntimeException("simulated"), replica); + } else { + replicaResult = new WriteReplicaResult(request, location, null, replica); + } + return replicaResult; } } @@ -168,6 +213,7 @@ public class TransportWriteActionTests extends ESTestCase { private static class CapturingActionListener implements ActionListener { private R response; + private Exception failure; @Override public void onResponse(R response) { @@ -175,8 +221,8 @@ public class TransportWriteActionTests extends ESTestCase { } @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); + public void onFailure(Exception failure) { + this.failure = failure; } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a4df773983a..be4b7bbdefe 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -31,8 +31,11 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.TextField; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; @@ -106,7 +109,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Before; @@ -130,6 +132,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; @@ -279,12 +282,21 @@ public class InternalEngineTests extends ESTestCase { } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { - return createEngine(defaultSettings, store, translogPath, newMergePolicy()); + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); } protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, null); + + } + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, Supplier indexWriterSupplier) throws IOException { EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); - InternalEngine internalEngine = new InternalEngine(config); + InternalEngine internalEngine = new InternalEngine(config) { + @Override + IndexWriter createWriter(boolean create) throws IOException { + return (indexWriterSupplier != null) ? indexWriterSupplier.get() : super.createWriter(create); + } + }; if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); } @@ -339,7 +351,7 @@ public class InternalEngineTests extends ESTestCase { ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null); Engine.Index second = new Engine.Index(newUid("2"), doc2); Engine.IndexResult secondResult = engine.index(second); - assertThat(secondResult.getLocation(), greaterThan(firstResult.getLocation())); + assertThat(secondResult.getTranslogLocation(), greaterThan(firstResult.getTranslogLocation())); engine.refresh("test"); segments = engine.segments(false); @@ -2134,6 +2146,72 @@ public class InternalEngineTests extends ESTestCase { } } + public void testCheckDocumentFailure() throws Exception { + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); + Exception documentFailure = engine.checkIfDocumentFailureOrThrow(new Engine.Index(newUid("1"), doc), new IOException("simulated document failure")); + assertThat(documentFailure, instanceOf(IOException.class)); + try { + engine.checkIfDocumentFailureOrThrow(new Engine.Index(newUid("1"), doc), new CorruptIndexException("simulated environment failure", "")); + fail("expected exception to be thrown"); + } catch (ElasticsearchException envirnomentException) { + assertThat(envirnomentException.getShardId(), equalTo(engine.shardId)); + assertThat(envirnomentException.getCause().getMessage(), containsString("simulated environment failure")); + } + } + + private static class ThrowingIndexWriter extends IndexWriter { + private boolean throwDocumentFailure; + + public ThrowingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { + super(d, conf); + } + + @Override + public long addDocument(Iterable doc) throws IOException { + if (throwDocumentFailure) { + throw new IOException("simulated"); + } else { + return super.addDocument(doc); + } + } + + @Override + public long deleteDocuments(Term... terms) throws IOException { + if (throwDocumentFailure) { + throw new IOException("simulated"); + } else { + return super.deleteDocuments(terms); + } + } + + public void setThrowDocumentFailure(boolean throwDocumentFailure) { + this.throwDocumentFailure = throwDocumentFailure; + } + } + + public void testHandleDocumentFailure() throws Exception { + try (Store store = createStore()) { + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); + ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig()); + try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) { + // test document failure while indexing + throwingIndexWriter.setThrowDocumentFailure(true); + Engine.IndexResult indexResult = engine.index(randomAppendOnly(1, doc, false)); + assertNotNull(indexResult.getFailure()); + + throwingIndexWriter.setThrowDocumentFailure(false); + indexResult = engine.index(randomAppendOnly(1, doc, false)); + assertNull(indexResult.getFailure()); + + // test document failure while deleting + throwingIndexWriter.setThrowDocumentFailure(true); + Engine.DeleteResult deleteResult = engine.delete(new Engine.Delete("test", "", newUid("1"))); + assertNotNull(deleteResult.getFailure()); + } + } + + } + public void testDocStats() throws IOException { final int numDocs = randomIntBetween(2, 10); // at least 2 documents otherwise we don't see any deletes below for (int i = 0; i < numDocs; i++) { @@ -2169,22 +2247,22 @@ public class InternalEngineTests extends ESTestCase { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); assertEquals(0, engine.getNumVersionLookups()); - assertNotNull(indexResult.getLocation()); + assertNotNull(indexResult.getTranslogLocation()); Engine.IndexResult retryResult = engine.index(retry); assertTrue(engine.indexWriterHasDeletions()); assertEquals(0, engine.getNumVersionLookups()); - assertNotNull(retryResult.getLocation()); - assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) > 0); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(retry); assertTrue(engine.indexWriterHasDeletions()); assertEquals(0, engine.getNumVersionLookups()); - assertNotNull(retryResult.getLocation()); + assertNotNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); assertTrue(engine.indexWriterHasDeletions()); assertEquals(0, engine.getNumVersionLookups()); - assertNotNull(retryResult.getLocation()); - assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) < 0); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } engine.refresh("test"); @@ -2196,16 +2274,16 @@ public class InternalEngineTests extends ESTestCase { retry = randomAppendOnly(1, doc, true); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); - assertNotNull(indexResult.getLocation()); + assertNotNull(indexResult.getTranslogLocation()); Engine.IndexResult retryResult = engine.index(retry); - assertNotNull(retryResult.getLocation()); - assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) > 0); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(retry); - assertNotNull(retryResult.getLocation()); + assertNotNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); - assertNotNull(retryResult.getLocation()); - assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) < 0); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } engine.refresh("test"); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index f0f53d9fdc9..94c0407f700 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -144,7 +144,7 @@ public class RefreshListenersTests extends ESTestCase { for (int i = 0; i < maxListeners; i++) { DummyRefreshListener listener = new DummyRefreshListener(); nonForcedListeners.add(listener); - listeners.addOrNotify(index.getLocation(), listener); + listeners.addOrNotify(index.getTranslogLocation(), listener); assertTrue(listeners.refreshNeeded()); } @@ -155,7 +155,7 @@ public class RefreshListenersTests extends ESTestCase { // Add one more listener which should cause a refresh. DummyRefreshListener forcingListener = new DummyRefreshListener(); - listeners.addOrNotify(index.getLocation(), forcingListener); + listeners.addOrNotify(index.getTranslogLocation(), forcingListener); assertTrue("Forced listener wasn't forced?", forcingListener.forcedRefresh.get()); forcingListener.assertNoError(); @@ -178,7 +178,7 @@ public class RefreshListenersTests extends ESTestCase { } DummyRefreshListener listener = new DummyRefreshListener(); - assertTrue(listeners.addOrNotify(index.getLocation(), listener)); + assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener)); assertFalse(listener.forcedRefresh.get()); listener.assertNoError(); } @@ -200,7 +200,7 @@ public class RefreshListenersTests extends ESTestCase { for (int i = 0; i < 1000; i++) { Engine.IndexResult index = index("1"); DummyRefreshListener listener = new DummyRefreshListener(); - boolean immediate = listeners.addOrNotify(index.getLocation(), listener); + boolean immediate = listeners.addOrNotify(index.getTranslogLocation(), listener); if (immediate) { assertNotNull(listener.forcedRefresh.get()); } else { @@ -238,7 +238,7 @@ public class RefreshListenersTests extends ESTestCase { assertEquals(iteration, index.getVersion()); DummyRefreshListener listener = new DummyRefreshListener(); - listeners.addOrNotify(index.getLocation(), listener); + listeners.addOrNotify(index.getTranslogLocation(), listener); assertBusy(() -> assertNotNull("listener never called", listener.forcedRefresh.get())); if (threadCount < maxListeners) { assertFalse(listener.forcedRefresh.get());