mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-05-31 09:12:11 +00:00
DATAES-653 - Make it easier to use a custom request converter when extending DefaultReactiveElasticsearchClient.
Original PR: #411
This commit is contained in:
parent
2ec61ab4ff
commit
0b378601d9
@ -79,7 +79,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.core.CountRequest;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
@ -94,14 +93,12 @@ import org.elasticsearch.search.Scroll;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.data.elasticsearch.ElasticsearchException;
|
||||
import org.springframework.data.elasticsearch.client.ClientConfiguration;
|
||||
import org.springframework.data.elasticsearch.client.ClientLogger;
|
||||
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
|
||||
import org.springframework.data.elasticsearch.client.NoReachableHostException;
|
||||
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
|
||||
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
|
||||
import org.springframework.data.elasticsearch.client.util.RequestConverters;
|
||||
import org.springframework.data.util.Lazy;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
@ -128,6 +125,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe
|
||||
* @author Peter-Josef Meisch
|
||||
* @author Huw Ayling-Miller
|
||||
* @author Henrique Amaral
|
||||
* @author Roman Puchkovskiy
|
||||
* @since 3.2
|
||||
* @see ClientConfiguration
|
||||
* @see ReactiveRestClients
|
||||
@ -135,6 +133,8 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe
|
||||
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
|
||||
|
||||
private final HostProvider hostProvider;
|
||||
|
||||
private final RequestCreator requestCreator;
|
||||
|
||||
/**
|
||||
* Create a new {@link DefaultReactiveElasticsearchClient} using the given {@link HostProvider} to obtain server
|
||||
@ -143,10 +143,23 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
* @param hostProvider must not be {@literal null}.
|
||||
*/
|
||||
public DefaultReactiveElasticsearchClient(HostProvider hostProvider) {
|
||||
this(hostProvider, new DefaultRequestCreator());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link DefaultReactiveElasticsearchClient} using the given {@link HostProvider} to obtain server
|
||||
* connections and the given {@link RequestCreator}.
|
||||
*
|
||||
* @param hostProvider must not be {@literal null}.
|
||||
* @param requestCreator must not be {@literal null}.
|
||||
*/
|
||||
public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) {
|
||||
|
||||
Assert.notNull(hostProvider, "HostProvider must not be null");
|
||||
Assert.notNull(requestCreator, "RequestCreator must not be null");
|
||||
|
||||
this.hostProvider = hostProvider;
|
||||
this.requestCreator = requestCreator;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -177,14 +190,30 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
* @return new instance of {@link DefaultReactiveElasticsearchClient}.
|
||||
*/
|
||||
public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration) {
|
||||
return create(clientConfiguration, new DefaultRequestCreator());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link DefaultReactiveElasticsearchClient} given {@link ClientConfiguration} and
|
||||
* {@link RequestCreator}. <br />
|
||||
* <strong>NOTE</strong> If the cluster requires authentication be sure to provide the according {@link HttpHeaders}
|
||||
* correctly.
|
||||
*
|
||||
* @param clientConfiguration Client configuration. Must not be {@literal null}.
|
||||
* @param requestCreator Request creator. Must not be {@literal null}.
|
||||
* @return new instance of {@link DefaultReactiveElasticsearchClient}.
|
||||
*/
|
||||
public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration,
|
||||
RequestCreator requestCreator) {
|
||||
|
||||
Assert.notNull(clientConfiguration, "ClientConfiguration must not be null");
|
||||
Assert.notNull(requestCreator, "RequestCreator must not be null");
|
||||
|
||||
WebClientProvider provider = getWebClientProvider(clientConfiguration);
|
||||
|
||||
HostProvider hostProvider = HostProvider.provider(provider,
|
||||
clientConfiguration.getEndpoints().toArray(new InetSocketAddress[0]));
|
||||
return new DefaultReactiveElasticsearchClient(hostProvider);
|
||||
return new DefaultReactiveElasticsearchClient(hostProvider, requestCreator);
|
||||
}
|
||||
|
||||
private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) {
|
||||
@ -248,7 +277,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Boolean> ping(HttpHeaders headers) {
|
||||
|
||||
return sendRequest(new MainRequest(), RequestCreator.ping(), RawActionResponse.class, headers) //
|
||||
return sendRequest(new MainRequest(), requestCreator.ping(), RawActionResponse.class, headers) //
|
||||
.map(response -> response.statusCode().is2xxSuccessful()) //
|
||||
.onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
|
||||
}
|
||||
@ -260,7 +289,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<MainResponse> info(HttpHeaders headers) {
|
||||
|
||||
return sendRequest(new MainRequest(), RequestCreator.info(), MainResponse.class, headers) //
|
||||
return sendRequest(new MainRequest(), requestCreator.info(), MainResponse.class, headers) //
|
||||
.next();
|
||||
}
|
||||
|
||||
@ -271,7 +300,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
|
||||
|
||||
return sendRequest(getRequest, RequestCreator.get(), GetResponse.class, headers) //
|
||||
return sendRequest(getRequest, requestCreator.get(), GetResponse.class, headers) //
|
||||
.filter(GetResponse::isExists) //
|
||||
.map(DefaultReactiveElasticsearchClient::getResponseToGetResult) //
|
||||
.next();
|
||||
@ -284,7 +313,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
|
||||
|
||||
return sendRequest(multiGetRequest, RequestCreator.multiGet(), MultiGetResponse.class, headers)
|
||||
return sendRequest(multiGetRequest, requestCreator.multiGet(), MultiGetResponse.class, headers)
|
||||
.map(MultiGetResponse::getResponses) //
|
||||
.flatMap(Flux::fromArray) //
|
||||
.filter(it -> !it.isFailed() && it.getResponse().isExists()) //
|
||||
@ -298,7 +327,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
|
||||
|
||||
return sendRequest(getRequest, RequestCreator.exists(), RawActionResponse.class, headers) //
|
||||
return sendRequest(getRequest, requestCreator.exists(), RawActionResponse.class, headers) //
|
||||
.map(response -> response.statusCode().is2xxSuccessful()) //
|
||||
.next();
|
||||
}
|
||||
@ -309,7 +338,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
*/
|
||||
@Override
|
||||
public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
|
||||
return sendRequest(indexRequest, RequestCreator.index(), IndexResponse.class, headers).publishNext();
|
||||
return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).publishNext();
|
||||
}
|
||||
|
||||
/*
|
||||
@ -327,7 +356,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
*/
|
||||
@Override
|
||||
public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
|
||||
return sendRequest(updateRequest, RequestCreator.update(), UpdateResponse.class, headers).publishNext();
|
||||
return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).publishNext();
|
||||
}
|
||||
|
||||
/*
|
||||
@ -337,7 +366,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
|
||||
|
||||
return sendRequest(deleteRequest, RequestCreator.delete(), DeleteResponse.class, headers) //
|
||||
return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) //
|
||||
.publishNext();
|
||||
}
|
||||
|
||||
@ -347,7 +376,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
*/
|
||||
@Override
|
||||
public Mono<Long> count(HttpHeaders headers, SearchRequest searchRequest) {
|
||||
return sendRequest(searchRequest, RequestCreator.search(), SearchResponse.class, headers) //
|
||||
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
|
||||
.map(SearchResponse::getHits) //
|
||||
.map(searchHits -> searchHits.getTotalHits().value) //
|
||||
.next();
|
||||
@ -360,7 +389,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {
|
||||
|
||||
return sendRequest(searchRequest, RequestCreator.search(), SearchResponse.class, headers) //
|
||||
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
|
||||
.map(SearchResponse::getHits) //
|
||||
.flatMap(Flux::fromIterable);
|
||||
}
|
||||
@ -387,11 +416,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
Flux<SearchResponse> exchange = outbound.startWith(searchRequest).flatMap(it -> {
|
||||
|
||||
if (it instanceof SearchRequest) {
|
||||
return sendRequest((SearchRequest) it, RequestCreator.search(), SearchResponse.class, headers);
|
||||
return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers);
|
||||
} else if (it instanceof SearchScrollRequest) {
|
||||
return sendRequest((SearchScrollRequest) it, RequestCreator.scroll(), SearchResponse.class, headers);
|
||||
return sendRequest((SearchScrollRequest) it, requestCreator.scroll(), SearchResponse.class, headers);
|
||||
} else if (it instanceof ClearScrollRequest) {
|
||||
return sendRequest((ClearScrollRequest) it, RequestCreator.clearScroll(), ClearScrollResponse.class, headers)
|
||||
return sendRequest((ClearScrollRequest) it, requestCreator.clearScroll(), ClearScrollResponse.class, headers)
|
||||
.flatMap(discard -> Flux.empty());
|
||||
}
|
||||
|
||||
@ -444,7 +473,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
clearScrollRequest.scrollIds(state.getScrollIds());
|
||||
|
||||
// just send the request, resources get cleaned up anyways after scrollTimeout has been reached.
|
||||
return sendRequest(clearScrollRequest, RequestCreator.clearScroll(), ClearScrollResponse.class, headers);
|
||||
return sendRequest(clearScrollRequest, requestCreator.clearScroll(), ClearScrollResponse.class, headers);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -454,7 +483,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
|
||||
|
||||
return sendRequest(deleteRequest, RequestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) //
|
||||
return sendRequest(deleteRequest, requestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) //
|
||||
.publishNext();
|
||||
}
|
||||
|
||||
@ -464,7 +493,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
*/
|
||||
@Override
|
||||
public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
|
||||
return sendRequest(bulkRequest, RequestCreator.bulk(), BulkResponse.class, headers) //
|
||||
return sendRequest(bulkRequest, requestCreator.bulk(), BulkResponse.class, headers) //
|
||||
.publishNext();
|
||||
}
|
||||
|
||||
@ -477,7 +506,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest request) {
|
||||
|
||||
return sendRequest(request, RequestCreator.indexExists(), RawActionResponse.class, headers) //
|
||||
return sendRequest(request, requestCreator.indexExists(), RawActionResponse.class, headers) //
|
||||
.map(response -> response.statusCode().is2xxSuccessful()) //
|
||||
.next();
|
||||
}
|
||||
@ -489,7 +518,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Void> deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
|
||||
|
||||
return sendRequest(request, RequestCreator.indexDelete(), AcknowledgedResponse.class, headers) //
|
||||
return sendRequest(request, requestCreator.indexDelete(), AcknowledgedResponse.class, headers) //
|
||||
.then();
|
||||
}
|
||||
|
||||
@ -500,7 +529,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Void> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) {
|
||||
|
||||
return sendRequest(createIndexRequest, RequestCreator.indexCreate(), AcknowledgedResponse.class, headers) //
|
||||
return sendRequest(createIndexRequest, requestCreator.indexCreate(), AcknowledgedResponse.class, headers) //
|
||||
.then();
|
||||
}
|
||||
|
||||
@ -511,7 +540,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Void> openIndex(HttpHeaders headers, OpenIndexRequest request) {
|
||||
|
||||
return sendRequest(request, RequestCreator.indexOpen(), AcknowledgedResponse.class, headers) //
|
||||
return sendRequest(request, requestCreator.indexOpen(), AcknowledgedResponse.class, headers) //
|
||||
.then();
|
||||
}
|
||||
|
||||
@ -522,7 +551,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Void> closeIndex(HttpHeaders headers, CloseIndexRequest closeIndexRequest) {
|
||||
|
||||
return sendRequest(closeIndexRequest, RequestCreator.indexClose(), AcknowledgedResponse.class, headers) //
|
||||
return sendRequest(closeIndexRequest, requestCreator.indexClose(), AcknowledgedResponse.class, headers) //
|
||||
.then();
|
||||
}
|
||||
|
||||
@ -533,7 +562,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Void> refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest) {
|
||||
|
||||
return sendRequest(refreshRequest, RequestCreator.indexRefresh(), RefreshResponse.class, headers) //
|
||||
return sendRequest(refreshRequest, requestCreator.indexRefresh(), RefreshResponse.class, headers) //
|
||||
.then();
|
||||
}
|
||||
|
||||
@ -544,7 +573,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Void> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
|
||||
|
||||
return sendRequest(putMappingRequest, RequestCreator.putMapping(), AcknowledgedResponse.class, headers) //
|
||||
return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers) //
|
||||
.then();
|
||||
}
|
||||
|
||||
@ -555,7 +584,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
@Override
|
||||
public Mono<Void> flushIndex(HttpHeaders headers, FlushRequest flushRequest) {
|
||||
|
||||
return sendRequest(flushRequest, RequestCreator.flushIndex(), FlushResponse.class, headers) //
|
||||
return sendRequest(flushRequest, requestCreator.flushIndex(), FlushResponse.class, headers) //
|
||||
.then();
|
||||
}
|
||||
|
||||
@ -648,7 +677,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
body::get);
|
||||
|
||||
requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()));
|
||||
requestBodySpec.body(Mono.fromSupplier(body::get), String.class);
|
||||
requestBodySpec.body(Mono.fromSupplier(body), String.class);
|
||||
} else {
|
||||
ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters());
|
||||
}
|
||||
@ -728,109 +757,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
request.getMethod(), request.getEndpoint(), response.statusCode().value())));
|
||||
}
|
||||
|
||||
static class RequestCreator {
|
||||
|
||||
static Function<SearchRequest, Request> search() {
|
||||
return RequestConverters::search;
|
||||
}
|
||||
|
||||
static Function<SearchScrollRequest, Request> scroll() {
|
||||
return RequestConverters::searchScroll;
|
||||
}
|
||||
|
||||
static Function<ClearScrollRequest, Request> clearScroll() {
|
||||
return RequestConverters::clearScroll;
|
||||
}
|
||||
|
||||
static Function<IndexRequest, Request> index() {
|
||||
return RequestConverters::index;
|
||||
}
|
||||
|
||||
static Function<GetRequest, Request> get() {
|
||||
return RequestConverters::get;
|
||||
}
|
||||
|
||||
static Function<MainRequest, Request> ping() {
|
||||
return (request) -> RequestConverters.ping();
|
||||
}
|
||||
|
||||
static Function<MainRequest, Request> info() {
|
||||
return (request) -> RequestConverters.info();
|
||||
}
|
||||
|
||||
static Function<MultiGetRequest, Request> multiGet() {
|
||||
return RequestConverters::multiGet;
|
||||
}
|
||||
|
||||
static Function<GetRequest, Request> exists() {
|
||||
return RequestConverters::exists;
|
||||
}
|
||||
|
||||
static Function<UpdateRequest, Request> update() {
|
||||
return RequestConverters::update;
|
||||
}
|
||||
|
||||
static Function<DeleteRequest, Request> delete() {
|
||||
return RequestConverters::delete;
|
||||
}
|
||||
|
||||
static Function<DeleteByQueryRequest, Request> deleteByQuery() {
|
||||
|
||||
return request -> RequestConverters.deleteByQuery(request);
|
||||
}
|
||||
|
||||
static Function<BulkRequest, Request> bulk() {
|
||||
|
||||
return request -> {
|
||||
|
||||
try {
|
||||
return RequestConverters.bulk(request);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("Could not parse request", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// --> INDICES
|
||||
|
||||
static Function<GetIndexRequest, Request> indexExists() {
|
||||
return RequestConverters::indexExists;
|
||||
}
|
||||
|
||||
static Function<DeleteIndexRequest, Request> indexDelete() {
|
||||
return RequestConverters::indexDelete;
|
||||
}
|
||||
|
||||
static Function<CreateIndexRequest, Request> indexCreate() {
|
||||
return RequestConverters::indexCreate;
|
||||
}
|
||||
|
||||
static Function<OpenIndexRequest, Request> indexOpen() {
|
||||
return RequestConverters::indexOpen;
|
||||
}
|
||||
|
||||
static Function<CloseIndexRequest, Request> indexClose() {
|
||||
return RequestConverters::indexClose;
|
||||
}
|
||||
|
||||
static Function<RefreshRequest, Request> indexRefresh() {
|
||||
return RequestConverters::indexRefresh;
|
||||
}
|
||||
|
||||
static Function<PutMappingRequest, Request> putMapping() {
|
||||
return RequestConverters::putMapping;
|
||||
}
|
||||
|
||||
static Function<FlushRequest, Request> flushIndex() {
|
||||
return RequestConverters::flushIndex;
|
||||
}
|
||||
|
||||
static Function<CountRequest, Request> count() {
|
||||
return RequestConverters::count;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Reactive client {@link ReactiveElasticsearchClient.Status} implementation.
|
||||
*
|
||||
|
@ -0,0 +1,8 @@
|
||||
package org.springframework.data.elasticsearch.client.reactive;
|
||||
|
||||
/**
|
||||
* @author Roman Puchkovskiy
|
||||
* @since 4.0
|
||||
*/
|
||||
class DefaultRequestCreator implements RequestCreator {
|
||||
}
|
@ -24,6 +24,7 @@ import org.springframework.util.Assert;
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @author Roman Puchkovskiy
|
||||
* @since 3.2
|
||||
*/
|
||||
public final class ReactiveRestClients {
|
||||
@ -33,6 +34,8 @@ public final class ReactiveRestClients {
|
||||
/**
|
||||
* Start here to create a new client tailored to your needs.
|
||||
*
|
||||
* @param clientConfiguration client configuration to use for building {@link ReactiveElasticsearchClient};
|
||||
* must not be {@literal null}.
|
||||
* @return new instance of {@link ReactiveElasticsearchClient}.
|
||||
*/
|
||||
public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration) {
|
||||
@ -41,4 +44,21 @@ public final class ReactiveRestClients {
|
||||
|
||||
return DefaultReactiveElasticsearchClient.create(clientConfiguration);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start here to create a new client tailored to your needs.
|
||||
*
|
||||
* @param clientConfiguration client configuration to use for building {@link ReactiveElasticsearchClient};
|
||||
* must not be {@literal null}.
|
||||
* @param requestCreator request creator to use in the client; must not be {@literal null}.
|
||||
* @return new instance of {@link ReactiveElasticsearchClient}.
|
||||
*/
|
||||
public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration,
|
||||
RequestCreator requestCreator) {
|
||||
|
||||
Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
|
||||
Assert.notNull(requestCreator, "RequestCreator must not be null!");
|
||||
|
||||
return DefaultReactiveElasticsearchClient.create(clientConfiguration, requestCreator);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,134 @@
|
||||
package org.springframework.data.elasticsearch.client.reactive;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.MultiGetRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.main.MainRequest;
|
||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchScrollRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.core.CountRequest;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.springframework.data.elasticsearch.ElasticsearchException;
|
||||
import org.springframework.data.elasticsearch.client.util.RequestConverters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* @author Roman Puchkovskiy
|
||||
* @since 4.0
|
||||
*/
|
||||
public interface RequestCreator {
|
||||
|
||||
default Function<SearchRequest, Request> search() {
|
||||
return RequestConverters::search;
|
||||
}
|
||||
|
||||
default Function<SearchScrollRequest, Request> scroll() {
|
||||
return RequestConverters::searchScroll;
|
||||
}
|
||||
|
||||
default Function<ClearScrollRequest, Request> clearScroll() {
|
||||
return RequestConverters::clearScroll;
|
||||
}
|
||||
|
||||
default Function<IndexRequest, Request> index() {
|
||||
return RequestConverters::index;
|
||||
}
|
||||
|
||||
default Function<GetRequest, Request> get() {
|
||||
return RequestConverters::get;
|
||||
}
|
||||
|
||||
default Function<MainRequest, Request> ping() {
|
||||
return (request) -> RequestConverters.ping();
|
||||
}
|
||||
|
||||
default Function<MainRequest, Request> info() {
|
||||
return (request) -> RequestConverters.info();
|
||||
}
|
||||
|
||||
default Function<MultiGetRequest, Request> multiGet() {
|
||||
return RequestConverters::multiGet;
|
||||
}
|
||||
|
||||
default Function<GetRequest, Request> exists() {
|
||||
return RequestConverters::exists;
|
||||
}
|
||||
|
||||
default Function<UpdateRequest, Request> update() {
|
||||
return RequestConverters::update;
|
||||
}
|
||||
|
||||
default Function<DeleteRequest, Request> delete() {
|
||||
return RequestConverters::delete;
|
||||
}
|
||||
|
||||
default Function<DeleteByQueryRequest, Request> deleteByQuery() {
|
||||
return RequestConverters::deleteByQuery;
|
||||
}
|
||||
|
||||
default Function<BulkRequest, Request> bulk() {
|
||||
|
||||
return request -> {
|
||||
|
||||
try {
|
||||
return RequestConverters.bulk(request);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("Could not parse request", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// --> INDICES
|
||||
|
||||
default Function<GetIndexRequest, Request> indexExists() {
|
||||
return RequestConverters::indexExists;
|
||||
}
|
||||
|
||||
default Function<DeleteIndexRequest, Request> indexDelete() {
|
||||
return RequestConverters::indexDelete;
|
||||
}
|
||||
|
||||
default Function<CreateIndexRequest, Request> indexCreate() {
|
||||
return RequestConverters::indexCreate;
|
||||
}
|
||||
|
||||
default Function<OpenIndexRequest, Request> indexOpen() {
|
||||
return RequestConverters::indexOpen;
|
||||
}
|
||||
|
||||
default Function<CloseIndexRequest, Request> indexClose() {
|
||||
return RequestConverters::indexClose;
|
||||
}
|
||||
|
||||
default Function<RefreshRequest, Request> indexRefresh() {
|
||||
return RequestConverters::indexRefresh;
|
||||
}
|
||||
|
||||
default Function<PutMappingRequest, Request> putMapping() {
|
||||
return RequestConverters::putMapping;
|
||||
}
|
||||
|
||||
default Function<FlushRequest, Request> flushIndex() {
|
||||
return RequestConverters::flushIndex;
|
||||
}
|
||||
|
||||
default Function<CountRequest, Request> count() {
|
||||
return RequestConverters::count;
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user