diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java index 1f08cedcb..9a9b41e85 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java @@ -93,7 +93,7 @@ public abstract class ClientLogger { public static void logRawResponse(String logId, HttpStatus statusCode) { if (isEnabled()) { - WIRE_LOGGER.trace("[{}] Received raw response: ", logId, statusCode); + WIRE_LOGGER.trace("[{}] Received raw response: {}", logId, statusCode); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index cc5094da4..99e68ab85 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -48,6 +48,16 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; @@ -63,6 +73,7 @@ import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Request; @@ -80,13 +91,13 @@ import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.reactivestreams.Publisher; - import org.springframework.data.elasticsearch.ElasticsearchException; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.NoReachableHostException; import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification; +import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices; import org.springframework.data.elasticsearch.client.util.RequestConverters; import org.springframework.data.util.Lazy; import org.springframework.http.HttpHeaders; @@ -115,7 +126,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe * @see ClientConfiguration * @see ReactiveRestClients */ -public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient { +public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices { private final HostProvider hostProvider; @@ -279,6 +290,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return sendRequest(indexRequest, RequestCreator.index(), IndexResponse.class, headers).publishNext(); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices() + */ + @Override + public Indices indices() { + return this; + } + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest) @@ -403,6 +423,97 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .publishNext(); } + // --> INDICES + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#existsIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.get.GetIndexRequest) + */ + @Override + public Mono existsIndex(HttpHeaders headers, GetIndexRequest request) { + + return sendRequest(request, RequestCreator.indexExists(), RawActionResponse.class, headers) // + .map(response -> response.statusCode().is2xxSuccessful()) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#deleteIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest) + */ + @Override + public Mono deleteIndex(HttpHeaders headers, DeleteIndexRequest request) { + + return sendRequest(request, RequestCreator.indexDelete(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#createIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.create.CreateIndexRequest) + */ + @Override + public Mono createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) { + + return sendRequest(createIndexRequest, RequestCreator.indexCreate(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#openIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.open.OpenIndexRequest) + */ + @Override + public Mono openIndex(HttpHeaders headers, OpenIndexRequest request) { + + return sendRequest(request, RequestCreator.indexOpen(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#closeIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.close.CloseIndexRequest) + */ + @Override + public Mono closeIndex(HttpHeaders headers, CloseIndexRequest closeIndexRequest) { + + return sendRequest(closeIndexRequest, RequestCreator.indexClose(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#refreshIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.refresh.RefreshRequest) + */ + @Override + public Mono refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest) { + + return sendRequest(refreshRequest, RequestCreator.indexRefresh(), RefreshResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#updateMapping(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest) + */ + @Override + public Mono updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) { + + return sendRequest(putMappingRequest, RequestCreator.putMapping(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#flushIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.flush.FlushRequest) + */ + @Override + public Mono flushIndex(HttpHeaders headers, FlushRequest flushRequest) { + + return sendRequest(flushRequest, RequestCreator.flushIndex(), FlushResponse.class, headers) // + .then(); + } + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback) @@ -630,6 +741,41 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } }; } + + // --> INDICES + + static Function indexExists() { + return RequestConverters::indexExists; + } + + static Function indexDelete() { + return RequestConverters::indexDelete; + } + + static Function indexCreate() { + return RequestConverters::indexCreate; + } + + static Function indexOpen() { + return RequestConverters::indexOpen; + } + + static Function indexClose() { + return RequestConverters::indexClose; + } + + static Function indexRefresh() { + return RequestConverters::indexRefresh; + } + + static Function putMapping() { + return RequestConverters::putMapping; + } + + static Function flushIndex() { + return RequestConverters::flushIndex; + } + } /** diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 4ab7969cd..d1071f42f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -15,6 +15,7 @@ */ package org.springframework.data.elasticsearch.client.reactive; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -22,6 +23,13 @@ import java.net.ConnectException; import java.util.Collection; import java.util.function.Consumer; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; @@ -237,6 +245,13 @@ public interface ReactiveElasticsearchClient { */ Mono index(HttpHeaders headers, IndexRequest indexRequest); + /** + * Gain access to index related commands. + * + * @return + */ + Indices indices(); + /** * Execute an {@link UpdateRequest} against the {@literal update} API to alter a document. * @@ -473,4 +488,339 @@ public interface ReactiveElasticsearchClient { return hosts().stream().anyMatch(ElasticsearchHost::isOnline); } } + + /** + * Encapsulation of methods for accessing the Indices API. + * + * @see Indices + * API. + * @author Christoph Strobl + */ + interface Indices { + + /** + * Execute the given {@link GetIndexRequest} against the {@literal indices} API. + * + * @param consumer never {@literal null}. + * @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise. + * @see Indices + * Exists API on elastic.co + */ + default Mono existsIndex(Consumer consumer) { + + GetIndexRequest request = new GetIndexRequest(); + consumer.accept(request); + return existsIndex(request); + } + + /** + * Execute the given {@link GetIndexRequest} against the {@literal indices} API. + * + * @param getIndexRequest must not be {@literal null}. + * @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise. + * @see Indices + * Exists API on elastic.co + */ + default Mono existsIndex(GetIndexRequest getIndexRequest) { + return existsIndex(HttpHeaders.EMPTY, getIndexRequest); + } + + /** + * Execute the given {@link GetIndexRequest} against the {@literal indices} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param getIndexRequest must not be {@literal null}. + * @return the {@link Mono} emitting {@literal true} if the index exists, {@literal false} otherwise. + * @see Indices + * Exists API on elastic.co + */ + Mono existsIndex(HttpHeaders headers, GetIndexRequest getIndexRequest); + + /** + * Execute the given {@link DeleteIndexRequest} against the {@literal indices} API. + * + * @param consumer never {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Delete API on elastic.co + */ + default Mono deleteIndex(Consumer consumer) { + + DeleteIndexRequest request = new DeleteIndexRequest(); + consumer.accept(request); + return deleteIndex(request); + } + + /** + * Execute the given {@link DeleteIndexRequest} against the {@literal indices} API. + * + * @param deleteIndexRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Delete API on elastic.co + */ + default Mono deleteIndex(DeleteIndexRequest deleteIndexRequest) { + return deleteIndex(HttpHeaders.EMPTY, deleteIndexRequest); + } + + /** + * Execute the given {@link DeleteIndexRequest} against the {@literal indices} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param deleteIndexRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Delete API on elastic.co + */ + Mono deleteIndex(HttpHeaders headers, DeleteIndexRequest deleteIndexRequest); + + /** + * Execute the given {@link CreateIndexRequest} against the {@literal indices} API. + * + * @param consumer never {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * already exist. + * @see Indices + * Create API on elastic.co + */ + default Mono createIndex(Consumer consumer) { + + CreateIndexRequest request = new CreateIndexRequest(); + consumer.accept(request); + return createIndex(request); + } + + /** + * Execute the given {@link CreateIndexRequest} against the {@literal indices} API. + * + * @param createIndexRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * already exist. + * @see Indices + * Create API on elastic.co + */ + default Mono createIndex(CreateIndexRequest createIndexRequest) { + return createIndex(HttpHeaders.EMPTY, createIndexRequest); + } + + /** + * Execute the given {@link CreateIndexRequest} against the {@literal indices} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param createIndexRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * already exist. + * @see Indices + * Create API on elastic.co + */ + Mono createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest); + + /** + * Execute the given {@link OpenIndexRequest} against the {@literal indices} API. + * + * @param consumer never {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Open API on elastic.co + */ + default Mono openIndex(Consumer consumer) { + + OpenIndexRequest request = new OpenIndexRequest(); + consumer.accept(request); + return openIndex(request); + } + + /** + * Execute the given {@link OpenIndexRequest} against the {@literal indices} API. + * + * @param openIndexRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Open API on elastic.co + */ + default Mono openIndex(OpenIndexRequest openIndexRequest) { + return openIndex(HttpHeaders.EMPTY, openIndexRequest); + } + + /** + * Execute the given {@link OpenIndexRequest} against the {@literal indices} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param openIndexRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Open API on elastic.co + */ + Mono openIndex(HttpHeaders headers, OpenIndexRequest openIndexRequest); + + /** + * Execute the given {@link CloseIndexRequest} against the {@literal indices} API. + * + * @param consumer never {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Close API on elastic.co + */ + default Mono closeIndex(Consumer consumer) { + + CloseIndexRequest request = new CloseIndexRequest(); + consumer.accept(request); + return closeIndex(request); + } + + /** + * Execute the given {@link CloseIndexRequest} against the {@literal indices} API. + * + * @param closeIndexRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Close API on elastic.co + */ + default Mono closeIndex(CloseIndexRequest closeIndexRequest) { + return closeIndex(HttpHeaders.EMPTY, closeIndexRequest); + } + + /** + * Execute the given {@link CloseIndexRequest} against the {@literal indices} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param closeIndexRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * CLose API on elastic.co + */ + Mono closeIndex(HttpHeaders headers, CloseIndexRequest closeIndexRequest); + + /** + * Execute the given {@link RefreshRequest} against the {@literal indices} API. + * + * @param consumer never {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Refresh API on elastic.co + */ + default Mono refreshIndex(Consumer consumer) { + + RefreshRequest request = new RefreshRequest(); + consumer.accept(request); + return refreshIndex(request); + } + + /** + * Execute the given {@link RefreshRequest} against the {@literal indices} API. + * + * @param refreshRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Refresh API on elastic.co + */ + default Mono refreshIndex(RefreshRequest refreshRequest) { + return refreshIndex(HttpHeaders.EMPTY, refreshRequest); + } + + /** + * Execute the given {@link RefreshRequest} against the {@literal indices} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param refreshRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Refresh API on elastic.co + */ + Mono refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest); + + /** + * Execute the given {@link PutMappingRequest} against the {@literal indices} API. + * + * @param consumer never {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Put Mapping API on elastic.co + */ + default Mono updateMapping(Consumer consumer) { + + PutMappingRequest request = new PutMappingRequest(); + consumer.accept(request); + return updateMapping(request); + } + + /** + * Execute the given {@link PutMappingRequest} against the {@literal indices} API. + * + * @param putMappingRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Put Mapping API on elastic.co + */ + default Mono updateMapping(PutMappingRequest putMappingRequest) { + return updateMapping(HttpHeaders.EMPTY, putMappingRequest); + } + + /** + * Execute the given {@link PutMappingRequest} against the {@literal indices} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param putMappingRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Put Mapping API on elastic.co + */ + Mono updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest); + + /** + * Execute the given {@link FlushRequest} against the {@literal indices} API. + * + * @param consumer never {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Flush API on elastic.co + */ + default Mono flushIndex(Consumer consumer) { + + FlushRequest request = new FlushRequest(); + consumer.accept(request); + return flushIndex(request); + } + + /** + * Execute the given {@link RefreshRequest} against the {@literal indices} API. + * + * @param flushRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Flush API on elastic.co + */ + default Mono flushIndex(FlushRequest flushRequest) { + return flushIndex(HttpHeaders.EMPTY, flushRequest); + } + + /** + * Execute the given {@link RefreshRequest} against the {@literal indices} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param flushRequest must not be {@literal null}. + * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index + * does not exist. + * @see Indices + * Flush API on elastic.co + */ + Mono flushIndex(HttpHeaders headers, FlushRequest flushRequest); + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 01efb2480..05692231c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.util; import java.io.ByteArrayOutputStream; @@ -35,6 +34,14 @@ import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRe import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest; import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest; import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.explain.ExplainRequest; @@ -82,7 +89,8 @@ import org.springframework.http.HttpMethod; /** *

- * Original implementation source {@link org.elasticsearch.client.RequestConverters} by {@literal Elasticsearch} + * Original implementation source {@link org.elasticsearch.client.RequestConverters} and + * {@link org.elasticsearch.client.IndicesRequestConverters} by {@literal Elasticsearch} * (https://www.elastic.co) licensed under the Apache License, Version 2.0. *

* Modified for usage with {@link ReactiveElasticsearchClient}. @@ -603,6 +611,125 @@ public class RequestConverters { return request; } + // --> INDICES + + public static Request getIndex(GetIndexRequest getIndexRequest) { + String[] indices = getIndexRequest.indices() == null ? Strings.EMPTY_ARRAY : getIndexRequest.indices(); + + String endpoint = endpoint(indices); + Request request = new Request(HttpMethod.GET.name(), endpoint); + + Params params = new Params(request); + params.withIndicesOptions(getIndexRequest.indicesOptions()); + params.withLocal(getIndexRequest.local()); + params.withIncludeDefaults(getIndexRequest.includeDefaults()); + params.withHuman(getIndexRequest.humanReadable()); + params.withMasterTimeout(getIndexRequest.masterNodeTimeout()); + + return request; + } + + public static Request indexDelete(DeleteIndexRequest deleteIndexRequest) { + String endpoint = RequestConverters.endpoint(deleteIndexRequest.indices()); + Request request = new Request(HttpMethod.DELETE.name(), endpoint); + + RequestConverters.Params parameters = new RequestConverters.Params(request); + parameters.withTimeout(deleteIndexRequest.timeout()); + parameters.withMasterTimeout(deleteIndexRequest.masterNodeTimeout()); + parameters.withIndicesOptions(deleteIndexRequest.indicesOptions()); + return request; + } + + public static Request indexExists(GetIndexRequest getIndexRequest) { + // this can be called with no indices as argument by transport client, not via REST though + if (getIndexRequest.indices() == null || getIndexRequest.indices().length == 0) { + throw new IllegalArgumentException("indices are mandatory"); + } + String endpoint = endpoint(getIndexRequest.indices(), ""); + Request request = new Request(HttpMethod.HEAD.name(), endpoint); + + Params params = new Params(request); + params.withLocal(getIndexRequest.local()); + params.withHuman(getIndexRequest.humanReadable()); + params.withIndicesOptions(getIndexRequest.indicesOptions()); + params.withIncludeDefaults(getIndexRequest.includeDefaults()); + return request; + } + + public static Request indexOpen(OpenIndexRequest openIndexRequest) { + String endpoint = RequestConverters.endpoint(openIndexRequest.indices(), "_open"); + Request request = new Request(HttpMethod.POST.name(), endpoint); + + Params parameters = new Params(request); + parameters.withTimeout(openIndexRequest.timeout()); + parameters.withMasterTimeout(openIndexRequest.masterNodeTimeout()); + parameters.withWaitForActiveShards(openIndexRequest.waitForActiveShards(), ActiveShardCount.NONE); + parameters.withIndicesOptions(openIndexRequest.indicesOptions()); + return request; + } + + public static Request indexClose(CloseIndexRequest closeIndexRequest) { + String endpoint = RequestConverters.endpoint(closeIndexRequest.indices(), "_close"); + Request request = new Request(HttpMethod.POST.name(), endpoint); + + Params parameters = new Params(request); + parameters.withTimeout(closeIndexRequest.timeout()); + parameters.withMasterTimeout(closeIndexRequest.masterNodeTimeout()); + parameters.withIndicesOptions(closeIndexRequest.indicesOptions()); + return request; + } + + public static Request indexCreate(CreateIndexRequest createIndexRequest) { + String endpoint = RequestConverters.endpoint(createIndexRequest.indices()); + Request request = new Request(HttpMethod.PUT.name(), endpoint); + + Params parameters = new Params(request); + parameters.withTimeout(createIndexRequest.timeout()); + parameters.withMasterTimeout(createIndexRequest.masterNodeTimeout()); + parameters.withWaitForActiveShards(createIndexRequest.waitForActiveShards(), ActiveShardCount.DEFAULT); + + request.setEntity(createEntity(createIndexRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); + return request; + } + + public static Request indexRefresh(RefreshRequest refreshRequest) { + + String[] indices = refreshRequest.indices() == null ? Strings.EMPTY_ARRAY : refreshRequest.indices(); + Request request = new Request(HttpMethod.POST.name(), RequestConverters.endpoint(indices, "_refresh")); + + Params parameters = new Params(request); + parameters.withIndicesOptions(refreshRequest.indicesOptions()); + return request; + } + + public static Request putMapping(PutMappingRequest putMappingRequest) { + // The concreteIndex is an internal concept, not applicable to requests made over the REST API. + if (putMappingRequest.getConcreteIndex() != null) { + throw new IllegalArgumentException("concreteIndex cannot be set on PutMapping requests made over the REST API"); + } + + Request request = new Request(HttpMethod.PUT.name(), + RequestConverters.endpoint(putMappingRequest.indices(), "_mapping", putMappingRequest.type())); + + RequestConverters.Params parameters = new RequestConverters.Params(request); + parameters.withTimeout(putMappingRequest.timeout()); + parameters.withMasterTimeout(putMappingRequest.masterNodeTimeout()); + + request.setEntity(RequestConverters.createEntity(putMappingRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); + return request; + } + + public static Request flushIndex(FlushRequest flushRequest) { + String[] indices = flushRequest.indices() == null ? Strings.EMPTY_ARRAY : flushRequest.indices(); + Request request = new Request(HttpMethod.POST.name(), RequestConverters.endpoint(indices, "_flush")); + + RequestConverters.Params parameters = new RequestConverters.Params(request); + parameters.withIndicesOptions(flushRequest.indicesOptions()); + parameters.putParam("wait_if_ongoing", Boolean.toString(flushRequest.waitIfOngoing())); + parameters.putParam("force", Boolean.toString(flushRequest.force())); + return request; + } + static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) { try { diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index 9f66d4dcb..6f2875e2b 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -17,7 +17,7 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; -import org.springframework.data.elasticsearch.ElasticsearchException; +import lombok.SneakyThrows; import reactor.test.StepVerifier; import java.io.IOException; @@ -31,6 +31,7 @@ import java.util.stream.IntStream; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.MultiGetRequest; @@ -139,13 +140,10 @@ public class ReactiveElasticsearchClientTests { @Test // DATAES-519 public void getOnNonExistingIndexShouldThrowException() { - client.get(new GetRequest(INDEX_I, TYPE_I, "nonono")) - .as(StepVerifier::create) - .expectError(ElasticsearchStatusException.class) - .verify(); + client.get(new GetRequest(INDEX_I, TYPE_I, "nonono")).as(StepVerifier::create) + .expectError(ElasticsearchStatusException.class).verify(); } - @Test // DATAES-488 public void getShouldFetchDocumentById() { @@ -502,12 +500,169 @@ public class ReactiveElasticsearchClientTests { request = request.scroll(TimeValue.timeValueMinutes(1)); client.scroll(HttpHeaders.EMPTY, request) // - .take(73) - .as(StepVerifier::create) // + .take(73).as(StepVerifier::create) // .expectNextCount(73) // .verifyComplete(); } + @Test // DATAES-569 + public void indexExistsShouldReturnTrueIfSo() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + client.indices().existsIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .expectNext(true) // + .verifyComplete(); + } + + @Test // DATAES-569 + public void indexExistsShouldReturnFalseIfNot() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + client.indices().existsIndex(request -> request.indices(INDEX_II)) // + .as(StepVerifier::create) // + .expectNext(false) // + .verifyComplete(); + } + + @Test // DATAES-569 + public void createIndex() throws IOException { + + client.indices().createIndex(request -> request.index(INDEX_I)) // + .as(StepVerifier::create) // + .verifyComplete(); + + syncClient.indices().exists(new GetIndexRequest().indices(INDEX_II), RequestOptions.DEFAULT); + } + + @Test // DATAES-569 + public void createExistingIndexErrors() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + client.indices().createIndex(request -> request.index(INDEX_I)) // + .as(StepVerifier::create) // + .verifyError(ElasticsearchStatusException.class); + } + + @Test // DATAES-569 + public void deleteExistingIndex() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + client.indices().deleteIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(syncClient.indices().exists(new GetIndexRequest().indices(INDEX_I), RequestOptions.DEFAULT)).isFalse(); + } + + @Test // DATAES-569 + public void deleteNonExistingIndexErrors() { + + client.indices().deleteIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyError(ElasticsearchStatusException.class); + } + + @Test // DATAES-569 + public void openExistingIndex() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + client.indices().openIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAES-569 + public void openNonExistingIndex() { + + client.indices().openIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyError(ElasticsearchStatusException.class); + } + + @Test // DATAES-569 + public void closeExistingIndex() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + client.indices().openIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAES-569 + public void closeNonExistingIndex() { + + client.indices().closeIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyError(ElasticsearchStatusException.class); + } + + @Test // DATAES-569 + public void refreshIndex() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + client.indices().refreshIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAES-569 + public void refreshNonExistingIndex() { + + client.indices().refreshIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyError(ElasticsearchStatusException.class); + } + + @Test // DATAES-569 + public void updateMapping() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + Map jsonMap = Collections.singletonMap("properties", + Collections.singletonMap("message", Collections.singletonMap("type", "text"))); + + client.indices().updateMapping(request -> request.indices(INDEX_I).type(TYPE_I).source(jsonMap)) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAES-569 + public void updateMappingNonExistingIndex() { + + Map jsonMap = Collections.singletonMap("properties", + Collections.singletonMap("message", Collections.singletonMap("type", "text"))); + + client.indices().updateMapping(request -> request.indices(INDEX_I).type(TYPE_I).source(jsonMap)) // + .as(StepVerifier::create) // + .verifyError(ElasticsearchStatusException.class); + } + + @Test // DATAES-569 + public void flushIndex() throws IOException { + + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); + + client.indices().flushIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAES-569 + public void flushNonExistingIndex() { + + client.indices().flushIndex(request -> request.indices(INDEX_I)) // + .as(StepVerifier::create) // + .verifyError(ElasticsearchStatusException.class); + } + AddToIndexOfType addSourceDocument() { return add(DOC_SOURCE); } @@ -525,13 +680,9 @@ public class ReactiveElasticsearchClientTests { .create(true); } + @SneakyThrows String doIndex(Map source, String index, String type) { - - try { - return syncClient.index(indexRequest(source, index, type)).getId(); - } catch (IOException e) { - throw new RuntimeException(e); - } + return syncClient.index(indexRequest(source, index, type), RequestOptions.DEFAULT).getId(); } interface AddToIndexOfType extends AddToIndex {