diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 987aa36585b..3023ecb1856 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -19,7 +19,9 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -31,13 +33,12 @@ public class BulkItemRequest implements Streamable { private int id; private DocWriteRequest request; private volatile BulkItemResponse primaryResponse; - private volatile boolean ignoreOnReplica; BulkItemRequest() { } - public BulkItemRequest(int id, DocWriteRequest request) { + protected BulkItemRequest(int id, DocWriteRequest request) { this.id = id; this.request = request; } @@ -55,25 +56,16 @@ public class BulkItemRequest implements Streamable { return request.indices()[0]; } - BulkItemResponse getPrimaryResponse() { + // NOTE: protected for testing only + protected BulkItemResponse getPrimaryResponse() { return primaryResponse; } - void setPrimaryResponse(BulkItemResponse primaryResponse) { + // NOTE: protected for testing only + protected void setPrimaryResponse(BulkItemResponse primaryResponse) { this.primaryResponse = primaryResponse; } - /** - * Marks this request to be ignored and *not* execute on a replica. - */ - void setIgnoreOnReplica() { - this.ignoreOnReplica = true; - } - - boolean isIgnoreOnReplica() { - return ignoreOnReplica; - } - public static BulkItemRequest readBulkItem(StreamInput in) throws IOException { BulkItemRequest item = new BulkItemRequest(); item.readFrom(in); @@ -87,14 +79,37 @@ public class BulkItemRequest implements Streamable { if (in.readBoolean()) { primaryResponse = BulkItemResponse.readBulkItem(in); } - ignoreOnReplica = in.readBoolean(); + if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { // TODO remove once backported + boolean ignoreOnReplica = in.readBoolean(); + if (ignoreOnReplica == false && primaryResponse != null) { + assert primaryResponse.isFailed() == false : "expected no failure on the primary response"; + } + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); - DocWriteRequest.writeDocumentRequest(out, request); + if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { // TODO remove once backported + // old nodes expect updated version and version type on the request + if (primaryResponse != null) { + request.version(primaryResponse.getVersion()); + request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); + DocWriteRequest.writeDocumentRequest(out, request); + } else { + DocWriteRequest.writeDocumentRequest(out, request); + } + } else { + DocWriteRequest.writeDocumentRequest(out, request); + } out.writeOptionalStreamable(primaryResponse); - out.writeBoolean(ignoreOnReplica); + if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { // TODO remove once backported + if (primaryResponse != null) { + out.writeBoolean(primaryResponse.isFailed() + || primaryResponse.getResponse().getResult() == DocWriteResponse.Result.NOOP); + } else { + out.writeBoolean(false); + } + } } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index c270c51ea38..8e2dde7db63 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -36,7 +36,7 @@ public class BulkShardRequest extends ReplicatedWriteRequest { public BulkShardRequest() { } - BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { + public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { super(shardId); this.items = items; setRefreshPolicy(refreshPolicy); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java index b51ce624800..aa368c13fb8 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java @@ -36,7 +36,8 @@ public class BulkShardResponse extends ReplicationResponse implements WriteRespo BulkShardResponse() { } - BulkShardResponse(ShardId shardId, BulkItemResponse[] responses) { + // NOTE: public for testing only + public BulkShardResponse(ShardId shardId, BulkItemResponse[] responses) { this.shardId = shardId; this.responses = responses; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index fc580dd3880..46bd825d0c7 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -104,14 +104,10 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnPrimary( BulkShardRequest request, IndexShard primary) throws Exception { final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); - - long[] preVersions = new long[request.items().length]; - VersionType[] preVersionTypes = new VersionType[request.items().length]; Translog.Location location = null; for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { - location = executeBulkItemRequest(metaData, primary, request, preVersions, preVersionTypes, location, requestIndex); + location = executeBulkItemRequest(metaData, primary, request, location, requestIndex); } - BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; BulkItemRequest[] items = request.items(); for (int i = 0; i < items.length; i++) { @@ -124,110 +120,73 @@ public class TransportShardBulkAction extends TransportWriteAction) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); - } else { - logger.debug((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); - } - // if its a conflict failure, and we already executed the request on a primary (and we execute it - // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) - // then just use the response we got from the successful execution - if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) { - replicaRequest.setIgnoreOnReplica(); - replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(), - new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure))); - } - } - assert replicaRequest.getPrimaryResponse() != null; - assert preVersionTypes[requestIndex] != null; - } catch (Exception e) { - // rethrow the failure if we are going to retry on primary and let parent failure to handle it - if (retryPrimaryException(e)) { - // restore updated versions... - for (int j = 0; j < requestIndex; j++) { - DocWriteRequest docWriteRequest = request.items()[j].request(); - docWriteRequest.version(preVersions[j]); - docWriteRequest.versionType(preVersionTypes[j]); - } - } - throw e; + final DocWriteRequest.OpType opType = itemRequest.opType(); + final Engine.Result operationResult; + final DocWriteResponse response; + final BulkItemRequest replicaRequest; + switch (itemRequest.opType()) { + case CREATE: + case INDEX: + final IndexRequest indexRequest = (IndexRequest) itemRequest; + Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction); + response = indexResult.hasFailure() ? null : + new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(), + indexResult.getVersion(), indexResult.isCreated()); + operationResult = indexResult; + replicaRequest = request.items()[requestIndex]; + break; + case UPDATE: + UpdateResultHolder updateResultHolder = executeUpdateRequest(((UpdateRequest) itemRequest), + primary, metaData, request, requestIndex); + operationResult = updateResultHolder.operationResult; + response = updateResultHolder.response; + replicaRequest = updateResultHolder.replicaRequest; + break; + case DELETE: + final DeleteRequest deleteRequest = (DeleteRequest) itemRequest; + Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary); + response = deleteResult.hasFailure() ? null : + new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(), + deleteResult.getVersion(), deleteResult.isFound()); + operationResult = deleteResult; + replicaRequest = request.items()[requestIndex]; + break; + default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); } + + // update the bulk item request because update request execution can mutate the bulk item request + request.items()[requestIndex] = replicaRequest; + if (operationResult == null) { // in case of noop update operation + assert response.getResult() == DocWriteResponse.Result.NOOP + : "only noop update can have null operation"; + replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), opType, response)); + } else if (operationResult.hasFailure() == false) { + location = locationToSync(location, operationResult.getTranslogLocation()); + BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response); + replicaRequest.setPrimaryResponse(primaryResponse); + // set the ShardInfo to 0 so we can safely send it to the replicas. We won't use it in the real response though. + primaryResponse.getResponse().setShardInfo(new ShardInfo()); + } else { + DocWriteRequest docWriteRequest = replicaRequest.request(); + Exception failure = operationResult.getFailure(); + if (isConflictException(failure)) { + logger.trace((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); + } else { + logger.debug((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); + } + // if its a conflict failure, and we already executed the request on a primary (and we execute it + // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) + // then just use the response we got from the successful execution + if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) { + replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(), + new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure))); + } + } + assert replicaRequest.getPrimaryResponse() != null; return location; } @@ -281,25 +240,10 @@ public class TransportShardBulkAction extends TransportWriteAction> extends ReplicationRequest implements WriteRequest { private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; - private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; - /** * Constructor for deserialization. */ @@ -66,32 +62,11 @@ public abstract class ReplicatedWriteRequest public void readFrom(StreamInput in) throws IOException { super.readFrom(in); refreshPolicy = RefreshPolicy.readFrom(in); - if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { - seqNo = in.readZLong(); - } else { - seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; - } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); refreshPolicy.writeTo(out); - if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { - out.writeZLong(seqNo); - } - } - - /** - * Returns the sequence number for this operation. The sequence number is assigned while the operation - * is performed on the primary shard. - */ - public long getSeqNo() { - return seqNo; - } - - /** sets the sequence number for this operation. should only be called on the primary shard */ - public void setSeqNo(long seqNo) { - this.seqNo = seqNo; } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 36b8ab6574c..f6b452502a5 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -391,6 +391,14 @@ public abstract class Engine implements Closeable { this.created = created; } + /** + * use in case of index operation failed before getting to internal engine + * (e.g while preparing operation or updating mappings) + * */ + public IndexResult(Exception failure, long version) { + this(failure, version, SequenceNumbersService.UNASSIGNED_SEQ_NO); + } + public IndexResult(Exception failure, long version, long seqNo) { super(Operation.TYPE.INDEX, failure, version, seqNo); this.created = false; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bcfee5026ce..a1ae385237a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -478,6 +478,20 @@ public class InternalEngine extends Engine { return false; } + private boolean assertVersionType(final Engine.Operation operation) { + if (operation.origin() == Operation.Origin.REPLICA || + operation.origin() == Operation.Origin.PEER_RECOVERY || + operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + // ensure that replica operation has expected version type for replication + // ensure that versionTypeForReplicationAndRecovery is idempotent + assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery() + : "unexpected version type in request from [" + operation.origin().name() + "] " + + "found [" + operation.versionType().name() + "] " + + "expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]"; + } + return true; + } + private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) && origin == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { // legacy support @@ -499,6 +513,7 @@ public class InternalEngine extends Engine { try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); assert assertSequenceNumber(index.origin(), index.seqNo()); + assert assertVersionType(index); final Translog.Location location; long seqNo = index.seqNo(); try (Releasable ignored = acquireLock(index.uid()); @@ -692,6 +707,7 @@ public class InternalEngine extends Engine { public DeleteResult delete(Delete delete) throws IOException { DeleteResult result; try (ReleasableLock ignored = readLock.acquire()) { + assert assertVersionType(delete); ensureOpen(); // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: result = innerDelete(delete); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5b4a6e10dda..6c7d2dd810f 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3114,7 +3114,7 @@ public class InternalEngineTests extends ESTestCase { public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws IOException { final long v = Versions.MATCH_ANY; - final VersionType t = VersionType.INTERNAL; + final VersionType t = VersionType.EXTERNAL; final long ts = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; final int docs = randomIntBetween(1, 32); InternalEngine initialEngine = null; diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 0a0cddf5d81..96f86f124dc 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -22,8 +22,17 @@ package org.elasticsearch.index.replication; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.BulkItemRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.bulk.BulkShardResponse; +import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; @@ -157,10 +166,34 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase public IndexResponse index(IndexRequest indexRequest) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - new IndexingAction(indexRequest, listener, this).execute(); + final ActionListener wrapBulkListener = ActionListener.wrap( + bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0].getResponse()), + listener::onFailure); + BulkItemRequest[] items = new BulkItemRequest[1]; + items[0] = new TestBulkItemRequest(0, indexRequest); + BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items); + new IndexingAction(request, wrapBulkListener, this).execute(); return listener.get(); } + /** BulkItemRequest exposing get/set primary response */ + public class TestBulkItemRequest extends BulkItemRequest { + + TestBulkItemRequest(int id, DocWriteRequest request) { + super(id, request); + } + + @Override + protected void setPrimaryResponse(BulkItemResponse primaryResponse) { + super.setPrimaryResponse(primaryResponse); + } + + @Override + protected BulkItemResponse getPrimaryResponse() { + return super.getPrimaryResponse(); + } + } + public synchronized void startAll() throws IOException { startReplicas(replicas.size()); } @@ -486,22 +519,28 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } - class IndexingAction extends ReplicationAction { + class IndexingAction extends ReplicationAction { - IndexingAction(IndexRequest request, ActionListener listener, ReplicationGroup replicationGroup) { + IndexingAction(BulkShardRequest request, ActionListener listener, ReplicationGroup replicationGroup) { super(request, listener, replicationGroup, "indexing"); - request.process(null, request.index()); } @Override - protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception { - IndexResponse response = indexOnPrimary(request, primary); - return new PrimaryResult(request, response); + protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception { + final IndexRequest indexRequest = (IndexRequest) request.items()[0].request(); + indexRequest.process(null, request.index()); + final IndexResponse indexResponse = indexOnPrimary(indexRequest, primary); + BulkItemResponse[] itemResponses = new BulkItemResponse[1]; + itemResponses[0] = new BulkItemResponse(0, indexRequest.opType(), indexResponse); + ((ReplicationGroup.TestBulkItemRequest) request.items()[0]).setPrimaryResponse(itemResponses[0]); + return new PrimaryResult(request, new BulkShardResponse(primary.shardId(), itemResponses)); } @Override - protected void performOnReplica(IndexRequest request, IndexShard replica) throws IOException { - indexOnReplica(request, replica); + protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws IOException { + final ReplicationGroup.TestBulkItemRequest bulkItemRequest = ((ReplicationGroup.TestBulkItemRequest) request.items()[0]); + final DocWriteResponse primaryResponse = bulkItemRequest.getPrimaryResponse().getResponse(); + indexOnReplica(primaryResponse, ((IndexRequest) bulkItemRequest.request()), replica); } } @@ -511,14 +550,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase protected IndexResponse indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, null); - if (indexResult.hasFailure() == false) { - // update the version on request so it will happen on the replicas - final long version = indexResult.getVersion(); - request.version(version); - request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - request.setSeqNo(indexResult.getSeqNo()); - assert request.versionType().validateVersionForWrites(request.version()); - } request.primaryTerm(primary.getPrimaryTerm()); TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger); return new IndexResponse( @@ -533,8 +564,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase /** * indexes the given requests on the supplied replica shard */ - protected void indexOnReplica(IndexRequest request, IndexShard replica) throws IOException { - final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica); + protected void indexOnReplica(DocWriteResponse response, IndexRequest request, IndexShard replica) throws IOException { + final Engine.IndexResult result = executeIndexRequestOnReplica(response, request, replica); TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 3928f78c3c5..97e224f04a4 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -26,6 +26,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; @@ -166,9 +167,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC logger.info("--> indexing {} rollback docs", rollbackDocs); for (int i = 0; i < rollbackDocs; i++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i) - .source("{}", XContentType.JSON); - indexOnPrimary(indexRequest, oldPrimary); - indexOnReplica(indexRequest, replica); + .source("{}", XContentType.JSON); + final IndexResponse primaryResponse = indexOnPrimary(indexRequest, oldPrimary); + indexOnReplica(primaryResponse, indexRequest, replica); } if (randomBoolean()) { oldPrimary.flush(new FlushRequest(index.getName())); diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 10b69fb297b..5e2655ff64c 100644 --- a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -82,7 +82,7 @@ public class IndexingIT extends ESRestTestCase { new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON))); } - protected int indexDocs(String index, final int idStart, final int numDocs) throws IOException { + private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), @@ -91,6 +91,116 @@ public class IndexingIT extends ESRestTestCase { return numDocs; } + /** + * Indexes a document in index with docId then concurrently updates the same document + * nUpdates times + * + * @return the document version after updates + */ + private int indexDocWithConcurrentUpdates(String index, final int docId, int nUpdates) throws IOException, InterruptedException { + indexDocs(index, docId, 1); + Thread[] indexThreads = new Thread[nUpdates]; + for (int i = 0; i < nUpdates; i++) { + indexThreads[i] = new Thread(() -> { + try { + indexDocs(index, docId, 1); + } catch (IOException e) { + throw new AssertionError("failed while indexing [" + e.getMessage() + "]"); + } + }); + indexThreads[i].start(); + } + for (Thread indexThread : indexThreads) { + indexThread.join(); + } + return nUpdates + 1; + } + + public void testIndexVersionPropagation() throws Exception { + Nodes nodes = buildNodeAndVersions(); + assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); + logger.info("cluster discovered: {}", nodes.toString()); + final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); + final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) + .put("index.routing.allocation.include._name", bwcNames); + final String index = "test"; + final int minUpdates = 5; + final int maxUpdates = 10; + createIndex(index, settings.build()); + try (RestClient newNodeClient = buildClient(restClientSettings(), + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + + int nUpdates = randomIntBetween(minUpdates, maxUpdates); + logger.info("indexing docs with [{}] concurrent updates initially", nUpdates); + final int finalVersionForDoc1 = indexDocWithConcurrentUpdates(index, 1, nUpdates); + logger.info("allowing shards on all nodes"); + updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); + ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + List shards = buildShards(nodes, newNodeClient); + for (Shard shard : shards) { + assertVersion(index, 1, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc1); + assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 1); + } + + nUpdates = randomIntBetween(minUpdates, maxUpdates); + logger.info("indexing docs with [{}] concurrent updates after allowing shards on all nodes", nUpdates); + final int finalVersionForDoc2 = indexDocWithConcurrentUpdates(index, 2, nUpdates); + assertOK(client().performRequest("POST", index + "/_refresh")); + shards = buildShards(nodes, newNodeClient); + for (Shard shard : shards) { + assertVersion(index, 2, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc2); + assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 2); + } + + Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); + ensureGreen(); + nUpdates = randomIntBetween(minUpdates, maxUpdates); + logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates); + final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates); + assertOK(client().performRequest("POST", index + "/_refresh")); + shards = buildShards(nodes, newNodeClient); + for (Shard shard : shards) { + assertVersion(index, 3, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc3); + assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 3); + } + + logger.info("setting number of replicas to 0"); + updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0)); + ensureGreen(); + nUpdates = randomIntBetween(minUpdates, maxUpdates); + logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates); + final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates); + assertOK(client().performRequest("POST", index + "/_refresh")); + shards = buildShards(nodes, newNodeClient); + for (Shard shard : shards) { + assertVersion(index, 4, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc4); + assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 4); + } + + logger.info("setting number of replicas to 1"); + updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); + ensureGreen(); + nUpdates = randomIntBetween(minUpdates, maxUpdates); + logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates); + final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates); + assertOK(client().performRequest("POST", index + "/_refresh")); + shards = buildShards(nodes, newNodeClient); + for (Shard shard : shards) { + assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5); + assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5); + } + // the number of documents on the primary and on the recovered replica should match the number of indexed documents + assertCount(index, "_primary", 5); + assertCount(index, "_replica", 5); + } + } + public void testSeqNoCheckpoints() throws Exception { Nodes nodes = buildNodeAndVersions(); assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); @@ -166,6 +276,14 @@ public class IndexingIT extends ESRestTestCase { assertThat(actualCount, equalTo(expectedCount)); } + private void assertVersion(final String index, final int docId, final String preference, final int expectedVersion) throws IOException { + final Response response = client().performRequest("GET", index + "/test/" + docId, + Collections.singletonMap("preference", preference)); + assertOK(response); + final int actualVersion = Integer.parseInt(objectPath(response).evaluate("_version").toString()); + assertThat("version mismatch for doc [" + docId + "] preference [" + preference + "]", actualVersion, equalTo(expectedVersion)); + } + private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { assertBusy(() -> { try {