DATAES-569 - Polishing.

Slightly tweak javadoc. Tweak StepVerifier usage. Add missing assertion. Fix generics.

Original pull request: #277.
This commit is contained in:
Mark Paluch 2019-05-07 16:36:11 +02:00
parent 2e709a72b7
commit 7e56c3f9f9
3 changed files with 41 additions and 47 deletions

View File

@ -15,7 +15,6 @@
*/ */
package org.springframework.data.elasticsearch.client.reactive; package org.springframework.data.elasticsearch.client.reactive;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -26,6 +25,7 @@ import java.util.function.Consumer;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@ -45,6 +45,7 @@ import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
@ -248,7 +249,7 @@ public interface ReactiveElasticsearchClient {
/** /**
* Gain access to index related commands. * Gain access to index related commands.
* *
* @return * @return access to index related commands.
*/ */
Indices indices(); Indices indices();
@ -788,8 +789,8 @@ public interface ReactiveElasticsearchClient {
* @param consumer never {@literal null}. * @param consumer never {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist. * does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Indices * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Indices Flush
* Flush API on elastic.co</a> * API on elastic.co</a>
*/ */
default Mono<Void> flushIndex(Consumer<FlushRequest> consumer) { default Mono<Void> flushIndex(Consumer<FlushRequest> consumer) {
@ -804,8 +805,8 @@ public interface ReactiveElasticsearchClient {
* @param flushRequest must not be {@literal null}. * @param flushRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist. * does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Indices * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Indices Flush
* Flush API on elastic.co</a> * API on elastic.co</a>
*/ */
default Mono<Void> flushIndex(FlushRequest flushRequest) { default Mono<Void> flushIndex(FlushRequest flushRequest) {
return flushIndex(HttpHeaders.EMPTY, flushRequest); return flushIndex(HttpHeaders.EMPTY, flushRequest);
@ -818,8 +819,8 @@ public interface ReactiveElasticsearchClient {
* @param flushRequest must not be {@literal null}. * @param flushRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist. * does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Indices * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Indices Flush
* Flush API on elastic.co</a> * API on elastic.co</a>
*/ */
Mono<Void> flushIndex(HttpHeaders headers, FlushRequest flushRequest); Mono<Void> flushIndex(HttpHeaders headers, FlushRequest flushRequest);
} }

View File

@ -94,6 +94,8 @@ import org.springframework.http.HttpMethod;
* (<a href="https://www.elastic.co">https://www.elastic.co</a>) licensed under the Apache License, Version 2.0. * (<a href="https://www.elastic.co">https://www.elastic.co</a>) licensed under the Apache License, Version 2.0.
* </p> * </p>
* Modified for usage with {@link ReactiveElasticsearchClient}. * Modified for usage with {@link ReactiveElasticsearchClient}.
* <p>
* Only intended for internal use.
* *
* @since 3.2 * @since 3.2
*/ */

View File

@ -42,7 +42,9 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -51,6 +53,7 @@ import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.data.elasticsearch.ElasticsearchVersion; import org.springframework.data.elasticsearch.ElasticsearchVersion;
import org.springframework.data.elasticsearch.ElasticsearchVersionRule; import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
import org.springframework.data.elasticsearch.TestUtils; import org.springframework.data.elasticsearch.TestUtils;
@ -96,6 +99,8 @@ public class ReactiveElasticsearchClientTests {
syncClient = TestUtils.restHighLevelClient(); syncClient = TestUtils.restHighLevelClient();
client = TestUtils.reactiveClient(); client = TestUtils.reactiveClient();
TestUtils.deleteIndex(INDEX_I, INDEX_II);
} }
@After @After
@ -140,8 +145,10 @@ public class ReactiveElasticsearchClientTests {
@Test // DATAES-519 @Test // DATAES-519
public void getOnNonExistingIndexShouldThrowException() { public void getOnNonExistingIndexShouldThrowException() {
client.get(new GetRequest(INDEX_I, TYPE_I, "nonono")).as(StepVerifier::create) client.get(new GetRequest(INDEX_I, TYPE_I, "nonono")) //
.expectError(ElasticsearchStatusException.class).verify(); .as(StepVerifier::create) //
.expectError(ElasticsearchStatusException.class) //
.verify();
} }
@Test // DATAES-488 @Test // DATAES-488
@ -231,13 +238,9 @@ public class ReactiveElasticsearchClientTests {
.add(INDEX_I, TYPE_I, id2); // .add(INDEX_I, TYPE_I, id2); //
client.multiGet(request) // client.multiGet(request) //
.map(GetResult::getId) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.consumeNextWith(it -> { .expectNext(id1, id2) //
assertThat(it.getId()).isEqualTo(id1);
}) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id2);
}) //
.verifyComplete(); .verifyComplete();
} }
@ -263,13 +266,9 @@ public class ReactiveElasticsearchClientTests {
.add(INDEX_II, TYPE_II, id2); .add(INDEX_II, TYPE_II, id2);
client.multiGet(request) // client.multiGet(request) //
.map(GetResult::getId) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.consumeNextWith(it -> { .expectNext(id1, id2) //
assertThat(it.getId()).isEqualTo(id1);
}) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id2);
}) //
.verifyComplete(); .verifyComplete();
} }
@ -320,10 +319,7 @@ public class ReactiveElasticsearchClientTests {
client.index(request) // client.index(request) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.consumeErrorWith(error -> { .verifyError(ElasticsearchStatusException.class);
assertThat(error).isInstanceOf(ElasticsearchStatusException.class);
}) //
.verify();
} }
@Test // DATAES-488 @Test // DATAES-488
@ -372,10 +368,7 @@ public class ReactiveElasticsearchClientTests {
client.update(request) // client.update(request) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.consumeErrorWith(error -> { .verifyError(ElasticsearchStatusException.class);
assertThat(error).isInstanceOf(ElasticsearchStatusException.class);
}) //
.verify();
} }
@Test // DATAES-488 @Test // DATAES-488
@ -447,11 +440,9 @@ public class ReactiveElasticsearchClientTests {
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("_id", id))); .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("_id", id)));
client.deleteBy(request) // client.deleteBy(request) //
.map(BulkByScrollResponse::getDeleted) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.consumeNextWith(it -> { .expectNext(1L) //
assertThat(it.getDeleted()).isEqualTo(1);
}) //
.verifyComplete(); .verifyComplete();
} }
@ -466,10 +457,9 @@ public class ReactiveElasticsearchClientTests {
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("_id", "it-was-not-me"))); .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("_id", "it-was-not-me")));
client.deleteBy(request) // client.deleteBy(request) //
.map(BulkByScrollResponse::getDeleted) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.consumeNextWith(it -> { .expectNext(0L)//
assertThat(it.getDeleted()).isEqualTo(0);
}) //
.verifyComplete(); .verifyComplete();
} }
@ -500,7 +490,8 @@ public class ReactiveElasticsearchClientTests {
request = request.scroll(TimeValue.timeValueMinutes(1)); request = request.scroll(TimeValue.timeValueMinutes(1));
client.scroll(HttpHeaders.EMPTY, request) // client.scroll(HttpHeaders.EMPTY, request) //
.take(73).as(StepVerifier::create) // .take(73) //
.as(StepVerifier::create) //
.expectNextCount(73) // .expectNextCount(73) //
.verifyComplete(); .verifyComplete();
} }
@ -534,7 +525,7 @@ public class ReactiveElasticsearchClientTests {
.as(StepVerifier::create) // .as(StepVerifier::create) //
.verifyComplete(); .verifyComplete();
syncClient.indices().exists(new GetIndexRequest().indices(INDEX_II), RequestOptions.DEFAULT); assertThat(syncClient.indices().exists(new GetIndexRequest().indices(INDEX_I), RequestOptions.DEFAULT)).isTrue();
} }
@Test // DATAES-569 @Test // DATAES-569
@ -663,15 +654,15 @@ public class ReactiveElasticsearchClientTests {
.verifyError(ElasticsearchStatusException.class); .verifyError(ElasticsearchStatusException.class);
} }
AddToIndexOfType addSourceDocument() { private AddToIndexOfType addSourceDocument() {
return add(DOC_SOURCE); return add(DOC_SOURCE);
} }
AddToIndexOfType add(Map source) { private AddToIndexOfType add(Map<String, ? extends Object> source) {
return new AddDocument(source); return new AddDocument(source);
} }
IndexRequest indexRequest(Map source, String index, String type) { private IndexRequest indexRequest(Map source, String index, String type) {
return new IndexRequest(index, type) // return new IndexRequest(index, type) //
.id(UUID.randomUUID().toString()) // .id(UUID.randomUUID().toString()) //
@ -681,7 +672,7 @@ public class ReactiveElasticsearchClientTests {
} }
@SneakyThrows @SneakyThrows
String doIndex(Map source, String index, String type) { private String doIndex(Map<?, ?> source, String index, String type) {
return syncClient.index(indexRequest(source, index, type), RequestOptions.DEFAULT).getId(); return syncClient.index(indexRequest(source, index, type), RequestOptions.DEFAULT).getId();
} }
@ -695,10 +686,10 @@ public class ReactiveElasticsearchClientTests {
class AddDocument implements AddToIndexOfType { class AddDocument implements AddToIndexOfType {
Map source; Map<String, ? extends Object> source;
@Nullable String type; @Nullable String type;
AddDocument(Map source) { AddDocument(Map<String, ? extends Object> source) {
this.source = source; this.source = source;
} }
@ -711,7 +702,7 @@ public class ReactiveElasticsearchClientTests {
@Override @Override
public String to(String index) { public String to(String index) {
return doIndex(new LinkedHashMap(source), index, type); return doIndex(new LinkedHashMap<>(source), index, type);
} }
} }