Ignore replication for noop updates (#46458)
Previously, we ignore replication for noop updates because they do not have sequence numbers. Since #44603, we started assigning sequence numbers to noop updates leading them to be replicated to replicas. This bug occurs only on 8.0 for it requires #41065 and #44603. Closes #46366
This commit is contained in:
parent
7b26a8c041
commit
24c3a1de3c
|
@ -383,54 +383,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
return response;
|
||||
}
|
||||
|
||||
|
||||
/** Modes for executing item request on replica depending on corresponding primary execution result */
|
||||
public enum ReplicaItemExecutionMode {
|
||||
|
||||
/**
|
||||
* When primary execution succeeded
|
||||
*/
|
||||
NORMAL,
|
||||
|
||||
/**
|
||||
* When primary execution failed before sequence no was generated
|
||||
* or primary execution was a noop (only possible when request is originating from pre-6.0 nodes)
|
||||
*/
|
||||
NOOP,
|
||||
|
||||
/**
|
||||
* When primary execution failed after sequence no was generated
|
||||
*/
|
||||
FAILURE
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether a bulk item request should be executed on the replica.
|
||||
*
|
||||
* @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures
|
||||
* {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation
|
||||
* {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or
|
||||
* when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes)
|
||||
*/
|
||||
static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest request, final int index) {
|
||||
final BulkItemResponse primaryResponse = request.getPrimaryResponse();
|
||||
assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]";
|
||||
if (primaryResponse.isFailed()) {
|
||||
return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
|
||||
? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op
|
||||
: ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication
|
||||
} else {
|
||||
// TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq#
|
||||
// (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether
|
||||
// an operation should be processed or be treated as a noop. This means we could remove this method and the
|
||||
// ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for
|
||||
// both failures and indexing operations.
|
||||
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP
|
||||
? ReplicaItemExecutionMode.NORMAL // execution successful on primary
|
||||
: ReplicaItemExecutionMode.NOOP; // ignore replication
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
|
||||
final Translog.Location location = performOnReplica(request, replica);
|
||||
|
@ -440,28 +392,23 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
|
||||
Translog.Location location = null;
|
||||
for (int i = 0; i < request.items().length; i++) {
|
||||
BulkItemRequest item = request.items()[i];
|
||||
final BulkItemRequest item = request.items()[i];
|
||||
final BulkItemResponse response = item.getPrimaryResponse();
|
||||
final Engine.Result operationResult;
|
||||
DocWriteRequest<?> docWriteRequest = item.request();
|
||||
switch (replicaItemExecutionMode(item, i)) {
|
||||
case NORMAL:
|
||||
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
|
||||
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
|
||||
assert operationResult != null : "operation result must never be null when primary response has no failure";
|
||||
location = syncOperationResultOrThrow(operationResult, location);
|
||||
break;
|
||||
case NOOP:
|
||||
break;
|
||||
case FAILURE:
|
||||
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
|
||||
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
|
||||
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
|
||||
assert operationResult != null : "operation result must never be null when primary response has no failure";
|
||||
location = syncOperationResultOrThrow(operationResult, location);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
|
||||
if (item.getPrimaryResponse().isFailed()) {
|
||||
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
continue; // ignore replication as we didn't generate a sequence number for this request.
|
||||
}
|
||||
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), response.getFailure().getMessage());
|
||||
} else {
|
||||
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
|
||||
continue; // ignore replication as it's a noop
|
||||
}
|
||||
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
|
||||
}
|
||||
assert operationResult != null : "operation result must never be null when primary response has no failure";
|
||||
location = syncOperationResultOrThrow(operationResult, location);
|
||||
}
|
||||
return location;
|
||||
}
|
||||
|
@ -485,8 +432,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
deleteRequest.type(), deleteRequest.id());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unexpected request operation type on replica: "
|
||||
+ docWriteRequest.opType().getLowercase());
|
||||
assert false : "Unexpected request operation type on replica: " + docWriteRequest + ";primary result: " + primaryResponse;
|
||||
throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase());
|
||||
}
|
||||
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
|
||||
// Even though the primary waits on all nodes to ack the mapping changes to the master
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.alias.Alias;
|
|||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
|
@ -32,6 +33,7 @@ import org.elasticsearch.action.update.UpdateRequestBuilder;
|
|||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
|
@ -57,6 +59,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.hamcrest.Matchers.arrayWithSize;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -618,5 +621,31 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
|||
assertThat(bulkResponse.getItems()[1].getOpType(), is(OpType.UPDATE));
|
||||
assertThat(bulkResponse.getItems()[2].getOpType(), is(OpType.DELETE));
|
||||
}
|
||||
|
||||
public void testNoopUpdate() {
|
||||
String indexName = "test";
|
||||
createIndex(indexName, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build());
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
ensureGreen(indexName);
|
||||
IndexResponse doc = index(indexName, "_doc", "1", Collections.singletonMap("user", "xyz"));
|
||||
assertThat(doc.getShardInfo().getSuccessful(), equalTo(2));
|
||||
final BulkResponse bulkResponse = client().prepareBulk()
|
||||
.add(new UpdateRequest().index(indexName).id("1").detectNoop(true).doc("user", "xyz")) // noop update
|
||||
.add(new UpdateRequest().index(indexName).id("2").docAsUpsert(false).doc("f", "v")) // not_found update
|
||||
.add(new DeleteRequest().index(indexName).id("2")) // not_found delete
|
||||
.get();
|
||||
assertThat(bulkResponse.getItems(), arrayWithSize(3));
|
||||
|
||||
final BulkItemResponse noopUpdate = bulkResponse.getItems()[0];
|
||||
assertThat(noopUpdate.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP));
|
||||
assertThat(Strings.toString(noopUpdate), noopUpdate.getResponse().getShardInfo().getSuccessful(), equalTo(2));
|
||||
|
||||
final BulkItemResponse notFoundUpdate = bulkResponse.getItems()[1];
|
||||
assertNotNull(notFoundUpdate.getFailure());
|
||||
|
||||
final BulkItemResponse notFoundDelete = bulkResponse.getItems()[2];
|
||||
assertThat(notFoundDelete.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOT_FOUND));
|
||||
assertThat(Strings.toString(notFoundDelete), notFoundDelete.getResponse().getShardInfo().getSuccessful(), equalTo(2));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
|
@ -58,7 +57,6 @@ import java.util.Collections;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
|
@ -95,47 +93,6 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
.primaryTerm(0, 1).build();
|
||||
}
|
||||
|
||||
public void testShouldExecuteReplicaItem() throws Exception {
|
||||
// Successful index request should be replicated
|
||||
DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id")
|
||||
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
||||
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean());
|
||||
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
|
||||
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
|
||||
assertThat(replicaItemExecutionMode(request, 0),
|
||||
equalTo(ReplicaItemExecutionMode.NORMAL));
|
||||
|
||||
// Failed index requests without sequence no should not be replicated
|
||||
writeRequest = new IndexRequest("index", "_doc", "id")
|
||||
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
||||
request = new BulkItemRequest(0, writeRequest);
|
||||
request.setPrimaryResponse(
|
||||
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
|
||||
new BulkItemResponse.Failure("index", "type", "id",
|
||||
new IllegalArgumentException("i died"))));
|
||||
assertThat(replicaItemExecutionMode(request, 0),
|
||||
equalTo(ReplicaItemExecutionMode.NOOP));
|
||||
|
||||
// Failed index requests with sequence no should be replicated
|
||||
request = new BulkItemRequest(0, writeRequest);
|
||||
request.setPrimaryResponse(
|
||||
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
|
||||
new BulkItemResponse.Failure("index", "type", "id",
|
||||
new IllegalArgumentException(
|
||||
"i died after sequence no was generated"),
|
||||
1)));
|
||||
assertThat(replicaItemExecutionMode(request, 0),
|
||||
equalTo(ReplicaItemExecutionMode.FAILURE));
|
||||
// NOOP requests should not be replicated
|
||||
DocWriteRequest<UpdateRequest> updateRequest = new UpdateRequest("index", "type", "id");
|
||||
response = new UpdateResponse(shardId, "type", "id", 0, 1, 1, DocWriteResponse.Result.NOOP);
|
||||
request = new BulkItemRequest(0, updateRequest);
|
||||
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE,
|
||||
response));
|
||||
assertThat(replicaItemExecutionMode(request, 0),
|
||||
equalTo(ReplicaItemExecutionMode.NOOP));
|
||||
}
|
||||
|
||||
public void testExecuteBulkIndexRequest() throws Exception {
|
||||
IndexShard shard = newStartedShard(true);
|
||||
|
||||
|
|
Loading…
Reference in New Issue