diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c4c6792bf46..023e659ffab 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -802,7 +802,7 @@ public class InternalEngine extends Engine { location = translog.add(new Translog.Index(index, indexResult)); } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { // if we have document failure, record it as a no-op in the translog with the generated seq_no - location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage())); + location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString())); } else { location = null; } @@ -1111,7 +1111,7 @@ public class InternalEngine extends Engine { location = translog.add(new Translog.Delete(delete, deleteResult)); } else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(), - delete.primaryTerm(), deleteResult.getFailure().getMessage())); + delete.primaryTerm(), deleteResult.getFailure().toString())); } else { location = null; } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index f38550d7041..1d1e423afc1 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineTests; @@ -47,6 +46,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTests; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.TestThreadPool; @@ -54,6 +54,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matcher; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -338,38 +339,73 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase * for primary and replica shards */ public void testDocumentFailureReplication() throws Exception { - final String failureMessage = "simulated document failure"; - final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory = - new ThrowingDocumentFailureEngineFactory(failureMessage); + final IOException indexException = new IOException("simulated indexing failure"); + final IOException deleteException = new IOException("simulated deleting failure"); + final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) -> + new IndexWriter(dir, iwc) { + final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW. + @Override + public long addDocument(Iterable doc) throws IOException { + if (throwAfterIndexedOneDoc.getAndSet(true)) { + throw indexException; + } else { + return super.addDocument(doc); + } + } + @Override + public long deleteDocuments(Term... terms) throws IOException { + throw deleteException; + } + }, null, null, config); try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) { @Override - protected EngineFactory getEngineFactory(ShardRouting routing) { - return throwingDocumentFailureEngineFactory; - }}) { + protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; }}) { - // test only primary + // start with the primary only so two first failures are replicated to replicas via recovery from the translog of the primary. shards.startPrimary(); - BulkItemResponse response = shards.index( - new IndexRequest(index.getName(), "type", "1") - .source("{}", XContentType.JSON) - ); - assertTrue(response.isFailed()); - assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPendingPrimaryTerm(), failureMessage); - shards.assertAllEqual(0); + long primaryTerm = shards.getPrimary().getPendingPrimaryTerm(); + List expectedTranslogOps = new ArrayList<>(); + BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON)); + assertThat(indexResp.isFailed(), equalTo(false)); + expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, "{}".getBytes(StandardCharsets.UTF_8), null, -1)); + try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); + } + + indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); + assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); + expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString())); + + BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1")); + assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException)); + expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString())); + shards.assertAllEqual(1); - // add some replicas int nReplica = randomIntBetween(1, 3); for (int i = 0; i < nReplica; i++) { shards.addReplica(); } shards.startReplicas(nReplica); - response = shards.index( - new IndexRequest(index.getName(), "type", "1") - .source("{}", XContentType.JSON) - ); - assertTrue(response.isFailed()); - assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPendingPrimaryTerm(), failureMessage); - shards.assertAllEqual(0); + for (IndexShard shard : shards) { + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); + } + } + // unlike previous failures, these two failures replicated directly from the replication channel. + indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); + assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); + expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString())); + + deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1")); + assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException)); + expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString())); + + for (IndexShard shard : shards) { + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); + } + } + shards.assertAllEqual(1); } } @@ -541,47 +577,4 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase shards.assertAllEqual(0); } } - - /** Throws documentFailure on every indexing operation */ - static class ThrowingDocumentFailureEngineFactory implements EngineFactory { - final String documentFailureMessage; - - ThrowingDocumentFailureEngineFactory(String documentFailureMessage) { - this.documentFailureMessage = documentFailureMessage; - } - - @Override - public Engine newReadWriteEngine(EngineConfig config) { - return InternalEngineTests.createInternalEngine((directory, writerConfig) -> - new IndexWriter(directory, writerConfig) { - @Override - public long addDocument(Iterable doc) throws IOException { - assert documentFailureMessage != null; - throw new IOException(documentFailureMessage); - } - }, null, null, config); - } - } - - private static void assertNoOpTranslogOperationForDocumentFailure( - Iterable replicationGroup, - int expectedOperation, - long expectedPrimaryTerm, - String failureMessage) throws IOException { - for (IndexShard indexShard : replicationGroup) { - try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(expectedOperation)); - long expectedSeqNo = 0L; - Translog.Operation op = snapshot.next(); - do { - assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP)); - assertThat(op.seqNo(), equalTo(expectedSeqNo)); - assertThat(op.primaryTerm(), equalTo(expectedPrimaryTerm)); - assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage)); - op = snapshot.next(); - expectedSeqNo++; - } while (op != null); - } - } - } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 9b63f0d233e..3f1f5daf514 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.resync.ResyncReplicationRequest; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.resync.TransportResyncReplicationAction; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -193,14 +194,23 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } public BulkItemResponse index(IndexRequest indexRequest) throws Exception { + return executeWriteRequest(indexRequest, indexRequest.getRefreshPolicy()); + } + + public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception { + return executeWriteRequest(deleteRequest, deleteRequest.getRefreshPolicy()); + } + + private BulkItemResponse executeWriteRequest( + DocWriteRequest writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); final ActionListener wrapBulkListener = ActionListener.wrap( - bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), - listener::onFailure); + bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), + listener::onFailure); BulkItemRequest[] items = new BulkItemRequest[1]; - items[0] = new BulkItemRequest(0, indexRequest); - BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items); - new IndexingAction(request, wrapBulkListener, this).execute(); + items[0] = new BulkItemRequest(0, writeRequest); + BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items); + new WriteReplicationAction(request, wrapBulkListener, this).execute(); return listener.get(); } @@ -598,9 +608,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } - class IndexingAction extends ReplicationAction { + class WriteReplicationAction extends ReplicationAction { - IndexingAction(BulkShardRequest request, ActionListener listener, ReplicationGroup replicationGroup) { + WriteReplicationAction(BulkShardRequest request, ActionListener listener, ReplicationGroup replicationGroup) { super(request, listener, replicationGroup, "indexing"); }