diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index ef5a87891..2ef372676 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -30,6 +30,7 @@ include::{spring-data-commons-docs}/repositories.adoc[] :leveloffset: +1 include::reference/elasticsearch-clients.adoc[] include::reference/data-elasticsearch.adoc[] +include::reference/reactive-elasticsearch-operations.adoc[] include::reference/elasticsearch-misc.adoc[] :leveloffset: -1 diff --git a/src/main/asciidoc/reference/reactive-elasticsearch-operations.adoc b/src/main/asciidoc/reference/reactive-elasticsearch-operations.adoc new file mode 100644 index 000000000..b3056481b --- /dev/null +++ b/src/main/asciidoc/reference/reactive-elasticsearch-operations.adoc @@ -0,0 +1,132 @@ +[[elasticsearch.reactive.operations]] += Reactive Elasticsearch Operations + +`ReactiveElasticsearchOperations` is the gateway to executing high level commands against an Elasticsearch cluster using the `ReactiveElasticsearchClient`. + +The `ReactiveElasticsearchTemplate` is the default implementation of `ReactiveElasticsearchOperations` and offers the following set of features. + +* Read/Write mapping support for domain types. +* A rich query and criteria api. +* Resource management and Exception translation. + +[[elasticsearch.reactive.template]] +== Reactive Elasticsearch Template + +To get started the `ReactiveElasticsearchTemplate` needs to know about the actual client to work with. +Please see <> for details on the client. + +[[elasticsearch.reactive.template.configuration]] +=== Reactive Template Configuration + +The easiest way of setting up the `ReactiveElasticsearchTemplate` is via `AbstractReactiveElasticsearchConfiguration` providing +dedicated configuration method hooks for `base package`, the `initial entity set` etc. + +.The AbstractReactiveElasticsearchConfiguration +==== +[source,java] +---- +@Configuration +public class Config extends AbstractReactiveElasticsearchConfiguration { + + @Bean <1> + @Override + public ReactiveElasticsearchClient reactiveElasticsearchClient() { + // ... + } +} +---- +<1> Configure the client to use. This can be done by `ReactiveRestClients` or directly via `DefaultReactiveElasticsearchClient`. +==== + +NOTE: If applicable set default `HttpHeaders` via the `ClientConfiguration` of the `ReactiveElasticsearchClient`. + +TIP: If needed the `ReactiveElasticsearchTemplate` can be configured with default `RefreshPolicy` and `IndicesOptions` that get applied to the related requests by overriding the defaults of `refreshPolicy()` and `indicesOptions()`. + +However one might want to be more in control over the actual components and use a more verbose approach. + +.Configure the ReactiveElasticsearchTemplate +==== +[source,java] +---- +@Configuration +public class Config { + + @Bean <1> + public ReactiveElasticsearchClient reactiveElasticsearchClient() { + // ... + } + + @Bean <2> + public ElasticsearchConverter elasticsearchConverter() { + return new MappingElasticsearchConverter(elasticsearchMappingContext()); + } + + @Bean <3> + public SimpleElasticsearchMappingContext elasticsearchMappingContext() { + return new SimpleElasticsearchMappingContext(); + } + + @Bean <4> + public ReactiveElasticsearchOperations reactiveElasticsearchOperations() { + return new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(), elasticsearchConverter()); + } +} +---- +<1> Configure the client to use. This can be done by `ReactiveRestClients` or directly via `DefaultReactiveElasticsearchClient`. +<2> Set up the `ElasticsearchConverter` used for domain type mapping utilizing metadata provided by the mapping context. +<3> The Elasticsearch specific mapping context for domain type metadata. +<4> The actual template based on the client and conversion infrastructure. +==== + +[[elasticsearch.reactive.template.usage]] +=== Reactive Template Usage + +`ReactiveElasticsearchTemplate` lets you save, find and delete your domain objects and map those objects to documents stored in Elasticsearch. + +Consider the following: + +.Use the ReactiveElasticsearchTemplate +==== +[source,java] +---- +@Document(indexName = "marvel", type = "characters") +public class Person { + + private @Id String id; + private String name; + private int age; + + // Getter/Setter omitted... +} +---- + +[source,java] +---- +template.save(new Person("Bruce Banner", 42)) <1> + .doOnNext(System.out::println) + .flatMap(person -> template.findById(person.id, Person.class)) <2> + .doOnNext(System.out::println) + .flatMap(person -> template.delete(person)) <3> + .doOnNext(System.out::println) + .flatMap(id -> template.count(Person.class)) <4> + .doOnNext(System.out::println) + .subscribe(); <5> +---- + +The above outputs the following sequence on the console. + +[source,text] +---- +> Person(id=QjWCWWcBXiLAnp77ksfR, name=Bruce Banner, age=42) +> Person(id=QjWCWWcBXiLAnp77ksfR, name=Bruce Banner, age=42) +> QjWCWWcBXiLAnp77ksfR +> 0 +---- +<1> Insert a new `Person` document into the _marvel_ index under type _characters_. The `id` is generated on server side and set into the instance returned. +<2> Lookup the `Person` with matching `id` in the _marvel_ index under type _characters_. +<3> Delete the `Person` with matching `id`, extracted from the given instance, in the _marvel_ index under type _characters_. +<4> Count the total number of documents in the _marvel_ index under type _characters_. +<5> Don't forget to _subscribe()_. +==== + + diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index b2f0cb2b9..4ed9c82e6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -56,11 +56,13 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.reactivestreams.Publisher; -import org.springframework.dao.DataAccessResourceFailureException; +import org.springframework.data.elasticsearch.ElasticsearchException; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.NoReachableHostException; @@ -249,7 +251,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch */ @Override public Mono delete(HttpHeaders headers, DeleteRequest deleteRequest) { - return sendRequest(deleteRequest, RequestCreator.delete(), DeleteResponse.class, headers).publishNext(); + + return sendRequest(deleteRequest, RequestCreator.delete(), DeleteResponse.class, headers) // + .publishNext(); } /* @@ -264,6 +268,16 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .flatMap(Flux::fromIterable); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest) + */ + public Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) { + + return sendRequest(deleteRequest, RequestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) // + .publishNext(); + } + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback) @@ -364,27 +378,22 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch try { - XContentParser contentParser = createParser(mediaType, content); + Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); + + return Mono.justOrEmpty(responseType + .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content)))); + + } catch (Exception errorParseFailure) { try { + return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content))); + } catch (Exception e) { - Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); return Mono - .justOrEmpty(responseType.cast(ReflectionUtils.invokeMethod(fromXContent, responseType, contentParser))); - - } catch (Exception errorParseFailure) { - try { - return Mono.error(BytesRestResponse.errorFromXContent(contentParser)); - } catch (Exception e) { - // return Mono.error to avoid ElasticsearchStatusException to be caught by outer catch. - return Mono.error(new ElasticsearchStatusException("Unable to parse response body", - RestStatus.fromCode(response.statusCode().value()))); - } + .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value()))); } - - } catch (IOException e) { - return Mono.error(new DataAccessResourceFailureException("Error parsing XContent.", e)); } + } private static XContentParser createParser(String mediaType, String content) throws IOException { @@ -437,6 +446,18 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch static Function delete() { return RequestConverters::delete; } + + static Function deleteByQuery() { + + return request -> { + + try { + return RequestConverters.deleteByQuery(request); + } catch (IOException e) { + throw new ElasticsearchException("Could not parse request", e); + } + }; + } } /** diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index ba9240580..a46e8ff32 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -33,6 +33,8 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.SearchHit; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ElasticsearchHost; @@ -348,6 +350,44 @@ public interface ReactiveElasticsearchClient { */ Flux search(HttpHeaders headers, SearchRequest searchRequest); + /** + * Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API. + * + * @param consumer never {@literal null}. + * @see Delete By + * Query API on elastic.co + * @return a {@link Mono} emitting the emitting operation response. + */ + default Mono deleteBy(Consumer consumer) { + + DeleteByQueryRequest request = new DeleteByQueryRequest(); + consumer.accept(request); + return deleteBy(request); + } + + /** + * Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API. + * + * @param deleteRequest must not be {@literal null}. + * @see Delete By + * Query API on elastic.co + * @return a {@link Mono} emitting the emitting operation response. + */ + default Mono deleteBy(DeleteByQueryRequest deleteRequest) { + return deleteBy(HttpHeaders.EMPTY, deleteRequest); + } + + /** + * Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param deleteRequest must not be {@literal null}. + * @see Delete By + * Query API on elastic.co + * @return a {@link Mono} emitting operation response. + */ + Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest); + /** * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and diff --git a/src/main/java/org/springframework/data/elasticsearch/config/AbstractElasticsearchConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/config/AbstractElasticsearchConfiguration.java new file mode 100644 index 000000000..90c832706 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/config/AbstractElasticsearchConfiguration.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.config; + +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.context.annotation.Bean; +import org.springframework.data.elasticsearch.core.ElasticsearchOperations; +import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; + +/** + * @author Christoph Strobl + * @since 4.0 + * @see ElasticsearchConfigurationSupport + */ +public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport { + + /** + * Return the {@link RestHighLevelClient} instance used to connect to the cluster.
+ * Annotate with {@link Bean} in case you want to expose a {@link RestHighLevelClient} instance to the + * {@link org.springframework.context.ApplicationContext}. + * + * @return never {@literal null}. + */ + public abstract RestHighLevelClient elasticsearchClient(); + + /** + * Creates {@link ElasticsearchOperations}. + * + * @return never {@literal null}. + */ + @Bean + public ElasticsearchOperations elasticsearchOperations() { + return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter()); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/config/AbstractReactiveElasticsearchConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/config/AbstractReactiveElasticsearchConfiguration.java new file mode 100644 index 000000000..9815c5c4a --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/config/AbstractReactiveElasticsearchConfiguration.java @@ -0,0 +1,80 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.config; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; +import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations; +import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate; +import org.springframework.lang.Nullable; + +/** + * @author Christoph Strobl + * @since 4.0 + * @see ElasticsearchConfigurationSupport + */ +@Configuration +public abstract class AbstractReactiveElasticsearchConfiguration extends ElasticsearchConfigurationSupport { + + /** + * Return the {@link ReactiveElasticsearchClient} instance used to connect to the cluster.
+ * Annotate with {@link Bean} in case you want to expose a {@link ReactiveElasticsearchClient} instance to the + * {@link org.springframework.context.ApplicationContext}. + * + * @return never {@literal null}. + */ + public abstract ReactiveElasticsearchClient reactiveElasticsearchClient(); + + /** + * Creates {@link ReactiveElasticsearchOperations}. + * + * @return never {@literal null}. + */ + @Bean + public ReactiveElasticsearchOperations reactiveElasticsearchOperations() { + + ReactiveElasticsearchTemplate template = new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(), + elasticsearchConverter()); + template.setIndicesOptions(indicesOptions()); + template.setRefreshPolicy(refreshPolicy()); + + return template; + } + + /** + * Set up the write {@link RefreshPolicy}. Default is set to {@link RefreshPolicy#IMMEDIATE}. + * + * @return {@literal null} to use the server defaults. + */ + @Nullable + protected RefreshPolicy refreshPolicy() { + return RefreshPolicy.IMMEDIATE; + } + + /** + * Set up the read {@link IndicesOptions}. Default is set to {@link IndicesOptions#strictExpandOpenAndForbidClosed()}. + * + * @return {@literal null} to use the server defaults. + */ + @Nullable + protected IndicesOptions indicesOptions() { + return IndicesOptions.strictExpandOpenAndForbidClosed(); + } + +} diff --git a/src/main/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupport.java b/src/main/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupport.java new file mode 100644 index 000000000..64bf4c0c4 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupport.java @@ -0,0 +1,146 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.config; + +import lombok.SneakyThrows; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.convert.converter.Converter; +import org.springframework.core.type.filter.AnnotationTypeFilter; +import org.springframework.data.annotation.Persistent; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions; +import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; +import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; +import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; + +/** + * @author Christoph Strobl + * @since 4.0 + */ +@Configuration +public class ElasticsearchConfigurationSupport { + + @Bean + public ElasticsearchConverter elasticsearchConverter() { + return new MappingElasticsearchConverter(elasticsearchMappingContext()); + } + + /** + * Creates a {@link SimpleElasticsearchMappingContext} equipped with entity classes scanned from the mapping base + * package. + * + * @see #getMappingBasePackages() + * @return never {@literal null}. + */ + @Bean + @SneakyThrows + public SimpleElasticsearchMappingContext elasticsearchMappingContext() { + + SimpleElasticsearchMappingContext mappingContext = new SimpleElasticsearchMappingContext(); + mappingContext.setInitialEntitySet(getInitialEntitySet()); + mappingContext.setSimpleTypeHolder(customConversions().getSimpleTypeHolder()); + + return mappingContext; + } + + /** + * Register custom {@link Converter}s in a {@link ElasticsearchCustomConversions} object if required. + * + * @return never {@literal null}. + */ + @Bean + public ElasticsearchCustomConversions customConversions() { + return new ElasticsearchCustomConversions(Collections.emptyList()); + } + + /** + * Returns the base packages to scan for Elasticsearch mapped entities at startup. Will return the package name of the + * configuration class' (the concrete class, not this one here) by default. So if you have a + * {@code com.acme.AppConfig} extending {@link ElasticsearchConfigurationSupport} the base package will be considered + * {@code com.acme} unless the method is overridden to implement alternate behavior. + * + * @return the base packages to scan for mapped {@link Document} classes or an empty collection to not enable scanning + * for entities. + */ + protected Collection getMappingBasePackages() { + + Package mappingBasePackage = getClass().getPackage(); + return Collections.singleton(mappingBasePackage == null ? null : mappingBasePackage.getName()); + } + + /** + * Scans the mapping base package for classes annotated with {@link Document}. By default, it scans for entities in + * all packages returned by {@link #getMappingBasePackages()}. + * + * @see #getMappingBasePackages() + * @return never {@literal null}. + * @throws ClassNotFoundException + */ + protected Set> getInitialEntitySet() throws ClassNotFoundException { + + Set> initialEntitySet = new HashSet<>(); + + for (String basePackage : getMappingBasePackages()) { + initialEntitySet.addAll(scanForEntities(basePackage)); + } + + return initialEntitySet; + } + + /** + * Scans the given base package for entities, i.e. Elasticsearch specific types annotated with {@link Document} and + * {@link Persistent}. + * + * @param basePackage must not be {@literal null}. + * @return never {@literal null}. + * @throws ClassNotFoundException + */ + protected Set> scanForEntities(String basePackage) throws ClassNotFoundException { + + if (!StringUtils.hasText(basePackage)) { + return Collections.emptySet(); + } + + Set> initialEntitySet = new HashSet>(); + + if (StringUtils.hasText(basePackage)) { + + ClassPathScanningCandidateComponentProvider componentProvider = new ClassPathScanningCandidateComponentProvider( + false); + componentProvider.addIncludeFilter(new AnnotationTypeFilter(Document.class)); + componentProvider.addIncludeFilter(new AnnotationTypeFilter(Persistent.class)); + + for (BeanDefinition candidate : componentProvider.findCandidateComponents(basePackage)) { + + initialEntitySet.add(ClassUtils.forName(candidate.getBeanClassName(), + AbstractReactiveElasticsearchConfiguration.class.getClassLoader())); + } + } + + return initialEntitySet; + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractResultMapper.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractResultMapper.java index 213a4f096..282c23311 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractResultMapper.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractResultMapper.java @@ -15,11 +15,7 @@ */ package org.springframework.data.elasticsearch.core; -import java.io.IOException; - -import org.springframework.data.elasticsearch.ElasticsearchException; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; /** * @author Artur Konczak @@ -29,21 +25,10 @@ public abstract class AbstractResultMapper implements ResultsMapper { private EntityMapper entityMapper; public AbstractResultMapper(EntityMapper entityMapper) { - - Assert.notNull(entityMapper, "EntityMapper must not be null!"); - - this.entityMapper = entityMapper; - } - public T mapEntity(String source, Class clazz) { - if (StringUtils.isEmpty(source)) { - return null; - } - try { - return entityMapper.mapToObject(source, clazz); - } catch (IOException e) { - throw new ElasticsearchException("failed to map source [ " + source + "] to class " + clazz.getSimpleName(), e); - } + Assert.notNull(entityMapper, "EntityMapper must not be null!"); + + this.entityMapper = entityMapper; } @Override diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslator.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslator.java index 333f62fe5..c0fdf4c2d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslator.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslator.java @@ -16,8 +16,11 @@ package org.springframework.data.elasticsearch.core; +import java.net.ConnectException; + import org.elasticsearch.ElasticsearchException; import org.springframework.dao.DataAccessException; +import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.dao.support.PersistenceExceptionTranslator; /** @@ -31,6 +34,12 @@ public class ElasticsearchExceptionTranslator implements PersistenceExceptionTra if (ex instanceof ElasticsearchException) { // TODO: exception translation + ElasticsearchException elasticsearchExption = (ElasticsearchException) ex; +// elasticsearchExption.get + } + + if(ex.getCause() instanceof ConnectException) { + return new DataAccessResourceFailureException(ex.getMessage(), ex); } return null; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java new file mode 100644 index 000000000..d1339e759 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java @@ -0,0 +1,459 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.elasticsearch.index.query.QueryBuilders; +import org.reactivestreams.Publisher; +import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; +import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.StringQuery; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Interface that specifies a basic set of Elasticsearch operations executed in a reactive way. + *

+ * Implemented by {@link ReactiveElasticsearchTemplate}. Not often used but a useful option for extensibility and + * testability (as it can be easily mocked, stubbed, or be the target of a JDK proxy). Command execution using + * {@link ReactiveElasticsearchOperations} is deferred until a {@link org.reactivestreams.Subscriber} subscribes to the + * {@link Publisher}. + * + * @author Christoph Strobl + * @since 4.0 + */ +public interface ReactiveElasticsearchOperations { + + /** + * Execute within a {@link ClientCallback} managing resources and translating errors. + * + * @param callback must not be {@literal null}. + * @param + * @return the {@link Publisher} emitting results. + */ + Publisher execute(ClientCallback> callback); + + /** + * Index the given entity, once available, extracting index and type from entity metadata. + * + * @param entityPublisher must not be {@literal null}. + * @param + * @return a {@link Mono} emitting the saved entity. + */ + default Mono save(Mono entityPublisher) { + + Assert.notNull(entityPublisher, "EntityPublisher must not be null!"); + return entityPublisher.flatMap(this::save); + } + + /** + * Index the given entity extracting index and type from entity metadata. + * + * @param entity must not be {@literal null}. + * @param + * @return a {@link Mono} emitting the saved entity. + */ + default Mono save(T entity) { + return save(entity, null); + } + + /** + * Index the entity, once available, in the given {@literal index}. If the index is {@literal null} or empty the index + * name provided via entity metadata is used. + * + * @param entityPublisher must not be {@literal null}. + * @param index the name of the target index. Can be {@literal null}. + * @param + * @return a {@link Mono} emitting the saved entity. + */ + default Mono save(Mono entityPublisher, String index) { + + Assert.notNull(entityPublisher, "EntityPublisher must not be null!"); + return entityPublisher.flatMap(it -> save(it, index)); + } + + /** + * Index the entity in the given {@literal index}. If the index is {@literal null} or empty the index name provided + * via entity metadata is used. + * + * @param entity must not be {@literal null}. + * @param index the name of the target index. Can be {@literal null}. + * @param + * @return a {@link Mono} emitting the saved entity. + */ + default Mono save(T entity, @Nullable String index) { + return save(entity, index, null); + } + + /** + * Index the entity, once available, under the given {@literal type} in the given {@literal index}. If the + * {@literal index} is {@literal null} or empty the index name provided via entity metadata is used. Same for the + * {@literal type}. + * + * @param entityPublisher must not be {@literal null}. + * @param index the name of the target index. Can be {@literal null}. + * @param type the name of the type within the index. Can be {@literal null}. + * @param + * @return a {@link Mono} emitting the saved entity. + */ + default Mono save(Mono entityPublisher, @Nullable String index, @Nullable String type) { + + Assert.notNull(entityPublisher, "EntityPublisher must not be null!"); + return entityPublisher.flatMap(it -> save(it, index, type)); + } + + /** + * Index the entity under the given {@literal type} in the given {@literal index}. If the {@literal index} is + * {@literal null} or empty the index name provided via entity metadata is used. Same for the {@literal type}. + * + * @param entity must not be {@literal null}. + * @param index the name of the target index. Can be {@literal null}. + * @param type the name of the type within the index. Can be {@literal null}. + * @param + * @return a {@link Mono} emitting the saved entity. + */ + Mono save(T entity, @Nullable String index, @Nullable String type); + + /** + * Find the document with the given {@literal id} mapped onto the given {@literal entityType}. + * + * @param id the {@literal _id} of the document to fetch. + * @param entityType the domain type used for mapping the document. + * @param + * @return {@link Mono#empty()} if not found. + */ + default Mono findById(String id, Class entityType) { + return findById(id, entityType, null); + } + + /** + * Fetch the entity with given {@literal id}. + * + * @param id the {@literal _id} of the document to fetch. + * @param entityType the domain type used for mapping the document. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param + * @return {@link Mono#empty()} if not found. + */ + default Mono findById(String id, Class entityType, @Nullable String index) { + return findById(id, entityType, index, null); + } + + /** + * Fetch the entity with given {@literal id}. + * + * @param id must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param + * @return the {@link Mono} emitting the entity or signalling completion if none found. + */ + Mono findById(String id, Class entityType, @Nullable String index, @Nullable String type); + + /** + * Check if an entity with given {@literal id} exists. + * + * @param id the {@literal _id} of the document to look for. + * @param entityType the domain type used. + * @return a {@link Mono} emitting {@literal true} if a matching document exists, {@literal false} otherwise. + */ + default Mono exists(String id, Class entityType) { + return exists(id, entityType, null); + } + + /** + * Check if an entity with given {@literal id} exists. + * + * @param id the {@literal _id} of the document to look for. + * @param entityType the domain type used. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting {@literal true} if a matching document exists, {@literal false} otherwise. + */ + default Mono exists(String id, Class entityType, @Nullable String index) { + return exists(id, entityType, index, null); + } + + /** + * Check if an entity with given {@literal id} exists. + * + * @param id the {@literal _id} of the document to look for. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting {@literal true} if a matching document exists, {@literal false} otherwise. + */ + Mono exists(String id, Class entityType, @Nullable String index, @Nullable String type); + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param + * @return a {@link Flux} emitting matching entities one by one. + */ + default Flux find(Query query, Class entityType) { + return find(query, entityType, entityType); + } + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param query must not be {@literal null}. + * @param entityType The entity type for mapping the query. Must not be {@literal null}. + * @param returnType The mapping target type. Must not be {@literal null}. Th + * @param + * @return a {@link Flux} emitting matching entities one by one. + */ + default Flux find(Query query, Class entityType, Class returnType) { + return find(query, entityType, null, null, returnType); + } + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param + * @return a {@link Flux} emitting matching entities one by one. + */ + default Flux find(Query query, Class entityType, @Nullable String index) { + return find(query, entityType, index, null); + } + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param + * @returnm a {@link Flux} emitting matching entities one by one. + */ + default Flux find(Query query, Class entityType, @Nullable String index, @Nullable String type) { + return find(query, entityType, index, type, entityType); + } + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param resultType the projection result type. + * @param + * @return a {@link Flux} emitting matching entities one by one. + */ + Flux find(Query query, Class entityType, @Nullable String index, @Nullable String type, + Class resultType); + + /** + * Count the number of documents matching the given {@link Query}. + * + * @param entityType must not be {@literal null}. + * @return a {@link Mono} emitting the nr of matching documents. + */ + default Mono count(Class entityType) { + return count(new StringQuery(QueryBuilders.matchAllQuery().toString()), entityType, null); + } + + /** + * Count the number of documents matching the given {@link Query}. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @return a {@link Mono} emitting the nr of matching documents. + */ + default Mono count(Query query, Class entityType) { + return count(query, entityType, null); + } + + /** + * Count the number of documents matching the given {@link Query}. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting the nr of matching documents. + */ + default Mono count(Query query, Class entityType, @Nullable String index) { + return count(query, entityType, index, null); + } + + /** + * Count the number of documents matching the given {@link Query}. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting the nr of matching documents. + */ + Mono count(Query query, Class entityType, @Nullable String index, @Nullable String type); + + /** + * Delete the given entity extracting index and type from entity metadata. + * + * @param entity must not be {@literal null}. + * @return a {@link Mono} emitting the {@literal id} of the removed document. + */ + default Mono delete(Object entity) { + return delete(entity, null); + } + + /** + * Delete the given entity extracting index and type from entity metadata. + * + * @param entity must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting the {@literal id} of the removed document. + */ + default Mono delete(Object entity, @Nullable String index) { + return delete(entity, index, null); + } + + /** + * Delete the given entity extracting index and type from entity metadata. + * + * @param entity must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting the {@literal id} of the removed document. + */ + Mono delete(Object entity, @Nullable String index, @Nullable String type); + + /** + * Delete the entity with given {@literal id}. + * + * @param id must not be {@literal null}. + * @param index the name of the target index. + * @param type the name of the target type. + * @return a {@link Mono} emitting the {@literal id} of the removed document. + */ + default Mono deleteById(String id, String index, String type) { + + Assert.notNull(index, "Index must not be null!"); + Assert.notNull(type, "Type must not be null!"); + + return deleteById(id, Object.class, index, type); + } + + /** + * Delete the entity with given {@literal id} extracting index and type from entity metadata. + * + * @param id must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @return a {@link Mono} emitting the {@literal id} of the removed document. + */ + default Mono deleteById(String id, Class entityType) { + return deleteById(id, entityType, null); + } + + /** + * Delete the entity with given {@literal id} extracting index and type from entity metadata. + * + * @param id must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting the {@literal id} of the removed document. + */ + default Mono deleteById(String id, Class entityType, @Nullable String index) { + return deleteById(id, entityType, index, null); + } + + /** + * Delete the entity with given {@literal id} extracting index and type from entity metadata. + * + * @param id must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting the {@literal id} of the removed document. + */ + Mono deleteById(String id, Class entityType, @Nullable String index, @Nullable String type); + + /** + * Delete the documents matching the given {@link Query} extracting index and type from entity metadata. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @return a {@link Mono} emitting the number of the removed documents. + */ + default Mono deleteBy(Query query, Class entityType) { + return deleteBy(query, entityType, null); + } + + /** + * Delete the documents matching the given {@link Query} extracting index and type from entity metadata. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting the number of the removed documents. + */ + default Mono deleteBy(Query query, Class entityType, @Nullable String index) { + return deleteBy(query, entityType, index, null); + } + + /** + * Delete the documents matching the given {@link Query} extracting index and type from entity metadata. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not + * {@literal null}. + * @return a {@link Mono} emitting the number of the removed documents. + */ + Mono deleteBy(Query query, Class entityType, @Nullable String index, @Nullable String type); + + /** + * Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on + * {@link ReactiveElasticsearchClient}. + * + * @param + * @author Christoph Strobl + * @since 4.0 + */ + interface ClientCallback> { + + T doWithClient(ReactiveElasticsearchClient client); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 9833f7ec1..dffa36edb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -17,7 +17,6 @@ package org.springframework.data.elasticsearch.core; import static org.elasticsearch.index.VersionType.*; -import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -25,16 +24,25 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.client.Requests; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.WrapperQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilders; @@ -45,25 +53,36 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsea import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; +import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; +import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.StringQuery; +import org.springframework.data.mapping.IdentifierAccessor; +import org.springframework.data.mapping.PersistentProperty; import org.springframework.data.mapping.PersistentPropertyAccessor; +import org.springframework.data.util.ClassTypeInformation; +import org.springframework.http.HttpStatus; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; -import org.springframework.web.client.HttpClientErrorException; /** * @author Christoph Strobl * @since 4.0 */ -public class ReactiveElasticsearchTemplate { +public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations { private final ReactiveElasticsearchClient client; private final ElasticsearchConverter converter; - private final DefaultResultMapper mapper; + private final ResultsMapper resultMapper; private final ElasticsearchExceptionTranslator exceptionTranslator; + private @Nullable RefreshPolicy refreshPolicy = RefreshPolicy.IMMEDIATE; + private @Nullable IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); + public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client) { this(client, new MappingElasticsearchConverter(new SimpleElasticsearchMappingContext())); } @@ -72,225 +91,471 @@ public class ReactiveElasticsearchTemplate { this.client = client; this.converter = converter; - this.mapper = new DefaultResultMapper(converter.getMappingContext()); + this.resultMapper = new DefaultResultMapper(converter.getMappingContext()); this.exceptionTranslator = new ElasticsearchExceptionTranslator(); } - public Mono index(T entity) { - return index(entity, null); - } - - public Mono index(T entity, String index) { - return index(entity, index, null); - } - - /** - * Add the given entity to the index. - * - * @param entity - * @param index - * @param type - * @param - * @return + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#exctute(ClientCallback) */ - public Mono index(T entity, String index, String type) { + @Override + public Publisher execute(ClientCallback> callback) { + return Flux.defer(() -> callback.doWithClient(getClient())).onErrorMap(this::translateException); + } - ElasticsearchPersistentEntity persistentEntity = lookupPersistentEntity(entity.getClass()); - return doIndex(entity, persistentEntity, index, type) // + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#index(Object, String, String) + */ + @Override + public Mono save(T entity, @Nullable String index, @Nullable String type) { + + Assert.notNull(entity, "Entity must not be null!"); + + AdaptableEntity adaptableEntity = ConverterAwareAdaptableEntity.of(entity, converter); + + return doIndex(entity, adaptableEntity, index, type) // .map(it -> { - - // TODO: update id if necessary! - // it.getId() - // it.getVersion() - - return entity; + return adaptableEntity.updateIdIfNecessary(it.getId()); }); } - public Mono get(String id, Class resultType) { - return get(id, resultType, null); - } + private Mono doIndex(Object value, AdaptableEntity entity, @Nullable String index, + @Nullable String type) { - public Mono get(String id, Class resultType, @Nullable String index) { - return get(id, resultType, index, null); - } + return Mono.defer(() -> { - /** - * Fetch the entity with given id. - * - * @param id must not be {@literal null}. - * @param resultType must not be {@literal null}. - * @param index - * @param type - * @param - * @return the {@link Mono} emitting the entity or signalling completion if none found. - */ - public Mono get(String id, Class resultType, @Nullable String index, @Nullable String type) { + Object id = entity.getId(); - ElasticsearchPersistentEntity persistentEntity = lookupPersistentEntity(resultType); - GetRequest request = new GetRequest(persistentEntity.getIndexName(), persistentEntity.getIndexType(), id); + String indexToUse = indexName(index, entity); + String typeToUse = typeName(type, entity); - return doGet(id, persistentEntity, index, type).map(it -> mapper.mapEntity(it.sourceAsString(), resultType)); - } + IndexRequest request = id != null ? new IndexRequest(indexToUse, typeToUse, id.toString()) + : new IndexRequest(indexToUse, typeToUse); - /** - * Search the index for entities matching the given {@link CriteriaQuery query}. - * - * @param query must not be {@literal null}. - * @param resultType must not be {@literal null}. - * @param - * @return - */ - public Flux query(CriteriaQuery query, Class resultType) { - - ElasticsearchPersistentEntity entity = lookupPersistentEntity(resultType); - - SearchRequest request = new SearchRequest(indices(query, entity)); - request.types(indexTypes(query, entity)); - - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(mappedQuery(query, entity)); - // TODO: request.source().postFilter(elasticsearchFilter); -- filter query - - searchSourceBuilder.version(entity.hasVersionProperty()); // This has been true by default before - searchSourceBuilder.trackScores(query.getTrackScores()); - - if (query.getSourceFilter() != null) { - searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); - } - - if (query.getPageable().isPaged()) { - - long offset = query.getPageable().getOffset(); - if (offset > Integer.MAX_VALUE) { - throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE)); + try { + request.source(resultMapper.getEntityMapper().mapToString(value), Requests.INDEX_CONTENT_TYPE); + } catch (IOException e) { + throw new RuntimeException(e); } - searchSourceBuilder.from((int) offset); - searchSourceBuilder.size(query.getPageable().getPageSize()); - } + if (entity.isVersioned()) { - if (query.getIndicesOptions() != null) { - request.indicesOptions(query.getIndicesOptions()); - } + Object version = entity.getVersion(); + if (version != null) { + request.version(((Number) version).longValue()); + request.versionType(EXTERNAL); + } + } - sort(query, entity).forEach(searchSourceBuilder::sort); + if (entity.getPersistentEntity().getParentIdProperty() != null) { - if (query.getMinScore() > 0) { - searchSourceBuilder.minScore(query.getMinScore()); - } - request.source(searchSourceBuilder); + Object parentId = entity.getPropertyValue(entity.getPersistentEntity().getParentIdProperty()); + if (parentId != null) { + request.parent(parentId.toString()); + } + } - return Flux.from( - execute(client -> client.search(request).map(it -> mapper.mapEntity(it.getSourceAsString(), resultType)))); + request = prepareIndexRequest(value, request); + return doIndex(request); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#findById(String, Class, String, String) + */ + @Override + public Mono findById(String id, Class entityType, @Nullable String index, @Nullable String type) { + + Assert.notNull(id, "Id must not be null!"); + + return doFindById(id, BasicElasticsearchEntity.of(entityType, converter), index, type) + .map(it -> resultMapper.mapEntity(it, entityType)); + } + + private Mono doFindById(String id, ElasticsearchEntity entity, @Nullable String index, + @Nullable String type) { + + return Mono.defer(() -> { + + String indexToUse = indexName(index, entity); + String typeToUse = typeName(type, entity); + + return doFindById(new GetRequest(indexToUse, typeToUse, id)); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#exists(String, Class, String, String) + */ + @Override + public Mono exists(String id, Class entityType, String index, String type) { + + Assert.notNull(id, "Id must not be null!"); + + return doExists(id, BasicElasticsearchEntity.of(entityType, converter), index, type); + } + + private Mono doExists(String id, ElasticsearchEntity entity, @Nullable String index, + @Nullable String type) { + + return Mono.defer(() -> { + + String indexToUse = indexName(index, entity); + String typeToUse = typeName(type, entity); + + return doExists(new GetRequest(indexToUse, typeToUse, id)); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#find(Query, Class, String, String, Class) + */ + @Override + public Flux find(Query query, Class entityType, @Nullable String index, @Nullable String type, + Class resultType) { + + return doFind(query, BasicElasticsearchEntity.of(entityType, converter), index, type) + .map(it -> resultMapper.mapEntity(it, resultType)); + } + + private Flux doFind(Query query, ElasticsearchEntity entity, @Nullable String index, + @Nullable String type) { + + return Flux.defer(() -> { + + SearchRequest request = new SearchRequest(indices(query, () -> indexName(index, entity))); + request.types(indexTypes(query, () -> typeName(type, entity))); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(mappedQuery(query, entity.getPersistentEntity())); + + // TODO: request.source().postFilter(elasticsearchFilter); -- filter query + + searchSourceBuilder.version(entity.isVersioned()); // This has been true by default before + searchSourceBuilder.trackScores(query.getTrackScores()); + + if (query.getSourceFilter() != null) { + searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); + } + + if (query.getPageable().isPaged()) { + + long offset = query.getPageable().getOffset(); + if (offset > Integer.MAX_VALUE) { + throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE)); + } + + searchSourceBuilder.from((int) offset); + searchSourceBuilder.size(query.getPageable().getPageSize()); + } + + if (query.getIndicesOptions() != null) { + request.indicesOptions(query.getIndicesOptions()); + } + + sort(query, entity.getPersistentEntity()).forEach(searchSourceBuilder::sort); + + if (query.getMinScore() > 0) { + searchSourceBuilder.minScore(query.getMinScore()); + } + + request.source(searchSourceBuilder); + + request = prepareSearchRequest(request); + + return doFind(request); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#count(Query, Class, String, String) + */ + @Override + public Mono count(Query query, Class entityType, String index, String type) { + + // TODO: ES 7.0 has a dedicated CountRequest - use that one once available. + return find(query, entityType, index, type).count(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#delete(Object, String, String) + */ + @Override + public Mono delete(Object entity, @Nullable String index, @Nullable String type) { + + AdaptableEntity elasticsearchEntity = ConverterAwareAdaptableEntity.of(entity, converter); + + return Mono.defer(() -> doDeleteById(entity, ObjectUtils.nullSafeToString(elasticsearchEntity.getId()), + elasticsearchEntity, index, type)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#delete(String, Class, String, String) + */ + @Override + public Mono deleteById(String id, Class entityType, @Nullable String index, @Nullable String type) { + + Assert.notNull(id, "Id must not be null!"); + + return doDeleteById(null, id, BasicElasticsearchEntity.of(entityType, converter), index, type); + + } + + private Mono doDeleteById(@Nullable Object source, String id, ElasticsearchEntity entity, + @Nullable String index, @Nullable String type) { + + return Mono.defer(() -> { + + String indexToUse = indexName(index, entity); + String typeToUse = typeName(type, entity); + + return doDelete(prepareDeleteRequest(source, new DeleteRequest(indexToUse, typeToUse, id))); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#deleteBy(Query, Class, String, String) + */ + @Override + public Mono deleteBy(Query query, Class entityType, String index, String type) { + + Assert.notNull(query, "Query must not be null!"); + + return doDeleteBy(query, BasicElasticsearchEntity.of(entityType, converter), index, type) + .map(BulkByScrollResponse::getDeleted).publishNext(); + } + + private Flux doDeleteBy(Query query, ElasticsearchEntity entity, @Nullable String index, + @Nullable String type) { + + return Flux.defer(() -> { + + DeleteByQueryRequest request = new DeleteByQueryRequest(indices(query, () -> indexName(index, entity))); + request.types(indexTypes(query, () -> typeName(type, entity))); + request.setQuery(mappedQuery(query, entity.getPersistentEntity())); + + return doDeleteBy(prepareDeleteByRequest(request)); + }); + } + + // Property Setters / Getters + + /** + * Set the default {@link RefreshPolicy} to apply when writing to Elasticsearch. + * + * @param refreshPolicy can be {@literal null}. + */ + public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; } /** - * Execute within a {@link ClientCallback} managing resources and translating errors. + * Set the default {@link IndicesOptions} for {@link SearchRequest search requests}. * - * @param callback must not be {@literal null}. - * @param - * @return the {@link Publisher} emitting results. + * @param indicesOptions can be {@literal null}. */ - public Publisher execute(ClientCallback> callback) { - return Flux.from(callback.doWithClient(this.client)).onErrorMap(this::translateException); + public void setIndicesOptions(@Nullable IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; } // Customization Hooks - protected Mono doGet(String id, ElasticsearchPersistentEntity entity, @Nullable String index, - @Nullable String type) { - - String indexToUse = indexName(index, entity); - String typeToUse = typeName(type, entity); - - return doGet(new GetRequest(indexToUse, typeToUse, id)); + /** + * Obtain the {@link ReactiveElasticsearchClient} to operate upon. + * + * @return never {@literal null}. + */ + protected ReactiveElasticsearchClient getClient() { + return this.client; } - protected Mono doGet(GetRequest request) { + /** + * Pre process the write request before it is sent to the server, eg. by setting the + * {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable. + * + * @param request must not be {@literal null}. + * @param + * @return the processed {@link WriteRequest}. + */ + protected > R prepareWriteRequest(R request) { - return Mono.from(execute(client -> client.get(request))) // - .onErrorResume((it) -> { + if (refreshPolicy == null) { + return request; + } - if (it instanceof HttpClientErrorException) { - return ((HttpClientErrorException) it).getRawStatusCode() == 404; - } - return false; - - }, (it) -> Mono.empty()); + return request.setRefreshPolicy(refreshPolicy); } - protected Mono doIndex(Object value, ElasticsearchPersistentEntity entity, @Nullable String index, - @Nullable String type) { - - PersistentPropertyAccessor propertyAccessor = entity.getPropertyAccessor(value); - Object id = propertyAccessor.getProperty(entity.getIdProperty()); - - String indexToUse = indexName(index, entity); - String typeToUse = typeName(type, entity); - - IndexRequest request = id != null ? new IndexRequest(indexToUse, typeToUse, id.toString()) - : new IndexRequest(indexToUse, typeToUse); - - try { - request.source(mapper.getEntityMapper().mapToString(value), Requests.INDEX_CONTENT_TYPE); - } catch (IOException e) { - throw new RuntimeException(e); - } - - if (entity.hasVersionProperty()) { - - Object version = propertyAccessor.getProperty(entity.getVersionProperty()); - if (version != null) { - request.version(((Number) version).longValue()); - request.versionType(EXTERNAL); - } - } - - if (entity.getParentIdProperty() != null) { - - Object parentId = propertyAccessor.getProperty(entity.getParentIdProperty()); - if (parentId != null) { - request.parent(parentId.toString()); - } - } - - return doIndex(request.setRefreshPolicy(RefreshPolicy.IMMEDIATE)); + /** + * Customization hook to modify a generated {@link IndexRequest} prior to its execution. Eg. by setting the + * {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable. + * + * @param source the source object the {@link IndexRequest} was derived from. + * @param request the generated {@link IndexRequest}. + * @return never {@literal null}. + */ + protected IndexRequest prepareIndexRequest(Object source, IndexRequest request) { + return prepareWriteRequest(request); } + /** + * Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the + * {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable. + * + * @param request the generated {@link SearchRequest}. + * @return never {@literal null}. + */ + protected SearchRequest prepareSearchRequest(SearchRequest request) { + + if (indicesOptions == null) { + return request; + } + + return request.indicesOptions(indicesOptions); + } + + /** + * Customization hook to modify a generated {@link DeleteRequest} prior to its execution. Eg. by setting the + * {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable. + * + * @param source the source object the {@link DeleteRequest} was derived from. My be {@literal null} if using the + * {@literal id} directly. + * @param request the generated {@link DeleteRequest}. + * @return never {@literal null}. + */ + protected DeleteRequest prepareDeleteRequest(@Nullable Object source, DeleteRequest request) { + return prepareWriteRequest(request); + } + + /** + * Customization hook to modify a generated {@link DeleteByQueryRequest} prior to its execution. Eg. by setting the + * {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable. + * + * @param request the generated {@link DeleteByQueryRequest}. + * @return never {@literal null}. + */ + protected DeleteByQueryRequest prepareDeleteByRequest(DeleteByQueryRequest request) { + + if (refreshPolicy != null && !RefreshPolicy.NONE.equals(refreshPolicy)) { + request = request.setRefresh(true); + } + + if (indicesOptions != null) { + request = request.setIndicesOptions(indicesOptions); + } + + return request; + } + + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * You know what you're doing here? Well fair enough, go ahead on your own risk. + * + * @param request the already prepared {@link IndexRequest} ready to be executed. + * @return a {@link Mono} emitting the result of the operation. + */ protected Mono doIndex(IndexRequest request) { return Mono.from(execute(client -> client.index(request))); } + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * + * @param request the already prepared {@link GetRequest} ready to be executed. + * @return a {@link Mono} emitting the result of the operation. + */ + protected Mono doFindById(GetRequest request) { + return Mono.from(execute(client -> client.get(request))); + } + + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * + * @param request the already prepared {@link GetRequest} ready to be executed. + * @return a {@link Mono} emitting the result of the operation. + */ + protected Mono doExists(GetRequest request) { + return Mono.from(execute(client -> client.exists(request))); + } + + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * + * @param request the already prepared {@link SearchRequest} ready to be executed. + * @return a {@link Flux} emitting the result of the operation. + */ + protected Flux doFind(SearchRequest request) { + return Flux.from(execute(client -> client.search(request))); + } + + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * + * @param request the already prepared {@link DeleteRequest} ready to be executed. + * @return a {@link Mono} emitting the result of the operation. + */ + protected Mono doDelete(DeleteRequest request) { + + return Mono.from(execute(client -> client.delete(request))) // + + .flatMap(it -> { + + if (HttpStatus.valueOf(it.status().getStatus()).equals(HttpStatus.NOT_FOUND)) { + return Mono.empty(); + } + + return Mono.just(it.getId()); + }); + } + + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * + * @param request the already prepared {@link DeleteByQueryRequest} ready to be executed. + * @return a {@link Mono} emitting the result of the operation. + */ + protected Mono doDeleteBy(DeleteByQueryRequest request) { + return Mono.from(execute(client -> client.deleteBy(request))); + } + // private helpers - private static String indexName(@Nullable String index, ElasticsearchPersistentEntity entity) { + private static String indexName(@Nullable String index, ElasticsearchEntity entity) { return StringUtils.isEmpty(index) ? entity.getIndexName() : index; } - private static String typeName(@Nullable String type, ElasticsearchPersistentEntity entity) { - return StringUtils.isEmpty(type) ? entity.getIndexType() : type; + private static String typeName(@Nullable String type, ElasticsearchEntity entity) { + return StringUtils.isEmpty(type) ? entity.getTypeName() : type; } - private static String[] indices(CriteriaQuery query, ElasticsearchPersistentEntity entity) { + private static String[] indices(Query query, Supplier index) { if (query.getIndices().isEmpty()) { - return new String[] { entity.getIndexName() }; + return new String[] { index.get() }; } return query.getIndices().toArray(new String[0]); } - private static String[] indexTypes(CriteriaQuery query, ElasticsearchPersistentEntity entity) { + private static String[] indexTypes(Query query, Supplier indexType) { if (query.getTypes().isEmpty()) { - return new String[] { entity.getIndexType() }; + return new String[] { indexType.get() }; } return query.getTypes().toArray(new String[0]); } - private List sort(Query query, ElasticsearchPersistentEntity entity) { + private static List sort(Query query, ElasticsearchPersistentEntity entity) { if (query.getSort() == null || query.getSort().isUnsorted()) { return Collections.emptyList(); @@ -317,10 +582,20 @@ public class ReactiveElasticsearchTemplate { return mappedSort; } - private QueryBuilder mappedQuery(CriteriaQuery query, ElasticsearchPersistentEntity entity) { + private QueryBuilder mappedQuery(Query query, ElasticsearchPersistentEntity entity) { // TODO: we need to actually map the fields to the according field names! - QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(query.getCriteria()); + + QueryBuilder elasticsearchQuery = null; + + if (query instanceof CriteriaQuery) { + elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(((CriteriaQuery) query).getCriteria()); + } else if (query instanceof StringQuery) { + elasticsearchQuery = new WrapperQueryBuilder(((StringQuery) query).getSource()); + } else { + throw new IllegalArgumentException(String.format("Unknown query type '%s'.", query.getClass())); + } + return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery(); } @@ -336,17 +611,166 @@ public class ReactiveElasticsearchTemplate { private Throwable translateException(Throwable throwable) { - if (!(throwable instanceof RuntimeException)) { - return throwable; + RuntimeException exception = throwable instanceof RuntimeException ? (RuntimeException) throwable + : new RuntimeException(throwable.getMessage(), throwable); + RuntimeException potentiallyTranslatedException = exceptionTranslator.translateExceptionIfPossible(exception); + + return potentiallyTranslatedException != null ? potentiallyTranslatedException : throwable; + } + + /** + * @param + * @author Christoph Strobl + * @since 4.0 + */ + protected interface ElasticsearchEntity { + + default boolean isIdentifiable() { + return getPersistentEntity().hasVersionProperty(); } - RuntimeException ex = exceptionTranslator.translateExceptionIfPossible((RuntimeException) throwable); - return ex != null ? ex : throwable; + default boolean isVersioned() { + return getPersistentEntity().hasVersionProperty(); + } + + default ElasticsearchPersistentProperty getIdProperty() { + return getPersistentEntity().getIdProperty(); + } + + default String getIndexName() { + return getPersistentEntity().getIndexName(); + } + + default String getTypeName() { + return getPersistentEntity().getIndexType(); + } + + ElasticsearchPersistentEntity getPersistentEntity(); } - // Additional types - public interface ClientCallback> { + protected interface AdaptableEntity extends ElasticsearchEntity { + + PersistentPropertyAccessor getPropertyAccessor(); + + IdentifierAccessor getIdentifierAccessor(); + + @Nullable + default Object getId() { + return getIdentifierAccessor().getIdentifier(); + } + + default Object getVersion() { + return getPropertyAccessor().getProperty(getPersistentEntity().getRequiredVersionProperty()); + } + + @Nullable + default Object getPropertyValue(PersistentProperty property) { + return getPropertyAccessor().getProperty(property); + } + + default T getBean() { + return getPropertyAccessor().getBean(); + } + + default T updateIdIfNecessary(Object id) { + + if (id == null || !getPersistentEntity().hasIdProperty() || getId() != null) { + return getPropertyAccessor().getBean(); + } + + return updatePropertyValue(getPersistentEntity().getIdProperty(), id); + } + + default T updatePropertyValue(PersistentProperty property, @Nullable Object value) { + + getPropertyAccessor().setProperty(property, value); + return getPropertyAccessor().getBean(); + } + + } + + protected static class BasicElasticsearchEntity implements ElasticsearchEntity { + + final ElasticsearchPersistentEntity entity; + + BasicElasticsearchEntity(ElasticsearchPersistentEntity entity) { + this.entity = entity; + } + + static BasicElasticsearchEntity of(T bean, ElasticsearchConverter converter) { + return new BasicElasticsearchEntity<>(converter.getMappingContext().getRequiredPersistentEntity(bean.getClass())); + } + + static BasicElasticsearchEntity of(Class type, ElasticsearchConverter converter) { + + if (Object.class.equals(type)) { + return new BasicElasticsearchEntity<>(new SimpleElasticsearchPersistentEntity<>(ClassTypeInformation.OBJECT)); + } + + return new BasicElasticsearchEntity<>(converter.getMappingContext().getRequiredPersistentEntity(type)); + } + + @Override + public ElasticsearchPersistentEntity getPersistentEntity() { + return entity; + } + } + + protected static class ConverterAwareAdaptableEntity implements AdaptableEntity { + + final ElasticsearchPersistentEntity entity; + final PersistentPropertyAccessor propertyAccessor; + final IdentifierAccessor idAccessor; + final ElasticsearchConverter converter; + + ConverterAwareAdaptableEntity(ElasticsearchPersistentEntity entity, IdentifierAccessor idAccessor, + PersistentPropertyAccessor propertyAccessor, ElasticsearchConverter converter) { + + this.entity = entity; + this.propertyAccessor = propertyAccessor; + this.idAccessor = idAccessor; + this.converter = converter; + } + + static ConverterAwareAdaptableEntity of(T bean, ElasticsearchConverter converter) { + + ElasticsearchPersistentEntity entity = converter.getMappingContext() + .getRequiredPersistentEntity(bean.getClass()); + IdentifierAccessor idAccessor = entity.getIdentifierAccessor(bean); + PersistentPropertyAccessor propertyAccessor = entity.getPropertyAccessor(bean); + + return new ConverterAwareAdaptableEntity<>(entity, idAccessor, propertyAccessor, converter); + } + + @Override + public PersistentPropertyAccessor getPropertyAccessor() { + return propertyAccessor; + } + + @Override + public IdentifierAccessor getIdentifierAccessor() { + + if (entity.getTypeInformation().isMap()) { + + return () -> { + + Object id = idAccessor.getIdentifier(); + if (id != null) { + return id; + } + + Map source = (Map) propertyAccessor.getBean(); + return source.get("id"); + }; + } + + return idAccessor; + } + + @Override + public ElasticsearchPersistentEntity getPersistentEntity() { + return entity; + } - T doWithClient(ReactiveElasticsearchClient client); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ResultsMapper.java b/src/main/java/org/springframework/data/elasticsearch/core/ResultsMapper.java index 41db0f7e7..589759502 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ResultsMapper.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ResultsMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2014 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,15 +15,86 @@ */ package org.springframework.data.elasticsearch.core; +import java.io.IOException; + +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.search.SearchHit; +import org.springframework.data.elasticsearch.ElasticsearchException; +import org.springframework.lang.Nullable; +import org.springframework.util.StringUtils; + /** * ResultsMapper * * @author Rizwan Idrees * @author Mohsin Husen * @author Artur Konczak + * @author Christoph Strobl */ - public interface ResultsMapper extends SearchResultMapper, GetResultMapper, MultiGetResultMapper { EntityMapper getEntityMapper(); + + @Nullable + default T mapEntity(String source, Class clazz) { + + if (StringUtils.isEmpty(source)) { + return null; + } + try { + return getEntityMapper().mapToObject(source, clazz); + } catch (IOException e) { + throw new ElasticsearchException("failed to map source [ " + source + "] to class " + clazz.getSimpleName(), e); + } + } + + /** + * Map a single {@link GetResult} to an instance of the given type. + * + * @param getResult must not be {@literal null}. + * @param type must not be {@literal null}. + * @param + * @return can be {@literal null} if the {@link GetResult#isSourceEmpty() is empty}. + * @since 4.0 + */ + @Nullable + default T mapEntity(GetResult getResult, Class type) { + + if (getResult.isSourceEmpty()) { + return null; + } + + String sourceString = getResult.sourceAsString(); + + if (sourceString.startsWith("{\"id\":null,")) { + sourceString = sourceString.replaceFirst("\"id\":null", "\"id\":\"" + getResult.getId() + "\""); + } + + return mapEntity(sourceString, type); + } + + /** + * Map a single {@link SearchHit} to an instance of the given type. + * + * @param searchHit must not be {@literal null}. + * @param type must not be {@literal null}. + * @param + * @return can be {@literal null} if the {@link SearchHit} does not have {@link SearchHit#hasSource() a source}. + * @since 4.0 + */ + @Nullable + default T mapEntity(SearchHit searchHit, Class type) { + + if (!searchHit.hasSource()) { + return null; + } + + String sourceString = searchHit.getSourceAsString(); + + if (sourceString.startsWith("{\"id\":null,")) { + sourceString = sourceString.replaceFirst("\"id\":null", "\"id\":\"" + searchHit.getId() + "\""); + } + + return mapEntity(sourceString, type); + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/convert/ElasticsearchCustomConversions.java b/src/main/java/org/springframework/data/elasticsearch/core/convert/ElasticsearchCustomConversions.java new file mode 100644 index 000000000..f5a0faeb8 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/convert/ElasticsearchCustomConversions.java @@ -0,0 +1,36 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.convert; + +import java.util.Collection; + +import org.springframework.data.convert.CustomConversions; + +/** + * @author Christoph Strobl + * @since 4.0 + */ +public class ElasticsearchCustomConversions extends CustomConversions { + + /** + * Creates a new {@link CustomConversions} instance registering the given converters. + * + * @param converters must not be {@literal null}. + */ + public ElasticsearchCustomConversions(Collection converters) { + super(StoreConversions.NONE, converters); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/GetQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/GetQuery.java index c5409d5b3..322ccaf67 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/GetQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/GetQuery.java @@ -32,4 +32,11 @@ public class GetQuery { public void setId(String id) { this.id = id; } + + public static GetQuery getById(String id) { + + GetQuery query = new GetQuery(); + query.setId(id); + return query; + } } diff --git a/src/test/java/org/springframework/data/elasticsearch/ElasticsearchVersion.java b/src/test/java/org/springframework/data/elasticsearch/ElasticsearchVersion.java new file mode 100644 index 000000000..6675e6027 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/ElasticsearchVersion.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author Christoph Strobl + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +@Documented +public @interface ElasticsearchVersion { + + /** + * Inclusive lower bound of Elasticsearch server range. + * + * @return {@code 0.0.0} by default. + */ + String asOf() default "0.0.0"; + + /** + * Exclusive upper bound of Elasticsearch server range. + * + * @return {@code 9999.9999.9999} by default. + */ + String until() default "9999.9999.9999"; +} diff --git a/src/test/java/org/springframework/data/elasticsearch/ElasticsearchVersionRule.java b/src/test/java/org/springframework/data/elasticsearch/ElasticsearchVersionRule.java new file mode 100644 index 000000000..19c6f179e --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/ElasticsearchVersionRule.java @@ -0,0 +1,131 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch; + +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.AssumptionViolatedException; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.util.Version; + +/** + * @author Christoph Strobl + */ +public class ElasticsearchVersionRule implements TestRule { + + private static final Logger logger = LoggerFactory.getLogger(ElasticsearchVersionRule.class); + + private static final Version ANY = new Version(9999, 9999, 9999); + private static final Version DEFAULT_HIGH = ANY; + private static final Version DEFAULT_LOW = new Version(0, 0, 0); + + private final static AtomicReference currentVersion = new AtomicReference<>(null); + private final Version minVersion; + private final Version maxVersion; + + public ElasticsearchVersionRule(Version min, Version max) { + + this.minVersion = min; + this.maxVersion = max; + } + + public static ElasticsearchVersionRule any() { + return new ElasticsearchVersionRule(ANY, ANY); + } + + public static ElasticsearchVersionRule atLeast(Version minVersion) { + return new ElasticsearchVersionRule(minVersion, DEFAULT_HIGH); + } + + public static ElasticsearchVersionRule atMost(Version maxVersion) { + return new ElasticsearchVersionRule(DEFAULT_LOW, maxVersion); + } + + @Override + public Statement apply(final Statement base, Description description) { + + return new Statement() { + + @Override + public void evaluate() throws Throwable { + + if (!getCurrentVersion().equals(ANY)) { + + Version minVersion = ElasticsearchVersionRule.this.minVersion.equals(ANY) ? DEFAULT_LOW + : ElasticsearchVersionRule.this.minVersion; + Version maxVersion = ElasticsearchVersionRule.this.maxVersion.equals(ANY) ? DEFAULT_HIGH + : ElasticsearchVersionRule.this.maxVersion; + + if (description.getAnnotation(ElasticsearchVersion.class) != null) { + ElasticsearchVersion version = description.getAnnotation(ElasticsearchVersion.class); + if (version != null) { + + Version expectedMinVersion = Version.parse(version.asOf()); + if (!expectedMinVersion.equals(ANY) && !expectedMinVersion.equals(DEFAULT_LOW)) { + minVersion = expectedMinVersion; + } + + Version expectedMaxVersion = Version.parse(version.until()); + if (!expectedMaxVersion.equals(ANY) && !expectedMaxVersion.equals(DEFAULT_HIGH)) { + maxVersion = expectedMaxVersion; + } + } + } + + validateVersion(minVersion, maxVersion); + } + + base.evaluate(); + } + }; + } + + private void validateVersion(Version min, Version max) { + + if (getCurrentVersion().isLessThan(min) || getCurrentVersion().isGreaterThanOrEqualTo(max)) { + + throw new AssumptionViolatedException(String + .format("Expected Elasticsearch server to be in range (%s, %s] but found %s", min, max, currentVersion)); + } + + } + + private Version getCurrentVersion() { + + if (currentVersion.get() == null) { + + Version current = fetchCurrentVersion(); + if (currentVersion.compareAndSet(null, current)) { + logger.info("Running Elasticsearch " + current); + } + } + + return currentVersion.get(); + } + + private Version fetchCurrentVersion() { + return TestUtils.serverVersion(); + } + + @Override + public String toString() { + return getCurrentVersion().toString(); + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/TestUtils.java b/src/test/java/org/springframework/data/elasticsearch/TestUtils.java index 804ea2f15..6790c9f7a 100644 --- a/src/test/java/org/springframework/data/elasticsearch/TestUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/TestUtils.java @@ -17,15 +17,20 @@ package org.springframework.data.elasticsearch; import lombok.SneakyThrows; +import java.io.IOException; + import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.RestClients; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients; +import org.springframework.data.util.Version; import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; /** * @author Christoph Strobl @@ -44,6 +49,18 @@ public final class TestUtils { return ReactiveRestClients.create(ClientConfiguration.create("localhost:9200")); } + public static Version serverVersion() { + + try (RestHighLevelClient client = restHighLevelClient()) { + + org.elasticsearch.Version version = client.info(RequestOptions.DEFAULT).getVersion(); + return new Version(version.major, version.minor, version.revision); + + } catch (Exception e) { + return new Version(0, 0, 0); + } + } + @SneakyThrows public static void deleteIndex(String... indexes) { @@ -62,4 +79,48 @@ public final class TestUtils { } } } + + public static OfType documentWithId(String id) { + return new DocumentLookup(id); + } + + public interface ExistsIn { + boolean existsIn(String index); + } + + public interface OfType extends ExistsIn { + ExistsIn ofType(String type); + } + + private static class DocumentLookup implements OfType { + + private String id; + private String type; + + public DocumentLookup(String id) { + this.id = id; + } + + @Override + public boolean existsIn(String index) { + + GetRequest request = new GetRequest(index).id(id); + if (StringUtils.hasText(type)) { + request = request.type(type); + } + try { + return restHighLevelClient().get(request, RequestOptions.DEFAULT).isExists(); + } catch (IOException e) { + e.printStackTrace(); + } + + return false; + } + + @Override + public ExistsIn ofType(String type) { + this.type = type; + return this; + } + } } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index 29f5d1d54..d7ec06536 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -17,6 +17,9 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; +import org.junit.Rule; +import org.springframework.data.elasticsearch.ElasticsearchVersion; +import org.springframework.data.elasticsearch.ElasticsearchVersionRule; import reactor.test.StepVerifier; import java.io.IOException; @@ -37,6 +40,7 @@ import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.After; @@ -58,6 +62,8 @@ import org.springframework.test.context.junit4.SpringRunner; @ContextConfiguration("classpath:infrastructure.xml") public class ReactiveElasticsearchClientTests { + public @Rule ElasticsearchVersionRule elasticsearchVersion = ElasticsearchVersionRule.any(); + static final String INDEX_I = "idx-1-reactive-client-tests"; static final String INDEX_II = "idx-2-reactive-client-tests"; @@ -413,6 +419,43 @@ public class ReactiveElasticsearchClientTests { .verifyComplete(); } + @Test // DATAES-488 + @ElasticsearchVersion(asOf = "6.5.0") + public void deleteByShouldRemoveExistingDocument() { + + String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I); + + DeleteByQueryRequest request = new DeleteByQueryRequest(INDEX_I) // + .setDocTypes(TYPE_I) // + .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("_id", id))); + + client.deleteBy(request) // + .as(StepVerifier::create) // + .consumeNextWith(it -> { + + assertThat(it.getDeleted()).isEqualTo(1); + }) // + .verifyComplete(); + } + + @Test // DATAES-488 + @ElasticsearchVersion(asOf = "6.5.0") + public void deleteByEmitResultWhenNothingRemoved() { + + addSourceDocument().ofType(TYPE_I).to(INDEX_I); + + DeleteByQueryRequest request = new DeleteByQueryRequest(INDEX_I) // + .setDocTypes(TYPE_I) // + .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("_id", "it-was-not-me"))); + + client.deleteBy(request) // + .as(StepVerifier::create) // + .consumeNextWith(it -> { + assertThat(it.getDeleted()).isEqualTo(0); + }) // + .verifyComplete(); + } + AddToIndexOfType addSourceDocument() { return add(DOC_SOURCE); } diff --git a/src/test/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupportUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupportUnitTests.java new file mode 100644 index 000000000..45bc18dd8 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupportUnitTests.java @@ -0,0 +1,117 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.config; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.commons.lang.ClassUtils; +import org.elasticsearch.client.RestHighLevelClient; +import org.junit.Test; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; +import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; +import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; +import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; +import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; + +/** + * @author Christoph Strobl + */ +public class ElasticsearchConfigurationSupportUnitTests { + + @Test // DATAES-504 + public void usesConfigClassPackageAsBaseMappingPackage() throws ClassNotFoundException { + + ElasticsearchConfigurationSupport configuration = new StubConfig(); + assertThat(configuration.getMappingBasePackages()).contains(ClassUtils.getPackageName(StubConfig.class)); + assertThat(configuration.getInitialEntitySet()).contains(Entity.class); + } + + @Test // DATAES-504 + public void doesNotScanOnEmptyBasePackage() throws ClassNotFoundException { + + ElasticsearchConfigurationSupport configuration = new StubConfig() { + @Override + protected Collection getMappingBasePackages() { + return Collections.emptySet(); + } + }; + + assertThat(configuration.getInitialEntitySet()).isEmpty(); + } + + @Test // DATAES-504 + public void containsMappingContext() { + + AbstractApplicationContext context = new AnnotationConfigApplicationContext(StubConfig.class); + assertThat(context.getBean(SimpleElasticsearchMappingContext.class)).isNotNull(); + } + + @Test // DATAES-504 + public void containsElasticsearchConverter() { + + AbstractApplicationContext context = new AnnotationConfigApplicationContext(StubConfig.class); + assertThat(context.getBean(ElasticsearchConverter.class)).isNotNull(); + } + + @Test // DATAES-504 + public void restConfigContainsElasticsearchTemplate() { + + AbstractApplicationContext context = new AnnotationConfigApplicationContext(RestConfig.class); + assertThat(context.getBean(ElasticsearchRestTemplate.class)).isNotNull(); + } + + @Test // DATAES-504 + public void reactiveConfigContainsReactiveElasticsearchTemplate() { + + AbstractApplicationContext context = new AnnotationConfigApplicationContext(ReactiveRestConfig.class); + assertThat(context.getBean(ReactiveElasticsearchTemplate.class)).isNotNull(); + } + + @Configuration + static class StubConfig extends ElasticsearchConfigurationSupport { + + } + + @Configuration + static class ReactiveRestConfig extends AbstractReactiveElasticsearchConfiguration { + + @Override + public ReactiveElasticsearchClient reactiveElasticsearchClient() { + return mock(ReactiveElasticsearchClient.class); + } + } + + @Configuration + static class RestConfig extends AbstractElasticsearchConfiguration { + + @Override + public RestHighLevelClient elasticsearchClient() { + return mock(RestHighLevelClient.class); + } + } + + @Document(indexName = "config-support-tests") + static class Entity {} +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index 493ffbb4b..ea06af16c 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -15,24 +15,42 @@ */ package org.springframework.data.elasticsearch.core; -import static org.apache.commons.lang.RandomStringUtils.*; import static org.assertj.core.api.Assertions.*; +import static org.elasticsearch.index.query.QueryBuilders.*; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.junit.Rule; +import org.springframework.data.elasticsearch.ElasticsearchVersion; +import org.springframework.data.elasticsearch.ElasticsearchVersionRule; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.ConnectException; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.dao.DataAccessResourceFailureException; +import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.TestUtils; +import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.core.query.Criteria; import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; +import org.springframework.data.elasticsearch.core.query.StringQuery; import org.springframework.data.elasticsearch.entities.SampleEntity; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.StringUtils; /** * @author Christoph Strobl @@ -42,16 +60,20 @@ import org.springframework.test.context.junit4.SpringRunner; @ContextConfiguration("classpath:infrastructure.xml") public class ReactiveElasticsearchTemplateTests { + public @Rule ElasticsearchVersionRule elasticsearchVersion = ElasticsearchVersionRule.any(); + + static final String DEFAULT_INDEX = "test-index-sample"; + static final String ALTERNATE_INDEX = "reactive-template-tests-alternate-index"; + private ElasticsearchRestTemplate restTemplate; private ReactiveElasticsearchTemplate template; @Before public void setUp() { + TestUtils.deleteIndex(DEFAULT_INDEX, ALTERNATE_INDEX); + restTemplate = new ElasticsearchRestTemplate(TestUtils.restHighLevelClient()); - - TestUtils.deleteIndex("test-index-sample"); - restTemplate.createIndex(SampleEntity.class); restTemplate.putMapping(SampleEntity.class); restTemplate.refresh(SampleEntity.class); @@ -59,14 +81,35 @@ public class ReactiveElasticsearchTemplateTests { template = new ReactiveElasticsearchTemplate(TestUtils.reactiveClient()); } - @Test // DATAES-488 - public void indexWithIdShouldWork() { + @Test // DATAES-504 + public void executeShouldProvideResource() { - String documentId = randomNumeric(5); - SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("foo bar") - .version(System.currentTimeMillis()).build(); + Mono.from(template.execute(client -> client.ping())) // + .as(StepVerifier::create) // + .expectNext(true) // + .verifyComplete(); + } - template.index(sampleEntity).as(StepVerifier::create).expectNextCount(1).verifyComplete(); + @Test // DATAES-504 + public void executeShouldConvertExceptions() { + + Mono.from(template.execute(client -> { + throw new RuntimeException(new ConnectException("we're doomed")); + })) // + .as(StepVerifier::create) // + .expectError(DataAccessResourceFailureException.class) // + .verify(); + } + + @Test // DATAES-504 + public void insertWithIdShouldWork() { + + SampleEntity sampleEntity = randomEntity("foo bar"); + + template.save(sampleEntity)// + .as(StepVerifier::create)// + .expectNextCount(1)// + .verifyComplete(); restTemplate.refresh(SampleEntity.class); @@ -75,78 +118,388 @@ public class ReactiveElasticsearchTemplateTests { assertThat(result).hasSize(1); } - @Test // DATAES-488 - public void getShouldReturnEntity() { + @Test // DATAES-504 + public void insertWithAutogeneratedIdShouldUpdateEntityId() { - String documentId = randomNumeric(5); - SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message") - .version(System.currentTimeMillis()).build(); + SampleEntity sampleEntity = SampleEntity.builder().message("wohoo").build(); - IndexQuery indexQuery = getIndexQuery(sampleEntity); - restTemplate.index(indexQuery); - restTemplate.refresh(SampleEntity.class); + template.save(sampleEntity) // + .as(StepVerifier::create) // + .consumeNextWith(it -> { - template.get(documentId, SampleEntity.class) // + assertThat(it.getId()).isNotNull(); + + restTemplate.refresh(SampleEntity.class); + assertThat(TestUtils.documentWithId(it.getId()).existsIn(DEFAULT_INDEX)).isTrue(); + }) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void insertWithExplicitIndexNameShouldOverwriteMetadata() { + + SampleEntity sampleEntity = randomEntity("in another index"); + + template.save(sampleEntity, ALTERNATE_INDEX).as(StepVerifier::create)// + .expectNextCount(1)// + .verifyComplete(); + + restTemplate.refresh(DEFAULT_INDEX); + restTemplate.refresh(ALTERNATE_INDEX); + + assertThat(TestUtils.documentWithId(sampleEntity.getId()).existsIn(DEFAULT_INDEX)).isFalse(); + assertThat(TestUtils.documentWithId(sampleEntity.getId()).existsIn(ALTERNATE_INDEX)).isTrue(); + } + + @Test // DATAES-504 + public void insertShouldAcceptPlainMapStructureAsSource() { + + Map map = Collections.singletonMap("foo", "bar"); + + template.save(map, ALTERNATE_INDEX, "singleton-map") // + .as(StepVerifier::create) // + .expectNextCount(1) // + .verifyComplete(); + } + + @Test(expected = IllegalArgumentException.class) // DATAES-504 + public void insertShouldErrorOnNullEntity() { + template.save(null); + } + + @Test // DATAES-504 + public void findByIdShouldReturnEntity() { + + SampleEntity sampleEntity = randomEntity("some message"); + index(sampleEntity); + + template.findById(sampleEntity.getId(), SampleEntity.class) // .as(StepVerifier::create) // .expectNext(sampleEntity) // .verifyComplete(); } - @Test // DATAES-488 - public void getForNothing() { + @Test // DATAES-504 + public void findByIdWhenIdIsAutogeneratedShouldHaveIdSetCorrectly() { - String documentId = randomNumeric(5); - SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message") - .version(System.currentTimeMillis()).build(); + SampleEntity sampleEntity = new SampleEntity(); + sampleEntity.setMessage("some message"); - IndexQuery indexQuery = getIndexQuery(sampleEntity); - restTemplate.index(indexQuery); - restTemplate.refresh(SampleEntity.class); + index(sampleEntity); - template.get("foo", SampleEntity.class) // + assertThat(sampleEntity.getId()).isNotNull(); + + template.findById(sampleEntity.getId(), SampleEntity.class) // + .as(StepVerifier::create) // + .consumeNextWith(it -> assertThat(it.getId()).isEqualTo(sampleEntity.getId())) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void findByIdShouldCompleteWhenNotingFound() { + + SampleEntity sampleEntity = randomEntity("some message"); + index(sampleEntity); + + template.findById("foo", SampleEntity.class) // .as(StepVerifier::create) // .verifyComplete(); } - @Test // DATAES-488 - public void findShouldApplyCriteria() { + @Test(expected = IllegalArgumentException.class) // DATAES-504 + public void findByIdShouldErrorForNullId() { + template.findById(null, SampleEntity.class); + } - String documentId = randomNumeric(5); - SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message") - .version(System.currentTimeMillis()).build(); + @Test // DATAES-504 + public void findByIdWithExplicitIndexNameShouldOverwriteMetadata() { + + SampleEntity sampleEntity = randomEntity("some message"); IndexQuery indexQuery = getIndexQuery(sampleEntity); + indexQuery.setIndexName(ALTERNATE_INDEX); + restTemplate.index(indexQuery); restTemplate.refresh(SampleEntity.class); + restTemplate.refresh(DEFAULT_INDEX); + restTemplate.refresh(ALTERNATE_INDEX); + + template.findById(sampleEntity.getId(), SampleEntity.class) // + .as(StepVerifier::create) // + .verifyComplete(); + + template.findById(sampleEntity.getId(), SampleEntity.class, ALTERNATE_INDEX) // + .as(StepVerifier::create)// + .expectNextCount(1) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void existsShouldReturnTrueWhenFound() { + + SampleEntity sampleEntity = randomEntity("some message"); + index(sampleEntity); + + template.exists(sampleEntity.getId(), SampleEntity.class) // + .as(StepVerifier::create) // + .expectNext(true) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void existsShouldReturnFalseWhenNotFound() { + + SampleEntity sampleEntity = randomEntity("some message"); + index(sampleEntity); + + template.exists("foo", SampleEntity.class) // + .as(StepVerifier::create) // + .expectNext(false) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void findShouldApplyCriteria() { + + SampleEntity sampleEntity = randomEntity("some message"); + index(sampleEntity); + CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("message").is("some message")); - template.query(criteriaQuery, SampleEntity.class) // + template.find(criteriaQuery, SampleEntity.class) // .as(StepVerifier::create) // .expectNext(sampleEntity) // .verifyComplete(); } - @Test // DATAES-488 + @Test // DATAES-504 public void findShouldReturnEmptyFluxIfNothingFound() { - String documentId = randomNumeric(5); - SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message") - .version(System.currentTimeMillis()).build(); - - IndexQuery indexQuery = getIndexQuery(sampleEntity); - restTemplate.index(indexQuery); - restTemplate.refresh(SampleEntity.class); + SampleEntity sampleEntity = randomEntity("some message"); + index(sampleEntity); CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("message").is("foo")); - template.query(criteriaQuery, SampleEntity.class) // + template.find(criteriaQuery, SampleEntity.class) // .as(StepVerifier::create) // .verifyComplete(); } + @Test // DATAES-504 + public void shouldAllowStringBasedQuery() { + + index(randomEntity("test message"), randomEntity("test test"), randomEntity("some message")); + + template.find(new StringQuery(matchAllQuery().toString()), SampleEntity.class) // + .as(StepVerifier::create) // + .expectNextCount(3) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void shouldExecuteGivenCriteriaQuery() { + + SampleEntity shouldMatch = randomEntity("test message"); + SampleEntity shouldNotMatch = randomEntity("the dog ate my homework"); + index(shouldMatch, shouldNotMatch); + + CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test")); + + template.find(query, SampleEntity.class) // + .as(StepVerifier::create) // + .expectNext(shouldMatch) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void shouldReturnListForGivenCriteria() { + + SampleEntity sampleEntity1 = randomEntity("test message"); + SampleEntity sampleEntity2 = randomEntity("test test"); + SampleEntity sampleEntity3 = randomEntity("some message"); + + index(sampleEntity1, sampleEntity2, sampleEntity3); + + CriteriaQuery query = new CriteriaQuery( + new Criteria("message").contains("some").and("message").contains("message")); + + template.find(query, SampleEntity.class) // + .as(StepVerifier::create) // + .expectNext(sampleEntity3) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void shouldReturnProjectedTargetEntity() { + + SampleEntity sampleEntity1 = randomEntity("test message"); + SampleEntity sampleEntity2 = randomEntity("test test"); + SampleEntity sampleEntity3 = randomEntity("some message"); + + index(sampleEntity1, sampleEntity2, sampleEntity3); + + CriteriaQuery query = new CriteriaQuery( + new Criteria("message").contains("some").and("message").contains("message")); + + template.find(query, SampleEntity.class, Message.class) // + .as(StepVerifier::create) // + .expectNext(new Message(sampleEntity3.getMessage())) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void countShouldReturnCountAllWhenGivenNoQuery() { + + index(randomEntity("test message"), randomEntity("test test"), randomEntity("some message")); + + template.count(SampleEntity.class) // + .as(StepVerifier::create) // + .expectNext(3L) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void countShouldReturnCountMatchingDocuments() { + + index(randomEntity("test message"), randomEntity("test test"), randomEntity("some message")); + + CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test")); + + template.count(query, SampleEntity.class) // + .as(StepVerifier::create) // + .expectNext(2L) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void deleteByIdShouldRemoveExistingDocumentById() { + + SampleEntity sampleEntity = randomEntity("test message"); + index(sampleEntity); + + template.deleteById(sampleEntity.getId(), SampleEntity.class) // + .as(StepVerifier::create)// + .expectNext(sampleEntity.getId()) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void deleteShouldRemoveExistingDocumentByIdUsingIndexName() { + + SampleEntity sampleEntity = randomEntity("test message"); + index(sampleEntity); + + template.deleteById(sampleEntity.getId(), DEFAULT_INDEX, "test-type") // + .as(StepVerifier::create)// + .expectNext(sampleEntity.getId()) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void deleteShouldRemoveExistingDocument() { + + SampleEntity sampleEntity = randomEntity("test message"); + index(sampleEntity); + + template.delete(sampleEntity) // + .as(StepVerifier::create)// + .expectNext(sampleEntity.getId()) // + .verifyComplete(); + } + + @Test // DATAES-504 + public void deleteByIdShouldCompleteWhenNothingDeleted() { + + SampleEntity sampleEntity = randomEntity("test message"); + + template.delete(sampleEntity) // + .as(StepVerifier::create)// + .verifyComplete(); + } + + @Test // DATAES-504 + @ElasticsearchVersion(asOf = "6.5.0") + public void deleteByQueryShouldReturnNumberOfDeletedDocuments() { + + index(randomEntity("test message"), randomEntity("test test"), randomEntity("some message")); + + CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test")); + + template.deleteBy(query, SampleEntity.class) // + .as(StepVerifier::create) // + .expectNext(2L) // + .verifyComplete(); + } + + @Test // DATAES-504 + @ElasticsearchVersion(asOf = "6.5.0") + public void deleteByQueryShouldReturnZeroIfNothingDeleted() { + + index(randomEntity("test message")); + + CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("luke")); + + template.deleteBy(query, SampleEntity.class) // + .as(StepVerifier::create) // + .expectNext(0L) // + .verifyComplete(); + } + + @Data + @Document(indexName = "marvel", type = "characters") + static class Person { + + private @Id String id; + private String name; + private int age; + + public Person() {} + + public Person(String name, int age) { + + this.name = name; + this.age = age; + } + } + + // TODO: check field mapping !!! + + // --> JUST some helpers + + private SampleEntity randomEntity(String message) { + + return SampleEntity.builder() // + .id(UUID.randomUUID().toString()) // + .message(StringUtils.hasText(message) ? message : "test message") // + .version(System.currentTimeMillis()).build(); + } + private IndexQuery getIndexQuery(SampleEntity sampleEntity) { + return new IndexQueryBuilder().withId(sampleEntity.getId()).withObject(sampleEntity) .withVersion(sampleEntity.getVersion()).build(); } + + private List getIndexQueries(SampleEntity... sampleEntities) { + return Arrays.stream(sampleEntities).map(this::getIndexQuery).collect(Collectors.toList()); + } + + private void index(SampleEntity... entities) { + + if (entities.length == 1) { + restTemplate.index(getIndexQuery(entities[0])); + } else { + restTemplate.bulkIndex(getIndexQueries(entities)); + } + + restTemplate.refresh(SampleEntity.class); + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + static class Message { + String message; + } } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java new file mode 100644 index 000000000..c1a31c9ef --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java @@ -0,0 +1,204 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import static org.assertj.core.api.Assertions.*; +import static org.elasticsearch.action.search.SearchRequest.*; +import static org.mockito.Mockito.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.Collections; + +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; +import org.springframework.data.elasticsearch.core.query.Criteria; +import org.springframework.data.elasticsearch.core.query.CriteriaQuery; +import org.springframework.data.elasticsearch.core.query.StringQuery; +import org.springframework.data.elasticsearch.entities.SampleEntity; + +/** + * @author Christoph Strobl + * @currentRead Fool's Fate - Robin Hobb + */ +public class ReactiveElasticsearchTemplateUnitTests { + + @Rule // + public MockitoRule rule = MockitoJUnit.rule(); + + @Mock ReactiveElasticsearchClient client; + ReactiveElasticsearchTemplate template; + + @Before + public void setUp() { + template = new ReactiveElasticsearchTemplate(client); + } + + @Test // DATAES-504 + public void insertShouldUseDefaultRefreshPolicy() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + when(client.index(captor.capture())).thenReturn(Mono.empty()); + + template.save(Collections.singletonMap("key", "value"), "index", "type") // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE); + } + + @Test // DATAES-504 + public void insertShouldApplyRefreshPolicy() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + when(client.index(captor.capture())).thenReturn(Mono.empty()); + + template.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); + + template.save(Collections.singletonMap("key", "value"), "index", "type") // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL); + } + + @Test // DATAES-504 + public void findShouldFallBackToDefaultIndexOptionsIfNotSet() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(SearchRequest.class); + when(client.search(captor.capture())).thenReturn(Flux.empty()); + + template.find(new CriteriaQuery(new Criteria("*")), SampleEntity.class) // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().indicesOptions()).isEqualTo(DEFAULT_INDICES_OPTIONS); + } + + @Test // DATAES-504 + public void findShouldApplyIndexOptionsIfSet() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(SearchRequest.class); + when(client.search(captor.capture())).thenReturn(Flux.empty()); + + template.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + + template.find(new CriteriaQuery(new Criteria("*")), SampleEntity.class) // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().indicesOptions()).isEqualTo(IndicesOptions.LENIENT_EXPAND_OPEN); + } + + @Test // DATAES-504 + public void deleteShouldUseDefaultRefreshPolicy() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteRequest.class); + when(client.delete(captor.capture())).thenReturn(Mono.empty()); + + template.deleteById("id", "index", "type") // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE); + } + + @Test // DATAES-504 + public void deleteShouldApplyRefreshPolicy() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteRequest.class); + when(client.delete(captor.capture())).thenReturn(Mono.empty()); + + template.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); + + template.deleteById("id", "index", "type") // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL); + } + + @Test // DATAES-504 + public void deleteByShouldUseDefaultRefreshPolicy() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class); + when(client.deleteBy(captor.capture())).thenReturn(Mono.empty()); + + template.deleteBy(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, "index", "type") // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().isRefresh()).isTrue(); + } + + @Test // DATAES-504 + public void deleteByShouldApplyRefreshPolicy() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class); + when(client.deleteBy(captor.capture())).thenReturn(Mono.empty()); + + template.setRefreshPolicy(RefreshPolicy.NONE); + + template.deleteBy(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, "index", "type") // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().isRefresh()).isFalse(); + } + + @Test // DATAES-504 + public void deleteByShouldApplyIndicesOptions() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class); + when(client.deleteBy(captor.capture())).thenReturn(Mono.empty()); + + template.deleteBy(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, "index", "type") // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().indicesOptions()).isEqualTo(DEFAULT_INDICES_OPTIONS); + } + + @Test // DATAES-504 + public void deleteByShouldApplyIndicesOptionsIfSet() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class); + when(client.deleteBy(captor.capture())).thenReturn(Mono.empty()); + + template.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + + template.deleteBy(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, "index", "type") // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().indicesOptions()).isEqualTo(IndicesOptions.LENIENT_EXPAND_OPEN); + } +} diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index e03b673a5..0281205b7 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -7,8 +7,10 @@ + + - \ No newline at end of file +