mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-04-01 04:48:28 +00:00
if_seq_no
and if_primary_term
parameters aren't wired correctly in REST Client's CRUD API (#38411)
This commit is contained in:
parent
df4eb0485d
commit
12657fda44
@ -108,6 +108,8 @@ final class RequestConverters {
|
|||||||
parameters.withTimeout(deleteRequest.timeout());
|
parameters.withTimeout(deleteRequest.timeout());
|
||||||
parameters.withVersion(deleteRequest.version());
|
parameters.withVersion(deleteRequest.version());
|
||||||
parameters.withVersionType(deleteRequest.versionType());
|
parameters.withVersionType(deleteRequest.versionType());
|
||||||
|
parameters.withIfSeqNo(deleteRequest.ifSeqNo());
|
||||||
|
parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm());
|
||||||
parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy());
|
parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy());
|
||||||
parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards());
|
parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards());
|
||||||
return request;
|
return request;
|
||||||
@ -191,6 +193,11 @@ final class RequestConverters {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||||
|
metadata.field("if_seq_no", action.ifSeqNo());
|
||||||
|
metadata.field("if_primary_term", action.ifPrimaryTerm());
|
||||||
|
}
|
||||||
|
|
||||||
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
|
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
|
||||||
IndexRequest indexRequest = (IndexRequest) action;
|
IndexRequest indexRequest = (IndexRequest) action;
|
||||||
if (Strings.hasLength(indexRequest.getPipeline())) {
|
if (Strings.hasLength(indexRequest.getPipeline())) {
|
||||||
@ -319,6 +326,8 @@ final class RequestConverters {
|
|||||||
parameters.withTimeout(indexRequest.timeout());
|
parameters.withTimeout(indexRequest.timeout());
|
||||||
parameters.withVersion(indexRequest.version());
|
parameters.withVersion(indexRequest.version());
|
||||||
parameters.withVersionType(indexRequest.versionType());
|
parameters.withVersionType(indexRequest.versionType());
|
||||||
|
parameters.withIfSeqNo(indexRequest.ifSeqNo());
|
||||||
|
parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
|
||||||
parameters.withPipeline(indexRequest.getPipeline());
|
parameters.withPipeline(indexRequest.getPipeline());
|
||||||
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
|
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
|
||||||
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards());
|
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards());
|
||||||
|
@ -104,11 +104,13 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|||||||
{
|
{
|
||||||
// Testing deletion
|
// Testing deletion
|
||||||
String docId = "id";
|
String docId = "id";
|
||||||
highLevelClient().index(
|
IndexResponse indexResponse = highLevelClient().index(
|
||||||
new IndexRequest("index").id(docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
|
new IndexRequest("index").id(docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
|
||||||
|
assertThat(indexResponse.getSeqNo(), greaterThanOrEqualTo(0L));
|
||||||
DeleteRequest deleteRequest = new DeleteRequest("index", docId);
|
DeleteRequest deleteRequest = new DeleteRequest("index", docId);
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
deleteRequest.version(1L);
|
deleteRequest.setIfSeqNo(indexResponse.getSeqNo());
|
||||||
|
deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm());
|
||||||
}
|
}
|
||||||
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync);
|
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync);
|
||||||
assertEquals("index", deleteResponse.getIndex());
|
assertEquals("index", deleteResponse.getIndex());
|
||||||
@ -131,12 +133,13 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|||||||
String docId = "version_conflict";
|
String docId = "version_conflict";
|
||||||
highLevelClient().index(
|
highLevelClient().index(
|
||||||
new IndexRequest("index").id( docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
|
new IndexRequest("index").id( docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
|
||||||
DeleteRequest deleteRequest = new DeleteRequest("index", docId).version(2);
|
DeleteRequest deleteRequest = new DeleteRequest("index", docId).setIfSeqNo(2).setIfPrimaryTerm(2);
|
||||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||||
() -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync));
|
() -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync));
|
||||||
assertEquals(RestStatus.CONFLICT, exception.status());
|
assertEquals(RestStatus.CONFLICT, exception.status());
|
||||||
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][" + docId + "]: " +
|
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][" + docId + "]: " +
|
||||||
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
|
"version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]",
|
||||||
|
exception.getMessage());
|
||||||
assertEquals("index", exception.getMetadata("es.index").get(0));
|
assertEquals("index", exception.getMetadata("es.index").get(0));
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@ -519,13 +522,14 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|||||||
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
|
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
|
||||||
IndexRequest wrongRequest = new IndexRequest("index").id("id");
|
IndexRequest wrongRequest = new IndexRequest("index").id("id");
|
||||||
wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
|
wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
|
||||||
wrongRequest.version(5L);
|
wrongRequest.setIfSeqNo(1L).setIfPrimaryTerm(5L);
|
||||||
|
|
||||||
execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync);
|
execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync);
|
||||||
});
|
});
|
||||||
assertEquals(RestStatus.CONFLICT, exception.status());
|
assertEquals(RestStatus.CONFLICT, exception.status());
|
||||||
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: " +
|
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: " +
|
||||||
"version conflict, current version [2] is different than the one provided [5]]", exception.getMessage());
|
"version conflict, required seqNo [1], primary term [5]. current document has seqNo [2] and primary term [1]]",
|
||||||
|
exception.getMessage());
|
||||||
assertEquals("index", exception.getMetadata("es.index").get(0));
|
assertEquals("index", exception.getMetadata("es.index").get(0));
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@ -820,7 +824,8 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|||||||
if (opType == DocWriteRequest.OpType.INDEX) {
|
if (opType == DocWriteRequest.OpType.INDEX) {
|
||||||
IndexRequest indexRequest = new IndexRequest("index").id(id).source(source, xContentType);
|
IndexRequest indexRequest = new IndexRequest("index").id(id).source(source, xContentType);
|
||||||
if (erroneous) {
|
if (erroneous) {
|
||||||
indexRequest.version(12L);
|
indexRequest.setIfSeqNo(12L);
|
||||||
|
indexRequest.setIfPrimaryTerm(12L);
|
||||||
}
|
}
|
||||||
bulkRequest.add(indexRequest);
|
bulkRequest.add(indexRequest);
|
||||||
|
|
||||||
@ -1130,7 +1135,8 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|||||||
if (opType == DocWriteRequest.OpType.INDEX) {
|
if (opType == DocWriteRequest.OpType.INDEX) {
|
||||||
IndexRequest indexRequest = new IndexRequest("index").id(id).source(xContentType, "id", i);
|
IndexRequest indexRequest = new IndexRequest("index").id(id).source(xContentType, "id", i);
|
||||||
if (erroneous) {
|
if (erroneous) {
|
||||||
indexRequest.version(12L);
|
indexRequest.setIfSeqNo(12L);
|
||||||
|
indexRequest.setIfPrimaryTerm(12L);
|
||||||
}
|
}
|
||||||
processor.add(indexRequest);
|
processor.add(indexRequest);
|
||||||
|
|
||||||
|
@ -281,6 +281,7 @@ public class RequestConvertersTests extends ESTestCase {
|
|||||||
setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams);
|
setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams);
|
||||||
setRandomVersion(deleteRequest, expectedParams);
|
setRandomVersion(deleteRequest, expectedParams);
|
||||||
setRandomVersionType(deleteRequest::versionType, expectedParams);
|
setRandomVersionType(deleteRequest::versionType, expectedParams);
|
||||||
|
setRandomIfSeqNoAndTerm(deleteRequest, expectedParams);
|
||||||
|
|
||||||
if (frequently()) {
|
if (frequently()) {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
@ -631,6 +632,7 @@ public class RequestConvertersTests extends ESTestCase {
|
|||||||
} else {
|
} else {
|
||||||
setRandomVersion(indexRequest, expectedParams);
|
setRandomVersion(indexRequest, expectedParams);
|
||||||
setRandomVersionType(indexRequest::versionType, expectedParams);
|
setRandomVersionType(indexRequest::versionType, expectedParams);
|
||||||
|
setRandomIfSeqNoAndTerm(indexRequest, expectedParams);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (frequently()) {
|
if (frequently()) {
|
||||||
@ -768,6 +770,7 @@ public class RequestConvertersTests extends ESTestCase {
|
|||||||
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams);
|
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams);
|
||||||
setRandomVersion(updateRequest, expectedParams);
|
setRandomVersion(updateRequest, expectedParams);
|
||||||
setRandomVersionType(updateRequest::versionType, expectedParams);
|
setRandomVersionType(updateRequest::versionType, expectedParams);
|
||||||
|
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
int retryOnConflict = randomIntBetween(0, 5);
|
int retryOnConflict = randomIntBetween(0, 5);
|
||||||
updateRequest.retryOnConflict(retryOnConflict);
|
updateRequest.retryOnConflict(retryOnConflict);
|
||||||
@ -798,6 +801,7 @@ public class RequestConvertersTests extends ESTestCase {
|
|||||||
assertEquals(updateRequest.docAsUpsert(), parsedUpdateRequest.docAsUpsert());
|
assertEquals(updateRequest.docAsUpsert(), parsedUpdateRequest.docAsUpsert());
|
||||||
assertEquals(updateRequest.detectNoop(), parsedUpdateRequest.detectNoop());
|
assertEquals(updateRequest.detectNoop(), parsedUpdateRequest.detectNoop());
|
||||||
assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource());
|
assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource());
|
||||||
|
assertIfSeqNoAndTerm(updateRequest, parsedUpdateRequest);
|
||||||
assertEquals(updateRequest.script(), parsedUpdateRequest.script());
|
assertEquals(updateRequest.script(), parsedUpdateRequest.script());
|
||||||
if (updateRequest.doc() != null) {
|
if (updateRequest.doc() != null) {
|
||||||
assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType);
|
assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType);
|
||||||
@ -811,6 +815,22 @@ public class RequestConvertersTests extends ESTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void assertIfSeqNoAndTerm(DocWriteRequest<?>request, DocWriteRequest<?> parsedRequest) {
|
||||||
|
assertEquals(request.ifSeqNo(), parsedRequest.ifSeqNo());
|
||||||
|
assertEquals(request.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setRandomIfSeqNoAndTerm(DocWriteRequest<?> request, Map<String, String> expectedParams) {
|
||||||
|
if (randomBoolean()) {
|
||||||
|
final long seqNo = randomNonNegativeLong();
|
||||||
|
request.setIfSeqNo(seqNo);
|
||||||
|
expectedParams.put("if_seq_no", Long.toString(seqNo));
|
||||||
|
final long primaryTerm = randomLongBetween(1, 200);
|
||||||
|
request.setIfPrimaryTerm(primaryTerm);
|
||||||
|
expectedParams.put("if_primary_term", Long.toString(primaryTerm));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testUpdateWithType() throws IOException {
|
public void testUpdateWithType() throws IOException {
|
||||||
String index = randomAlphaOfLengthBetween(3, 10);
|
String index = randomAlphaOfLengthBetween(3, 10);
|
||||||
String type = randomAlphaOfLengthBetween(3, 10);
|
String type = randomAlphaOfLengthBetween(3, 10);
|
||||||
@ -891,12 +911,17 @@ public class RequestConvertersTests extends ESTestCase {
|
|||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
docWriteRequest.routing(randomAlphaOfLength(10));
|
docWriteRequest.routing(randomAlphaOfLength(10));
|
||||||
}
|
}
|
||||||
|
if (randomBoolean()) {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
docWriteRequest.version(randomNonNegativeLong());
|
docWriteRequest.version(randomNonNegativeLong());
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
docWriteRequest.versionType(randomFrom(VersionType.values()));
|
docWriteRequest.versionType(randomFrom(VersionType.values()));
|
||||||
}
|
}
|
||||||
|
} else if (randomBoolean()) {
|
||||||
|
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
|
||||||
|
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
|
||||||
|
}
|
||||||
bulkRequest.add(docWriteRequest);
|
bulkRequest.add(docWriteRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -925,6 +950,8 @@ public class RequestConvertersTests extends ESTestCase {
|
|||||||
assertEquals(originalRequest.routing(), parsedRequest.routing());
|
assertEquals(originalRequest.routing(), parsedRequest.routing());
|
||||||
assertEquals(originalRequest.version(), parsedRequest.version());
|
assertEquals(originalRequest.version(), parsedRequest.version());
|
||||||
assertEquals(originalRequest.versionType(), parsedRequest.versionType());
|
assertEquals(originalRequest.versionType(), parsedRequest.versionType());
|
||||||
|
assertEquals(originalRequest.ifSeqNo(), parsedRequest.ifSeqNo());
|
||||||
|
assertEquals(originalRequest.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());
|
||||||
|
|
||||||
DocWriteRequest.OpType opType = originalRequest.opType();
|
DocWriteRequest.OpType opType = originalRequest.opType();
|
||||||
if (opType == DocWriteRequest.OpType.INDEX) {
|
if (opType == DocWriteRequest.OpType.INDEX) {
|
||||||
|
@ -0,0 +1,42 @@
|
|||||||
|
---
|
||||||
|
"Compare And Swap Sequence Numbers":
|
||||||
|
|
||||||
|
- skip:
|
||||||
|
version: " - 6.99.99"
|
||||||
|
reason: typeless API are add in 7.0.0
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_1
|
||||||
|
id: 1
|
||||||
|
body: { foo: bar }
|
||||||
|
- match: { _version: 1}
|
||||||
|
- set: { _seq_no: seqno }
|
||||||
|
- set: { _primary_term: primary_term }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
body:
|
||||||
|
- index:
|
||||||
|
_index: test_1
|
||||||
|
_id: 1
|
||||||
|
if_seq_no: 10000
|
||||||
|
if_primary_term: $primary_term
|
||||||
|
- foo: bar2
|
||||||
|
|
||||||
|
- match: { errors: true }
|
||||||
|
- match: { items.0.index.status: 409 }
|
||||||
|
- match: { items.0.index.error.type: version_conflict_engine_exception }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
body:
|
||||||
|
- index:
|
||||||
|
_index: test_1
|
||||||
|
_id: 1
|
||||||
|
if_seq_no: $seqno
|
||||||
|
if_primary_term: $primary_term
|
||||||
|
- foo: bar2
|
||||||
|
|
||||||
|
- match: { errors: false}
|
||||||
|
- match: { items.0.index.status: 200 }
|
@ -0,0 +1,45 @@
|
|||||||
|
---
|
||||||
|
"Compare And Swap Sequence Numbers":
|
||||||
|
|
||||||
|
- skip:
|
||||||
|
version: " - 6.6.99"
|
||||||
|
reason: cas operations with sequence numbers was added in 6.7
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_1
|
||||||
|
type: _doc
|
||||||
|
id: 1
|
||||||
|
body: { foo: bar }
|
||||||
|
- match: { _version: 1}
|
||||||
|
- set: { _seq_no: seqno }
|
||||||
|
- set: { _primary_term: primary_term }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
body:
|
||||||
|
- index:
|
||||||
|
_index: test_1
|
||||||
|
_type: _doc
|
||||||
|
_id: 1
|
||||||
|
if_seq_no: 10000
|
||||||
|
if_primary_term: $primary_term
|
||||||
|
- foo: bar2
|
||||||
|
|
||||||
|
- match: { errors: true }
|
||||||
|
- match: { items.0.index.status: 409 }
|
||||||
|
- match: { items.0.index.error.type: version_conflict_engine_exception }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
body:
|
||||||
|
- index:
|
||||||
|
_index: test_1
|
||||||
|
_type: _doc
|
||||||
|
_id: 1
|
||||||
|
if_seq_no: $seqno
|
||||||
|
if_primary_term: $primary_term
|
||||||
|
- foo: bar2
|
||||||
|
|
||||||
|
- match: { errors: false}
|
||||||
|
- match: { items.0.index.status: 200 }
|
@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
- skip:
|
- skip:
|
||||||
version: " - 6.99.99"
|
version: " - 6.99.99"
|
||||||
reason: cas ops are introduced in 7.0.0
|
reason: typesless api was introduces in 7.0
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
index:
|
index:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user