Use Sequence number powered OCC for processing updates (#37308)
Updates perform realtime get, perform the requested update and then index the document again using optimistic concurrency control. This PR changes the logic to use sequence numbers instead of versioning. Note that the current versioning logic isn't suffering from the same problem as external OCC requests because the get and indexing is always done on the same primary. Relates #36148 Relates #10708
This commit is contained in:
parent
19a7e0f4eb
commit
d21df2a17a
|
@ -163,19 +163,6 @@ public class UpdateHelper {
|
||||||
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
|
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Calculate the version to use for the update request, using either the existing version if internal versioning is used, or the get
|
|
||||||
* result document's version if the version type is "FORCE".
|
|
||||||
*/
|
|
||||||
static long calculateUpdateVersion(UpdateRequest request, GetResult getResult) {
|
|
||||||
if (request.versionType() != VersionType.INTERNAL) {
|
|
||||||
assert request.versionType() == VersionType.FORCE;
|
|
||||||
return request.version(); // remember, match_any is excluded by the conflict test
|
|
||||||
} else {
|
|
||||||
return getResult.getVersion();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculate a routing value to be used, either the included index request's routing, or retrieved document's routing when defined.
|
* Calculate a routing value to be used, either the included index request's routing, or retrieved document's routing when defined.
|
||||||
*/
|
*/
|
||||||
|
@ -195,7 +182,6 @@ public class UpdateHelper {
|
||||||
* containing a new {@code IndexRequest} to be executed on the primary and replicas.
|
* containing a new {@code IndexRequest} to be executed on the primary and replicas.
|
||||||
*/
|
*/
|
||||||
Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
|
Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
|
||||||
final long updateVersion = calculateUpdateVersion(request, getResult);
|
|
||||||
final IndexRequest currentRequest = request.doc();
|
final IndexRequest currentRequest = request.doc();
|
||||||
final String routing = calculateRouting(getResult, currentRequest);
|
final String routing = calculateRouting(getResult, currentRequest);
|
||||||
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
||||||
|
@ -215,7 +201,8 @@ public class UpdateHelper {
|
||||||
} else {
|
} else {
|
||||||
final IndexRequest finalIndexRequest = Requests.indexRequest(request.index())
|
final IndexRequest finalIndexRequest = Requests.indexRequest(request.index())
|
||||||
.type(request.type()).id(request.id()).routing(routing)
|
.type(request.type()).id(request.id()).routing(routing)
|
||||||
.source(updatedSourceAsMap, updateSourceContentType).version(updateVersion).versionType(request.versionType())
|
.source(updatedSourceAsMap, updateSourceContentType)
|
||||||
|
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
|
||||||
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
|
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
|
||||||
.setRefreshPolicy(request.getRefreshPolicy());
|
.setRefreshPolicy(request.getRefreshPolicy());
|
||||||
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
|
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
|
||||||
|
@ -228,7 +215,6 @@ public class UpdateHelper {
|
||||||
* primary and replicas.
|
* primary and replicas.
|
||||||
*/
|
*/
|
||||||
Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetResult getResult, LongSupplier nowInMillis) {
|
Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetResult getResult, LongSupplier nowInMillis) {
|
||||||
final long updateVersion = calculateUpdateVersion(request, getResult);
|
|
||||||
final IndexRequest currentRequest = request.doc();
|
final IndexRequest currentRequest = request.doc();
|
||||||
final String routing = calculateRouting(getResult, currentRequest);
|
final String routing = calculateRouting(getResult, currentRequest);
|
||||||
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
||||||
|
@ -256,14 +242,16 @@ public class UpdateHelper {
|
||||||
case INDEX:
|
case INDEX:
|
||||||
final IndexRequest indexRequest = Requests.indexRequest(request.index())
|
final IndexRequest indexRequest = Requests.indexRequest(request.index())
|
||||||
.type(request.type()).id(request.id()).routing(routing)
|
.type(request.type()).id(request.id()).routing(routing)
|
||||||
.source(updatedSourceAsMap, updateSourceContentType).version(updateVersion).versionType(request.versionType())
|
.source(updatedSourceAsMap, updateSourceContentType)
|
||||||
|
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
|
||||||
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
|
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
|
||||||
.setRefreshPolicy(request.getRefreshPolicy());
|
.setRefreshPolicy(request.getRefreshPolicy());
|
||||||
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
|
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
|
||||||
case DELETE:
|
case DELETE:
|
||||||
DeleteRequest deleteRequest = Requests.deleteRequest(request.index())
|
DeleteRequest deleteRequest = Requests.deleteRequest(request.index())
|
||||||
.type(request.type()).id(request.id()).routing(routing)
|
.type(request.type()).id(request.id()).routing(routing)
|
||||||
.version(updateVersion).versionType(request.versionType()).waitForActiveShards(request.waitForActiveShards())
|
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
|
||||||
|
.waitForActiveShards(request.waitForActiveShards())
|
||||||
.timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy());
|
.timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy());
|
||||||
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
|
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.index.VersionType;
|
|
||||||
import org.elasticsearch.index.get.GetResult;
|
import org.elasticsearch.index.get.GetResult;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.script.MockScriptEngine;
|
import org.elasticsearch.script.MockScriptEngine;
|
||||||
|
@ -570,24 +569,6 @@ public class UpdateRequestTests extends ESTestCase {
|
||||||
assertThat(UpdateHelper.calculateRouting(getResult, indexRequest), equalTo("routing1"));
|
assertThat(UpdateHelper.calculateRouting(getResult, indexRequest), equalTo("routing1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecated") // VersionType.FORCE is deprecated
|
|
||||||
public void testCalculateUpdateVersion() throws Exception {
|
|
||||||
long randomVersion = randomIntBetween(0, 100);
|
|
||||||
GetResult getResult = new GetResult("test", "type", "1", 0, 1, randomVersion, true, new BytesArray("{}"), null);
|
|
||||||
|
|
||||||
UpdateRequest request = new UpdateRequest("test", "type1", "1");
|
|
||||||
long version = UpdateHelper.calculateUpdateVersion(request, getResult);
|
|
||||||
|
|
||||||
// Use the get result's version
|
|
||||||
assertThat(version, equalTo(randomVersion));
|
|
||||||
|
|
||||||
request = new UpdateRequest("test", "type1", "1").versionType(VersionType.FORCE).version(1337);
|
|
||||||
version = UpdateHelper.calculateUpdateVersion(request, getResult);
|
|
||||||
|
|
||||||
// Use the forced update request version
|
|
||||||
assertThat(version, equalTo(1337L));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testNoopDetection() throws Exception {
|
public void testNoopDetection() throws Exception {
|
||||||
ShardId shardId = new ShardId("test", "", 0);
|
ShardId shardId = new ShardId("test", "", 0);
|
||||||
GetResult getResult = new GetResult("test", "type", "1", 0, 1, 0, true,
|
GetResult getResult = new GetResult("test", "type", "1", 0, 1, 0, true,
|
||||||
|
|
Loading…
Reference in New Issue