Make document write requests immutable (#23038)
* Make document write requests immutable Previously, write requests were mutated at the transport level to update request version, version type and sequence no before replication. Now that all write requests go through the shard bulk transport action, we can use the primary response stored in item level bulk requests to pass the updated version, seqence no. to replicas. * incorporate feedback * minor cleanup * Add bwc test to ensure correct index version propagates to replica * Fix bwc for propagating write operation versions * Add assertion on replica request version type * fix tests using internal version type for replica op * Fix assertions to assert version type in replica and recovery * add bwc tests for version checks in concurrent indexing * incorporate feedback
This commit is contained in:
parent
38d25a0369
commit
148be11f26
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
@ -31,13 +33,12 @@ public class BulkItemRequest implements Streamable {
|
|||
private int id;
|
||||
private DocWriteRequest request;
|
||||
private volatile BulkItemResponse primaryResponse;
|
||||
private volatile boolean ignoreOnReplica;
|
||||
|
||||
BulkItemRequest() {
|
||||
|
||||
}
|
||||
|
||||
public BulkItemRequest(int id, DocWriteRequest request) {
|
||||
protected BulkItemRequest(int id, DocWriteRequest request) {
|
||||
this.id = id;
|
||||
this.request = request;
|
||||
}
|
||||
|
@ -55,25 +56,16 @@ public class BulkItemRequest implements Streamable {
|
|||
return request.indices()[0];
|
||||
}
|
||||
|
||||
BulkItemResponse getPrimaryResponse() {
|
||||
// NOTE: protected for testing only
|
||||
protected BulkItemResponse getPrimaryResponse() {
|
||||
return primaryResponse;
|
||||
}
|
||||
|
||||
void setPrimaryResponse(BulkItemResponse primaryResponse) {
|
||||
// NOTE: protected for testing only
|
||||
protected void setPrimaryResponse(BulkItemResponse primaryResponse) {
|
||||
this.primaryResponse = primaryResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks this request to be ignored and *not* execute on a replica.
|
||||
*/
|
||||
void setIgnoreOnReplica() {
|
||||
this.ignoreOnReplica = true;
|
||||
}
|
||||
|
||||
boolean isIgnoreOnReplica() {
|
||||
return ignoreOnReplica;
|
||||
}
|
||||
|
||||
public static BulkItemRequest readBulkItem(StreamInput in) throws IOException {
|
||||
BulkItemRequest item = new BulkItemRequest();
|
||||
item.readFrom(in);
|
||||
|
@ -87,14 +79,37 @@ public class BulkItemRequest implements Streamable {
|
|||
if (in.readBoolean()) {
|
||||
primaryResponse = BulkItemResponse.readBulkItem(in);
|
||||
}
|
||||
ignoreOnReplica = in.readBoolean();
|
||||
if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { // TODO remove once backported
|
||||
boolean ignoreOnReplica = in.readBoolean();
|
||||
if (ignoreOnReplica == false && primaryResponse != null) {
|
||||
assert primaryResponse.isFailed() == false : "expected no failure on the primary response";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(id);
|
||||
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { // TODO remove once backported
|
||||
// old nodes expect updated version and version type on the request
|
||||
if (primaryResponse != null) {
|
||||
request.version(primaryResponse.getVersion());
|
||||
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
|
||||
DocWriteRequest.writeDocumentRequest(out, request);
|
||||
} else {
|
||||
DocWriteRequest.writeDocumentRequest(out, request);
|
||||
}
|
||||
} else {
|
||||
DocWriteRequest.writeDocumentRequest(out, request);
|
||||
}
|
||||
out.writeOptionalStreamable(primaryResponse);
|
||||
out.writeBoolean(ignoreOnReplica);
|
||||
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { // TODO remove once backported
|
||||
if (primaryResponse != null) {
|
||||
out.writeBoolean(primaryResponse.isFailed()
|
||||
|| primaryResponse.getResponse().getResult() == DocWriteResponse.Result.NOOP);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
|
|||
public BulkShardRequest() {
|
||||
}
|
||||
|
||||
BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
|
||||
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
|
||||
super(shardId);
|
||||
this.items = items;
|
||||
setRefreshPolicy(refreshPolicy);
|
||||
|
|
|
@ -36,7 +36,8 @@ public class BulkShardResponse extends ReplicationResponse implements WriteRespo
|
|||
BulkShardResponse() {
|
||||
}
|
||||
|
||||
BulkShardResponse(ShardId shardId, BulkItemResponse[] responses) {
|
||||
// NOTE: public for testing only
|
||||
public BulkShardResponse(ShardId shardId, BulkItemResponse[] responses) {
|
||||
this.shardId = shardId;
|
||||
this.responses = responses;
|
||||
}
|
||||
|
|
|
@ -104,14 +104,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
public WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(
|
||||
BulkShardRequest request, IndexShard primary) throws Exception {
|
||||
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
|
||||
|
||||
long[] preVersions = new long[request.items().length];
|
||||
VersionType[] preVersionTypes = new VersionType[request.items().length];
|
||||
Translog.Location location = null;
|
||||
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
|
||||
location = executeBulkItemRequest(metaData, primary, request, preVersions, preVersionTypes, location, requestIndex);
|
||||
location = executeBulkItemRequest(metaData, primary, request, location, requestIndex);
|
||||
}
|
||||
|
||||
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
|
||||
BulkItemRequest[] items = request.items();
|
||||
for (int i = 0; i < items.length; i++) {
|
||||
|
@ -124,14 +120,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
/** Executes bulk item requests and handles request execution exceptions */
|
||||
private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard primary,
|
||||
BulkShardRequest request,
|
||||
long[] preVersions, VersionType[] preVersionTypes,
|
||||
Translog.Location location, int requestIndex) throws Exception {
|
||||
final DocWriteRequest itemRequest = request.items()[requestIndex].request();
|
||||
preVersions[requestIndex] = itemRequest.version();
|
||||
preVersionTypes[requestIndex] = itemRequest.versionType();
|
||||
DocWriteRequest.OpType opType = itemRequest.opType();
|
||||
try {
|
||||
// execute item request
|
||||
final DocWriteRequest.OpType opType = itemRequest.opType();
|
||||
final Engine.Result operationResult;
|
||||
final DocWriteResponse response;
|
||||
final BulkItemRequest replicaRequest;
|
||||
|
@ -140,18 +131,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
case INDEX:
|
||||
final IndexRequest indexRequest = (IndexRequest) itemRequest;
|
||||
Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
|
||||
if (indexResult.hasFailure()) {
|
||||
response = null;
|
||||
} else {
|
||||
// update the version on request so it will happen on the replicas
|
||||
final long version = indexResult.getVersion();
|
||||
indexRequest.version(version);
|
||||
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
|
||||
indexRequest.setSeqNo(indexResult.getSeqNo());
|
||||
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
|
||||
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
|
||||
response = indexResult.hasFailure() ? null :
|
||||
new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
|
||||
indexResult.getVersion(), indexResult.isCreated());
|
||||
}
|
||||
operationResult = indexResult;
|
||||
replicaRequest = request.items()[requestIndex];
|
||||
break;
|
||||
|
@ -165,17 +147,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
case DELETE:
|
||||
final DeleteRequest deleteRequest = (DeleteRequest) itemRequest;
|
||||
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
|
||||
if (deleteResult.hasFailure()) {
|
||||
response = null;
|
||||
} else {
|
||||
// update the request with the version so it will go to the replicas
|
||||
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
|
||||
deleteRequest.version(deleteResult.getVersion());
|
||||
deleteRequest.setSeqNo(deleteResult.getSeqNo());
|
||||
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
|
||||
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
|
||||
response = deleteResult.hasFailure() ? null :
|
||||
new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
|
||||
deleteResult.getVersion(), deleteResult.isFound());
|
||||
}
|
||||
operationResult = deleteResult;
|
||||
replicaRequest = request.items()[requestIndex];
|
||||
break;
|
||||
|
@ -187,7 +161,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
if (operationResult == null) { // in case of noop update operation
|
||||
assert response.getResult() == DocWriteResponse.Result.NOOP
|
||||
: "only noop update can have null operation";
|
||||
replicaRequest.setIgnoreOnReplica();
|
||||
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), opType, response));
|
||||
} else if (operationResult.hasFailure() == false) {
|
||||
location = locationToSync(location, operationResult.getTranslogLocation());
|
||||
|
@ -209,25 +182,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
||||
// then just use the response we got from the successful execution
|
||||
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
|
||||
replicaRequest.setIgnoreOnReplica();
|
||||
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
|
||||
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
|
||||
}
|
||||
}
|
||||
assert replicaRequest.getPrimaryResponse() != null;
|
||||
assert preVersionTypes[requestIndex] != null;
|
||||
} catch (Exception e) {
|
||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||
if (retryPrimaryException(e)) {
|
||||
// restore updated versions...
|
||||
for (int j = 0; j < requestIndex; j++) {
|
||||
DocWriteRequest docWriteRequest = request.items()[j].request();
|
||||
docWriteRequest.version(preVersions[j]);
|
||||
docWriteRequest.versionType(preVersionTypes[j]);
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
return location;
|
||||
}
|
||||
|
||||
|
@ -281,25 +240,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
|
||||
indexRequest.process(mappingMd, request.index());
|
||||
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
|
||||
if (updateOperationResult.hasFailure() == false) {
|
||||
// update the version on request so it will happen on the replicas
|
||||
final long version = updateOperationResult.getVersion();
|
||||
indexRequest.version(version);
|
||||
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
|
||||
indexRequest.setSeqNo(updateOperationResult.getSeqNo());
|
||||
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
|
||||
}
|
||||
break;
|
||||
case DELETED:
|
||||
DeleteRequest deleteRequest = translate.action();
|
||||
updateOperationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
|
||||
if (updateOperationResult.hasFailure() == false) {
|
||||
// update the request with the version so it will go to the replicas
|
||||
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
|
||||
deleteRequest.version(updateOperationResult.getVersion());
|
||||
deleteRequest.setSeqNo(updateOperationResult.getSeqNo());
|
||||
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
|
||||
}
|
||||
break;
|
||||
case NOOP:
|
||||
primary.noopUpdate(updateRequest.type());
|
||||
|
@ -348,10 +292,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
|
||||
break;
|
||||
}
|
||||
assert (replicaRequest.request() instanceof IndexRequest
|
||||
&& ((IndexRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
|
||||
(replicaRequest.request() instanceof DeleteRequest
|
||||
&& ((DeleteRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
|
||||
assert updateOperationResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
// successful operation
|
||||
break; // out of retry loop
|
||||
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
|
||||
|
@ -367,20 +308,20 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
Translog.Location location = null;
|
||||
for (int i = 0; i < request.items().length; i++) {
|
||||
BulkItemRequest item = request.items()[i];
|
||||
if (item.isIgnoreOnReplica() == false) {
|
||||
assert item.getPrimaryResponse() != null : "expected primary response to be set for item [" + i + "] request ["+ item.request() +"]";
|
||||
if (item.getPrimaryResponse().isFailed() == false &&
|
||||
item.getPrimaryResponse().getResponse().getResult() != DocWriteResponse.Result.NOOP) {
|
||||
DocWriteRequest docWriteRequest = item.request();
|
||||
// ensure request version is updated for replica operation during request execution in the primary
|
||||
assert docWriteRequest.versionType() == docWriteRequest.versionType().versionTypeForReplicationAndRecovery()
|
||||
: "unexpected version in replica " + docWriteRequest.version();
|
||||
DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
|
||||
final Engine.Result operationResult;
|
||||
try {
|
||||
switch (docWriteRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
operationResult = executeIndexRequestOnReplica((IndexRequest) docWriteRequest, replica);
|
||||
operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica);
|
||||
break;
|
||||
case DELETE:
|
||||
operationResult = executeDeleteRequestOnReplica((DeleteRequest) docWriteRequest, replica);
|
||||
operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unexpected request operation type on replica: "
|
||||
|
@ -426,17 +367,21 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
* Execute the given {@link IndexRequest} on a replica shard, throwing a
|
||||
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
|
||||
*/
|
||||
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) throws IOException {
|
||||
public static Engine.IndexResult executeIndexRequestOnReplica(DocWriteResponse primaryResponse, IndexRequest request, IndexShard replica) throws IOException {
|
||||
final ShardId shardId = replica.shardId();
|
||||
SourceToParse sourceToParse =
|
||||
SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source(),
|
||||
request.getContentType()).routing(request.routing()).parent(request.parent());
|
||||
|
||||
final Engine.Index operation;
|
||||
final long version = primaryResponse.getVersion();
|
||||
final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
final long seqNo = primaryResponse.getSeqNo();
|
||||
try {
|
||||
operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||
operation = replica.prepareIndexOnReplica(sourceToParse, seqNo, version, versionType, request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||
} catch (MapperParsingException e) {
|
||||
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
|
||||
return new Engine.IndexResult(e, version, seqNo);
|
||||
}
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
if (update != null) {
|
||||
|
@ -446,7 +391,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
}
|
||||
|
||||
/** Utility method to prepare an index operation on primary shards */
|
||||
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
|
||||
private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
|
||||
SourceToParse sourceToParse =
|
||||
SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source(),
|
||||
request.getContentType()).routing(request.routing()).parent(request.parent());
|
||||
|
@ -460,7 +405,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
try {
|
||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
||||
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
|
||||
return new Engine.IndexResult(e, request.version());
|
||||
}
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
final ShardId shardId = primary.shardId();
|
||||
|
@ -471,12 +416,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// throws IAE on conflicts merging dynamic mappings
|
||||
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
|
||||
return new Engine.IndexResult(e, request.version());
|
||||
}
|
||||
try {
|
||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
||||
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
|
||||
return new Engine.IndexResult(e, request.version());
|
||||
}
|
||||
update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
if (update != null) {
|
||||
|
@ -487,14 +432,17 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
return primary.index(operation);
|
||||
}
|
||||
|
||||
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
|
||||
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
|
||||
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
|
||||
return primary.delete(delete);
|
||||
}
|
||||
|
||||
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) throws IOException {
|
||||
private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request, IndexShard replica) throws IOException {
|
||||
final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
|
||||
final long version = primaryResponse.getVersion();
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
|
||||
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
|
||||
primaryResponse.getSeqNo(), request.primaryTerm(), version, versionType);
|
||||
return replica.delete(delete);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,14 +19,12 @@
|
|||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -38,8 +36,6 @@ import java.io.IOException;
|
|||
public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>> extends ReplicationRequest<R> implements WriteRequest<R> {
|
||||
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
|
||||
|
||||
private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
|
||||
/**
|
||||
* Constructor for deserialization.
|
||||
*/
|
||||
|
@ -66,32 +62,11 @@ public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
refreshPolicy = RefreshPolicy.readFrom(in);
|
||||
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
|
||||
seqNo = in.readZLong();
|
||||
} else {
|
||||
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
refreshPolicy.writeTo(out);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
|
||||
out.writeZLong(seqNo);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the sequence number for this operation. The sequence number is assigned while the operation
|
||||
* is performed on the primary shard.
|
||||
*/
|
||||
public long getSeqNo() {
|
||||
return seqNo;
|
||||
}
|
||||
|
||||
/** sets the sequence number for this operation. should only be called on the primary shard */
|
||||
public void setSeqNo(long seqNo) {
|
||||
this.seqNo = seqNo;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -391,6 +391,14 @@ public abstract class Engine implements Closeable {
|
|||
this.created = created;
|
||||
}
|
||||
|
||||
/**
|
||||
* use in case of index operation failed before getting to internal engine
|
||||
* (e.g while preparing operation or updating mappings)
|
||||
* */
|
||||
public IndexResult(Exception failure, long version) {
|
||||
this(failure, version, SequenceNumbersService.UNASSIGNED_SEQ_NO);
|
||||
}
|
||||
|
||||
public IndexResult(Exception failure, long version, long seqNo) {
|
||||
super(Operation.TYPE.INDEX, failure, version, seqNo);
|
||||
this.created = false;
|
||||
|
|
|
@ -478,6 +478,20 @@ public class InternalEngine extends Engine {
|
|||
return false;
|
||||
}
|
||||
|
||||
private boolean assertVersionType(final Engine.Operation operation) {
|
||||
if (operation.origin() == Operation.Origin.REPLICA ||
|
||||
operation.origin() == Operation.Origin.PEER_RECOVERY ||
|
||||
operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
// ensure that replica operation has expected version type for replication
|
||||
// ensure that versionTypeForReplicationAndRecovery is idempotent
|
||||
assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery()
|
||||
: "unexpected version type in request from [" + operation.origin().name() + "] " +
|
||||
"found [" + operation.versionType().name() + "] " +
|
||||
"expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]";
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) && origin == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
// legacy support
|
||||
|
@ -499,6 +513,7 @@ public class InternalEngine extends Engine {
|
|||
try (ReleasableLock releasableLock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
assert assertSequenceNumber(index.origin(), index.seqNo());
|
||||
assert assertVersionType(index);
|
||||
final Translog.Location location;
|
||||
long seqNo = index.seqNo();
|
||||
try (Releasable ignored = acquireLock(index.uid());
|
||||
|
@ -692,6 +707,7 @@ public class InternalEngine extends Engine {
|
|||
public DeleteResult delete(Delete delete) throws IOException {
|
||||
DeleteResult result;
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
assert assertVersionType(delete);
|
||||
ensureOpen();
|
||||
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
|
||||
result = innerDelete(delete);
|
||||
|
|
|
@ -3114,7 +3114,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws IOException {
|
||||
final long v = Versions.MATCH_ANY;
|
||||
final VersionType t = VersionType.INTERNAL;
|
||||
final VersionType t = VersionType.EXTERNAL;
|
||||
final long ts = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
|
||||
final int docs = randomIntBetween(1, 32);
|
||||
InternalEngine initialEngine = null;
|
||||
|
|
|
@ -22,8 +22,17 @@ package org.elasticsearch.index.replication;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||
import org.elasticsearch.action.bulk.BulkShardResponse;
|
||||
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
|
@ -157,10 +166,34 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
|
||||
public IndexResponse index(IndexRequest indexRequest) throws Exception {
|
||||
PlainActionFuture<IndexResponse> listener = new PlainActionFuture<>();
|
||||
new IndexingAction(indexRequest, listener, this).execute();
|
||||
final ActionListener<BulkShardResponse> wrapBulkListener = ActionListener.wrap(
|
||||
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0].getResponse()),
|
||||
listener::onFailure);
|
||||
BulkItemRequest[] items = new BulkItemRequest[1];
|
||||
items[0] = new TestBulkItemRequest(0, indexRequest);
|
||||
BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items);
|
||||
new IndexingAction(request, wrapBulkListener, this).execute();
|
||||
return listener.get();
|
||||
}
|
||||
|
||||
/** BulkItemRequest exposing get/set primary response */
|
||||
public class TestBulkItemRequest extends BulkItemRequest {
|
||||
|
||||
TestBulkItemRequest(int id, DocWriteRequest request) {
|
||||
super(id, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setPrimaryResponse(BulkItemResponse primaryResponse) {
|
||||
super.setPrimaryResponse(primaryResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BulkItemResponse getPrimaryResponse() {
|
||||
return super.getPrimaryResponse();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void startAll() throws IOException {
|
||||
startReplicas(replicas.size());
|
||||
}
|
||||
|
@ -486,22 +519,28 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
|
||||
}
|
||||
|
||||
class IndexingAction extends ReplicationAction<IndexRequest, IndexRequest, IndexResponse> {
|
||||
class IndexingAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
|
||||
|
||||
IndexingAction(IndexRequest request, ActionListener<IndexResponse> listener, ReplicationGroup replicationGroup) {
|
||||
IndexingAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
|
||||
super(request, listener, replicationGroup, "indexing");
|
||||
request.process(null, request.index());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
|
||||
IndexResponse response = indexOnPrimary(request, primary);
|
||||
return new PrimaryResult(request, response);
|
||||
protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception {
|
||||
final IndexRequest indexRequest = (IndexRequest) request.items()[0].request();
|
||||
indexRequest.process(null, request.index());
|
||||
final IndexResponse indexResponse = indexOnPrimary(indexRequest, primary);
|
||||
BulkItemResponse[] itemResponses = new BulkItemResponse[1];
|
||||
itemResponses[0] = new BulkItemResponse(0, indexRequest.opType(), indexResponse);
|
||||
((ReplicationGroup.TestBulkItemRequest) request.items()[0]).setPrimaryResponse(itemResponses[0]);
|
||||
return new PrimaryResult(request, new BulkShardResponse(primary.shardId(), itemResponses));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void performOnReplica(IndexRequest request, IndexShard replica) throws IOException {
|
||||
indexOnReplica(request, replica);
|
||||
protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws IOException {
|
||||
final ReplicationGroup.TestBulkItemRequest bulkItemRequest = ((ReplicationGroup.TestBulkItemRequest) request.items()[0]);
|
||||
final DocWriteResponse primaryResponse = bulkItemRequest.getPrimaryResponse().getResponse();
|
||||
indexOnReplica(primaryResponse, ((IndexRequest) bulkItemRequest.request()), replica);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -511,14 +550,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
protected IndexResponse indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
|
||||
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary,
|
||||
null);
|
||||
if (indexResult.hasFailure() == false) {
|
||||
// update the version on request so it will happen on the replicas
|
||||
final long version = indexResult.getVersion();
|
||||
request.version(version);
|
||||
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
|
||||
request.setSeqNo(indexResult.getSeqNo());
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
}
|
||||
request.primaryTerm(primary.getPrimaryTerm());
|
||||
TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger);
|
||||
return new IndexResponse(
|
||||
|
@ -533,8 +564,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
/**
|
||||
* indexes the given requests on the supplied replica shard
|
||||
*/
|
||||
protected void indexOnReplica(IndexRequest request, IndexShard replica) throws IOException {
|
||||
final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica);
|
||||
protected void indexOnReplica(DocWriteResponse response, IndexRequest request, IndexShard replica) throws IOException {
|
||||
final Engine.IndexResult result = executeIndexRequestOnReplica(response, request, replica);
|
||||
TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -167,8 +168,8 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
for (int i = 0; i < rollbackDocs; i++) {
|
||||
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i)
|
||||
.source("{}", XContentType.JSON);
|
||||
indexOnPrimary(indexRequest, oldPrimary);
|
||||
indexOnReplica(indexRequest, replica);
|
||||
final IndexResponse primaryResponse = indexOnPrimary(indexRequest, oldPrimary);
|
||||
indexOnReplica(primaryResponse, indexRequest, replica);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
oldPrimary.flush(new FlushRequest(index.getName()));
|
||||
|
|
|
@ -82,7 +82,7 @@ public class IndexingIT extends ESRestTestCase {
|
|||
new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON)));
|
||||
}
|
||||
|
||||
protected int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
|
||||
private int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
final int id = idStart + i;
|
||||
assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(),
|
||||
|
@ -91,6 +91,116 @@ public class IndexingIT extends ESRestTestCase {
|
|||
return numDocs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indexes a document in <code>index</code> with <code>docId</code> then concurrently updates the same document
|
||||
* <code>nUpdates</code> times
|
||||
*
|
||||
* @return the document version after updates
|
||||
*/
|
||||
private int indexDocWithConcurrentUpdates(String index, final int docId, int nUpdates) throws IOException, InterruptedException {
|
||||
indexDocs(index, docId, 1);
|
||||
Thread[] indexThreads = new Thread[nUpdates];
|
||||
for (int i = 0; i < nUpdates; i++) {
|
||||
indexThreads[i] = new Thread(() -> {
|
||||
try {
|
||||
indexDocs(index, docId, 1);
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("failed while indexing [" + e.getMessage() + "]");
|
||||
}
|
||||
});
|
||||
indexThreads[i].start();
|
||||
}
|
||||
for (Thread indexThread : indexThreads) {
|
||||
indexThread.join();
|
||||
}
|
||||
return nUpdates + 1;
|
||||
}
|
||||
|
||||
public void testIndexVersionPropagation() throws Exception {
|
||||
Nodes nodes = buildNodeAndVersions();
|
||||
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
|
||||
logger.info("cluster discovered: {}", nodes.toString());
|
||||
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
|
||||
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
|
||||
Settings.Builder settings = Settings.builder()
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
|
||||
.put("index.routing.allocation.include._name", bwcNames);
|
||||
final String index = "test";
|
||||
final int minUpdates = 5;
|
||||
final int maxUpdates = 10;
|
||||
createIndex(index, settings.build());
|
||||
try (RestClient newNodeClient = buildClient(restClientSettings(),
|
||||
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
|
||||
|
||||
int nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
||||
logger.info("indexing docs with [{}] concurrent updates initially", nUpdates);
|
||||
final int finalVersionForDoc1 = indexDocWithConcurrentUpdates(index, 1, nUpdates);
|
||||
logger.info("allowing shards on all nodes");
|
||||
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
|
||||
ensureGreen();
|
||||
assertOK(client().performRequest("POST", index + "/_refresh"));
|
||||
List<Shard> shards = buildShards(nodes, newNodeClient);
|
||||
for (Shard shard : shards) {
|
||||
assertVersion(index, 1, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc1);
|
||||
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 1);
|
||||
}
|
||||
|
||||
nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
||||
logger.info("indexing docs with [{}] concurrent updates after allowing shards on all nodes", nUpdates);
|
||||
final int finalVersionForDoc2 = indexDocWithConcurrentUpdates(index, 2, nUpdates);
|
||||
assertOK(client().performRequest("POST", index + "/_refresh"));
|
||||
shards = buildShards(nodes, newNodeClient);
|
||||
for (Shard shard : shards) {
|
||||
assertVersion(index, 2, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc2);
|
||||
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 2);
|
||||
}
|
||||
|
||||
Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
|
||||
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
|
||||
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
|
||||
ensureGreen();
|
||||
nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
||||
logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates);
|
||||
final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates);
|
||||
assertOK(client().performRequest("POST", index + "/_refresh"));
|
||||
shards = buildShards(nodes, newNodeClient);
|
||||
for (Shard shard : shards) {
|
||||
assertVersion(index, 3, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc3);
|
||||
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 3);
|
||||
}
|
||||
|
||||
logger.info("setting number of replicas to 0");
|
||||
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0));
|
||||
ensureGreen();
|
||||
nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
||||
logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates);
|
||||
final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates);
|
||||
assertOK(client().performRequest("POST", index + "/_refresh"));
|
||||
shards = buildShards(nodes, newNodeClient);
|
||||
for (Shard shard : shards) {
|
||||
assertVersion(index, 4, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc4);
|
||||
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 4);
|
||||
}
|
||||
|
||||
logger.info("setting number of replicas to 1");
|
||||
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
|
||||
ensureGreen();
|
||||
nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
||||
logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates);
|
||||
final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates);
|
||||
assertOK(client().performRequest("POST", index + "/_refresh"));
|
||||
shards = buildShards(nodes, newNodeClient);
|
||||
for (Shard shard : shards) {
|
||||
assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5);
|
||||
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5);
|
||||
}
|
||||
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
|
||||
assertCount(index, "_primary", 5);
|
||||
assertCount(index, "_replica", 5);
|
||||
}
|
||||
}
|
||||
|
||||
public void testSeqNoCheckpoints() throws Exception {
|
||||
Nodes nodes = buildNodeAndVersions();
|
||||
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
|
||||
|
@ -166,6 +276,14 @@ public class IndexingIT extends ESRestTestCase {
|
|||
assertThat(actualCount, equalTo(expectedCount));
|
||||
}
|
||||
|
||||
private void assertVersion(final String index, final int docId, final String preference, final int expectedVersion) throws IOException {
|
||||
final Response response = client().performRequest("GET", index + "/test/" + docId,
|
||||
Collections.singletonMap("preference", preference));
|
||||
assertOK(response);
|
||||
final int actualVersion = Integer.parseInt(objectPath(response).evaluate("_version").toString());
|
||||
assertThat("version mismatch for doc [" + docId + "] preference [" + preference + "]", actualVersion, equalTo(expectedVersion));
|
||||
}
|
||||
|
||||
private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception {
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue