From a211e51343999cd132377682f7febb54be2eb22f Mon Sep 17 00:00:00 2001 From: Henning Andersen <33268011+henningandersen@users.noreply.github.com> Date: Fri, 15 Feb 2019 10:10:27 +0100 Subject: [PATCH] ShardBulkAction ignore primary response on primary (#38901) Previously, if a version conflict occurred and a previous primary response was present, the original primary response would be used both for sending to replica and back to client. This was made in the past as an attempt to fix issues with conflicts after relocations where a bulk request would experience a closed shard half way through and thus have to retry on the new primary. It could then fail on its own update. With sequence numbers, this leads to an issue, since if a primary is demoted (network partitions), it will send along the original response in the request. In case of a conflict on the new primary, the old response is sent to the replica. That data could be stale, leading to inconsistency between primary and replica. Relocations now do an explicit hand-off from old to new primary and ensures that no operations are active while doing this. Above is thus no longer necessary. This change removes the special handling of conflicts and ignores primary responses when executing shard bulk requests on the primary. --- .../bulk/BulkPrimaryExecutionContext.java | 5 --- .../action/bulk/TransportShardBulkAction.java | 11 +----- .../bulk/TransportShardBulkActionTests.java | 27 +++++++++++++++ .../discovery/ClusterDisruptionIT.java | 34 +++++++++++++++---- 4 files changed, 56 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java index 5f61d90d500..65452f9a75d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java @@ -172,11 +172,6 @@ class BulkPrimaryExecutionContext { return getCurrentItem().index(); } - /** returns any primary response that was set by a previous primary */ - public BulkItemResponse getPreviousPrimaryResponse() { - return getCurrentItem().getPrimaryResponse(); - } - /** returns a translog location that is needed to be synced in order to persist all operations executed so far */ public Translog.Location getLocationToSync() { assert hasMoreOperationsToExecute() == false; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 3f4a0b6042a..f182c298581 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -261,16 +261,7 @@ public class TransportShardBulkAction extends TransportWriteAction {}); @@ -271,6 +275,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(mappingUpdate); + randomlySetIgnoredPrimaryResponse(items[0]); + // Pretend the mappings haven't made it to the node yet BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); @@ -326,6 +332,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { boolean errorOnWait = randomBoolean(); + randomlySetIgnoredPrimaryResponse(items[0]); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), @@ -365,6 +373,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; + randomlySetIgnoredPrimaryResponse(items[0]); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); @@ -405,6 +415,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { location = context.getLocationToSync(); + randomlySetIgnoredPrimaryResponse(items[0]); + context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); @@ -459,6 +471,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomlySetIgnoredPrimaryResponse(primaryRequest); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); @@ -503,6 +517,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -552,6 +567,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -598,6 +614,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -643,6 +660,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -676,6 +694,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -809,6 +828,14 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { assertThat(response.getSeqNo(), equalTo(13L)); } + private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { + if (randomBoolean()) { + // add a response to the request and thereby check that it is ignored for the primary. + primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc", + "ignore-primary-response-on-primary", 42, 42, 42, false))); + } + } + /** * Fake IndexResult that has a settable translog location */ diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index d8262dc4f57..9fd08511446 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -37,6 +38,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -75,6 +77,18 @@ import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) public class ClusterDisruptionIT extends AbstractDisruptionTestCase { + private enum ConflictMode { + none, + external, + create; + + + static ConflictMode randomMode() { + ConflictMode[] values = values(); + return values[randomInt(values.length-1)]; + } + } + /** * Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme * We also collect & report the type of indexing failures that occur. @@ -111,7 +125,9 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { final AtomicReference countDownLatchRef = new AtomicReference<>(); final List exceptedExceptions = new CopyOnWriteArrayList<>(); - logger.info("starting indexers"); + final ConflictMode conflictMode = ConflictMode.randomMode(); + + logger.info("starting indexers using conflict mode " + conflictMode); try { for (final String node : nodes) { final Semaphore semaphore = new Semaphore(0); @@ -131,11 +147,17 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { id = Integer.toString(idGenerator.incrementAndGet()); int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries); logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard); - IndexResponse response = - client.prepareIndex("test", "type", id) - .setSource("{}", XContentType.JSON) - .setTimeout(timeout) - .get(timeout); + IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id) + .setSource("{}", XContentType.JSON) + .setTimeout(timeout); + + if (conflictMode == ConflictMode.external) { + indexRequestBuilder.setVersion(randomIntBetween(1,10)).setVersionType(VersionType.EXTERNAL); + } else if (conflictMode == ConflictMode.create) { + indexRequestBuilder.setCreate(true); + } + + IndexResponse response = indexRequestBuilder.get(timeout); assertThat(response.getResult(), isOneOf(CREATED, UPDATED)); ackedDocs.put(id, node); logger.trace("[{}] indexed id [{}] through node [{}], response [{}]", name, id, node, response);