ShardBulkAction ignore primary response on primary (#38901)

Previously, if a version conflict occurred and a previous primary
response was present, the original primary response would be used both
for sending to replica and back to client. This was made in the past as an
attempt to fix issues with conflicts after relocations where a bulk request
would experience a closed shard half way through and thus have to retry
on the new primary. It could then fail on its own update.

With sequence numbers, this leads to an issue, since if a primary is
demoted (network partitions), it will send along the original response
in the request. In case of a conflict on the new primary, the old
response is sent to the replica. That data could be stale, leading to
inconsistency between primary and replica.

Relocations now do an explicit hand-off from old to new primary and
ensures that no operations are active while doing this. Above is thus no
longer necessary. This change removes the special handling of conflicts
and ignores primary responses when executing shard bulk requests on the
primary.
This commit is contained in:
Henning Andersen 2019-02-15 10:10:27 +01:00 committed by Henning Andersen
parent d55e52223f
commit a211e51343
4 changed files with 56 additions and 21 deletions

View File

@ -172,11 +172,6 @@ class BulkPrimaryExecutionContext {
return getCurrentItem().index();
}
/** returns any primary response that was set by a previous primary */
public BulkItemResponse getPreviousPrimaryResponse() {
return getCurrentItem().getPrimaryResponse();
}
/** returns a translog location that is needed to be synced in order to persist all operations executed so far */
public Translog.Location getLocationToSync() {
assert hasMoreOperationsToExecute() == false;

View File

@ -261,16 +261,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure);
}
final BulkItemResponse primaryResponse;
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the failed execution
if (TransportShardBulkAction.isConflictException(failure) && context.getPreviousPrimaryResponse() != null) {
primaryResponse = context.getPreviousPrimaryResponse();
} else {
primaryResponse = executionResult;
}
context.markAsCompleted(primaryResponse);
context.markAsCompleted(executionResult);
} else {
context.markAsCompleted(executionResult);
}

View File

@ -144,6 +144,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
randomlySetIgnoredPrimaryResponse(primaryRequest);
UpdateHelper updateHelper = null;
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@ -169,6 +171,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
items[0] = primaryRequest;
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper,
threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {});
@ -271,6 +275,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(mappingUpdate);
randomlySetIgnoredPrimaryResponse(items[0]);
// Pretend the mappings haven't made it to the node yet
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
AtomicInteger updateCalled = new AtomicInteger();
@ -326,6 +332,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
boolean errorOnWait = randomBoolean();
randomlySetIgnoredPrimaryResponse(items[0]);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(),
@ -365,6 +373,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
Translog.Location location = new Translog.Location(0, 0, 0);
UpdateHelper updateHelper = null;
randomlySetIgnoredPrimaryResponse(items[0]);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(), () -> {});
@ -405,6 +415,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
location = context.getLocationToSync();
randomlySetIgnoredPrimaryResponse(items[0]);
context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(), () -> {});
@ -459,6 +471,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(), () -> {});
@ -503,6 +517,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@ -552,6 +567,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@ -598,6 +614,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@ -643,6 +660,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@ -676,6 +694,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
BulkShardRequest bulkShardRequest =
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
randomlySetIgnoredPrimaryResponse(primaryRequest);
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@ -809,6 +828,14 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
assertThat(response.getSeqNo(), equalTo(13L));
}
private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
if (randomBoolean()) {
// add a response to the request and thereby check that it is ignored for the primary.
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc",
"ignore-primary-response-on-primary", 42, 42, 42, false)));
}
}
/**
* Fake IndexResult that has a settable translog location
*/

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
@ -37,6 +38,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
@ -75,6 +77,18 @@ import static org.hamcrest.Matchers.not;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
private enum ConflictMode {
none,
external,
create;
static ConflictMode randomMode() {
ConflictMode[] values = values();
return values[randomInt(values.length-1)];
}
}
/**
* Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme
* We also collect &amp; report the type of indexing failures that occur.
@ -111,7 +125,9 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
final List<Exception> exceptedExceptions = new CopyOnWriteArrayList<>();
logger.info("starting indexers");
final ConflictMode conflictMode = ConflictMode.randomMode();
logger.info("starting indexers using conflict mode " + conflictMode);
try {
for (final String node : nodes) {
final Semaphore semaphore = new Semaphore(0);
@ -131,11 +147,17 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
id = Integer.toString(idGenerator.incrementAndGet());
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
IndexResponse response =
client.prepareIndex("test", "type", id)
.setSource("{}", XContentType.JSON)
.setTimeout(timeout)
.get(timeout);
IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id)
.setSource("{}", XContentType.JSON)
.setTimeout(timeout);
if (conflictMode == ConflictMode.external) {
indexRequestBuilder.setVersion(randomIntBetween(1,10)).setVersionType(VersionType.EXTERNAL);
} else if (conflictMode == ConflictMode.create) {
indexRequestBuilder.setCreate(true);
}
IndexResponse response = indexRequestBuilder.get(timeout);
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
ackedDocs.put(id, node);
logger.trace("[{}] indexed id [{}] through node [{}], response [{}]", name, id, node, response);