DATAES-652 - Send if_seq_no and if_primary_term parameters when index…

...ing via ReactiveElasticsearchClient

Original PR: #322
This commit is contained in:
Roman Puchkovskiy 2019-09-20 14:05:06 +04:00 committed by Peter-Josef Meisch
parent 642d95b5d1
commit 8d044fe3d1
2 changed files with 112 additions and 0 deletions

View File

@ -81,6 +81,7 @@ import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.TaskId;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
@ -116,6 +117,8 @@ public class RequestConverters {
parameters.withTimeout(deleteRequest.timeout());
parameters.withVersion(deleteRequest.version());
parameters.withVersionType(deleteRequest.versionType());
parameters.withIfSeqNo(deleteRequest.ifSeqNo());
parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm());
parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards());
return request;
@ -315,6 +318,8 @@ public class RequestConverters {
parameters.withTimeout(indexRequest.timeout());
parameters.withVersion(indexRequest.version());
parameters.withVersionType(indexRequest.versionType());
parameters.withIfSeqNo(indexRequest.ifSeqNo());
parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
parameters.withPipeline(indexRequest.getPipeline());
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards());
@ -917,6 +922,20 @@ public class RequestConverters {
return this;
}
Params withIfSeqNo(long seqNo) {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
return putParam("if_seq_no", Long.toString(seqNo));
}
return this;
}
Params withIfPrimaryTerm(long primaryTerm) {
if (primaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
return putParam("if_primary_term", Long.toString(primaryTerm));
}
return this;
}
Params withWaitForActiveShards(ActiveShardCount activeShardCount) {
return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT);
}

View File

@ -0,0 +1,93 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.util;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import java.util.HashMap;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Request;
import org.junit.Test;
/**
* Unit tests for {@link RequestConverters}.
*
* @author Roman Puchkovskiy
*/
public class RequestConvertersTest {
@Test // DATAES-652
public void shouldNotAddIfSeqNoAndIfPrimaryTermToResultIfInputDoesNotcontainThemWhenConvertingIndexRequest() {
IndexRequest request = createMinimalIndexRequest();
Request result = RequestConverters.index(request);
assertThat(result.getParameters(), not(hasKey("if_seq_no")));
assertThat(result.getParameters(), not(hasKey("if_primary_term")));
}
private IndexRequest createMinimalIndexRequest() {
IndexRequest request = new IndexRequest("the-index", "the-type", "id");
request.source(new HashMap<String, String>() {
{
put("test", "test");
}
});
return request;
}
@Test // DATAES-652
public void shouldAddIfSeqNoAndIfPrimaryTermToResultIfInputcontainsThemWhenConvertingIndexRequest() {
IndexRequest request = createMinimalIndexRequest();
request.setIfSeqNo(3);
request.setIfPrimaryTerm(4);
Request result = RequestConverters.index(request);
assertThat(result.getParameters(), hasEntry("if_seq_no", "3"));
assertThat(result.getParameters(), hasEntry("if_primary_term", "4"));
}
@Test // DATAES-652
public void shouldNotAddIfSeqNoAndIfPrimaryTermToResultIfInputDoesNotcontainThemWhenConvertingDeleteRequest() {
DeleteRequest request = createMinimalDeleteRequest();
Request result = RequestConverters.delete(request);
assertThat(result.getParameters(), not(hasKey("if_seq_no")));
assertThat(result.getParameters(), not(hasKey("if_primary_term")));
}
private DeleteRequest createMinimalDeleteRequest() {
return new DeleteRequest("the-index", "the-type", "id");
}
@Test // DATAES-652
public void shouldAddIfSeqNoAndIfPrimaryTermToResultIfInputcontainsThemWhenConvertingDeleteRequest() {
DeleteRequest request = createMinimalDeleteRequest();
request.setIfSeqNo(3);
request.setIfPrimaryTerm(4);
Request result = RequestConverters.delete(request);
assertThat(result.getParameters(), hasEntry("if_seq_no", "3"));
assertThat(result.getParameters(), hasEntry("if_primary_term", "4"));
}
}