DATAES-684 - Polishing.

(cherry picked from commit 24751972a8f266281e948eaf5f73f66003d4b896)
This commit is contained in:
Peter-Josef Meisch 2019-11-05 10:05:14 +01:00
parent f82dd229d9
commit 6f0d1ee9e7
3 changed files with 14 additions and 17 deletions

View File

@ -15,8 +15,6 @@
*/ */
package org.springframework.data.elasticsearch.client.reactive; package org.springframework.data.elasticsearch.client.reactive;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -32,6 +30,8 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
@ -437,8 +437,8 @@ public interface ReactiveElasticsearchClient {
* Execute a {@link BulkRequest} against the {@literal bulk} API. * Execute a {@link BulkRequest} against the {@literal bulk} API.
* *
* @param consumer never {@literal null}. * @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on
* API on elastic.co</a> * elastic.co</a>
* @return a {@link Mono} emitting the emitting operation response. * @return a {@link Mono} emitting the emitting operation response.
*/ */
default Mono<BulkResponse> bulk(Consumer<BulkRequest> consumer) { default Mono<BulkResponse> bulk(Consumer<BulkRequest> consumer) {
@ -452,8 +452,8 @@ public interface ReactiveElasticsearchClient {
* Execute a {@link BulkRequest} against the {@literal bulk} API. * Execute a {@link BulkRequest} against the {@literal bulk} API.
* *
* @param bulkRequest must not be {@literal null}. * @param bulkRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on
* API on elastic.co</a> * elastic.co</a>
* @return a {@link Mono} emitting the emitting operation response. * @return a {@link Mono} emitting the emitting operation response.
*/ */
default Mono<BulkResponse> bulk(BulkRequest bulkRequest) { default Mono<BulkResponse> bulk(BulkRequest bulkRequest) {
@ -465,8 +465,8 @@ public interface ReactiveElasticsearchClient {
* *
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param bulkRequest must not be {@literal null}. * @param bulkRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on
* API on elastic.co</a> * elastic.co</a>
* @return a {@link Mono} emitting operation response. * @return a {@link Mono} emitting operation response.
*/ */
Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest); Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest);

View File

@ -18,7 +18,6 @@ package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.elasticsearch.action.bulk.BulkRequest;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import java.io.IOException; import java.io.IOException;
@ -34,6 +33,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetRequest;
@ -669,8 +669,7 @@ public class ReactiveElasticsearchClientTests {
bulkRequest.add(requestFirstDoc); bulkRequest.add(requestFirstDoc);
bulkRequest.add(requestSecondDoc); bulkRequest.add(requestSecondDoc);
client.bulk(bulkRequest) client.bulk(bulkRequest).as(StepVerifier::create) //
.as(StepVerifier::create) //
.consumeNextWith(it -> { .consumeNextWith(it -> {
assertThat(it.status()).isEqualTo(RestStatus.OK); assertThat(it.status()).isEqualTo(RestStatus.OK);
assertThat(it.hasFailures()).isFalse(); assertThat(it.hasFailures()).isFalse();
@ -679,8 +678,7 @@ public class ReactiveElasticsearchClientTests {
assertThat(itemResponse.status()).isEqualTo(RestStatus.OK); assertThat(itemResponse.status()).isEqualTo(RestStatus.OK);
assertThat(itemResponse.getVersion()).isEqualTo(2); assertThat(itemResponse.getVersion()).isEqualTo(2);
}); });
}) }).verifyComplete();
.verifyComplete();
} }
private AddToIndexOfType addSourceDocument() { private AddToIndexOfType addSourceDocument() {

View File

@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*; import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -31,6 +30,7 @@ import java.util.Collections;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteResponse.Result; import org.elasticsearch.action.DocWriteResponse.Result;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetRequest;
@ -636,8 +636,7 @@ public class ReactiveElasticsearchClientUnitTests {
final BulkRequest bulkRequest = new BulkRequest(); final BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(updateRequest); bulkRequest.add(updateRequest);
client.bulk(bulkRequest) client.bulk(bulkRequest).as(StepVerifier::create) //
.as(StepVerifier::create) //
.consumeNextWith(bulkResponse -> { .consumeNextWith(bulkResponse -> {
assertThat(bulkResponse.status()).isEqualTo(RestStatus.OK); assertThat(bulkResponse.status()).isEqualTo(RestStatus.OK);