diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 99e68ab85..b32cf13fb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -58,6 +58,8 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; @@ -122,6 +124,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe * * @author Christoph Strobl * @author Mark Paluch + * @author Henrique Amaral * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -423,6 +426,16 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .publishNext(); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest) + */ + @Override + public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) { + return sendRequest(bulkRequest, RequestCreator.bulk(), BulkResponse.class, headers) // + .publishNext(); + } + // --> INDICES /* @@ -742,6 +755,18 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch }; } + static Function bulk() { + + return request -> { + + try { + return RequestConverters.bulk(request); + } catch (IOException e) { + throw new ElasticsearchException("Could not parse request", e); + } + }; + } + // --> INDICES static Function indexExists() { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 48da70294..a30dbc71d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.client.reactive; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -58,6 +60,7 @@ import org.springframework.web.reactive.function.client.WebClient; * * @author Christoph Strobl * @author Mark Paluch + * @author Henrique Amaral * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -430,6 +433,44 @@ public interface ReactiveElasticsearchClient { */ Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest); + /** + * Execute a {@link BulkRequest} against the {@literal bulk} API. + * + * @param consumer never {@literal null}. + * @see Bulk + * API on elastic.co + * @return a {@link Mono} emitting the emitting operation response. + */ + default Mono bulk(Consumer consumer) { + + BulkRequest request = new BulkRequest(); + consumer.accept(request); + return bulk(request); + } + + /** + * Execute a {@link BulkRequest} against the {@literal bulk} API. + * + * @param bulkRequest must not be {@literal null}. + * @see Bulk + * API on elastic.co + * @return a {@link Mono} emitting the emitting operation response. + */ + default Mono bulk(BulkRequest bulkRequest) { + return bulk(HttpHeaders.EMPTY, bulkRequest); + } + + /** + * Execute a {@link BulkRequest} against the {@literal bulk} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param bulkRequest must not be {@literal null}. + * @see Bulk + * API on elastic.co + * @return a {@link Mono} emitting operation response. + */ + Mono bulk(HttpHeaders headers, BulkRequest bulkRequest); + /** * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index 406957220..d505d1d0c 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -18,10 +18,12 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; import lombok.SneakyThrows; +import org.elasticsearch.action.bulk.BulkRequest; import reactor.test.StepVerifier; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -65,7 +67,7 @@ import org.springframework.test.context.junit4.SpringRunner; /** * @author Christoph Strobl * @author Mark Paluch - * @currentRead Fool's Fate - Robin Hobb + * @author Henrique Amaral */ @RunWith(SpringRunner.class) @ContextConfiguration("classpath:infrastructure.xml") @@ -653,6 +655,34 @@ public class ReactiveElasticsearchClientTests { .verifyError(ElasticsearchStatusException.class); } + @Test // DATAES-684 + public void bulkShouldUpdateExistingDocument() { + String idFirstDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I); + String idSecondDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I); + + UpdateRequest requestFirstDoc = new UpdateRequest(INDEX_I, TYPE_I, idFirstDoc) // + .doc(Collections.singletonMap("dutiful", "farseer")); + UpdateRequest requestSecondDoc = new UpdateRequest(INDEX_I, TYPE_I, idSecondDoc) // + .doc(Collections.singletonMap("secondDocUpdate", "secondDocUpdatePartTwo")); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(requestFirstDoc); + bulkRequest.add(requestSecondDoc); + + client.bulk(bulkRequest) + .as(StepVerifier::create) // + .consumeNextWith(it -> { + assertThat(it.status()).isEqualTo(RestStatus.OK); + assertThat(it.hasFailures()).isFalse(); + + Arrays.stream(it.getItems()).forEach(itemResponse-> { + assertThat(itemResponse.status()).isEqualTo(RestStatus.OK); + assertThat(itemResponse.getVersion()).isEqualTo(2); + }); + }) + .verifyComplete(); + } + private AddToIndexOfType addSourceDocument() { return add(DOC_SOURCE); } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java index dab800a52..09373c17b 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java @@ -20,6 +20,8 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.rest.RestStatus; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -51,7 +53,7 @@ import org.springframework.util.StreamUtils; /** * @author Christoph Strobl - * @currentRead Golden Fool - Robin Hobb + * @author Henrique Amaral */ public class ReactiveElasticsearchClientUnitTests { @@ -623,4 +625,26 @@ public class ReactiveElasticsearchClientUnitTests { }); } + @Test // DATAES-684 + public void bulkShouldEmitResponseCorrectly() { + + hostProvider.when(HOST) // + .receiveBulkOk(); + + final UpdateRequest updateRequest = new UpdateRequest("twitter", "doc", "1") + .doc(Collections.singletonMap("user", "cstrobl")); + final BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(updateRequest); + + client.bulk(bulkRequest) + .as(StepVerifier::create) // + .consumeNextWith(bulkResponse -> { + + assertThat(bulkResponse.status()).isEqualTo(RestStatus.OK); + assertThat(bulkResponse.hasFailures()).isFalse(); + + }) // + .verifyComplete(); + } + } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java index d0a04e2c5..a7947cdad 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java @@ -60,6 +60,7 @@ import org.springframework.web.util.UriBuilder; /** * @author Christoph Strobl + * @author Henrique Amaral */ public class ReactiveMockClientTestsUtils { @@ -361,6 +362,12 @@ public class ReactiveMockClientTestsUtils { }); } + default Receive receiveBulkOk() { + + return receiveJsonFromFile("bulk-ok") // + .receive(Receive::ok); + } + } public interface Receive { diff --git a/src/test/resources/org/springframework/data/elasticsearch/client/bulk-ok.json b/src/test/resources/org/springframework/data/elasticsearch/client/bulk-ok.json new file mode 100644 index 000000000..d782733c7 --- /dev/null +++ b/src/test/resources/org/springframework/data/elasticsearch/client/bulk-ok.json @@ -0,0 +1,23 @@ +{ + "took": 30, + "errors": false, + "items": [ + { + "update": { + "_index": "twitter", + "_type": "doc", + "_id": "1", + "_version": 2, + "result": "updated", + "_shards": { + "total": 2, + "successful": 1, + "failed": 0 + }, + "_seq_no": 2, + "_primary_term": 4 + } + } + ] + +} \ No newline at end of file