diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 3c74acb41..0163e01c6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -81,6 +81,7 @@ import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.tasks.TaskId; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; @@ -116,6 +117,8 @@ public class RequestConverters { parameters.withTimeout(deleteRequest.timeout()); parameters.withVersion(deleteRequest.version()); parameters.withVersionType(deleteRequest.versionType()); + parameters.withIfSeqNo(deleteRequest.ifSeqNo()); + parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm()); parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy()); parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards()); return request; @@ -315,6 +318,8 @@ public class RequestConverters { parameters.withTimeout(indexRequest.timeout()); parameters.withVersion(indexRequest.version()); parameters.withVersionType(indexRequest.versionType()); + parameters.withIfSeqNo(indexRequest.ifSeqNo()); + parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm()); parameters.withPipeline(indexRequest.getPipeline()); parameters.withRefreshPolicy(indexRequest.getRefreshPolicy()); parameters.withWaitForActiveShards(indexRequest.waitForActiveShards()); @@ -917,6 +922,20 @@ public class RequestConverters { return this; } + Params withIfSeqNo(long seqNo) { + if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + return putParam("if_seq_no", Long.toString(seqNo)); + } + return this; + } + + Params withIfPrimaryTerm(long primaryTerm) { + if (primaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { + return putParam("if_primary_term", Long.toString(primaryTerm)); + } + return this; + } + Params withWaitForActiveShards(ActiveShardCount activeShardCount) { return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT); } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/util/RequestConvertersTest.java b/src/test/java/org/springframework/data/elasticsearch/client/util/RequestConvertersTest.java new file mode 100644 index 000000000..de56af80b --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/client/util/RequestConvertersTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client.util; + +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import java.util.HashMap; + +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Request; +import org.junit.Test; + +/** + * Unit tests for {@link RequestConverters}. + * + * @author Roman Puchkovskiy + */ +public class RequestConvertersTest { + @Test // DATAES-652 + public void shouldNotAddIfSeqNoAndIfPrimaryTermToResultIfInputDoesNotcontainThemWhenConvertingIndexRequest() { + IndexRequest request = createMinimalIndexRequest(); + + Request result = RequestConverters.index(request); + + assertThat(result.getParameters(), not(hasKey("if_seq_no"))); + assertThat(result.getParameters(), not(hasKey("if_primary_term"))); + } + + private IndexRequest createMinimalIndexRequest() { + IndexRequest request = new IndexRequest("the-index", "the-type", "id"); + request.source(new HashMap() { + { + put("test", "test"); + } + }); + return request; + } + + @Test // DATAES-652 + public void shouldAddIfSeqNoAndIfPrimaryTermToResultIfInputcontainsThemWhenConvertingIndexRequest() { + IndexRequest request = createMinimalIndexRequest(); + request.setIfSeqNo(3); + request.setIfPrimaryTerm(4); + + Request result = RequestConverters.index(request); + + assertThat(result.getParameters(), hasEntry("if_seq_no", "3")); + assertThat(result.getParameters(), hasEntry("if_primary_term", "4")); + } + + @Test // DATAES-652 + public void shouldNotAddIfSeqNoAndIfPrimaryTermToResultIfInputDoesNotcontainThemWhenConvertingDeleteRequest() { + DeleteRequest request = createMinimalDeleteRequest(); + + Request result = RequestConverters.delete(request); + + assertThat(result.getParameters(), not(hasKey("if_seq_no"))); + assertThat(result.getParameters(), not(hasKey("if_primary_term"))); + } + + private DeleteRequest createMinimalDeleteRequest() { + return new DeleteRequest("the-index", "the-type", "id"); + } + + @Test // DATAES-652 + public void shouldAddIfSeqNoAndIfPrimaryTermToResultIfInputcontainsThemWhenConvertingDeleteRequest() { + DeleteRequest request = createMinimalDeleteRequest(); + request.setIfSeqNo(3); + request.setIfPrimaryTerm(4); + + Request result = RequestConverters.delete(request); + + assertThat(result.getParameters(), hasEntry("if_seq_no", "3")); + assertThat(result.getParameters(), hasEntry("if_primary_term", "4")); + } +}