add tests
This commit is contained in:
parent
fa3ee6b996
commit
7fb44a3ab6
|
@ -70,6 +70,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
CapturingActionListener<Response> listener = new CapturingActionListener<>();
|
||||
responder.accept(result, listener);
|
||||
assertNotNull(listener.response);
|
||||
assertNull(listener.failure);
|
||||
verify(indexShard, never()).refresh(any());
|
||||
verify(indexShard, never()).addRefreshListener(any(), any());
|
||||
}
|
||||
|
@ -91,6 +92,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
CapturingActionListener<Response> listener = new CapturingActionListener<>();
|
||||
responder.accept(result, listener);
|
||||
assertNotNull(listener.response);
|
||||
assertNull(listener.failure);
|
||||
responseChecker.accept(listener.response);
|
||||
verify(indexShard).refresh("refresh_flag_index");
|
||||
verify(indexShard, never()).addRefreshListener(any(), any());
|
||||
|
@ -124,15 +126,46 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
boolean forcedRefresh = randomBoolean();
|
||||
refreshListener.getValue().accept(forcedRefresh);
|
||||
assertNotNull(listener.response);
|
||||
assertNull(listener.failure);
|
||||
resultChecker.accept(listener.response, forcedRefresh);
|
||||
}
|
||||
|
||||
public void testDocumentFailureInShardOperationOnPrimary() throws Exception {
|
||||
handleDocumentFailure(new TestAction(true, true), TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond);
|
||||
}
|
||||
|
||||
public void testDocumentFailureInShardOperationOnReplica() throws Exception {
|
||||
handleDocumentFailure(new TestAction(randomBoolean(), true), TestAction::shardOperationOnReplica,
|
||||
TestAction.WriteReplicaResult::respond);
|
||||
}
|
||||
|
||||
private <Result, Response> void handleDocumentFailure(TestAction testAction,
|
||||
ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
|
||||
BiConsumer<Result, CapturingActionListener<Response>> responder)
|
||||
throws Exception {
|
||||
TestRequest request = new TestRequest();
|
||||
Result result = action.apply(testAction, request, indexShard);
|
||||
CapturingActionListener<Response> listener = new CapturingActionListener<>();
|
||||
responder.accept(result, listener);
|
||||
assertNull(listener.response);
|
||||
assertNotNull(listener.failure);
|
||||
}
|
||||
|
||||
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
|
||||
|
||||
private final boolean withDocumentFailureOnPrimary;
|
||||
private final boolean withDocumentFailureOnReplica;
|
||||
|
||||
protected TestAction() {
|
||||
this(false, false);
|
||||
}
|
||||
protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
|
||||
super(Settings.EMPTY, "test",
|
||||
new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null), null, null, null,
|
||||
null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new,
|
||||
TestRequest::new, ThreadPool.Names.SAME);
|
||||
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
|
||||
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -142,12 +175,24 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
protected WritePrimaryResult shardOperationOnPrimary(TestRequest request, IndexShard primary) throws Exception {
|
||||
return new WritePrimaryResult(request, new TestResponse(), location, null, primary);
|
||||
final WritePrimaryResult primaryResult;
|
||||
if (withDocumentFailureOnPrimary) {
|
||||
primaryResult = new WritePrimaryResult(request, null, null, new RuntimeException("simulated"), primary);
|
||||
} else {
|
||||
primaryResult = new WritePrimaryResult(request, new TestResponse(), location, null, primary);
|
||||
}
|
||||
return primaryResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception {
|
||||
return new WriteReplicaResult(request, location, null, replica);
|
||||
final WriteReplicaResult replicaResult;
|
||||
if (withDocumentFailureOnReplica) {
|
||||
replicaResult = new WriteReplicaResult(request, null, new RuntimeException("simulated"), replica);
|
||||
} else {
|
||||
replicaResult = new WriteReplicaResult(request, location, null, replica);
|
||||
}
|
||||
return replicaResult;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,6 +213,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
|
||||
private static class CapturingActionListener<R> implements ActionListener<R> {
|
||||
private R response;
|
||||
private Exception failure;
|
||||
|
||||
@Override
|
||||
public void onResponse(R response) {
|
||||
|
@ -175,8 +221,8 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
public void onFailure(Exception failure) {
|
||||
this.failure = failure;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,8 +31,11 @@ import org.apache.lucene.codecs.Codec;
|
|||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
|
||||
import org.apache.lucene.index.LiveIndexWriterConfig;
|
||||
import org.apache.lucene.index.LogByteSizeMergePolicy;
|
||||
|
@ -106,7 +109,6 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -130,6 +132,7 @@ import java.util.concurrent.CyclicBarrier;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
||||
|
@ -279,12 +282,21 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
|
||||
return createEngine(defaultSettings, store, translogPath, newMergePolicy());
|
||||
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
|
||||
}
|
||||
|
||||
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
|
||||
return createEngine(indexSettings, store, translogPath, mergePolicy, null);
|
||||
|
||||
}
|
||||
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, Supplier<IndexWriter> indexWriterSupplier) throws IOException {
|
||||
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
|
||||
InternalEngine internalEngine = new InternalEngine(config);
|
||||
InternalEngine internalEngine = new InternalEngine(config) {
|
||||
@Override
|
||||
IndexWriter createWriter(boolean create) throws IOException {
|
||||
return (indexWriterSupplier != null) ? indexWriterSupplier.get() : super.createWriter(create);
|
||||
}
|
||||
};
|
||||
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
|
||||
internalEngine.recoverFromTranslog();
|
||||
}
|
||||
|
@ -339,7 +351,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
|
||||
Engine.Index second = new Engine.Index(newUid("2"), doc2);
|
||||
Engine.IndexResult secondResult = engine.index(second);
|
||||
assertThat(secondResult.getLocation(), greaterThan(firstResult.getLocation()));
|
||||
assertThat(secondResult.getTranslogLocation(), greaterThan(firstResult.getTranslogLocation()));
|
||||
engine.refresh("test");
|
||||
|
||||
segments = engine.segments(false);
|
||||
|
@ -2134,6 +2146,72 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testCheckDocumentFailure() throws Exception {
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
|
||||
Exception documentFailure = engine.checkIfDocumentFailureOrThrow(new Engine.Index(newUid("1"), doc), new IOException("simulated document failure"));
|
||||
assertThat(documentFailure, instanceOf(IOException.class));
|
||||
try {
|
||||
engine.checkIfDocumentFailureOrThrow(new Engine.Index(newUid("1"), doc), new CorruptIndexException("simulated environment failure", ""));
|
||||
fail("expected exception to be thrown");
|
||||
} catch (ElasticsearchException envirnomentException) {
|
||||
assertThat(envirnomentException.getShardId(), equalTo(engine.shardId));
|
||||
assertThat(envirnomentException.getCause().getMessage(), containsString("simulated environment failure"));
|
||||
}
|
||||
}
|
||||
|
||||
private static class ThrowingIndexWriter extends IndexWriter {
|
||||
private boolean throwDocumentFailure;
|
||||
|
||||
public ThrowingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
|
||||
super(d, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
|
||||
if (throwDocumentFailure) {
|
||||
throw new IOException("simulated");
|
||||
} else {
|
||||
return super.addDocument(doc);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long deleteDocuments(Term... terms) throws IOException {
|
||||
if (throwDocumentFailure) {
|
||||
throw new IOException("simulated");
|
||||
} else {
|
||||
return super.deleteDocuments(terms);
|
||||
}
|
||||
}
|
||||
|
||||
public void setThrowDocumentFailure(boolean throwDocumentFailure) {
|
||||
this.throwDocumentFailure = throwDocumentFailure;
|
||||
}
|
||||
}
|
||||
|
||||
public void testHandleDocumentFailure() throws Exception {
|
||||
try (Store store = createStore()) {
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
|
||||
ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig());
|
||||
try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) {
|
||||
// test document failure while indexing
|
||||
throwingIndexWriter.setThrowDocumentFailure(true);
|
||||
Engine.IndexResult indexResult = engine.index(randomAppendOnly(1, doc, false));
|
||||
assertNotNull(indexResult.getFailure());
|
||||
|
||||
throwingIndexWriter.setThrowDocumentFailure(false);
|
||||
indexResult = engine.index(randomAppendOnly(1, doc, false));
|
||||
assertNull(indexResult.getFailure());
|
||||
|
||||
// test document failure while deleting
|
||||
throwingIndexWriter.setThrowDocumentFailure(true);
|
||||
Engine.DeleteResult deleteResult = engine.delete(new Engine.Delete("test", "", newUid("1")));
|
||||
assertNotNull(deleteResult.getFailure());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testDocStats() throws IOException {
|
||||
final int numDocs = randomIntBetween(2, 10); // at least 2 documents otherwise we don't see any deletes below
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
|
@ -2169,22 +2247,22 @@ public class InternalEngineTests extends ESTestCase {
|
|||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertFalse(engine.indexWriterHasDeletions());
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(indexResult.getLocation());
|
||||
assertNotNull(indexResult.getTranslogLocation());
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getLocation());
|
||||
assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) > 0);
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
|
||||
} else {
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getLocation());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertTrue(engine.indexWriterHasDeletions());
|
||||
assertEquals(0, engine.getNumVersionLookups());
|
||||
assertNotNull(retryResult.getLocation());
|
||||
assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) < 0);
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
|
||||
}
|
||||
|
||||
engine.refresh("test");
|
||||
|
@ -2196,16 +2274,16 @@ public class InternalEngineTests extends ESTestCase {
|
|||
retry = randomAppendOnly(1, doc, true);
|
||||
if (randomBoolean()) {
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertNotNull(indexResult.getLocation());
|
||||
assertNotNull(indexResult.getTranslogLocation());
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertNotNull(retryResult.getLocation());
|
||||
assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) > 0);
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
|
||||
} else {
|
||||
Engine.IndexResult retryResult = engine.index(retry);
|
||||
assertNotNull(retryResult.getLocation());
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
Engine.IndexResult indexResult = engine.index(operation);
|
||||
assertNotNull(retryResult.getLocation());
|
||||
assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) < 0);
|
||||
assertNotNull(retryResult.getTranslogLocation());
|
||||
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
|
||||
}
|
||||
|
||||
engine.refresh("test");
|
||||
|
|
|
@ -144,7 +144,7 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
for (int i = 0; i < maxListeners; i++) {
|
||||
DummyRefreshListener listener = new DummyRefreshListener();
|
||||
nonForcedListeners.add(listener);
|
||||
listeners.addOrNotify(index.getLocation(), listener);
|
||||
listeners.addOrNotify(index.getTranslogLocation(), listener);
|
||||
assertTrue(listeners.refreshNeeded());
|
||||
}
|
||||
|
||||
|
@ -155,7 +155,7 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
|
||||
// Add one more listener which should cause a refresh.
|
||||
DummyRefreshListener forcingListener = new DummyRefreshListener();
|
||||
listeners.addOrNotify(index.getLocation(), forcingListener);
|
||||
listeners.addOrNotify(index.getTranslogLocation(), forcingListener);
|
||||
assertTrue("Forced listener wasn't forced?", forcingListener.forcedRefresh.get());
|
||||
forcingListener.assertNoError();
|
||||
|
||||
|
@ -178,7 +178,7 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
}
|
||||
|
||||
DummyRefreshListener listener = new DummyRefreshListener();
|
||||
assertTrue(listeners.addOrNotify(index.getLocation(), listener));
|
||||
assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener));
|
||||
assertFalse(listener.forcedRefresh.get());
|
||||
listener.assertNoError();
|
||||
}
|
||||
|
@ -200,7 +200,7 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
for (int i = 0; i < 1000; i++) {
|
||||
Engine.IndexResult index = index("1");
|
||||
DummyRefreshListener listener = new DummyRefreshListener();
|
||||
boolean immediate = listeners.addOrNotify(index.getLocation(), listener);
|
||||
boolean immediate = listeners.addOrNotify(index.getTranslogLocation(), listener);
|
||||
if (immediate) {
|
||||
assertNotNull(listener.forcedRefresh.get());
|
||||
} else {
|
||||
|
@ -238,7 +238,7 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
assertEquals(iteration, index.getVersion());
|
||||
|
||||
DummyRefreshListener listener = new DummyRefreshListener();
|
||||
listeners.addOrNotify(index.getLocation(), listener);
|
||||
listeners.addOrNotify(index.getTranslogLocation(), listener);
|
||||
assertBusy(() -> assertNotNull("listener never called", listener.forcedRefresh.get()));
|
||||
if (threadCount < maxListeners) {
|
||||
assertFalse(listener.forcedRefresh.get());
|
||||
|
|
Loading…
Reference in New Issue