Add primary term to doc write response

This commit adds the primary term to the doc write response.

Relates #24171
This commit is contained in:
Jason Tedor 2017-04-19 14:44:22 -04:00 committed by GitHub
parent e82d8007e3
commit 4796557a30
21 changed files with 197 additions and 119 deletions

View File

@ -57,6 +57,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
private static final String _ID = "_id";
private static final String _VERSION = "_version";
private static final String _SEQ_NO = "_seq_no";
private static final String _PRIMARY_TERM = "_primary_term";
private static final String RESULT = "result";
private static final String FORCED_REFRESH = "forced_refresh";
@ -116,14 +117,16 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
private String type;
private long version;
private long seqNo;
private long primaryTerm;
private boolean forcedRefresh;
protected Result result;
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long version, Result result) {
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
this.shardId = shardId;
this.type = type;
this.id = id;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.result = result;
}
@ -182,6 +185,15 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
return seqNo;
}
/**
* The primary term for this change.
*
* @return the primary term
*/
public long getPrimaryTerm() {
return primaryTerm;
}
/**
* Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to
* {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will
@ -251,8 +263,10 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
version = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
primaryTerm = 0;
}
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
@ -267,6 +281,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
out.writeZLong(version);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(seqNo);
out.writeVLong(primaryTerm);
}
out.writeBoolean(forcedRefresh);
result.writeTo(out);
@ -293,6 +308,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
builder.field(_SHARDS, shardInfo);
if (getSeqNo() >= 0) {
builder.field(_SEQ_NO, getSeqNo());
builder.field(_PRIMARY_TERM, getPrimaryTerm());
}
return builder;
}
@ -333,6 +349,8 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
context.setForcedRefresh(parser.booleanValue());
} else if (_SEQ_NO.equals(currentFieldName)) {
context.setSeqNo(parser.longValue());
} else if (_PRIMARY_TERM.equals(currentFieldName)) {
context.setPrimaryTerm(parser.longValue());
} else {
throwUnknownField(currentFieldName, parser.getTokenLocation());
}
@ -362,6 +380,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
protected boolean forcedRefresh;
protected ShardInfo shardInfo = null;
protected Long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
protected Long primaryTerm = 0L;
public ShardId getShardId() {
return shardId;
@ -407,6 +426,10 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
this.seqNo = seqNo;
}
public void setPrimaryTerm(Long primaryTerm) {
this.primaryTerm = primaryTerm;
}
public abstract DocWriteResponse build();
}
}

View File

@ -54,6 +54,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
@ -142,7 +143,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
} else {
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated());
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
}
}
@ -155,7 +156,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
} else {
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound());
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
}
}
@ -276,7 +277,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
int requestIndex, UpdateHelper updateHelper,
LongSupplier nowInMillis,
final MappingUpdatePerformer mappingUpdater) throws Exception {
Engine.Result updateOperationResult = null;
Engine.Result result = null;
UpdateResponse updateResponse = null;
BulkItemRequest replicaRequest = request.items()[requestIndex];
int maxAttempts = updateRequest.retryOnConflict();
@ -288,7 +289,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
break; // out of retry loop
}
// execute translated update request
@ -298,34 +299,46 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, request.index());
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
break;
case DELETED:
DeleteRequest deleteRequest = translate.action();
updateOperationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
result = executeDeleteRequestOnPrimary(deleteRequest, primary);
break;
case NOOP:
primary.noopUpdate(updateRequest.type());
break;
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
if (updateOperationResult == null) {
if (result == null) {
// this is a noop operation
updateResponse = translate.action();
break; // out of retry loop
} else if (updateOperationResult.hasFailure() == false) {
} else if (result.hasFailure() == false) {
// enrich update response and
// set translated update (index/delete) request for replica execution in bulk items
switch (updateOperationResult.getOperationType()) {
switch (result.getOperationType()) {
case INDEX:
assert result instanceof Engine.IndexResult : result.getClass();
IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(),
updateIndexRequest.type(), updateIndexRequest.id(), updateOperationResult.getSeqNo(),
updateOperationResult.getVersion(), ((Engine.IndexResult) updateOperationResult).isCreated());
final IndexResponse indexResponse = new IndexResponse(
primary.shardId(),
updateIndexRequest.type(),
updateIndexRequest.id(),
result.getSeqNo(),
primary.getPrimaryTerm(),
result.getVersion(),
((Engine.IndexResult) result).isCreated());
BytesReference indexSourceAsBytes = updateIndexRequest.source();
updateResponse = new UpdateResponse(indexResponse.getShardInfo(),
indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getSeqNo(),
indexResponse.getVersion(), indexResponse.getResult());
updateResponse = new UpdateResponse(
indexResponse.getShardInfo(),
indexResponse.getShardId(),
indexResponse.getType(),
indexResponse.getId(),
indexResponse.getSeqNo(),
indexResponse.getPrimaryTerm(),
indexResponse.getVersion(),
indexResponse.getResult());
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
@ -337,29 +350,46 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
break;
case DELETE:
assert result instanceof Engine.DeleteResult : result.getClass();
DeleteRequest updateDeleteRequest = translate.action();
DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(),
updateDeleteRequest.type(), updateDeleteRequest.id(), updateOperationResult.getSeqNo(),
updateOperationResult.getVersion(), ((Engine.DeleteResult) updateOperationResult).isFound());
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(),
deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(),
deleteResponse.getVersion(), deleteResponse.getResult());
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest,
request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(),
translate.updateSourceContentType(), null));
DeleteResponse deleteResponse = new DeleteResponse(
primary.shardId(),
updateDeleteRequest.type(),
updateDeleteRequest.id(),
result.getSeqNo(),
primary.getPrimaryTerm(),
result.getVersion(),
((Engine.DeleteResult) result).isFound());
updateResponse = new UpdateResponse(
deleteResponse.getShardInfo(),
deleteResponse.getShardId(),
deleteResponse.getType(),
deleteResponse.getId(),
deleteResponse.getSeqNo(),
deleteResponse.getPrimaryTerm(),
deleteResponse.getVersion(),
deleteResponse.getResult());
final GetResult getResult = updateHelper.extractGetResult(
updateRequest,
request.index(),
deleteResponse.getVersion(),
translate.updatedSourceAsMap(),
translate.updateSourceContentType(),
null);
updateResponse.setGetResult(getResult);
// set translated request as replica request
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
}
assert updateOperationResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
assert result.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
// successful operation
break; // out of retry loop
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
} else if (result.getFailure() instanceof VersionConflictEngineException == false) {
// not a version conflict exception
break; // out of retry loop
}
}
return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest);
return new BulkItemResultHolder(updateResponse, result, replicaRequest);
}
/** Modes for executing item request on replica depending on corresponding primary execution result */
@ -513,8 +543,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
try {
operation = prepareIndexOperationOnReplica(primaryResponse, request, replica);
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, primaryResponse.getVersion(),
primaryResponse.getSeqNo());
return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();

View File

@ -42,8 +42,8 @@ public class DeleteResponse extends DocWriteResponse {
public DeleteResponse() {
}
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean found) {
super(shardId, type, id, seqNo, version, found ? Result.DELETED : Result.NOT_FOUND);
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean found) {
super(shardId, type, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND);
}
@Override
@ -112,7 +112,7 @@ public class DeleteResponse extends DocWriteResponse {
@Override
public DeleteResponse build() {
DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, version, found);
DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, primaryTerm, version, found);
deleteResponse.setForcedRefresh(forcedRefresh);
if (shardInfo != null) {
deleteResponse.setShardInfo(shardInfo);

View File

@ -43,8 +43,8 @@ public class IndexResponse extends DocWriteResponse {
public IndexResponse() {
}
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean created) {
super(shardId, type, id, seqNo, version, created ? Result.CREATED : Result.UPDATED);
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean created) {
super(shardId, type, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED);
}
@Override
@ -62,6 +62,7 @@ public class IndexResponse extends DocWriteResponse {
builder.append(",version=").append(getVersion());
builder.append(",result=").append(getResult().getLowercase());
builder.append(",seqNo=").append(getSeqNo());
builder.append(",primaryTerm=").append(getPrimaryTerm());
builder.append(",shards=").append(Strings.toString(getShardInfo()));
return builder.append("]").toString();
}
@ -114,7 +115,7 @@ public class IndexResponse extends DocWriteResponse {
@Override
public IndexResponse build() {
IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, version, created);
IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, primaryTerm, version, created);
indexResponse.setForcedRefresh(forcedRefresh);
if (shardInfo != null) {
indexResponse.setShardInfo(shardInfo);

View File

@ -179,7 +179,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
final BytesReference upsertSourceBytes = upsertRequest.source();
bulkAction.execute(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
if ((request.fetchSource() != null && request.fetchSource().fetchSource()) ||
(request.fields() != null && request.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
@ -200,7 +200,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
final BytesReference indexSourceBytes = indexRequest.source();
bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
@ -211,7 +211,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
DeleteRequest deleteRequest = result.action();
bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
ActionListener.<DeleteResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult());
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);

View File

@ -47,11 +47,12 @@ public class UpdateResponse extends DocWriteResponse {
* For example: update script with operation set to none
*/
public UpdateResponse(ShardId shardId, String type, String id, long version, Result result) {
this(new ShardInfo(0, 0), shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, result);
this(new ShardInfo(0, 0), shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, version, result);
}
public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id, long seqNo, long version, Result result) {
super(shardId, type, id, seqNo, version, result);
public UpdateResponse(
ShardInfo shardInfo, ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
super(shardId, type, id, seqNo, primaryTerm, version, result);
setShardInfo(shardInfo);
}
@ -106,6 +107,8 @@ public class UpdateResponse extends DocWriteResponse {
builder.append(",type=").append(getType());
builder.append(",id=").append(getId());
builder.append(",version=").append(getVersion());
builder.append(",seqNo=").append(getSeqNo());
builder.append(",primaryTerm=").append(getPrimaryTerm());
builder.append(",result=").append(getResult().getLowercase());
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
@ -154,7 +157,7 @@ public class UpdateResponse extends DocWriteResponse {
public UpdateResponse build() {
UpdateResponse update;
if (shardInfo != null && seqNo != null) {
update = new UpdateResponse(shardInfo, shardId, type, id, seqNo, version, result);
update = new UpdateResponse(shardInfo, shardId, type, id, seqNo, primaryTerm, version, result);
} else {
update = new UpdateResponse(shardId, type, id, version, result);
}

View File

@ -611,8 +611,8 @@ public class InternalEngine extends Engine {
} else if (plan.indexIntoLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing,
plan.currentNotFoundOrDeleted);
indexResult = new IndexResult(
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
@ -704,10 +704,9 @@ public class InternalEngine extends Engine {
}
if (index.versionType().isVersionConflictForWrites(
currentVersion, index.version(), currentNotFoundOrDeleted)) {
plan = IndexingStrategy.skipDueToVersionConflict(
new VersionConflictEngineException(shardId, index, currentVersion,
currentNotFoundOrDeleted),
currentNotFoundOrDeleted, currentVersion);
final VersionConflictEngineException e =
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
seqNoService().generateSeqNo(),
@ -828,12 +827,11 @@ public class InternalEngine extends Engine {
return new IndexingStrategy(true, false, true, seqNoForIndexing, 1, null);
}
static IndexingStrategy skipDueToVersionConflict(VersionConflictEngineException e,
boolean currentNotFoundOrDeleted,
long currentVersion) {
return new IndexingStrategy(currentNotFoundOrDeleted, false,
false, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND,
new IndexResult(e, currentVersion));
static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
final IndexResult result = new IndexResult(e, currentVersion);
return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
}
static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted,
@ -903,8 +901,8 @@ public class InternalEngine extends Engine {
} else if (plan.deleteFromLucene) {
deleteResult = deleteInLucene(delete, plan);
} else {
deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion,
plan.currentlyDeleted == false);
deleteResult = new DeleteResult(
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
}
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
@ -982,9 +980,8 @@ public class InternalEngine extends Engine {
}
final DeletionStrategy plan;
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
plan = DeletionStrategy.skipDueToVersionConflict(
new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted),
currentVersion, currentlyDeleted);
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
plan = DeletionStrategy.processNormally(currentlyDeleted,
seqNoService().generateSeqNo(),
@ -1009,8 +1006,8 @@ public class InternalEngine extends Engine {
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
// there is no tragic event and such it must be a document level failure
return new DeleteResult(ex, plan.versionOfDeletion, plan.versionOfDeletion,
plan.currentlyDeleted == false);
return new DeleteResult(
ex, plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
} else {
throw ex;
}
@ -1040,26 +1037,20 @@ public class InternalEngine extends Engine {
Optional.empty() : Optional.of(earlyResultOnPreflightError);
}
static DeletionStrategy skipDueToVersionConflict(VersionConflictEngineException e,
long currentVersion, boolean currentlyDeleted) {
return new DeletionStrategy(false, currentlyDeleted,
SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND,
new DeleteResult(e, currentVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO,
currentlyDeleted == false));
static DeletionStrategy skipDueToVersionConflict(
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
final long unassignedSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false);
return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
}
static DeletionStrategy processNormally(boolean currentlyDeleted,
long seqNoOfDeletion, long versionOfDeletion) {
return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion,
null);
static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) {
return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
}
public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted,
long seqNoOfDeletion,
long versionOfDeletion) {
return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion,
null);
public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) {
return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null);
}
}

View File

@ -43,6 +43,7 @@ public class DocWriteResponseTests extends ESTestCase {
"type",
"id",
SequenceNumbersService.UNASSIGNED_SEQ_NO,
17,
0,
Result.CREATED) {};
assertEquals("/index/type/id", response.getLocation(null));
@ -56,6 +57,7 @@ public class DocWriteResponseTests extends ESTestCase {
"type",
"",
SequenceNumbersService.UNASSIGNED_SEQ_NO,
17,
0,
Result.CREATED) {};
assertEquals("/index/type/%E2%9D%A4", response.getLocation(null));
@ -69,6 +71,7 @@ public class DocWriteResponseTests extends ESTestCase {
"type",
"a b",
SequenceNumbersService.UNASSIGNED_SEQ_NO,
17,
0,
Result.CREATED) {};
assertEquals("/index/type/a+b", response.getLocation(null));
@ -86,6 +89,7 @@ public class DocWriteResponseTests extends ESTestCase {
"type",
"id",
SequenceNumbersService.UNASSIGNED_SEQ_NO,
17,
0,
Result.CREATED) {
// DocWriteResponse is abstract so we have to sneak a subclass in here to test it.

View File

@ -117,7 +117,7 @@ public class BulkRequestModifierTests extends ESTestCase {
for (DocWriteRequest actionRequest : bulkRequest.requests()) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
IndexResponse indexResponse = new IndexResponse(new ShardId("index", "_na_", 0), indexRequest.type(),
indexRequest.id(), 1, 1, true);
indexRequest.id(), 1, 17, 1, true);
originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType(), indexResponse));
}
bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0));

View File

@ -85,7 +85,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
// Successful index request should be replicated
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean());
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
assertThat(replicaItemExecutionMode(request, 0),
@ -471,7 +471,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
boolean created = randomBoolean();
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult indexResult = new FakeResult(1, 1, created, resultLocation);
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created);
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, created);
BulkItemResultHolder goodResults =
new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
@ -509,10 +509,12 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
equalTo(original));
boolean created = randomBoolean();
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created);
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, created);
Translog.Location newLocation = new Translog.Location(1, 1, 1);
Engine.IndexResult indexResult = new IndexResultWithLocation(randomNonNegativeLong(),
randomNonNegativeLong(), created, newLocation);
final long version = randomNonNegativeLong();
final long seqNo = randomNonNegativeLong();
final long primaryTerm = randomIntBetween(1, 16);
Engine.IndexResult indexResult = new IndexResultWithLocation(version, seqNo, primaryTerm, created, newLocation);
results = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results),
equalTo(newLocation));
@ -614,8 +616,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
public class IndexResultWithLocation extends Engine.IndexResult {
private final Translog.Location location;
public IndexResultWithLocation(long version, long seqNo, boolean created,
Translog.Location newLocation) {
public IndexResultWithLocation(long version, long seqNo, long primaryTerm, boolean created, Translog.Location newLocation) {
super(version, seqNo, created);
this.location = newLocation;
}
@ -630,8 +631,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(false);
DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id",
1, 1, randomBoolean());
DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, randomBoolean());
IndexRequest request = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");
@ -652,8 +652,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
private final Translog.Location location;
protected FakeResult(long version, long seqNo, boolean created,
Translog.Location location) {
protected FakeResult(long version, long seqNo, boolean created, Translog.Location location) {
super(version, seqNo, created);
this.location = location;
}

View File

@ -290,10 +290,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
default:
throw new RuntimeException("Bad scenario");
}
responses[i] = new BulkItemResponse(
i,
opType,
new IndexResponse(shardId, "type", "id" + i, randomInt(20), randomInt(), createdResponse));
final int seqNo = randomInt(20);
final int primaryTerm = randomIntBetween(1, 16);
final IndexResponse response =
new IndexResponse(shardId, "type", "id" + i, seqNo, primaryTerm, randomInt(), createdResponse);
responses[i] = new BulkItemResponse(i, opType, response);
}
new DummyAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0));
assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
@ -799,6 +800,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
index.type(),
index.id(),
randomInt(20),
randomIntBetween(1, 16),
randomIntBetween(0, Integer.MAX_VALUE),
true);
} else if (item instanceof UpdateRequest) {
@ -813,6 +815,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
delete.type(),
delete.id(),
randomInt(20),
randomIntBetween(1, 16),
randomIntBetween(0, Integer.MAX_VALUE),
true);
} else {

View File

@ -40,13 +40,13 @@ public class DeleteResponseTests extends ESTestCase {
public void testToXContent() {
{
DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "type", "id", 3, 5, true);
DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "type", "id", 3, 17, 5, true);
String output = Strings.toString(response);
assertEquals("{\"found\":true,\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":5,\"result\":\"deleted\"," +
"\"_shards\":null,\"_seq_no\":3}", output);
"\"_shards\":null,\"_seq_no\":3,\"_primary_term\":17}", output);
}
{
DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "type", "id", -1, 7, true);
DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "type", "id", -1, 0, 7, true);
response.setForcedRefresh(true);
response.setShardInfo(new ReplicationResponse.ShardInfo(10, 5));
String output = Strings.toString(response);
@ -89,17 +89,19 @@ public class DeleteResponseTests extends ESTestCase {
String type = randomAlphaOfLength(5);
String id = randomAlphaOfLength(5);
long seqNo = randomFrom(SequenceNumbersService.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), (long) randomIntBetween(0, 10000));
long primaryTerm = seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO ? 0 : randomIntBetween(1, 10000);
long version = randomBoolean() ? randomNonNegativeLong() : randomIntBetween(0, 10000);
boolean found = randomBoolean();
boolean forcedRefresh = randomBoolean();
Tuple<ReplicationResponse.ShardInfo, ReplicationResponse.ShardInfo> shardInfos = RandomObjects.randomShardInfo(random());
DeleteResponse actual = new DeleteResponse(new ShardId(index, indexUUid, shardId), type, id, seqNo, version, found);
DeleteResponse actual = new DeleteResponse(new ShardId(index, indexUUid, shardId), type, id, seqNo, primaryTerm, version, found);
actual.setForcedRefresh(forcedRefresh);
actual.setShardInfo(shardInfos.v1());
DeleteResponse expected = new DeleteResponse(new ShardId(index, INDEX_UUID_NA_VALUE, -1), type, id, seqNo, version, found);
DeleteResponse expected =
new DeleteResponse(new ShardId(index, INDEX_UUID_NA_VALUE, -1), type, id, seqNo, primaryTerm, version, found);
expected.setForcedRefresh(forcedRefresh);
expected.setShardInfo(shardInfos.v2());

View File

@ -135,7 +135,7 @@ public class IndexRequestTests extends ESTestCase {
String id = randomAlphaOfLengthBetween(3, 10);
long version = randomLong();
boolean created = randomBoolean();
IndexResponse indexResponse = new IndexResponse(shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, created);
IndexResponse indexResponse = new IndexResponse(shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, version, created);
int total = randomIntBetween(1, 10);
int successful = randomIntBetween(1, 10);
ReplicationResponse.ShardInfo shardInfo = new ReplicationResponse.ShardInfo(total, successful);
@ -156,6 +156,7 @@ public class IndexRequestTests extends ESTestCase {
assertEquals("IndexResponse[index=" + shardId.getIndexName() + ",type=" + type + ",id="+ id +
",version=" + version + ",result=" + (created ? "created" : "updated") +
",seqNo=" + SequenceNumbersService.UNASSIGNED_SEQ_NO +
",primaryTerm=" + 0 +
",shards={\"total\":" + total + ",\"successful\":" + successful + ",\"failed\":0}]",
indexResponse.toString());
}

View File

@ -41,13 +41,13 @@ public class IndexResponseTests extends ESTestCase {
public void testToXContent() {
{
IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "type", "id", 3, 5, true);
IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "type", "id", 3, 17, 5, true);
String output = Strings.toString(indexResponse);
assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":5,\"result\":\"created\",\"_shards\":null," +
"\"_seq_no\":3,\"created\":true}", output);
"\"_seq_no\":3,\"_primary_term\":17,\"created\":true}", output);
}
{
IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "type", "id", -1, 7, true);
IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "type", "id", -1, 17, 7, true);
indexResponse.setForcedRefresh(true);
indexResponse.setShardInfo(new ReplicationResponse.ShardInfo(10, 5));
String output = Strings.toString(indexResponse);
@ -102,17 +102,19 @@ public class IndexResponseTests extends ESTestCase {
String type = randomAlphaOfLength(5);
String id = randomAlphaOfLength(5);
long seqNo = randomFrom(SequenceNumbersService.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), (long) randomIntBetween(0, 10000));
long primaryTerm = seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO ? 0 : randomIntBetween(1, 10000);
long version = randomBoolean() ? randomNonNegativeLong() : randomIntBetween(0, 10000);
boolean created = randomBoolean();
boolean forcedRefresh = randomBoolean();
Tuple<ReplicationResponse.ShardInfo, ReplicationResponse.ShardInfo> shardInfos = RandomObjects.randomShardInfo(random());
IndexResponse actual = new IndexResponse(new ShardId(index, indexUUid, shardId), type, id, seqNo, version, created);
IndexResponse actual = new IndexResponse(new ShardId(index, indexUUid, shardId), type, id, seqNo, primaryTerm, version, created);
actual.setForcedRefresh(forcedRefresh);
actual.setShardInfo(shardInfos.v1());
IndexResponse expected = new IndexResponse(new ShardId(index, INDEX_UUID_NA_VALUE, -1), type, id, seqNo, version, created);
IndexResponse expected =
new IndexResponse(new ShardId(index, INDEX_UUID_NA_VALUE, -1), type, id, seqNo, primaryTerm, version, created);
expected.setForcedRefresh(forcedRefresh);
expected.setShardInfo(shardInfos.v2());

View File

@ -57,10 +57,10 @@ public class UpdateResponseTests extends ESTestCase {
}
{
UpdateResponse updateResponse = new UpdateResponse(new ReplicationResponse.ShardInfo(10, 6),
new ShardId("index", "index_uuid", 1), "type", "id", 3, 1, DELETED);
new ShardId("index", "index_uuid", 1), "type", "id", 3, 17, 1, DELETED);
String output = Strings.toString(updateResponse);
assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":1,\"result\":\"deleted\"," +
"\"_shards\":{\"total\":10,\"successful\":6,\"failed\":0},\"_seq_no\":3}", output);
"\"_shards\":{\"total\":10,\"successful\":6,\"failed\":0},\"_seq_no\":3,\"_primary_term\":17}", output);
}
{
BytesReference source = new BytesArray("{\"title\":\"Book title\",\"isbn\":\"ABC-123\"}");
@ -69,12 +69,12 @@ public class UpdateResponseTests extends ESTestCase {
fields.put("isbn", new GetField("isbn", Collections.singletonList("ABC-123")));
UpdateResponse updateResponse = new UpdateResponse(new ReplicationResponse.ShardInfo(3, 2),
new ShardId("books", "books_uuid", 2), "book", "1", 7, 2, UPDATED);
new ShardId("books", "books_uuid", 2), "book", "1", 7, 17, 2, UPDATED);
updateResponse.setGetResult(new GetResult("books", "book", "1", 2, true, source, fields));
String output = Strings.toString(updateResponse);
assertEquals("{\"_index\":\"books\",\"_type\":\"book\",\"_id\":\"1\",\"_version\":2,\"result\":\"updated\"," +
"\"_shards\":{\"total\":3,\"successful\":2,\"failed\":0},\"_seq_no\":7,\"get\":{\"found\":true," +
"\"_shards\":{\"total\":3,\"successful\":2,\"failed\":0},\"_seq_no\":7,\"_primary_term\":17,\"get\":{\"found\":true," +
"\"_source\":{\"title\":\"Book title\",\"isbn\":\"ABC-123\"},\"fields\":{\"isbn\":[\"ABC-123\"],\"title\":[\"Book " +
"title\"]}}}", output);
}
@ -128,6 +128,7 @@ public class UpdateResponseTests extends ESTestCase {
// We also want small number values (randomNonNegativeLong() tend to generate high numbers)
// in order to catch some conversion error that happen between int/long after parsing.
Long seqNo = randomFrom(randomNonNegativeLong(), (long) randomIntBetween(0, 10_000), null);
long primaryTerm = seqNo == null ? 0 : randomIntBetween(1, 16);
ShardId actualShardId = new ShardId(index, indexUUid, shardId);
ShardId expectedShardId = new ShardId(index, INDEX_UUID_NA_VALUE, -1);
@ -136,8 +137,8 @@ public class UpdateResponseTests extends ESTestCase {
if (seqNo != null) {
Tuple<ReplicationResponse.ShardInfo, ReplicationResponse.ShardInfo> shardInfos = RandomObjects.randomShardInfo(random());
actual = new UpdateResponse(shardInfos.v1(), actualShardId, type, id, seqNo, version, result);
expected = new UpdateResponse(shardInfos.v2(), expectedShardId, type, id, seqNo, version, result);
actual = new UpdateResponse(shardInfos.v1(), actualShardId, type, id, seqNo, primaryTerm, version, result);
expected = new UpdateResponse(shardInfos.v2(), expectedShardId, type, id, seqNo, primaryTerm, version, result);
} else {
actual = new UpdateResponse(actualShardId, type, id, version, result);
expected = new UpdateResponse(expectedShardId, type, id, version, result);

View File

@ -2055,6 +2055,7 @@ public class TranslogTests extends ESTestCase {
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
assert Version.CURRENT.major <= 6 : "Using UNASSIGNED_SEQ_NO can be removed in 7.0, because 6.0+ nodes have actual sequence numbers";
long randomSeqNum = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong();
long primaryTerm = randomSeqNum == SequenceNumbersService.UNASSIGNED_SEQ_NO ? 0 : randomIntBetween(1, 16);
long randomPrimaryTerm = randomBoolean() ? 0 : randomNonNegativeLong();
seqID.seqNo.setLongValue(randomSeqNum);
seqID.seqNoDocValue.setLongValue(randomSeqNum);

View File

@ -104,7 +104,8 @@ The result of this bulk operation is:
},
"created": true,
"status": 201,
"_seq_no" : 0
"_seq_no" : 0,
"_primary_term": 1
}
},
{
@ -121,7 +122,8 @@ The result of this bulk operation is:
"failed": 0
},
"status": 404,
"_seq_no" : 1
"_seq_no" : 1,
"_primary_term" : 2
}
},
{
@ -138,7 +140,8 @@ The result of this bulk operation is:
},
"created": true,
"status": 201,
"_seq_no" : 2
"_seq_no" : 2,
"_primary_term" : 3
}
},
{
@ -154,13 +157,23 @@ The result of this bulk operation is:
"failed": 0
},
"status": 200,
"_seq_no" : 3
"_seq_no" : 3,
"_primary_term" : 4
}
}
]
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 30/"took": $body.took/ s/"index_uuid": .../"index_uuid": $body.items.3.update.error.index_uuid/ s/"_seq_no" : 0/"_seq_no" : $body.items.0.index._seq_no/ s/"_seq_no" : 1/"_seq_no" : $body.items.1.delete._seq_no/ s/"_seq_no" : 2/"_seq_no" : $body.items.2.create._seq_no/ s/"_seq_no" : 3/"_seq_no" : $body.items.3.update._seq_no/]
// TESTRESPONSE[s/"took": 30/"took": $body.took/]
// TESTRESPONSE[s/"index_uuid": .../"index_uuid": $body.items.3.update.error.index_uuid/]
// TESTRESPONSE[s/"_seq_no" : 0/"_seq_no" : $body.items.0.index._seq_no/]
// TESTRESPONSE[s/"_primary_term" : 1/"_primary_term" : $body.items.0.index._primary_term/]
// TESTRESPONSE[s/"_seq_no" : 1/"_seq_no" : $body.items.1.delete._seq_no/]
// TESTRESPONSE[s/"_primary_term" : 2/"_primary_term" : $body.items.1.delete._primary_term/]
// TESTRESPONSE[s/"_seq_no" : 2/"_seq_no" : $body.items.2.create._seq_no/]
// TESTRESPONSE[s/"_primary_term" : 3/"_primary_term" : $body.items.2.create._primary_term/]
// TESTRESPONSE[s/"_seq_no" : 3/"_seq_no" : $body.items.3.update._seq_no/]
// TESTRESPONSE[s/"_primary_term" : 4/"_primary_term" : $body.items.3.update._primary_term/]
The endpoints are `/_bulk`, `/{index}/_bulk`, and `{index}/{type}/_bulk`.
When the index or the index/type are provided, they will be used by

View File

@ -32,6 +32,7 @@ The result of the above index operation is:
"_version" : 1,
"created" : true,
"_seq_no" : 0,
"_primary_term" : 1,
"result" : created
}
--------------------------------------------------
@ -230,6 +231,7 @@ The result of the above index operation is:
"_version" : 1,
"created" : true,
"_seq_no" : 0,
"_primary_term" : 1,
"result": "created"
}
--------------------------------------------------

View File

@ -323,10 +323,11 @@ And the response:
"failed" : 0
},
"created" : true,
"_seq_no" : 0
"_seq_no" : 0,
"_primary_term" : 1
}
--------------------------------------------------
// TESTRESPONSE[s/"_seq_no" : 0/"_seq_no" : $body._seq_no/]
// TESTRESPONSE[s/"_seq_no" : 0/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
From the above, we can see that a new customer document was successfully created inside the customer index and the external type. The document also has an internal id of 1 which we specified at index time.

View File

@ -906,7 +906,8 @@ PUT /myindex/type/1?pipeline=monthlyindex
"failed" : 0
},
"created" : true,
"_seq_no" : 0
"_seq_no" : 0,
"_primary_term" : 1
}
--------------------------------------------------
// TESTRESPONSE

View File

@ -182,7 +182,8 @@ Index response:
},
"created": true,
"result": "created",
"_seq_no" : 1
"_seq_no" : 1,
"_primary_term" : 1
}
--------------------------------------------------
// TESTRESPONSE