mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-05-30 16:52:11 +00:00
parent
dd3d01eab6
commit
f08c34ec5d
@ -53,3 +53,10 @@ Previously the reactive code initialized this to `IMMEDIATE`, now reactive and n
|
||||
==== delete methods that take a Query
|
||||
|
||||
The reactive methods previously returned a `Mono<Long>` with the number of deleted documents, the non reactive versions were void. They now return a `Mono<ByQueryResponse>` which contains much more detailed information about the deleted documents and errors that might have occurred.
|
||||
|
||||
==== multiget methods
|
||||
|
||||
The implementations of _multiget_ previousl only returned the found entities in a `List<T>` for non-reactive implementations and in a `Flux<T>` for reactive implementations. If the request contained ids that were not found, the information that these are missing was not available. The user needed to compare the returned ids to the requested ones to find
|
||||
which ones were missing.
|
||||
|
||||
Now the `multiget` methods return a `MultiGetItem` for every requested id. This contains information about failures (like non existing indices) and the information if the item existed (then it is contained in the `MultiGetItem) or not.
|
||||
|
@ -22,6 +22,7 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter;
|
||||
import io.netty.handler.ssl.JdkSslContext;
|
||||
import io.netty.handler.timeout.ReadTimeoutHandler;
|
||||
import io.netty.handler.timeout.WriteTimeoutHandler;
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
@ -330,18 +331,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
.next();
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#multiGet(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.MultiGetRequest)
|
||||
*/
|
||||
@Override
|
||||
public Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
|
||||
public Flux<MultiGetItemResponse> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
|
||||
|
||||
return sendRequest(multiGetRequest, requestCreator.multiGet(), MultiGetResponse.class, headers)
|
||||
.map(MultiGetResponse::getResponses) //
|
||||
.flatMap(Flux::fromArray) //
|
||||
.filter(it -> !it.isFailed() && it.getResponse().isExists()) //
|
||||
.map(it -> DefaultReactiveElasticsearchClient.getResponseToGetResult(it.getResponse()));
|
||||
.flatMap(Flux::fromArray); //
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -15,11 +15,6 @@
|
||||
*/
|
||||
package org.springframework.data.elasticsearch.client.reactive;
|
||||
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.indices.GetIndexRequest;
|
||||
import org.elasticsearch.client.indices.GetMappingsRequest;
|
||||
import org.elasticsearch.client.indices.GetMappingsResponse;
|
||||
import org.elasticsearch.client.indices.PutMappingRequest;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@ -42,6 +37,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
import org.elasticsearch.action.get.MultiGetRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
@ -51,13 +47,7 @@ import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.GetAliasesResponse;
|
||||
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
|
||||
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
|
||||
import org.elasticsearch.client.indices.GetIndexResponse;
|
||||
import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
|
||||
import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
|
||||
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
|
||||
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
|
||||
import org.elasticsearch.client.indices.*;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
@ -167,9 +157,9 @@ public interface ReactiveElasticsearchClient {
|
||||
* @param consumer never {@literal null}.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html">Multi Get API on
|
||||
* elastic.co</a>
|
||||
* @return the {@link Flux} emitting the {@link GetResult result}.
|
||||
* @return the {@link Flux} emitting the {@link MultiGetItemResponse result}.
|
||||
*/
|
||||
default Flux<GetResult> multiGet(Consumer<MultiGetRequest> consumer) {
|
||||
default Flux<MultiGetItemResponse> multiGet(Consumer<MultiGetRequest> consumer) {
|
||||
|
||||
MultiGetRequest request = new MultiGetRequest();
|
||||
consumer.accept(request);
|
||||
@ -183,9 +173,9 @@ public interface ReactiveElasticsearchClient {
|
||||
* @param multiGetRequest must not be {@literal null}.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html">Multi Get API on
|
||||
* elastic.co</a>
|
||||
* @return the {@link Flux} emitting the {@link GetResult result}.
|
||||
* @return the {@link Flux} emitting the {@link MultiGetItemResponse result}.
|
||||
*/
|
||||
default Flux<GetResult> multiGet(MultiGetRequest multiGetRequest) {
|
||||
default Flux<MultiGetItemResponse> multiGet(MultiGetRequest multiGetRequest) {
|
||||
return multiGet(HttpHeaders.EMPTY, multiGetRequest);
|
||||
}
|
||||
|
||||
@ -197,9 +187,9 @@ public interface ReactiveElasticsearchClient {
|
||||
* @param multiGetRequest must not be {@literal null}.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html">Multi Get API on
|
||||
* elastic.co</a>
|
||||
* @return the {@link Flux} emitting the {@link GetResult result}.
|
||||
* @return the {@link Flux} emitting the {@link MultiGetItemResponse result}.
|
||||
*/
|
||||
Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest);
|
||||
Flux<MultiGetItemResponse> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest);
|
||||
|
||||
/**
|
||||
* Checks for the existence of a document. Emits {@literal true} if it exists, {@literal false} otherwise.
|
||||
@ -748,31 +738,32 @@ public interface ReactiveElasticsearchClient {
|
||||
interface Indices {
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.get.GetIndexRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.get.GetIndexRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param consumer never {@literal null}.
|
||||
* @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html"> Indices
|
||||
* Exists API on elastic.co</a>
|
||||
* @deprecated since 4.2
|
||||
* @deprecated since 4.2
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> existsIndex(Consumer<org.elasticsearch.action.admin.indices.get.GetIndexRequest> consumer) {
|
||||
|
||||
org.elasticsearch.action.admin.indices.get.GetIndexRequest request =
|
||||
new org.elasticsearch.action.admin.indices.get.GetIndexRequest();
|
||||
org.elasticsearch.action.admin.indices.get.GetIndexRequest request = new org.elasticsearch.action.admin.indices.get.GetIndexRequest();
|
||||
consumer.accept(request);
|
||||
return existsIndex(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.get.GetIndexRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.get.GetIndexRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param getIndexRequest must not be {@literal null}.
|
||||
* @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html"> Indices
|
||||
* Exists API on elastic.co</a>
|
||||
* @deprecated since 4.2, use {@link #existsIndex(GetIndexRequest)}
|
||||
* @deprecated since 4.2, use {@link #existsIndex(GetIndexRequest)}
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> existsIndex(org.elasticsearch.action.admin.indices.get.GetIndexRequest getIndexRequest) {
|
||||
@ -780,42 +771,44 @@ public interface ReactiveElasticsearchClient {
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.get.GetIndexRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.get.GetIndexRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
|
||||
* @param getIndexRequest must not be {@literal null}.
|
||||
* @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html"> Indices
|
||||
* Exists API on elastic.co</a>
|
||||
* @deprecated since 4.2, use {@link #existsIndex(HttpHeaders, GetIndexRequest)}
|
||||
* @deprecated since 4.2, use {@link #existsIndex(HttpHeaders, GetIndexRequest)}
|
||||
*/
|
||||
@Deprecated
|
||||
Mono<Boolean> existsIndex(HttpHeaders headers, org.elasticsearch.action.admin.indices.get.GetIndexRequest getIndexRequest);
|
||||
Mono<Boolean> existsIndex(HttpHeaders headers,
|
||||
org.elasticsearch.action.admin.indices.get.GetIndexRequest getIndexRequest);
|
||||
|
||||
/**
|
||||
* Execute the given {@link GetIndexRequest} against the {@literal indices} API.
|
||||
*
|
||||
* @param getIndexRequest must not be {@literal null}.
|
||||
* @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html"> Indices
|
||||
* Exists API on elastic.co</a>
|
||||
* @since 4.2
|
||||
*/
|
||||
default Mono<Boolean> existsIndex(GetIndexRequest getIndexRequest) {
|
||||
return existsIndex(HttpHeaders.EMPTY, getIndexRequest);
|
||||
}
|
||||
/**
|
||||
* Execute the given {@link GetIndexRequest} against the {@literal indices} API.
|
||||
*
|
||||
* @param getIndexRequest must not be {@literal null}.
|
||||
* @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html"> Indices
|
||||
* Exists API on elastic.co</a>
|
||||
* @since 4.2
|
||||
*/
|
||||
default Mono<Boolean> existsIndex(GetIndexRequest getIndexRequest) {
|
||||
return existsIndex(HttpHeaders.EMPTY, getIndexRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link GetIndexRequest} against the {@literal indices} API.
|
||||
*
|
||||
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
|
||||
* @param getIndexRequest must not be {@literal null}.
|
||||
* @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html"> Indices
|
||||
* Exists API on elastic.co</a>
|
||||
* @since 4.2
|
||||
*/
|
||||
Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest getIndexRequest);
|
||||
/**
|
||||
* Execute the given {@link GetIndexRequest} against the {@literal indices} API.
|
||||
*
|
||||
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
|
||||
* @param getIndexRequest must not be {@literal null}.
|
||||
* @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise.
|
||||
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html"> Indices
|
||||
* Exists API on elastic.co</a>
|
||||
* @since 4.2
|
||||
*/
|
||||
Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest getIndexRequest);
|
||||
|
||||
/**
|
||||
* Execute the given {@link DeleteIndexRequest} against the {@literal indices} API.
|
||||
@ -859,7 +852,8 @@ public interface ReactiveElasticsearchClient {
|
||||
Mono<Boolean> deleteIndex(HttpHeaders headers, DeleteIndexRequest deleteIndexRequest);
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.create.CreateIndexRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.create.CreateIndexRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param consumer never {@literal null}.
|
||||
* @return a {@link Mono} signalling successful operation completion or an {@link Mono#error(Throwable) error} if
|
||||
@ -869,16 +863,17 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.2
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> createIndex(Consumer<org.elasticsearch.action.admin.indices.create.CreateIndexRequest> consumer) {
|
||||
default Mono<Boolean> createIndex(
|
||||
Consumer<org.elasticsearch.action.admin.indices.create.CreateIndexRequest> consumer) {
|
||||
|
||||
org.elasticsearch.action.admin.indices.create.CreateIndexRequest request =
|
||||
new org.elasticsearch.action.admin.indices.create.CreateIndexRequest();
|
||||
org.elasticsearch.action.admin.indices.create.CreateIndexRequest request = new org.elasticsearch.action.admin.indices.create.CreateIndexRequest();
|
||||
consumer.accept(request);
|
||||
return createIndex(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.create.CreateIndexRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.create.CreateIndexRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param createIndexRequest must not be {@literal null}.
|
||||
* @return a {@link Mono} signalling successful operation completion or an {@link Mono#error(Throwable) error} if
|
||||
@ -888,12 +883,14 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.2, use {@link #createIndex(CreateIndexRequest)}
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> createIndex(org.elasticsearch.action.admin.indices.create.CreateIndexRequest createIndexRequest) {
|
||||
default Mono<Boolean> createIndex(
|
||||
org.elasticsearch.action.admin.indices.create.CreateIndexRequest createIndexRequest) {
|
||||
return createIndex(HttpHeaders.EMPTY, createIndexRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.create.CreateIndexRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.create.CreateIndexRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
|
||||
* @param createIndexRequest must not be {@literal null}.
|
||||
@ -904,7 +901,8 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.2, use {@link #createIndex(HttpHeaders, CreateIndexRequest)}
|
||||
*/
|
||||
@Deprecated
|
||||
Mono<Boolean> createIndex(HttpHeaders headers, org.elasticsearch.action.admin.indices.create.CreateIndexRequest createIndexRequest);
|
||||
Mono<Boolean> createIndex(HttpHeaders headers,
|
||||
org.elasticsearch.action.admin.indices.create.CreateIndexRequest createIndexRequest);
|
||||
|
||||
/**
|
||||
* Execute the given {@link CreateIndexRequest} against the {@literal indices} API.
|
||||
@ -1057,7 +1055,8 @@ public interface ReactiveElasticsearchClient {
|
||||
Mono<Void> refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest);
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param consumer never {@literal null}.
|
||||
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
|
||||
@ -1067,12 +1066,14 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.1, use {@link #putMapping(Consumer)}
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> updateMapping(Consumer<org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest> consumer) {
|
||||
default Mono<Boolean> updateMapping(
|
||||
Consumer<org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest> consumer) {
|
||||
return putMapping(consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param putMappingRequest must not be {@literal null}.
|
||||
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
|
||||
@ -1082,12 +1083,14 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.1, use {@link #putMapping(PutMappingRequest)}
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> updateMapping(org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequest) {
|
||||
default Mono<Boolean> updateMapping(
|
||||
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequest) {
|
||||
return putMapping(putMappingRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
|
||||
* @param putMappingRequest must not be {@literal null}.
|
||||
@ -1098,12 +1101,14 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.1, use {@link #putMapping(HttpHeaders, PutMappingRequest)}
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> updateMapping(HttpHeaders headers, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequest) {
|
||||
default Mono<Boolean> updateMapping(HttpHeaders headers,
|
||||
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequest) {
|
||||
return putMapping(headers, putMappingRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param consumer never {@literal null}.
|
||||
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
|
||||
@ -1113,16 +1118,17 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.2
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> putMapping(Consumer<org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest> consumer) {
|
||||
default Mono<Boolean> putMapping(
|
||||
Consumer<org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest> consumer) {
|
||||
|
||||
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest request =
|
||||
new org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest();
|
||||
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest request = new org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest();
|
||||
consumer.accept(request);
|
||||
return putMapping(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param putMappingRequest must not be {@literal null}.
|
||||
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
|
||||
@ -1132,12 +1138,14 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.2, use {@link #putMapping(PutMappingRequest)}
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<Boolean> putMapping(org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequest) {
|
||||
default Mono<Boolean> putMapping(
|
||||
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequest) {
|
||||
return putMapping(HttpHeaders.EMPTY, putMappingRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
|
||||
* @param putMappingRequest must not be {@literal null}.
|
||||
@ -1148,7 +1156,8 @@ public interface ReactiveElasticsearchClient {
|
||||
* @deprecated since 4.2, use {@link #putMapping(HttpHeaders, PutMappingRequest)}
|
||||
*/
|
||||
@Deprecated
|
||||
Mono<Boolean> putMapping(HttpHeaders headers, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequest);
|
||||
Mono<Boolean> putMapping(HttpHeaders headers,
|
||||
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequest);
|
||||
|
||||
/**
|
||||
* Execute the given {@link PutMappingRequest} against the {@literal indices} API.
|
||||
@ -1263,7 +1272,8 @@ public interface ReactiveElasticsearchClient {
|
||||
Mono<GetSettingsResponse> getSettings(HttpHeaders headers, GetSettingsRequest getSettingsRequest);
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param consumer never {@literal null}.
|
||||
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
|
||||
@ -1275,16 +1285,16 @@ public interface ReactiveElasticsearchClient {
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse> getMapping(
|
||||
Consumer<org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest> consumer) {
|
||||
Consumer<org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest> consumer) {
|
||||
|
||||
org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest request =
|
||||
new org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest();
|
||||
org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest request = new org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest();
|
||||
consumer.accept(request);
|
||||
return getMapping(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param getMappingsRequest must not be {@literal null}.
|
||||
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
|
||||
@ -1296,12 +1306,13 @@ public interface ReactiveElasticsearchClient {
|
||||
*/
|
||||
@Deprecated
|
||||
default Mono<org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse> getMapping(
|
||||
org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest getMappingsRequest) {
|
||||
org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest getMappingsRequest) {
|
||||
return getMapping(HttpHeaders.EMPTY, getMappingsRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest} against the {@literal indices} API.
|
||||
* Execute the given {@link org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest} against the
|
||||
* {@literal indices} API.
|
||||
*
|
||||
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
|
||||
* @param getMappingsRequest must not be {@literal null}.
|
||||
@ -1314,7 +1325,7 @@ public interface ReactiveElasticsearchClient {
|
||||
*/
|
||||
@Deprecated
|
||||
Mono<org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse> getMapping(HttpHeaders headers,
|
||||
org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest getMappingsRequest);
|
||||
org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest getMappingsRequest);
|
||||
|
||||
/**
|
||||
* Execute the given {@link GetMappingsRequest} against the {@literal indices} API.
|
||||
|
@ -250,7 +250,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<T> multiGet(Query query, Class<T> clazz) {
|
||||
public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz) {
|
||||
return multiGet(query, clazz, getIndexCoordinatesFor(clazz));
|
||||
}
|
||||
|
||||
|
@ -120,10 +120,10 @@ public interface DocumentOperations {
|
||||
*
|
||||
* @param query the query defining the ids of the objects to get
|
||||
* @param clazz the type of the object to be returned
|
||||
* @return list of objects, contains null values for ids that are not found
|
||||
* @return list of {@link MultiGetItem}s
|
||||
* @since 4.1
|
||||
*/
|
||||
<T> List<T> multiGet(Query query, Class<T> clazz);
|
||||
<T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Execute a multiGet against elasticsearch for the given ids.
|
||||
@ -131,9 +131,9 @@ public interface DocumentOperations {
|
||||
* @param query the query defining the ids of the objects to get
|
||||
* @param clazz the type of the object to be returned
|
||||
* @param index the index(es) from which the objects are read.
|
||||
* @return list of objects, contains null values for ids that are not found
|
||||
* @return list of {@link MultiGetItem}s
|
||||
*/
|
||||
<T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index);
|
||||
<T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index);
|
||||
|
||||
/**
|
||||
* Check if an entity with given {@literal id} exists.
|
||||
|
@ -168,7 +168,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
|
||||
public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
|
||||
|
||||
Assert.notNull(index, "index must not be null");
|
||||
Assert.notEmpty(query.getIds(), "No Id defined for Query");
|
||||
@ -177,7 +177,10 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
|
||||
MultiGetResponse result = execute(client -> client.mget(request, RequestOptions.DEFAULT));
|
||||
|
||||
DocumentCallback<T> callback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
|
||||
return DocumentAdapters.from(result).stream().map(callback::doWith).collect(Collectors.toList());
|
||||
return DocumentAdapters.from(result).stream() //
|
||||
.map(multiGetItem -> MultiGetItem.of( //
|
||||
multiGetItem.isFailed() ? null : callback.doWith(multiGetItem.getItem()), multiGetItem.getFailure())) //
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -42,7 +42,6 @@ import org.elasticsearch.search.suggest.SuggestBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
|
||||
import org.springframework.data.elasticsearch.core.document.Document;
|
||||
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
|
||||
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
|
||||
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
|
||||
@ -190,7 +189,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
|
||||
public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
|
||||
|
||||
Assert.notNull(index, "index must not be null");
|
||||
Assert.notEmpty(query.getIds(), "No Ids defined for Query");
|
||||
@ -198,8 +197,11 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
|
||||
MultiGetRequestBuilder builder = requestFactory.multiGetRequestBuilder(client, query, clazz, index);
|
||||
|
||||
DocumentCallback<T> callback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
|
||||
List<Document> documents = DocumentAdapters.from(builder.execute().actionGet());
|
||||
return documents.stream().map(callback::doWith).collect(Collectors.toList());
|
||||
|
||||
return DocumentAdapters.from(builder.execute().actionGet()).stream() //
|
||||
.map(multiGetItem -> MultiGetItem.of(multiGetItem.isFailed() ? null : callback.doWith(multiGetItem.getItem()),
|
||||
multiGetItem.getFailure()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Copyright 2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.elasticsearch.core;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
/**
|
||||
* Response object for items returned from multiget requests, encapsulating the returned data and potential error
|
||||
* information.
|
||||
*
|
||||
* @param <T> the entity type
|
||||
* @author Peter-Josef Meisch
|
||||
* @since 4.2
|
||||
*/
|
||||
public class MultiGetItem<T> {
|
||||
@Nullable private final T item;
|
||||
@Nullable private final Failure failure;
|
||||
|
||||
private MultiGetItem(@Nullable T item, @Nullable Failure failure) {
|
||||
this.item = item;
|
||||
this.failure = failure;
|
||||
}
|
||||
|
||||
public static <T> MultiGetItem<T> of(@Nullable T item, @Nullable Failure failure) {
|
||||
return new MultiGetItem<>(item, failure);
|
||||
}
|
||||
|
||||
public boolean hasItem() {
|
||||
return item != null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public T getItem() {
|
||||
return item;
|
||||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
return failure != null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Failure getFailure() {
|
||||
return failure;
|
||||
}
|
||||
|
||||
public static class Failure {
|
||||
@Nullable private final String index;
|
||||
@Nullable private final String type;
|
||||
@Nullable private final String id;
|
||||
@Nullable private final Exception exception;
|
||||
|
||||
private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception exception) {
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
public static Failure of(String index, String type, String id, Exception exception) {
|
||||
return new Failure(index, type, id, exception);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Exception getException() {
|
||||
return exception;
|
||||
}
|
||||
}
|
||||
}
|
@ -147,10 +147,10 @@ public interface ReactiveDocumentOperations {
|
||||
*
|
||||
* @param query the query defining the ids of the objects to get
|
||||
* @param clazz the type of the object to be returned, used to determine the index
|
||||
* @return flux with list of nullable objects
|
||||
* @return flux with list of {@link MultiGetItem}s that contain the entities
|
||||
* @since 4.1
|
||||
*/
|
||||
<T> Flux<T> multiGet(Query query, Class<T> clazz);
|
||||
<T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz);
|
||||
|
||||
/**
|
||||
* Execute a multiGet against elasticsearch for the given ids.
|
||||
@ -158,10 +158,10 @@ public interface ReactiveDocumentOperations {
|
||||
* @param query the query defining the ids of the objects to get
|
||||
* @param clazz the type of the object to be returned
|
||||
* @param index the index(es) from which the objects are read.
|
||||
* @return flux with list of nullable objects
|
||||
* @return flux with list of {@link MultiGetItem}s that contain the entities
|
||||
* @since 4.0
|
||||
*/
|
||||
<T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index);
|
||||
<T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index);
|
||||
|
||||
/**
|
||||
* Bulk update all objects. Will do update.
|
||||
|
@ -71,10 +71,10 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste
|
||||
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
|
||||
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
|
||||
import org.springframework.data.elasticsearch.core.query.BulkOptions;
|
||||
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
|
||||
import org.springframework.data.elasticsearch.core.query.IndexQuery;
|
||||
import org.springframework.data.elasticsearch.core.query.Query;
|
||||
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
|
||||
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
|
||||
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
|
||||
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
|
||||
import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver;
|
||||
@ -298,12 +298,12 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> multiGet(Query query, Class<T> clazz) {
|
||||
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz) {
|
||||
return multiGet(query, clazz, getIndexCoordinatesFor(clazz));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
|
||||
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
|
||||
|
||||
Assert.notNull(index, "Index must not be null");
|
||||
Assert.notNull(clazz, "Class must not be null");
|
||||
@ -314,7 +314,12 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
|
||||
|
||||
MultiGetRequest request = requestFactory.multiGetRequest(query, clazz, index);
|
||||
return Flux.from(execute(client -> client.multiGet(request))) //
|
||||
.concatMap(result -> callback.toEntity(DocumentAdapters.from(result)));
|
||||
.map(DocumentAdapters::from) //
|
||||
.flatMap(multiGetItem -> multiGetItem.isFailed() //
|
||||
? Mono.just(MultiGetItem.of(null, multiGetItem.getFailure())) //
|
||||
: callback.toEntity(multiGetItem.getItem())
|
||||
.map((T item) -> MultiGetItem.of(item, multiGetItem.getFailure())) //
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -27,6 +27,8 @@ import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.client.indices.GetIndexResponse;
|
||||
import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
|
||||
import org.elasticsearch.client.indices.IndexTemplateMetadata;
|
||||
@ -213,67 +215,77 @@ public class ResponseConverter {
|
||||
|
||||
// endregion
|
||||
|
||||
//region templates
|
||||
@Nullable
|
||||
public static TemplateData getTemplateData(GetIndexTemplatesResponse getIndexTemplatesResponse, String templateName) {
|
||||
for (IndexTemplateMetadata indexTemplateMetadata : getIndexTemplatesResponse.getIndexTemplates()) {
|
||||
// region templates
|
||||
@Nullable
|
||||
public static TemplateData getTemplateData(GetIndexTemplatesResponse getIndexTemplatesResponse, String templateName) {
|
||||
for (IndexTemplateMetadata indexTemplateMetadata : getIndexTemplatesResponse.getIndexTemplates()) {
|
||||
|
||||
if (indexTemplateMetadata.name().equals(templateName)) {
|
||||
if (indexTemplateMetadata.name().equals(templateName)) {
|
||||
|
||||
Document settings = Document.create();
|
||||
Settings templateSettings = indexTemplateMetadata.settings();
|
||||
templateSettings.keySet().forEach(key -> settings.put(key, templateSettings.get(key)));
|
||||
Document settings = Document.create();
|
||||
Settings templateSettings = indexTemplateMetadata.settings();
|
||||
templateSettings.keySet().forEach(key -> settings.put(key, templateSettings.get(key)));
|
||||
|
||||
Map<String, AliasData> aliases = new LinkedHashMap<>();
|
||||
Map<String, AliasData> aliases = new LinkedHashMap<>();
|
||||
|
||||
ImmutableOpenMap<String, AliasMetadata> aliasesResponse = indexTemplateMetadata.aliases();
|
||||
Iterator<String> keysIt = aliasesResponse.keysIt();
|
||||
while (keysIt.hasNext()) {
|
||||
String key = keysIt.next();
|
||||
aliases.put(key, ResponseConverter.toAliasData(aliasesResponse.get(key)));
|
||||
}
|
||||
ImmutableOpenMap<String, AliasMetadata> aliasesResponse = indexTemplateMetadata.aliases();
|
||||
Iterator<String> keysIt = aliasesResponse.keysIt();
|
||||
while (keysIt.hasNext()) {
|
||||
String key = keysIt.next();
|
||||
aliases.put(key, ResponseConverter.toAliasData(aliasesResponse.get(key)));
|
||||
}
|
||||
|
||||
return TemplateData.builder()
|
||||
.withIndexPatterns(indexTemplateMetadata.patterns().toArray(new String[0])) //
|
||||
.withSettings(settings) //
|
||||
.withMapping(Document.from(indexTemplateMetadata.mappings().getSourceAsMap())) //
|
||||
.withAliases(aliases) //
|
||||
.withOrder(indexTemplateMetadata.order()) //
|
||||
.withVersion(indexTemplateMetadata.version()).build();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
//endregion
|
||||
return TemplateData.builder().withIndexPatterns(indexTemplateMetadata.patterns().toArray(new String[0])) //
|
||||
.withSettings(settings) //
|
||||
.withMapping(Document.from(indexTemplateMetadata.mappings().getSourceAsMap())) //
|
||||
.withAliases(aliases) //
|
||||
.withOrder(indexTemplateMetadata.order()) //
|
||||
.withVersion(indexTemplateMetadata.version()).build();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
// endregion
|
||||
|
||||
//region settings
|
||||
/**
|
||||
* extract the index settings information for a given index
|
||||
*
|
||||
* @param response the Elasticsearch response
|
||||
* @param indexName the index name
|
||||
* @return settings as {@link Document}
|
||||
*/
|
||||
public static Document fromSettingsResponse(GetSettingsResponse response, String indexName) {
|
||||
// region settings
|
||||
/**
|
||||
* extract the index settings information for a given index
|
||||
*
|
||||
* @param response the Elasticsearch response
|
||||
* @param indexName the index name
|
||||
* @return settings as {@link Document}
|
||||
*/
|
||||
public static Document fromSettingsResponse(GetSettingsResponse response, String indexName) {
|
||||
|
||||
Document settings = Document.create();
|
||||
Document settings = Document.create();
|
||||
|
||||
if (!response.getIndexToDefaultSettings().isEmpty()) {
|
||||
Settings defaultSettings = response.getIndexToDefaultSettings().get(indexName);
|
||||
for (String key : defaultSettings.keySet()) {
|
||||
settings.put(key, defaultSettings.get(key));
|
||||
}
|
||||
}
|
||||
if (!response.getIndexToDefaultSettings().isEmpty()) {
|
||||
Settings defaultSettings = response.getIndexToDefaultSettings().get(indexName);
|
||||
for (String key : defaultSettings.keySet()) {
|
||||
settings.put(key, defaultSettings.get(key));
|
||||
}
|
||||
}
|
||||
|
||||
if (!response.getIndexToSettings().isEmpty()) {
|
||||
Settings customSettings = response.getIndexToSettings().get(indexName);
|
||||
for (String key : customSettings.keySet()) {
|
||||
settings.put(key, customSettings.get(key));
|
||||
}
|
||||
}
|
||||
if (!response.getIndexToSettings().isEmpty()) {
|
||||
Settings customSettings = response.getIndexToSettings().get(indexName);
|
||||
for (String key : customSettings.keySet()) {
|
||||
settings.put(key, customSettings.get(key));
|
||||
}
|
||||
}
|
||||
|
||||
return settings;
|
||||
}
|
||||
//endregion
|
||||
return settings;
|
||||
}
|
||||
// endregion
|
||||
|
||||
// region multiget
|
||||
|
||||
@Nullable
|
||||
public static MultiGetItem.Failure getFailure(MultiGetItemResponse itemResponse) {
|
||||
|
||||
MultiGetResponse.Failure responseFailure = itemResponse.getFailure();
|
||||
return responseFailure != null ? MultiGetItem.Failure.of(responseFailure.getIndex(), responseFailure.getType(),
|
||||
responseFailure.getId(), responseFailure.getFailure()) : null;
|
||||
}
|
||||
// endregion
|
||||
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import java.util.function.BiConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.document.DocumentField;
|
||||
@ -39,6 +40,8 @@ import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.springframework.data.elasticsearch.core.MultiGetItem;
|
||||
import org.springframework.data.elasticsearch.core.ResponseConverter;
|
||||
import org.springframework.data.mapping.MappingException;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
@ -126,21 +129,32 @@ public class DocumentAdapters {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a List of {@link Document}s from {@link MultiGetResponse}.
|
||||
* Creates a List of {@link MultiGetItem<Document>}s from {@link MultiGetResponse}.
|
||||
*
|
||||
* @param source the source {@link MultiGetResponse}, not {@literal null}.
|
||||
* @return a list of Documents, contains null values for not found Documents.
|
||||
*/
|
||||
public static List<Document> from(MultiGetResponse source) {
|
||||
public static List<MultiGetItem<Document>> from(MultiGetResponse source) {
|
||||
|
||||
Assert.notNull(source, "MultiGetResponse must not be null");
|
||||
|
||||
// noinspection ReturnOfNull
|
||||
return Arrays.stream(source.getResponses()) //
|
||||
.map(itemResponse -> itemResponse.isFailed() ? null : DocumentAdapters.from(itemResponse.getResponse())) //
|
||||
.map(DocumentAdapters::from) //
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link MultiGetItem<Document>} from a {@link MultiGetItemResponse}.
|
||||
*
|
||||
* @param itemResponse the response, must not be {@literal null}
|
||||
* @return the MultiGetItem
|
||||
*/
|
||||
public static MultiGetItem<Document> from(MultiGetItemResponse itemResponse) {
|
||||
|
||||
MultiGetItem.Failure failure = ResponseConverter.getFailure(itemResponse);
|
||||
return MultiGetItem.of(itemResponse.isFailed() ? null : DocumentAdapters.from(itemResponse.getResponse()), failure);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link SearchDocument} from {@link SearchHit}.
|
||||
* <p>
|
||||
|
@ -34,6 +34,7 @@ import org.springframework.data.domain.Sort;
|
||||
import org.springframework.data.elasticsearch.core.AbstractElasticsearchTemplate;
|
||||
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
|
||||
import org.springframework.data.elasticsearch.core.IndexOperations;
|
||||
import org.springframework.data.elasticsearch.core.MultiGetItem;
|
||||
import org.springframework.data.elasticsearch.core.RefreshPolicy;
|
||||
import org.springframework.data.elasticsearch.core.SearchHit;
|
||||
import org.springframework.data.elasticsearch.core.SearchHitSupport;
|
||||
@ -154,13 +155,14 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
|
||||
return result;
|
||||
}
|
||||
|
||||
List<T> multiGetEntities = execute(operations -> operations.multiGet(idQuery, entityClass, getIndexCoordinates()));
|
||||
List<MultiGetItem<T>> multiGetItems = execute(
|
||||
operations -> operations.multiGet(idQuery, entityClass, getIndexCoordinates()));
|
||||
|
||||
if (multiGetEntities != null) {
|
||||
multiGetEntities.forEach(entity -> {
|
||||
if (multiGetItems != null) {
|
||||
multiGetItems.forEach(multiGetItem -> {
|
||||
|
||||
if (entity != null) {
|
||||
result.add(entity);
|
||||
if (multiGetItem.hasItem()) {
|
||||
result.add(multiGetItem.getItem());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.data.elasticsearch.core.MultiGetItem;
|
||||
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
|
||||
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
|
||||
import org.springframework.data.elasticsearch.core.ReactiveIndexOperations;
|
||||
@ -161,9 +162,10 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
|
||||
.collectList() //
|
||||
.map(ids -> new NativeSearchQueryBuilder().withIds(ids).build()) //
|
||||
.flatMapMany(query -> {
|
||||
|
||||
IndexCoordinates index = entityInformation.getIndexCoordinates();
|
||||
return operations.multiGet(query, entityInformation.getJavaType(), index);
|
||||
return operations.multiGet(query, entityInformation.getJavaType(), index) //
|
||||
.filter(MultiGetItem::hasItem) //
|
||||
.map(MultiGetItem::getItem);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -18,10 +18,6 @@ package org.springframework.data.elasticsearch.client.reactive;
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.indices.GetIndexRequest;
|
||||
import org.elasticsearch.client.indices.GetMappingsRequest;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
@ -45,8 +41,10 @@ import org.elasticsearch.action.get.MultiGetRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.indices.GetIndexRequest;
|
||||
import org.elasticsearch.client.indices.GetMappingsRequest;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
@ -197,7 +195,7 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-488
|
||||
@Test // DATAES-488, #1678
|
||||
public void multiGetShouldReturnAllDocumentsFromSameCollection() {
|
||||
|
||||
String id1 = addSourceDocument().to(INDEX_I);
|
||||
@ -209,12 +207,18 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
|
||||
client.multiGet(request) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo(id1)) //
|
||||
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo(id2)) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.getResponse().isExists()).isTrue(); //
|
||||
assertThat(it.getId()).isEqualTo(id1); //
|
||||
}) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.getResponse().isExists()).isTrue(); //
|
||||
assertThat(it.getId()).isEqualTo(id2); //
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-488
|
||||
@Test // DATAES-488, #1678
|
||||
public void multiGetShouldReturnAllExistingDocumentsFromSameCollection() {
|
||||
|
||||
String id1 = addSourceDocument().to(INDEX_I);
|
||||
@ -226,12 +230,20 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
|
||||
client.multiGet(request) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo(id1)) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.isFailed()).isFalse(); //
|
||||
assertThat(it.getResponse().isExists()).isTrue(); //
|
||||
assertThat(it.getId()).isEqualTo(id1); //
|
||||
}) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.isFailed()).isFalse(); //
|
||||
assertThat(it.getResponse().isExists()).isFalse(); //
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-488
|
||||
public void multiGetShouldSkipNonExistingDocuments() {
|
||||
@Test // DATAES-488, #1678
|
||||
public void multiGetShouldNotSkipNonExistingDocuments() {
|
||||
|
||||
String id1 = addSourceDocument().to(INDEX_I);
|
||||
String id2 = addSourceDocument().to(INDEX_I);
|
||||
@ -242,9 +254,21 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
.add(INDEX_I, id2); //
|
||||
|
||||
client.multiGet(request) //
|
||||
.map(GetResult::getId) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(id1, id2) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.isFailed()).isFalse(); //
|
||||
assertThat(it.getResponse().isExists()).isTrue(); //
|
||||
assertThat(it.getId()).isEqualTo(id1); //
|
||||
}) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.isFailed()).isFalse(); //
|
||||
assertThat(it.getResponse().isExists()).isFalse();
|
||||
}) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.isFailed()).isFalse(); //
|
||||
assertThat(it.getResponse().isExists()).isTrue(); //
|
||||
assertThat(it.getId()).isEqualTo(id2); //
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@ -257,10 +281,12 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
client.multiGet(new MultiGetRequest() //
|
||||
.add(INDEX_II, id1).add(INDEX_II, id2)) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> assertThat(it.isFailed()).isTrue()) //
|
||||
.consumeNextWith(it -> assertThat(it.isFailed()).isTrue()) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-488
|
||||
@Test // DATAES-488, #1678
|
||||
public void multiGetShouldReturnAllExistingDocumentsFromDifferentCollection() {
|
||||
|
||||
String id1 = addSourceDocument().to(INDEX_I);
|
||||
@ -271,10 +297,16 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
.add(INDEX_II, id2);
|
||||
|
||||
client.multiGet(request) //
|
||||
.map(GetResult::getId) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(id1, id2) //
|
||||
.verifyComplete();
|
||||
.consumeNextWith(it -> { //
|
||||
assertThat(it.isFailed()).isFalse(); //
|
||||
assertThat(it.getId()).isEqualTo(id1); //
|
||||
}) //
|
||||
.consumeNextWith(it -> { //
|
||||
assertThat(it.isFailed()).isFalse(); //
|
||||
assertThat(it.getId()).isEqualTo(id2); //
|
||||
}) //
|
||||
.verifyComplete(); //
|
||||
}
|
||||
|
||||
@Test // DATAES-488
|
||||
@ -587,27 +619,27 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // #1658
|
||||
public void indexExistsShouldReturnTrueIfExists() {
|
||||
@Test // #1658
|
||||
public void indexExistsShouldReturnTrueIfExists() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
client.indices().existsIndex(new GetIndexRequest(INDEX_I)) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(true) //
|
||||
.verifyComplete();
|
||||
}
|
||||
client.indices().existsIndex(new GetIndexRequest(INDEX_I)) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(true) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // #1658
|
||||
public void indexExistsShouldReturnFalseIfNotExists() {
|
||||
@Test // #1658
|
||||
public void indexExistsShouldReturnFalseIfNotExists() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
client.indices().existsIndex(new GetIndexRequest(INDEX_II)) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(false) //
|
||||
.verifyComplete();
|
||||
}
|
||||
client.indices().existsIndex(new GetIndexRequest(INDEX_II)) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(false) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-569, DATAES-678
|
||||
public void createIndex() {
|
||||
@ -633,54 +665,47 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
|
||||
@Test // #1658
|
||||
public void createIndex_() {
|
||||
@Test // #1658
|
||||
public void createIndex_() {
|
||||
|
||||
client.indices().createIndex(new CreateIndexRequest(INDEX_I))
|
||||
.as(StepVerifier::create)
|
||||
.expectNext(true)
|
||||
.verifyComplete();
|
||||
client.indices().createIndex(new CreateIndexRequest(INDEX_I)).as(StepVerifier::create).expectNext(true)
|
||||
.verifyComplete();
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).exists() //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(true) //
|
||||
.verifyComplete();
|
||||
}
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).exists() //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(true) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // #1658
|
||||
public void createExistingIndexErrors_() {
|
||||
@Test // #1658
|
||||
public void createExistingIndexErrors_() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
client.indices().createIndex(new CreateIndexRequest(INDEX_I)) //
|
||||
.as(StepVerifier::create) //
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
client.indices().createIndex(new CreateIndexRequest(INDEX_I)) //
|
||||
.as(StepVerifier::create) //
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
|
||||
@Test // #1658
|
||||
public void getIndex() {
|
||||
@Test // #1658
|
||||
public void getIndex() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
client.indices().getIndex(new GetIndexRequest(INDEX_I))
|
||||
.as(StepVerifier::create)
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.getIndices().length).isOne();
|
||||
assertThat(it.getIndices()[0]).isEqualTo(INDEX_I);
|
||||
})
|
||||
.verifyComplete();
|
||||
}
|
||||
client.indices().getIndex(new GetIndexRequest(INDEX_I)).as(StepVerifier::create).consumeNextWith(it -> {
|
||||
assertThat(it.getIndices().length).isOne();
|
||||
assertThat(it.getIndices()[0]).isEqualTo(INDEX_I);
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // #1658
|
||||
public void getIndexError() {
|
||||
@Test // #1658
|
||||
public void getIndexError() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
client.indices().getIndex(new GetIndexRequest(INDEX_II))
|
||||
.as(StepVerifier::create)
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
client.indices().getIndex(new GetIndexRequest(INDEX_II)).as(StepVerifier::create)
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
|
||||
@Test // DATAES-569, DATAES-678
|
||||
public void deleteExistingIndex() {
|
||||
@ -761,81 +786,80 @@ public class ReactiveElasticsearchClientIntegrationTests {
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
|
||||
@Test // #1640
|
||||
void putMapping() {
|
||||
@Test // #1640
|
||||
void putMapping() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
Map<String, Object> jsonMap = Collections.singletonMap("properties",
|
||||
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
|
||||
Map<String, Object> jsonMap = Collections.singletonMap("properties",
|
||||
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
|
||||
|
||||
org.elasticsearch.client.indices.PutMappingRequest putMappingRequest =
|
||||
new org.elasticsearch.client.indices.PutMappingRequest(INDEX_I).source(jsonMap);
|
||||
org.elasticsearch.client.indices.PutMappingRequest putMappingRequest = new org.elasticsearch.client.indices.PutMappingRequest(
|
||||
INDEX_I).source(jsonMap);
|
||||
|
||||
client.indices().putMapping(putMappingRequest) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(true) //
|
||||
.verifyComplete();
|
||||
}
|
||||
client.indices().putMapping(putMappingRequest) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext(true) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // #1640
|
||||
void putMappingError() {
|
||||
@Test // #1640
|
||||
void putMappingError() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
Map<String, Object> jsonMap = Collections.singletonMap("properties",
|
||||
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
|
||||
Map<String, Object> jsonMap = Collections.singletonMap("properties",
|
||||
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
|
||||
|
||||
org.elasticsearch.client.indices.PutMappingRequest putMappingRequest =
|
||||
new org.elasticsearch.client.indices.PutMappingRequest(INDEX_II).source(jsonMap);
|
||||
org.elasticsearch.client.indices.PutMappingRequest putMappingRequest = new org.elasticsearch.client.indices.PutMappingRequest(
|
||||
INDEX_II).source(jsonMap);
|
||||
|
||||
client.indices().putMapping(putMappingRequest) //
|
||||
.as(StepVerifier::create) //
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
client.indices().putMapping(putMappingRequest) //
|
||||
.as(StepVerifier::create) //
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
|
||||
@Test // #1640
|
||||
void getMapping() {
|
||||
@Test // #1640
|
||||
void getMapping() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
Map<String, Object> jsonMap = Collections.singletonMap("properties",
|
||||
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
|
||||
Map<String, Object> jsonMap = Collections.singletonMap("properties",
|
||||
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
|
||||
|
||||
org.elasticsearch.client.indices.PutMappingRequest putMappingRequest =
|
||||
new org.elasticsearch.client.indices.PutMappingRequest(INDEX_I).source(jsonMap);
|
||||
org.elasticsearch.client.indices.PutMappingRequest putMappingRequest = new org.elasticsearch.client.indices.PutMappingRequest(
|
||||
INDEX_I).source(jsonMap);
|
||||
|
||||
client.indices().putMapping(putMappingRequest).block();
|
||||
client.indices().putMapping(putMappingRequest).block();
|
||||
|
||||
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(INDEX_I);
|
||||
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(INDEX_I);
|
||||
|
||||
client.indices().getMapping(getMappingsRequest) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.mappings().get(INDEX_I).getSourceAsMap()).isEqualTo(jsonMap);
|
||||
})
|
||||
.verifyComplete();
|
||||
}
|
||||
client.indices().getMapping(getMappingsRequest) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it.mappings().get(INDEX_I).getSourceAsMap()).isEqualTo(jsonMap);
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // #1640
|
||||
void getMappingError() {
|
||||
@Test // #1640
|
||||
void getMappingError() {
|
||||
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
operations.indexOps(IndexCoordinates.of(INDEX_I)).create().block();
|
||||
|
||||
Map<String, Object> jsonMap = Collections.singletonMap("properties",
|
||||
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
|
||||
Map<String, Object> jsonMap = Collections.singletonMap("properties",
|
||||
Collections.singletonMap("message", Collections.singletonMap("type", "text")));
|
||||
|
||||
org.elasticsearch.client.indices.PutMappingRequest putMappingRequest =
|
||||
new org.elasticsearch.client.indices.PutMappingRequest(INDEX_I).source(jsonMap);
|
||||
org.elasticsearch.client.indices.PutMappingRequest putMappingRequest = new org.elasticsearch.client.indices.PutMappingRequest(
|
||||
INDEX_I).source(jsonMap);
|
||||
|
||||
client.indices().putMapping(putMappingRequest).block();
|
||||
client.indices().putMapping(putMappingRequest).block();
|
||||
|
||||
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(INDEX_II);
|
||||
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(INDEX_II);
|
||||
|
||||
client.indices().getMapping(getMappingsRequest) //
|
||||
.as(StepVerifier::create) //
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
client.indices().getMapping(getMappingsRequest) //
|
||||
.as(StepVerifier::create) //
|
||||
.verifyError(ElasticsearchStatusException.class);
|
||||
}
|
||||
|
||||
@Test // DATAES-569
|
||||
public void updateMapping() {
|
||||
|
@ -242,30 +242,30 @@ public class ReactiveElasticsearchClientUnitTests {
|
||||
assertThat(uri.getRawPath()).isEqualTo("/_mget");
|
||||
}
|
||||
|
||||
@Test // DATAES-488
|
||||
@Test // DATAES-488, #1678
|
||||
public void multiGetShouldReturnExistingDocuments() {
|
||||
|
||||
hostProvider.when(HOST) //
|
||||
.receiveJsonFromFile("multi-get-ok-2-hits");
|
||||
|
||||
client.multiGet(new MultiGetRequest().add("twitter", "_doc", "1").add("twitter", "_doc", "2")) //
|
||||
client.multiGet(new MultiGetRequest().add("twitter", "1").add("twitter", "2")) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(result -> {
|
||||
|
||||
assertThat(result.isExists()).isTrue();
|
||||
assertThat(result.isFailed()).isFalse();
|
||||
assertThat(result.getIndex()).isEqualTo("twitter");
|
||||
assertThat(result.getId()).isEqualTo("1");
|
||||
assertThat(result.getSource()) //
|
||||
assertThat(result.getResponse().getSource()) //
|
||||
.containsEntry("user", "kimchy") //
|
||||
.containsEntry("message", "Trying out Elasticsearch, so far so good?") //
|
||||
.containsKey("post_date");
|
||||
}) //
|
||||
.consumeNextWith(result -> {
|
||||
|
||||
assertThat(result.isExists()).isTrue();
|
||||
assertThat(result.isFailed()).isFalse();
|
||||
assertThat(result.getIndex()).isEqualTo("twitter");
|
||||
assertThat(result.getId()).isEqualTo("2");
|
||||
assertThat(result.getSource()) //
|
||||
assertThat(result.getResponse().getSource()) //
|
||||
.containsEntry("user", "kimchy") //
|
||||
.containsEntry("message", "Another tweet, will it be indexed?") //
|
||||
.containsKey("post_date");
|
||||
@ -273,33 +273,44 @@ public class ReactiveElasticsearchClientUnitTests {
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-488
|
||||
@Test // DATAES-488, #1678
|
||||
public void multiGetShouldWorkForNonExistingDocuments() {
|
||||
|
||||
hostProvider.when(HOST) //
|
||||
.receiveJsonFromFile("multi-get-ok-2-hits-1-unavailable");
|
||||
|
||||
client.multiGet(new MultiGetRequest().add("twitter", "_doc", "1").add("twitter", "_doc", "2")) //
|
||||
client.multiGet(new MultiGetRequest() //
|
||||
.add("twitter", "1") //
|
||||
.add("twitter", "2") //
|
||||
.add("twitter", "3") //
|
||||
) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(result -> {
|
||||
|
||||
assertThat(result.isExists()).isTrue();
|
||||
assertThat(result.isFailed()).isFalse();
|
||||
assertThat(result.getIndex()).isEqualTo("twitter");
|
||||
assertThat(result.getId()).isEqualTo("1");
|
||||
assertThat(result.getSource()) //
|
||||
assertThat(result.getResponse().isExists()).isTrue();
|
||||
assertThat(result.getResponse().getSource()) //
|
||||
.containsEntry("user", "kimchy") //
|
||||
.containsEntry("message", "Trying out Elasticsearch, so far so good?") //
|
||||
.containsKey("post_date");
|
||||
}) //
|
||||
.consumeNextWith(result -> {
|
||||
|
||||
assertThat(result.isExists()).isTrue();
|
||||
assertThat(result.getIndex()).isEqualTo("twitter");
|
||||
assertThat(result.getId()).isEqualTo("3");
|
||||
assertThat(result.getSource()) //
|
||||
.containsEntry("user", "elastic") //
|
||||
.containsEntry("message", "Building the site, should be kewl") //
|
||||
.containsKey("post_date");
|
||||
assertThat(result.isFailed()).isFalse();
|
||||
assertThat(result.getResponse().isExists()).isFalse();
|
||||
}) //
|
||||
.consumeNextWith(result -> {
|
||||
|
||||
assertThat(result.isFailed()).isFalse();
|
||||
assertThat(result.getIndex()).isEqualTo("twitter");
|
||||
assertThat(result.getId()).isEqualTo("3");
|
||||
assertThat(result.getResponse().isExists()).isTrue();
|
||||
assertThat(result.getResponse().getSource()) //
|
||||
.containsEntry("user", "elastic") //
|
||||
.containsEntry("message", "Building the site, should be kewl") //
|
||||
.containsKey("post_date");
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
@ -234,17 +234,17 @@ abstract class AbstractElasticsearchTemplateCallbackTests {
|
||||
assertThat(result.firstname).isEqualTo("after-convert");
|
||||
}
|
||||
|
||||
@Test // DATAES-772
|
||||
@Test // DATAES-772, #1678
|
||||
void multiGetShouldInvokeAfterConvertCallback() {
|
||||
|
||||
template.setEntityCallbacks(EntityCallbacks.create(afterConvertCallback));
|
||||
|
||||
List<Person> results = template.multiGet(queryForTwo(), Person.class, index);
|
||||
List<MultiGetItem<Person>> results = template.multiGet(queryForTwo(), Person.class, index);
|
||||
|
||||
verify(afterConvertCallback, times(2)).onAfterConvert(eq(new Person("init", "luke")), eq(lukeDocument()),
|
||||
eq(index));
|
||||
assertThat(results.get(0).firstname).isEqualTo("after-convert");
|
||||
assertThat(results.get(1).firstname).isEqualTo("after-convert");
|
||||
assertThat(results.get(0).getItem().firstname).isEqualTo("after-convert");
|
||||
assertThat(results.get(1).getItem().firstname).isEqualTo("after-convert");
|
||||
}
|
||||
|
||||
private Query queryForTwo() {
|
||||
|
@ -237,7 +237,7 @@ public abstract class ElasticsearchTemplateTests {
|
||||
assertThat(sampleEntity1).isEqualTo(sampleEntity);
|
||||
}
|
||||
|
||||
@Test // DATAES-52
|
||||
@Test // DATAES-52, #1678
|
||||
public void shouldReturnObjectsForGivenIdsUsingMultiGet() {
|
||||
|
||||
// given
|
||||
@ -258,15 +258,15 @@ public abstract class ElasticsearchTemplateTests {
|
||||
|
||||
// when
|
||||
NativeSearchQuery query = new NativeSearchQueryBuilder().withIds(Arrays.asList(documentId, documentId2)).build();
|
||||
List<SampleEntity> sampleEntities = operations.multiGet(query, SampleEntity.class, index);
|
||||
List<MultiGetItem<SampleEntity>> sampleEntities = operations.multiGet(query, SampleEntity.class, index);
|
||||
|
||||
// then
|
||||
assertThat(sampleEntities).hasSize(2);
|
||||
assertThat(sampleEntities.get(0)).isEqualTo(sampleEntity1);
|
||||
assertThat(sampleEntities.get(1)).isEqualTo(sampleEntity2);
|
||||
assertThat(sampleEntities.get(0).getItem()).isEqualTo(sampleEntity1);
|
||||
assertThat(sampleEntities.get(1).getItem()).isEqualTo(sampleEntity2);
|
||||
}
|
||||
|
||||
@Test // DATAES-791
|
||||
@Test // DATAES-791, #1678
|
||||
public void shouldReturnNullObjectForNotExistingIdUsingMultiGet() {
|
||||
|
||||
// given
|
||||
@ -290,16 +290,30 @@ public abstract class ElasticsearchTemplateTests {
|
||||
assertThat(idsToSearch).hasSize(3);
|
||||
|
||||
NativeSearchQuery query = new NativeSearchQueryBuilder().withIds(idsToSearch).build();
|
||||
List<SampleEntity> sampleEntities = operations.multiGet(query, SampleEntity.class, index);
|
||||
List<MultiGetItem<SampleEntity>> sampleEntities = operations.multiGet(query, SampleEntity.class, index);
|
||||
|
||||
// then
|
||||
assertThat(sampleEntities).hasSize(3);
|
||||
assertThat(sampleEntities.get(0)).isEqualTo(sampleEntity1);
|
||||
assertThat(sampleEntities.get(1)).isNull();
|
||||
assertThat(sampleEntities.get(2)).isEqualTo(sampleEntity2);
|
||||
assertThat(sampleEntities.get(0).getItem()).isEqualTo(sampleEntity1);
|
||||
assertThat(sampleEntities.get(1).getItem()).isNull();
|
||||
assertThat(sampleEntities.get(2).getItem()).isEqualTo(sampleEntity2);
|
||||
}
|
||||
|
||||
@Test // DATAES-52
|
||||
@Test // #1678
|
||||
@DisplayName("should return failure in multiget result")
|
||||
void shouldReturnFailureInMultigetResult() {
|
||||
|
||||
NativeSearchQuery query = new NativeSearchQueryBuilder().withIds(Arrays.asList("42")).build();
|
||||
List<MultiGetItem<SampleEntity>> sampleEntities = operations.multiGet(query, SampleEntity.class,
|
||||
IndexCoordinates.of("not-existing-index"));
|
||||
|
||||
// then
|
||||
assertThat(sampleEntities).hasSize(1);
|
||||
assertThat(sampleEntities.get(0).isFailed()).isTrue();
|
||||
assertThat(sampleEntities.get(0).getFailure()).isNotNull();
|
||||
}
|
||||
|
||||
@Test // DATAES-52, #1678
|
||||
public void shouldReturnObjectsForGivenIdsUsingMultiGetWithFields() {
|
||||
|
||||
// given
|
||||
@ -321,7 +335,7 @@ public abstract class ElasticsearchTemplateTests {
|
||||
// when
|
||||
NativeSearchQuery query = new NativeSearchQueryBuilder().withIds(Arrays.asList(documentId, documentId2))
|
||||
.withFields("message", "type").build();
|
||||
List<SampleEntity> sampleEntities = operations.multiGet(query, SampleEntity.class, index);
|
||||
List<MultiGetItem<SampleEntity>> sampleEntities = operations.multiGet(query, SampleEntity.class, index);
|
||||
|
||||
// then
|
||||
assertThat(sampleEntities).hasSize(2);
|
||||
@ -3263,9 +3277,9 @@ public abstract class ElasticsearchTemplateTests {
|
||||
OptimisticEntity saved = operations.save(original);
|
||||
operations.indexOps(OptimisticEntity.class).refresh();
|
||||
|
||||
List<OptimisticEntity> retrievedList = operations.multiGet(queryForOne(saved.getId()), OptimisticEntity.class,
|
||||
operations.getIndexCoordinatesFor(OptimisticEntity.class));
|
||||
OptimisticEntity retrieved = retrievedList.get(0);
|
||||
List<MultiGetItem<OptimisticEntity>> retrievedList = operations.multiGet(queryForOne(saved.getId()),
|
||||
OptimisticEntity.class, operations.getIndexCoordinatesFor(OptimisticEntity.class));
|
||||
OptimisticEntity retrieved = retrievedList.get(0).getItem();
|
||||
|
||||
assertThatSeqNoPrimaryTermIsFilled(retrieved);
|
||||
}
|
||||
|
@ -21,6 +21,8 @@ import static org.mockito.Mockito.*;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@ -85,6 +87,8 @@ public class ReactiveElasticsearchTemplateCallbackTests {
|
||||
@Mock private BulkItemResponse bulkItemResponse;
|
||||
@Mock private DocWriteResponse docWriteResponse;
|
||||
@Mock private GetResult getResult;
|
||||
@Mock private GetResponse getResponse;
|
||||
@Mock private MultiGetItemResponse multiGetItemResponse;
|
||||
@Mock private org.elasticsearch.search.SearchHit searchHit;
|
||||
@Mock private org.elasticsearch.action.search.SearchResponse searchResponse;
|
||||
|
||||
@ -109,8 +113,6 @@ public class ReactiveElasticsearchTemplateCallbackTests {
|
||||
doReturn(docWriteResponse).when(bulkItemResponse).getResponse();
|
||||
doReturn("response-id").when(docWriteResponse).getId();
|
||||
|
||||
when(client.multiGet(any(MultiGetRequest.class))).thenReturn(Flux.just(getResult, getResult));
|
||||
|
||||
doReturn(true).when(getResult).isExists();
|
||||
doReturn(false).when(getResult).isSourceEmpty();
|
||||
doReturn(new HashMap<String, Object>() {
|
||||
@ -120,6 +122,16 @@ public class ReactiveElasticsearchTemplateCallbackTests {
|
||||
}
|
||||
}).when(getResult).getSource();
|
||||
|
||||
doReturn(true).when(getResponse).isExists();
|
||||
doReturn(new HashMap<String, Object>() {
|
||||
{
|
||||
put("id", "init");
|
||||
put("firstname", "luke");
|
||||
}
|
||||
}).when(getResponse).getSourceAsMap();
|
||||
doReturn(getResponse).when(multiGetItemResponse).getResponse();
|
||||
when(client.multiGet(any(MultiGetRequest.class))).thenReturn(Flux.just(multiGetItemResponse, multiGetItemResponse));
|
||||
|
||||
doReturn(Mono.just(getResult)).when(client).get(any(GetRequest.class));
|
||||
|
||||
when(client.search(any(SearchRequest.class))).thenReturn(Flux.just(searchHit, searchHit));
|
||||
@ -224,18 +236,18 @@ public class ReactiveElasticsearchTemplateCallbackTests {
|
||||
assertThat(saved.get(1).firstname).isEqualTo("after-save");
|
||||
}
|
||||
|
||||
@Test // DATAES-772
|
||||
@Test // DATAES-772, #1678
|
||||
void multiGetShouldInvokeAfterConvertCallbacks() {
|
||||
|
||||
template.setEntityCallbacks(ReactiveEntityCallbacks.create(afterConvertCallback));
|
||||
|
||||
List<Person> results = template.multiGet(pagedQueryForTwo(), Person.class, index).timeout(Duration.ofSeconds(1))
|
||||
List<MultiGetItem<Person>> results = template.multiGet(pagedQueryForTwo(), Person.class, index).timeout(Duration.ofSeconds(1))
|
||||
.toStream().collect(Collectors.toList());
|
||||
|
||||
verify(afterConvertCallback, times(2)).onAfterConvert(eq(new Person("init", "luke")), eq(lukeDocument()),
|
||||
eq(index));
|
||||
assertThat(results.get(0).firstname).isEqualTo("after-convert");
|
||||
assertThat(results.get(1).firstname).isEqualTo("after-convert");
|
||||
assertThat(results.get(0).getItem().firstname).isEqualTo("after-convert");
|
||||
assertThat(results.get(1).getItem().firstname).isEqualTo("after-convert");
|
||||
}
|
||||
|
||||
@Test // DATAES-772
|
||||
|
@ -772,7 +772,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-623
|
||||
@Test // DATAES-623, #1678
|
||||
public void shouldReturnObjectsForGivenIdsUsingMultiGet() {
|
||||
SampleEntity entity1 = randomEntity("test message 1");
|
||||
entity1.rate = 1;
|
||||
@ -786,7 +786,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
|
||||
.build();
|
||||
|
||||
template.multiGet(query, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
|
||||
.as(StepVerifier::create) //
|
||||
.map(MultiGetItem::getItem).as(StepVerifier::create) //
|
||||
.expectNext(entity1, entity2) //
|
||||
.verifyComplete();
|
||||
}
|
||||
@ -811,7 +811,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-623
|
||||
@Test // DATAES-623. #1678
|
||||
public void shouldDoBulkUpdate() {
|
||||
SampleEntity entity1 = randomEntity("test message 1");
|
||||
entity1.rate = 1;
|
||||
@ -841,6 +841,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
|
||||
.withIds(Arrays.asList(entity1.getId(), entity2.getId())) //
|
||||
.build();
|
||||
template.multiGet(getQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
|
||||
.map(MultiGetItem::getItem) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNextMatches(entity -> entity.getMessage().equals("updated 1")) //
|
||||
.expectNextMatches(entity -> entity.getMessage().equals("updated 2")) //
|
||||
@ -891,7 +892,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
|
||||
assertThat(retrieved.seqNoPrimaryTerm.getPrimaryTerm()).isPositive();
|
||||
}
|
||||
|
||||
@Test // DATAES-799
|
||||
@Test // DATAES-799, #1678
|
||||
void multiGetShouldReturnSeqNoPrimaryTerm() {
|
||||
OptimisticEntity original = new OptimisticEntity();
|
||||
original.setMessage("It's fine");
|
||||
@ -899,8 +900,10 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
|
||||
|
||||
template
|
||||
.multiGet(multiGetQueryForOne(saved.getId()), OptimisticEntity.class,
|
||||
template.getIndexCoordinatesFor(OptimisticEntity.class))
|
||||
.as(StepVerifier::create).assertNext(this::assertThatSeqNoPrimaryTermIsFilled).verifyComplete();
|
||||
template.getIndexCoordinatesFor(OptimisticEntity.class)) //
|
||||
.map(MultiGetItem::getItem) //
|
||||
.as(StepVerifier::create) //
|
||||
.assertNext(this::assertThatSeqNoPrimaryTermIsFilled).verifyComplete();
|
||||
}
|
||||
|
||||
private Query multiGetQueryForOne(String id) {
|
||||
|
@ -81,17 +81,17 @@ public class SourceFilterIntegrationTests {
|
||||
assertThat(entity.getField3()).isNull();
|
||||
}
|
||||
|
||||
@Test // #1659
|
||||
@Test // #1659, #1678
|
||||
@DisplayName("should only return requested fields on multiget")
|
||||
void shouldOnlyReturnRequestedFieldsOnGMultiGet() {
|
||||
|
||||
Query query = new NativeSearchQueryBuilder().withIds(Collections.singleton("42")).build();
|
||||
query.addFields("field2");
|
||||
|
||||
List<Entity> entities = operations.multiGet(query, Entity.class);
|
||||
List<MultiGetItem<Entity>> entities = operations.multiGet(query, Entity.class);
|
||||
|
||||
assertThat(entities).hasSize(1);
|
||||
Entity entity = entities.get(0);
|
||||
Entity entity = entities.get(0).getItem();
|
||||
assertThat(entity.getField1()).isNull();
|
||||
assertThat(entity.getField2()).isEqualTo("two");
|
||||
assertThat(entity.getField3()).isNull();
|
||||
@ -123,7 +123,7 @@ public class SourceFilterIntegrationTests {
|
||||
assertThat(entity.getField3()).isNotNull();
|
||||
}
|
||||
|
||||
@Test // #1659
|
||||
@Test // #1659, #1678
|
||||
@DisplayName("should not return excluded fields from SourceFilter on multiget")
|
||||
void shouldNotReturnExcludedFieldsFromSourceFilterOnMultiGet() {
|
||||
|
||||
@ -140,10 +140,10 @@ public class SourceFilterIntegrationTests {
|
||||
}
|
||||
});
|
||||
|
||||
List<Entity> entities = operations.multiGet(query, Entity.class);
|
||||
List<MultiGetItem<Entity>> entities = operations.multiGet(query, Entity.class);
|
||||
|
||||
assertThat(entities).hasSize(1);
|
||||
Entity entity = entities.get(0);
|
||||
Entity entity = entities.get(0).getItem();
|
||||
assertThat(entity.getField1()).isNotNull();
|
||||
assertThat(entity.getField2()).isNull();
|
||||
assertThat(entity.getField3()).isNotNull();
|
||||
@ -175,7 +175,7 @@ public class SourceFilterIntegrationTests {
|
||||
assertThat(entity.getField3()).isNull();
|
||||
}
|
||||
|
||||
@Test // #1659
|
||||
@Test // #1659, #1678
|
||||
@DisplayName("should only return included fields from SourceFilter on multiget")
|
||||
void shouldOnlyReturnIncludedFieldsFromSourceFilterOnMultiGet() {
|
||||
|
||||
@ -192,10 +192,10 @@ public class SourceFilterIntegrationTests {
|
||||
}
|
||||
});
|
||||
|
||||
List<Entity> entities = operations.multiGet(query, Entity.class);
|
||||
List<MultiGetItem<Entity>> entities = operations.multiGet(query, Entity.class);
|
||||
|
||||
assertThat(entities).hasSize(1);
|
||||
Entity entity = entities.get(0);
|
||||
Entity entity = entities.get(0).getItem();
|
||||
assertThat(entity.getField1()).isNull();
|
||||
assertThat(entity.getField2()).isNotNull();
|
||||
assertThat(entity.getField3()).isNull();
|
||||
|
Loading…
x
Reference in New Issue
Block a user