From 4f773e2dbb8464c66fe1cf711ba792c28d7be59f Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Wed, 19 Apr 2017 01:23:54 -0400 Subject: [PATCH] Replicate write failures (#23314) * Replicate write failures Currently, when a primary write operation fails after generating a sequence number, the failure is not communicated to the replicas. Ideally, every operation which generates a sequence number on primary should be recorded in all replicas. In this change, a sequence number is associated with write operation failure. When a failure with an assinged seqence number arrives at a replica, the failure cause and sequence number is recorded in the translog and the sequence number is marked as completed via executing `Engine.noOp` on the replica engine. * use zlong to serialize seq_no * Incorporate feedback * track write failures in translog as a noop in primary * Add tests for replicating write failures. Test that document failure (w/ seq no generated) are recorded as no-op in the translog for primary and replica shards * Update to master * update shouldExecuteOnReplica comment * rename indexshard noop to markSeqNoAsNoOp * remove redundant conditional * Consolidate possible replica action for bulk item request depanding on it's primary execution * remove bulk shard result abstraction * fix failure handling logic for bwc * add more tests * minor fix * cleanup * incorporate feedback * incorporate feedback * add assert to remove handling noop primary response when 5.0 nodes are not supported --- .../action/bulk/BulkItemRequest.java | 9 +- .../action/bulk/BulkItemResponse.java | 46 ++++- .../action/bulk/TransportShardBulkAction.java | 189 +++++++++++++----- .../replication/TransportWriteAction.java | 4 +- .../elasticsearch/index/engine/Engine.java | 30 +-- .../index/engine/InternalEngine.java | 30 ++- .../elasticsearch/index/shard/IndexShard.java | 33 ++- .../shard/TranslogRecoveryPerformer.java | 4 +- .../bulk/TransportShardBulkActionTests.java | 65 ++++-- .../index/engine/InternalEngineTests.java | 12 +- .../ESIndexLevelReplicationTestCase.java | 113 +++++------ .../IndexLevelReplicationTests.java | 145 +++++++++++++- .../RecoveryDuringReplicationTests.java | 6 +- .../elasticsearch/backwards/IndexingIT.java | 42 ++-- 14 files changed, 521 insertions(+), 207 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 3023ecb1856..50da1476f49 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -38,7 +38,8 @@ public class BulkItemRequest implements Streamable { } - protected BulkItemRequest(int id, DocWriteRequest request) { + // NOTE: public for testing only + public BulkItemRequest(int id, DocWriteRequest request) { this.id = id; this.request = request; } @@ -56,13 +57,11 @@ public class BulkItemRequest implements Streamable { return request.indices()[0]; } - // NOTE: protected for testing only - protected BulkItemResponse getPrimaryResponse() { + BulkItemResponse getPrimaryResponse() { return primaryResponse; } - // NOTE: protected for testing only - protected void setPrimaryResponse(BulkItemResponse primaryResponse) { + void setPrimaryResponse(BulkItemResponse primaryResponse) { this.primaryResponse = primaryResponse; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 2e2a7f15401..68cede5d251 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -37,6 +37,8 @@ import org.elasticsearch.common.xcontent.StatusToXContentObject; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -171,17 +173,34 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject { private final String id; private final Exception cause; private final RestStatus status; + private final long seqNo; - Failure(String index, String type, String id, Exception cause, RestStatus status) { + /** + * For write failures before operation was assigned a sequence number. + * + * use @{link {@link #Failure(String, String, String, Exception, long)}} + * to record operation sequence no with failure + */ + public Failure(String index, String type, String id, Exception cause) { + this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO); + } + + public Failure(String index, String type, String id, Exception cause, RestStatus status) { + this(index, type, id, cause, status, SequenceNumbersService.UNASSIGNED_SEQ_NO); + } + + /** For write failures after operation was assigned a sequence number. */ + public Failure(String index, String type, String id, Exception cause, long seqNo) { + this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo); + } + + public Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo) { this.index = index; this.type = type; this.id = id; this.cause = cause; this.status = status; - } - - public Failure(String index, String type, String id, Exception cause) { - this(index, type, id, cause, ExceptionsHelper.status(cause)); + this.seqNo = seqNo; } /** @@ -193,6 +212,11 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject { id = in.readOptionalString(); cause = in.readException(); status = ExceptionsHelper.status(cause); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } } @Override @@ -201,6 +225,9 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject { out.writeString(getType()); out.writeOptionalString(getId()); out.writeException(getCause()); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(getSeqNo()); + } } @@ -246,6 +273,15 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject { return cause; } + /** + * The operation sequence number generated by primary + * NOTE: {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} + * indicates sequence number was not generated by primary + */ + public long getSeqNo() { + return seqNo; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(INDEX_FIELD, index); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 697f4c2f993..170f2d30536 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -43,7 +44,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -65,13 +65,9 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.index.translog.Translog.Location; -import org.elasticsearch.action.bulk.BulkItemResultHolder; -import org.elasticsearch.action.bulk.BulkItemResponse; import java.io.IOException; import java.util.Map; -import java.util.Objects; import java.util.function.LongSupplier; /** Performs shard-level bulk (index, delete or update) operations */ @@ -113,12 +109,20 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnPrimary( BulkShardRequest request, IndexShard primary) throws Exception { + return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer()); + } + + public static WritePrimaryResult performOnPrimary( + BulkShardRequest request, + IndexShard primary, + UpdateHelper updateHelper, + LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater) throws Exception { final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); Translog.Location location = null; - final MappingUpdatePerformer mappingUpdater = new ConcreteMappingUpdatePerformer(); for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { location = executeBulkItemRequest(metaData, primary, request, location, requestIndex, - updateHelper, threadPool::absoluteTimeInMillis, mappingUpdater); + updateHelper, nowInMillisSupplier, mappingUpdater); } BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; BulkItemRequest[] items = request.items(); @@ -129,7 +133,6 @@ public class TransportShardBulkAction extends TransportWriteAction(request, response, location, null, primary, logger); } - private static BulkItemResultHolder executeIndexRequest(final IndexRequest indexRequest, final BulkItemRequest bulkItemRequest, final IndexShard primary, @@ -208,7 +211,8 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + final Translog.Location location = performOnReplica(request, replica); + return new WriteReplicaResult<>(request, location, null, replica, logger); + } + + public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; - if (shouldExecuteReplicaItem(item, i)) { - DocWriteRequest docWriteRequest = item.request(); - DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); - final Engine.Result operationResult; - try { - switch (docWriteRequest.opType()) { - case CREATE: - case INDEX: - operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica); - break; - case DELETE: - operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica); - break; - default: - throw new IllegalStateException("Unexpected request operation type on replica: " - + docWriteRequest.opType().getLowercase()); - } - if (operationResult.hasFailure()) { - // check if any transient write operation failures should be bubbled up - Exception failure = operationResult.getFailure(); - assert failure instanceof VersionConflictEngineException - || failure instanceof MapperParsingException - : "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" + - " failures. got " + failure; - if (!TransportActions.isShardNotAvailableException(failure)) { - throw failure; + final Engine.Result operationResult; + DocWriteRequest docWriteRequest = item.request(); + try { + switch (replicaItemExecutionMode(item, i)) { + case NORMAL: + final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); + switch (docWriteRequest.opType()) { + case CREATE: + case INDEX: + operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica); + break; + case DELETE: + operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica); + break; + default: + throw new IllegalStateException("Unexpected request operation type on replica: " + + docWriteRequest.opType().getLowercase()); } - } else { - location = locationToSync(location, operationResult.getTranslogLocation()); - } - } catch (Exception e) { - // if its not an ignore replica failure, we need to make sure to bubble up the failure - // so we will fail the shard - if (!TransportActions.isShardNotAvailableException(e)) { - throw e; - } + assert operationResult != null : "operation result must never be null when primary response has no failure"; + location = syncOperationResultOrThrow(operationResult, location); + break; + case NOOP: + break; + case FAILURE: + final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); + assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned"; + operationResult = executeFailureNoOpOnReplica(failure, replica); + assert operationResult != null : "operation result must never be null when primary response has no failure"; + location = syncOperationResultOrThrow(operationResult, location); + break; + default: + throw new IllegalStateException("illegal replica item execution mode for: " + item.request()); + } + } catch (Exception e) { + // if its not an ignore replica failure, we need to make sure to bubble up the failure + // so we will fail the shard + if (!TransportActions.isShardNotAvailableException(e)) { + throw e; } } } - return new WriteReplicaResult<>(request, location, null, replica, logger); + return location; + } + + /** Syncs operation result to the translog or throws a shard not available failure */ + private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult, + final Translog.Location currentLocation) throws Exception { + final Translog.Location location; + if (operationResult.hasFailure()) { + // check if any transient write operation failures should be bubbled up + Exception failure = operationResult.getFailure(); + assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure; + if (!TransportActions.isShardNotAvailableException(failure)) { + throw failure; + } else { + location = currentLocation; + } + } else { + location = locationToSync(currentLocation, operationResult.getTranslogLocation()); + } + return location; } private static Translog.Location locationToSync(Translog.Location current, @@ -429,7 +504,7 @@ public class TransportShardBulkAction extends TransportWriteAction, + public static class WritePrimaryResult, Response extends ReplicationResponse & WriteResponse> extends PrimaryResult implements RespondingWriteResult { boolean finishedAsyncActions; diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 59655abf289..45b731cd9cf 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -363,7 +363,6 @@ public abstract class Engine implements Closeable { void setTranslogLocation(Translog.Location translogLocation) { if (freeze.get() == null) { - assert failure == null : "failure has to be null to set translog location"; this.translogLocation = translogLocation; } else { throw new IllegalStateException("result is already frozen"); @@ -432,7 +431,7 @@ public abstract class Engine implements Closeable { } - static class NoOpResult extends Result { + public static class NoOpResult extends Result { NoOpResult(long seqNo) { super(Operation.TYPE.NO_OP, 0, seqNo); @@ -1154,24 +1153,31 @@ public abstract class Engine implements Closeable { return reason; } - public NoOp( - final Term uid, - final long seqNo, - final long primaryTerm, - final long version, - final VersionType versionType, - final Origin origin, - final long startTime, - final String reason) { - super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) { + super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime); this.reason = reason; } + @Override + public Term uid() { + throw new UnsupportedOperationException(); + } + @Override public String type() { throw new UnsupportedOperationException(); } + @Override + public long version() { + throw new UnsupportedOperationException(); + } + + @Override + public VersionType versionType() { + throw new UnsupportedOperationException(); + } + @Override String id() { throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0bed51e0e24..544b68add13 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -614,10 +614,16 @@ public class InternalEngine extends Engine { indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } - if (indexResult.hasFailure() == false && - index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - Translog.Location location = - translog.add(new Translog.Index(index, indexResult)); + if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + final Translog.Location location; + if (indexResult.hasFailure() == false) { + location = translog.add(new Translog.Index(index, indexResult)); + } else if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + // if we have document failure, record it as a no-op in the translog with the generated seq_no + location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage())); + } else { + location = null; + } indexResult.setTranslogLocation(location); } if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { @@ -749,7 +755,7 @@ public class InternalEngine extends Engine { * we return a `MATCH_ANY` version to indicate no document was index. The value is * not used anyway */ - return new IndexResult(ex, Versions.MATCH_ANY, index.seqNo()); + return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing); } else { throw ex; } @@ -900,10 +906,16 @@ public class InternalEngine extends Engine { deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); } - if (!deleteResult.hasFailure() && - delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - Translog.Location location = - translog.add(new Translog.Delete(delete, deleteResult)); + if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + final Translog.Location location; + if (deleteResult.hasFailure() == false) { + location = translog.add(new Translog.Delete(delete, deleteResult)); + } else if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(), + delete.primaryTerm(), deleteResult.getFailure().getMessage())); + } else { + location = null; + } deleteResult.setTranslogLocation(location); } if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1dee58ced00..589572fff3f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -569,12 +569,21 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return result; } + public Engine.NoOp prepareMarkingSeqNoAsNoOp(long seqNo, String reason) { + verifyReplicationTarget(); + long startTime = System.nanoTime(); + return new Engine.NoOp(seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason); + } + + public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException { + ensureWriteAllowed(noOp); + Engine engine = getEngine(); + return engine.noOp(noOp); + } + public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) { verifyPrimary(); - final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); - final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); - final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); - final Term uid = MappedFieldType.extractTerm(uidQuery); + final Term uid = extractUid(type, id); return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, Engine.Operation.Origin.PRIMARY); } @@ -582,15 +591,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType) { verifyReplicationTarget(); - final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); - final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); - final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); - final Term uid = MappedFieldType.extractTerm(uidQuery); + final Term uid = extractUid(type, id); return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA); } - static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, - VersionType versionType, Engine.Operation.Origin origin) { + private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, + VersionType versionType, Engine.Operation.Origin origin) { long startTime = System.nanoTime(); return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime); } @@ -601,6 +607,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return delete(engine, delete); } + private Term extractUid(String type, String id) { + final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); + final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); + final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); + return MappedFieldType.extractTerm(uidQuery); + } + private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { active.set(true); final Engine.DeleteResult result; diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index d5aadc1664e..8842cbf3c0b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -23,7 +23,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException; import org.elasticsearch.index.mapper.DocumentMapperForType; @@ -31,7 +30,6 @@ import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; @@ -182,7 +180,7 @@ public class TranslogRecoveryPerformer { final String reason = noOp.reason(); logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason); final Engine.NoOp engineNoOp = - new Engine.NoOp(null, seqNo, primaryTerm, 0, VersionType.INTERNAL, origin, System.nanoTime(), reason); + new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason); noOp(engine, engineNoOp); break; default: diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 355b3978cbf..4016c2cbdef 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -34,14 +34,9 @@ import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -52,15 +47,12 @@ import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.action.bulk.TransportShardBulkAction; -import org.elasticsearch.action.bulk.MappingUpdatePerformer; -import org.elasticsearch.action.bulk.BulkItemResultHolder; +import org.mockito.ArgumentCaptor; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.containsString; @@ -96,26 +88,38 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); - assertTrue(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NORMAL)); - // Failed index requests should not be replicated (for now!) + // Failed index requests without sequence no should not be replicated writeRequest = new IndexRequest("index", "type", "id") .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); - response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse( new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("index", "type", "id", new IllegalArgumentException("i died")))); - assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NOOP)); + // Failed index requests with sequence no should be replicated + request = new BulkItemRequest(0, writeRequest); + request.setPrimaryResponse( + new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "type", "id", + new IllegalArgumentException( + "i died after sequence no was generated"), + 1))); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.FAILURE)); // NOOP requests should not be replicated writeRequest = new UpdateRequest("index", "type", "id"); response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, response)); - assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NOOP)); } @@ -515,6 +519,35 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { } + public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception { + final IndexShard shard = spy(newStartedShard(false)); + BulkItemRequest itemRequest = new BulkItemRequest(0, + new IndexRequest("index", "type") + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar") + ); + final String failureMessage = "simulated primary failure"; + itemRequest.setPrimaryResponse(new BulkItemResponse(0, + randomFrom( + DocWriteRequest.OpType.CREATE, + DocWriteRequest.OpType.DELETE, + DocWriteRequest.OpType.INDEX + ), + new BulkItemResponse.Failure("index", "type", "1", + new IOException(failureMessage), 1L) + )); + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + itemRequests[0] = itemRequest; + BulkShardRequest bulkShardRequest = new BulkShardRequest( + shard.shardId(), RefreshPolicy.NONE, itemRequests); + TransportShardBulkAction.performOnReplica(bulkShardRequest, shard); + ArgumentCaptor noOp = ArgumentCaptor.forClass(Engine.NoOp.class); + verify(shard, times(1)).markSeqNoAsNoOp(noOp.capture()); + final Engine.NoOp noOpValue = noOp.getValue(); + assertThat(noOpValue.seqNo(), equalTo(1L)); + assertThat(noOpValue.reason(), containsString(failureMessage)); + closeShards(shard); + } + public void testMappingUpdateParsesCorrectNumberOfTimes() throws Exception { IndexMetaData metaData = indexMetaData(); logger.info("--> metadata.getIndex(): {}", metaData.getIndex()); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 71d754ddfb6..af53c4997fd 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2857,10 +2857,13 @@ public class InternalEngineTests extends ESTestCase { } Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); assertNotNull(indexResult.getFailure()); - + // document failures should be recorded in translog + assertNotNull(indexResult.getTranslogLocation()); throwingIndexWriter.get().clearFailure(); indexResult = engine.index(indexForDoc(doc1)); assertNull(indexResult.getFailure()); + // document failures should be recorded in translog + assertNotNull(indexResult.getTranslogLocation()); engine.index(indexForDoc(doc2)); // test failure while deleting @@ -3672,12 +3675,9 @@ public class InternalEngineTests extends ESTestCase { final String reason = randomAlphaOfLength(16); noOpEngine.noOp( new Engine.NoOp( - null, - maxSeqNo + 1, + maxSeqNo + 1, primaryTerm, - 0, - VersionType.INTERNAL, - randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), + randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), System.nanoTime(), reason)); assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c35f72d2085..2243a5769b9 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -22,21 +22,21 @@ package org.elasticsearch.index.replication; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardResponse; +import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkActionTests; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction.ReplicaResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -50,7 +50,6 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; @@ -58,6 +57,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; @@ -77,8 +77,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary; -import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnReplica; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -147,9 +145,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase public int indexDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet())) - .source("{}", XContentType.JSON); - final IndexResponse response = index(indexRequest); - assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); + .source("{}", XContentType.JSON); + final BulkItemResponse response = index(indexRequest); + if (response.isFailed()) { + throw response.getFailure().getCause(); + } else { + assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); + } } primary.updateGlobalCheckpointOnPrimary(); return numOfDoc; @@ -158,43 +160,29 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase public int appendDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); - final IndexResponse response = index(indexRequest); - assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); + final BulkItemResponse response = index(indexRequest); + if (response.isFailed()) { + throw response.getFailure().getCause(); + } else if (response.isFailed() == false) { + assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); + } } primary.updateGlobalCheckpointOnPrimary(); return numOfDoc; } - public IndexResponse index(IndexRequest indexRequest) throws Exception { - PlainActionFuture listener = new PlainActionFuture<>(); + public BulkItemResponse index(IndexRequest indexRequest) throws Exception { + PlainActionFuture listener = new PlainActionFuture<>(); final ActionListener wrapBulkListener = ActionListener.wrap( - bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0].getResponse()), + bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), listener::onFailure); BulkItemRequest[] items = new BulkItemRequest[1]; - items[0] = new TestBulkItemRequest(0, indexRequest); + items[0] = new BulkItemRequest(0, indexRequest); BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items); new IndexingAction(request, wrapBulkListener, this).execute(); return listener.get(); } - /** BulkItemRequest exposing get/set primary response */ - public class TestBulkItemRequest extends BulkItemRequest { - - TestBulkItemRequest(int id, DocWriteRequest request) { - super(id, request); - } - - @Override - protected void setPrimaryResponse(BulkItemResponse primaryResponse) { - super.setPrimaryResponse(primaryResponse); - } - - @Override - protected BulkItemResponse getPrimaryResponse() { - return super.getPrimaryResponse(); - } - } - public synchronized void startAll() throws IOException { startReplicas(replicas.size()); } @@ -442,7 +430,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; - protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws IOException; + protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception; class PrimaryRef implements ReplicationOperation.Primary { @@ -539,46 +527,53 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase @Override protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception { - final IndexRequest indexRequest = (IndexRequest) request.items()[0].request(); - indexRequest.process(null, request.index()); - final IndexResponse indexResponse = indexOnPrimary(indexRequest, primary); - BulkItemResponse[] itemResponses = new BulkItemResponse[1]; - itemResponses[0] = new BulkItemResponse(0, indexRequest.opType(), indexResponse); - ((ReplicationGroup.TestBulkItemRequest) request.items()[0]).setPrimaryResponse(itemResponses[0]); - return new PrimaryResult(request, new BulkShardResponse(primary.shardId(), itemResponses)); + final TransportWriteAction.WritePrimaryResult result = executeShardBulkOnPrimary(primary, request); + return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); } @Override - protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws IOException { - final ReplicationGroup.TestBulkItemRequest bulkItemRequest = ((ReplicationGroup.TestBulkItemRequest) request.items()[0]); - final DocWriteResponse primaryResponse = bulkItemRequest.getPrimaryResponse().getResponse(); - indexOnReplica(primaryResponse, ((IndexRequest) bulkItemRequest.request()), replica); + protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + executeShardBulkOnReplica(replica, request); } } + private TransportWriteAction.WritePrimaryResult executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception { + for (BulkItemRequest itemRequest : request.items()) { + if (itemRequest.request() instanceof IndexRequest) { + ((IndexRequest) itemRequest.request()).process(null, index.getName()); + } + } + final TransportWriteAction.WritePrimaryResult result = + TransportShardBulkAction.performOnPrimary(request, primary, null, + System::currentTimeMillis, new TransportShardBulkActionTests.NoopMappingUpdatePerformer()); + request.primaryTerm(primary.getPrimaryTerm()); + TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); + return result; + } + + private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest request) throws Exception { + final Translog.Location location = TransportShardBulkAction.performOnReplica(request, replica); + TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger); + } + /** * indexes the given requests on the supplied primary, modifying it for replicas */ - protected IndexResponse indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { - final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, - new TransportShardBulkActionTests.NoopMappingUpdatePerformer()); - request.primaryTerm(primary.getPrimaryTerm()); - TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger); - return new IndexResponse( - primary.shardId(), - request.type(), - request.id(), - indexResult.getSeqNo(), - indexResult.getVersion(), - indexResult.isCreated()); + BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { + final BulkItemRequest bulkItemRequest = new BulkItemRequest(0, request); + BulkItemRequest[] bulkItemRequests = new BulkItemRequest[1]; + bulkItemRequests[0] = bulkItemRequest; + final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), bulkItemRequests); + final TransportWriteAction.WritePrimaryResult result = + executeShardBulkOnPrimary(primary, bulkShardRequest); + return result.replicaRequest(); } /** * indexes the given requests on the supplied replica shard */ - protected void indexOnReplica(DocWriteResponse response, IndexRequest request, IndexShard replica) throws IOException { - final Engine.IndexResult result = executeIndexRequestOnReplica(response, request, replica); - TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); + void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + executeShardBulkOnReplica(replica, request); } class GlobalCheckpointSync extends ReplicationAction future = shards.asyncRecoverReplica(replica, (indexShard, node) - -> new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) { + -> new RecoveryTarget(indexShard, node, recoveryListener, version -> { + }) { @Override public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { super.cleanFiles(totalTranslogOps, sourceMetaData); @@ -113,8 +122,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase shards.startAll(); final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); indexRequest.onRetry(); // force an update of the timestamp - final IndexResponse response = shards.index(indexRequest); - assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); + final BulkItemResponse response = shards.index(indexRequest); + assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); if (randomBoolean()) { // lets check if that also happens if no translog record is replicated shards.flush(); } @@ -147,7 +156,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase final SeqNoStats shardStats = shard.seqNoStats(); final ShardRouting shardRouting = shard.routingEntry(); logger.debug("seq_no stats for {}: {}", shardRouting, XContentHelper.toString(shardStats, - new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); + new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); assertThat(shardRouting + " local checkpoint mismatch", shardStats.getLocalCheckpoint(), equalTo(numDocs - 1L)); assertThat(shardRouting + " global checkpoint mismatch", shardStats.getGlobalCheckpoint(), equalTo(numDocs - 1L)); @@ -158,7 +167,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase public void testConflictingOpsOnReplica() throws Exception { Map mappings = - Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); + Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { shards.startAll(); IndexShard replica1 = shards.getReplicas().get(0); @@ -180,4 +189,128 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase } } } + + /** + * test document failures (failures after seq_no generation) are added as noop operation to the translog + * for primary and replica shards + */ + public void testDocumentFailureReplication() throws Exception { + final String failureMessage = "simulated document failure"; + final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory = + new ThrowingDocumentFailureEngineFactory(failureMessage); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return throwingDocumentFailureEngineFactory; + }}) { + + // test only primary + shards.startPrimary(); + BulkItemResponse response = shards.index( + new IndexRequest(index.getName(), "testDocumentFailureReplication", "1") + .source("{}", XContentType.JSON) + ); + assertTrue(response.isFailed()); + assertNoOpTranslogOperationForDocumentFailure(shards, 1, failureMessage); + shards.assertAllEqual(0); + + // add some replicas + int nReplica = randomIntBetween(1, 3); + for (int i = 0; i < nReplica; i++) { + shards.addReplica(); + } + shards.startReplicas(nReplica); + response = shards.index( + new IndexRequest(index.getName(), "testDocumentFailureReplication", "1") + .source("{}", XContentType.JSON) + ); + assertTrue(response.isFailed()); + assertNoOpTranslogOperationForDocumentFailure(shards, 2, failureMessage); + shards.assertAllEqual(0); + } + } + + /** + * test request failures (failures before seq_no generation) are not added as a noop to translog + */ + public void testRequestFailureReplication() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startAll(); + BulkItemResponse response = shards.index( + new IndexRequest(index.getName(), "testRequestFailureException", "1") + .source("{}", XContentType.JSON) + .version(2) + ); + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class)); + shards.assertAllEqual(0); + for (IndexShard indexShard : shards) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(0)); + } + } + + // add some replicas + int nReplica = randomIntBetween(1, 3); + for (int i = 0; i < nReplica; i++) { + shards.addReplica(); + } + shards.startReplicas(nReplica); + response = shards.index( + new IndexRequest(index.getName(), "testRequestFailureException", "1") + .source("{}", XContentType.JSON) + .version(2) + ); + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class)); + shards.assertAllEqual(0); + for (IndexShard indexShard : shards) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(0)); + } + } + } + } + + /** Throws documentFailure on every indexing operation */ + static class ThrowingDocumentFailureEngineFactory implements EngineFactory { + final String documentFailureMessage; + + ThrowingDocumentFailureEngineFactory(String documentFailureMessage) { + this.documentFailureMessage = documentFailureMessage; + } + + @Override + public Engine newReadWriteEngine(EngineConfig config) { + return InternalEngineTests.createInternalEngine((directory, writerConfig) -> + new IndexWriter(directory, writerConfig) { + @Override + public long addDocument(Iterable doc) throws IOException { + assert documentFailureMessage != null; + throw new IOException(documentFailureMessage); + } + }, null, config); + } + } + + private static void assertNoOpTranslogOperationForDocumentFailure( + Iterable replicationGroup, + int expectedOperation, + String failureMessage) throws IOException { + for (IndexShard indexShard : replicationGroup) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(expectedOperation)); + final Translog.Snapshot snapshot = view.snapshot(); + long expectedSeqNo = 0L; + Translog.Operation op = snapshot.next(); + do { + assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP)); + assertThat(op.seqNo(), equalTo(expectedSeqNo)); + assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage)); + op = snapshot.next(); + expectedSeqNo++; + } while (op != null); + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 12f749e6819..139c7f500d8 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -24,9 +24,9 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; @@ -168,8 +168,8 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC for (int i = 0; i < rollbackDocs; i++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i) .source("{}", XContentType.JSON); - final IndexResponse primaryResponse = indexOnPrimary(indexRequest, oldPrimary); - indexOnReplica(primaryResponse, indexRequest, replica); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); + indexOnReplica(bulkShardRequest, replica); } if (randomBoolean()) { oldPrimary.flush(new FlushRequest(index.getName())); diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java index f0be7753067..6ef40a77782 100644 --- a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.anyOf; @@ -76,7 +77,7 @@ public class IndexingIT extends ESRestTestCase { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), - new StringEntity("{\"test\": \"test_" + id + "\"}", ContentType.APPLICATION_JSON))); + new StringEntity("{\"test\": \"test_" + randomAsciiOfLength(2) + "\"}", ContentType.APPLICATION_JSON))); } return numDocs; } @@ -116,7 +117,7 @@ public class IndexingIT extends ESRestTestCase { .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) .put("index.routing.allocation.include._name", bwcNames); - final String index = "test"; + final String index = "indexversionprop"; final int minUpdates = 5; final int maxUpdates = 10; createIndex(index, settings.build()); @@ -130,7 +131,9 @@ public class IndexingIT extends ESRestTestCase { updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); ensureGreen(); assertOK(client().performRequest("POST", index + "/_refresh")); - List shards = buildShards(nodes, newNodeClient); + List shards = buildShards(index, nodes, newNodeClient); + Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + logger.info("primary resolved to: " + primary.getNode().getNodeName()); for (Shard shard : shards) { assertVersion(index, 1, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc1); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 1); @@ -140,13 +143,15 @@ public class IndexingIT extends ESRestTestCase { logger.info("indexing docs with [{}] concurrent updates after allowing shards on all nodes", nUpdates); final int finalVersionForDoc2 = indexDocWithConcurrentUpdates(index, 2, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); + primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); + logger.info("primary resolved to: " + primary.getNode().getNodeName()); for (Shard shard : shards) { assertVersion(index, 2, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc2); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 2); } - Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); ensureGreen(); @@ -154,7 +159,7 @@ public class IndexingIT extends ESRestTestCase { logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates); final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 3, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc3); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 3); @@ -167,7 +172,7 @@ public class IndexingIT extends ESRestTestCase { logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates); final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 4, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc4); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 4); @@ -180,7 +185,7 @@ public class IndexingIT extends ESRestTestCase { logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates); final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5); @@ -216,7 +221,7 @@ public class IndexingIT extends ESRestTestCase { final int numberOfInitialDocs = 1 + randomInt(5); logger.info("indexing [{}] docs initially", numberOfInitialDocs); numDocs += indexDocs(index, 0, numberOfInitialDocs); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, 0, newNodeClient); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); ensureGreen(); @@ -227,8 +232,8 @@ public class IndexingIT extends ESRestTestCase { final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5); logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes); numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); - Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, 0, newNodeClient); + Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); ensureGreen(); @@ -237,7 +242,7 @@ public class IndexingIT extends ESRestTestCase { logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary); numDocs += numberOfDocsAfterMovingPrimary; - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); /* * Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in * the recovery code. @@ -255,7 +260,7 @@ public class IndexingIT extends ESRestTestCase { // the number of documents on the primary and on the recovered replica should match the number of indexed documents assertCount(index, "_primary", numDocs); assertCount(index, "_replica", numDocs); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); } } @@ -274,10 +279,11 @@ public class IndexingIT extends ESRestTestCase { assertThat("version mismatch for doc [" + docId + "] preference [" + preference + "]", actualVersion, equalTo(expectedVersion)); } - private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { + private void assertSeqNoOnShards(String index, Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) + throws Exception { assertBusy(() -> { try { - List shards = buildShards(nodes, client); + List shards = buildShards(index, nodes, client); Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get(); assertNotNull("failed to find primary shard", primaryShard); final long expectedGlobalCkp; @@ -311,9 +317,9 @@ public class IndexingIT extends ESRestTestCase { }); } - private List buildShards(Nodes nodes, RestClient client) throws IOException { - Response response = client.performRequest("GET", "test/_stats", singletonMap("level", "shards")); - List shardStats = ObjectPath.createFromResponse(response).evaluate("indices.test.shards.0"); + private List buildShards(String index, Nodes nodes, RestClient client) throws IOException { + Response response = client.performRequest("GET", index + "/_stats", singletonMap("level", "shards")); + List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); ArrayList shards = new ArrayList<>(); for (Object shard : shardStats) { final String nodeId = ObjectPath.evaluate(shard, "routing.node");