DATAES-519 - Add reactive repository support.

Reactive Elasticsearch repository support builds on the core repository support utilizing
operations provided via ReactiveElasticsearchOperations executed by a ReactiveElasticsearchClient.

Spring Data Elasticsearchs reactive repository support uses Project Reactor as its reactive
composition library of choice.

There are 3 main interfaces to be used:

* ReactiveRepository
* ReactiveCrudRepository
* ReactiveSortingRepository

For Java configuration, use the @EnableReactiveElasticsearchRepositories annotation.
The following listing shows how to use Java configuration for a repository:

@Configuration
@EnableReactiveElasticsearchRepositories
public class Config extends AbstractReactiveElasticsearchConfiguration {

  @Override
  public ReactiveElasticsearchClient reactiveElasticsearchClient() {
    return ReactiveRestClients.create(ClientConfiguration.localhost());
  }
}

Using a repository that extends ReactiveSortingRepository makes all CRUD operations available
as well as methods for sorted access to the entities. Working with the repository instance is a matter of dependency
injecting it into a client.

The repository itself allows defining additional methods backed by the inferred proxy.

public interface ReactivePersonRepository extends ReactiveSortingRepository<Person, String> {

  Flux<Person> findByFirstname(String firstname);

  Flux<Person> findByFirstname(Publisher<String> firstname);

  Flux<Person> findByFirstnameOrderByLastname(String firstname);

  Flux<Person> findByFirstname(String firstname, Sort sort);

  Flux<Person> findByFirstname(String firstname, Pageable page);

  Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);

  Mono<Person> findFirstByLastname(String lastname);

  @Query("{ \"bool\" : { \"must\" : { \"term\" : { \"lastname\" : \"?0\" } } } }")
  Flux<Person> findByLastname(String lastname);

  Mono<Long> countByFirstname(String firstname)

  Mono<Boolean> existsByFirstname(String firstname)

  Mono<Long> deleteByFirstname(String firstname)
}

Original Pull Request: #235
This commit is contained in:
Christoph Strobl 2018-12-18 15:14:54 +01:00
parent 21a010c65a
commit 69dc36c6c3
44 changed files with 3133 additions and 33 deletions

View File

@ -270,6 +270,9 @@
<includes>
<include>**/*Tests.java</include>
</includes>
<systemPropertyVariables>
<es.set.netty.runtime.available.processors>false</es.set.netty.runtime.available.processors>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>

View File

@ -31,6 +31,7 @@ include::{spring-data-commons-docs}/repositories.adoc[]
include::reference/elasticsearch-clients.adoc[]
include::reference/data-elasticsearch.adoc[]
include::reference/reactive-elasticsearch-operations.adoc[]
include::reference/reactive-elasticsearch-repositories.adoc[]
include::reference/elasticsearch-misc.adoc[]
:leveloffset: -1

View File

@ -0,0 +1,130 @@
[[elasticsearch.reactive.repositories]]
= Reactive Elasticsearch Repositories
Reactive Elasticsearch repository support builds on the core repository support explained in <<repositories>> utilizing
operations provided via <<elasticsearch.reactive.operations>> executed by a <<elasticsearch.clients.reactive>>.
Spring Data Elasticsearchs reactive repository support uses https://projectreactor.io/[Project Reactor] as its reactive
composition library of choice.
There are 3 main interfaces to be used:
* `ReactiveRepository`
* `ReactiveCrudRepository`
* `ReactiveSortingRepository`
[[elasticsearch.reactive.repositories.usage]]
== Usage
To access domain objects stored in a Elasticsearch using a `Repository`, just create an interface for it.
Before you can actually go on and do that you will need an entity.
.Sample `Person` entity
====
[source,java]
----
public class Person {
@Id
private String id;
private String firstname;
private String lastname;
private Address address;
// … getters and setters omitted
}
----
====
NOTE: Please note that the `id` property needs to be of type `String`.
.Basic repository interface to persist Person entities
====
[source]
----
public interface ReactivePersonRepository extends ReactiveSortingRepository<Person, String> {
Flux<Person> findByFirstname(String firstname); <1>
Flux<Person> findByFirstname(Publisher<String> firstname); <2>
Flux<Person> findByFirstnameOrderByLastname(String firstname); <3>
Flux<Person> findByFirstname(String firstname, Sort sort); <4>
Flux<Person> findByFirstname(String firstname, Pageable page); <5>
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname); <6>
Mono<Person> findFirstByLastname(String lastname); <7>
@Query("{ \"bool\" : { \"must\" : { \"term\" : { \"lastname\" : \"?0\" } } } }")
Flux<Person> findByLastname(String lastname); <8>
Mono<Long> countByFirstname(String firstname) <9>
Mono<Boolean> existsByFirstname(String firstname) <10>
Mono<Long> deleteByFirstname(String firstname) <11>
}
----
<1> The method shows a query for all people with the given `lastname`.
<2> Finder method awaiting input from `Publisher` to bind parameter value for `firstname`.
<3> Finder method ordering matching documents by `lastname`.
<4> Finder method ordering matching documents by the expression defined via the `Sort` parameter.
<5> Use `Pageable` to pass offset and sorting parameters to the database.
<6> Finder method concating criteria using `And` / `Or` keywords.
<7> Find the first matching entity.
<8> The method shows a query for all people with the given `lastname` looked up by running the annotated `@Query` with given
parameters.
<9> Count all entities with matching `firstname`.
<10> Check if at least one entity with matching `firstname` exists.
<11> Delete all entites with matching `firstname`.
====
[[elasticsearch.reactive.repositories.configuration]]
== Configuration
For Java configuration, use the `@EnableReactiveElasticsearchRepositories` annotation. If no base package is configured,
the infrastructure scans the package of the annotated configuration class.
The following listing shows how to use Java configuration for a repository:
.Java configuration for repositories
====
[source,java]
----
@Configuration
@EnableReactiveElasticsearchRepositories
public class Config extends AbstractReactiveElasticsearchConfiguration {
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
return ReactiveRestClients.create(ClientConfiguration.localhost());
}
}
----
====
Because the repository from the previous example extends `ReactiveSortingRepository`, all CRUD operations are available
as well as methods for sorted access to the entities. Working with the repository instance is a matter of dependency
injecting it into a client, as the following example shows:
.Sorted access to Person entities
====
[source,java]
----
public class PersonRepositoryTests {
@Autowired ReactivePersonRepository repository;
@Test
public void sortsElementsCorrectly() {
Flux<Person> persons = repository.findAll(Sort.by(new Order(ASC, "lastname")));
// ...
}
}
----
====

View File

@ -0,0 +1,36 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch;
import org.springframework.dao.NonTransientDataAccessResourceException;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class NoSuchIndexException extends NonTransientDataAccessResourceException {
private final String index;
public NoSuchIndexException(String index, Throwable cause) {
super(String.format("Index %s not found.", index), cause);
this.index = index;
}
public String getIndex() {
return index;
}
}

View File

@ -42,6 +42,22 @@ public interface ClientConfiguration {
return new ClientConfigurationBuilder();
}
/**
* Creates a new {@link ClientConfiguration} instance configured to localhost.
* <p/>
*
* <pre class="code">
* // "localhost:9200"
* ClientConfiguration configuration = ClientConfiguration.localhost();
* </pre>
*
* @return a new {@link ClientConfiguration} instance
* @see ClientConfigurationBuilder#connectedToLocalhost()
*/
static ClientConfiguration localhost() {
return new ClientConfigurationBuilder().connectedToLocalhost().build();
}
/**
* Creates a new {@link ClientConfiguration} instance configured to a single host given {@code hostAndPort}.
* <p/>

View File

@ -545,7 +545,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
return Mono.justOrEmpty(responseType
.cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content))));
} catch (Exception errorParseFailure) {
} catch (Throwable errorParseFailure) { // cause elasticsearch also uses AssertionError
try {
return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content)));

View File

@ -47,7 +47,7 @@ public abstract class AbstractReactiveElasticsearchConfiguration extends Elastic
* @return never {@literal null}.
*/
@Bean
public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
public ReactiveElasticsearchOperations reactiveElasticsearchTemplate() {
ReactiveElasticsearchTemplate template = new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(),
elasticsearchConverter());

View File

@ -22,6 +22,8 @@ import org.elasticsearch.ElasticsearchException;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.util.CollectionUtils;
/**
* @author Christoph Strobl
@ -33,15 +35,22 @@ public class ElasticsearchExceptionTranslator implements PersistenceExceptionTra
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
if (ex instanceof ElasticsearchException) {
// TODO: exception translation
ElasticsearchException elasticsearchExption = (ElasticsearchException) ex;
// elasticsearchExption.get
ElasticsearchException elasticsearchException = (ElasticsearchException) ex;
if (!indexAvailable(elasticsearchException)) {
return new NoSuchIndexException(elasticsearchException.getMetadata("es.index").toString(), ex);
}
}
if(ex.getCause() instanceof ConnectException) {
if (ex.getCause() instanceof ConnectException) {
return new DataAccessResourceFailureException(ex.getMessage(), ex);
}
return null;
}
private boolean indexAvailable(ElasticsearchException ex) {
return !CollectionUtils.contains(ex.getMetadata("es.index_uuid").iterator(), "_na_");
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.lang.Nullable;
@ -451,6 +452,13 @@ public interface ReactiveElasticsearchOperations {
*/
Mono<Long> deleteBy(Query query, Class<?> entityType, @Nullable String index, @Nullable String type);
/**
* Get the {@link ElasticsearchConverter} used.
*
* @return never {@literal null}
*/
ElasticsearchConverter getElasticsearchConverter();
/**
* Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on
* {@link ReactiveElasticsearchClient}.

View File

@ -52,6 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.EntityOperations.Entity;
@ -62,6 +63,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.data.elasticsearch.core.query.StringQuery;
@ -69,7 +71,6 @@ import org.springframework.data.mapping.context.MappingContext;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* @author Christoph Strobl
@ -141,7 +142,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
IndexRequest request = id != null
? new IndexRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id.toString())
? new IndexRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), converter.convertId(id))
: new IndexRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName());
try {
@ -163,7 +164,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Object parentId = entity.getParentId();
if (parentId != null) {
request.parent(parentId.toString());
request.parent(converter.convertId(parentId));
}
}
@ -307,7 +308,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Entity<?> elasticsearchEntity = operations.forEntity(entity);
return Mono.defer(() -> doDeleteById(entity, ObjectUtils.nullSafeToString(elasticsearchEntity.getId()),
return Mono.defer(() -> doDeleteById(entity, converter.convertId(elasticsearchEntity.getId()),
elasticsearchEntity.getPersistentEntity(), index, type));
}
@ -384,6 +385,15 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
this.indicesOptions = indicesOptions;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#getElasticsearchConverter()
*/
@Override
public ElasticsearchConverter getElasticsearchConverter() {
return converter;
}
// Customization Hooks
/**
@ -491,7 +501,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<GetResult> doFindById(GetRequest request) {
return Mono.from(execute(client -> client.get(request)));
return Mono.from(execute(client -> client.get(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
@ -501,7 +513,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<Boolean> doExists(GetRequest request) {
return Mono.from(execute(client -> client.exists(request)));
return Mono.from(execute(client -> client.exists(request))) //
.onErrorReturn(NoSuchIndexException.class, false);
}
/**
@ -516,7 +530,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
QUERY_LOGGER.debug("Executing doFind: {}", request);
}
return Flux.from(execute(client -> client.search(request)));
return Flux.from(execute(client -> client.search(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
@ -531,7 +546,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
QUERY_LOGGER.debug("Executing doScan: {}", request);
}
return Flux.from(execute(client -> client.scroll(request)));
return Flux.from(execute(client -> client.scroll(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
@ -551,7 +567,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
return Mono.just(it.getId());
});
}) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
@ -561,7 +578,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<BulkByScrollResponse> doDeleteBy(DeleteByQueryRequest request) {
return Mono.from(execute(client -> client.deleteBy(request)));
return Mono.from(execute(client -> client.deleteBy(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
// private helpers
@ -621,7 +640,11 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(((CriteriaQuery) query).getCriteria());
} else if (query instanceof StringQuery) {
elasticsearchQuery = new WrapperQueryBuilder(((StringQuery) query).getSource());
} else {
} else if (query instanceof NativeSearchQuery) {
elasticsearchQuery = ((NativeSearchQuery) query).getQuery();
}
else {
throw new IllegalArgumentException(String.format("Unknown query type '%s'.", query.getClass()));
}

View File

@ -19,14 +19,15 @@ import org.springframework.core.convert.ConversionService;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.util.Assert;
/**
* ElasticsearchConverter
*
* @author Rizwan Idrees
* @author Mohsin Husen
* @author Christoph Strobl
*/
public interface ElasticsearchConverter {
/**
@ -42,4 +43,22 @@ public interface ElasticsearchConverter {
* @return never {@literal null}.
*/
ConversionService getConversionService();
/**
* Convert a given {@literal idValue} to its {@link String} representation taking potentially registered
* {@link org.springframework.core.convert.converter.Converter Converters} into account.
*
* @param idValue must not be {@literal null}.
* @return never {@literal null}.
* @since 3.2
*/
default String convertId(Object idValue) {
Assert.notNull(idValue, "idValue must not be null!");
if (!getConversionService().canConvert(idValue.getClass(), String.class)) {
return idValue.toString();
}
return getConversionService().convert(idValue, String.class);
}
}

View File

@ -100,16 +100,26 @@ public class SimpleElasticsearchPersistentEntity<T> extends BasicPersistentEntit
@Override
public String getIndexName() {
if(indexName != null) {
Expression expression = parser.parseExpression(indexName, ParserContext.TEMPLATE_EXPRESSION);
return expression.getValue(context, String.class);
}
return getTypeInformation().getType().getSimpleName();
}
@Override
public String getIndexType() {
if(indexType != null) {
Expression expression = parser.parseExpression(indexType, ParserContext.TEMPLATE_EXPRESSION);
return expression.getValue(context, String.class);
}
return "";
}
@Override
public String getIndexStoreType() {
return indexStoreType;

View File

@ -15,10 +15,13 @@
*/
package org.springframework.data.elasticsearch.core.query;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
@ -31,12 +34,24 @@ import org.springframework.data.domain.Sort;
* @author Mark Paluch
* @author Alen Turkovic
* @author Sascha Woo
* @author Christoph Strobl
*/
public interface Query {
int DEFAULT_PAGE_SIZE = 10;
Pageable DEFAULT_PAGE = PageRequest.of(0, DEFAULT_PAGE_SIZE);
/**
* Get get a {@link Query} that matches all documents in the index.
*
* @return new instance of {@link Query}.
* @since 3.2
* @see QueryBuilders#matchAllQuery()
*/
static Query findAll() {
return new StringQuery(QueryBuilders.matchAllQuery().toString());
}
/**
* restrict result to entries on given page. Corresponds to the 'start' and 'rows' parameter in elasticsearch
*

View File

@ -0,0 +1,30 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
/**
* Elasticsearch specific {@link org.springframework.data.repository.Repository} interface with reactive support.
*
* @author Christoph Strobl
* @since 3.2
*/
@NoRepositoryBean
public interface ReactiveElasticsearchRepository<T, ID> extends ReactiveSortingRepository<T, ID> {
}

View File

@ -29,6 +29,7 @@ import org.springframework.data.elasticsearch.repository.support.ElasticsearchRe
import org.springframework.data.repository.config.AnnotationRepositoryConfigurationSource;
import org.springframework.data.repository.config.RepositoryConfigurationExtensionSupport;
import org.springframework.data.repository.config.XmlRepositoryConfigurationSource;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.w3c.dom.Element;
/**
@ -39,6 +40,7 @@ import org.w3c.dom.Element;
* @author Rizwan Idrees
* @author Mohsin Husen
* @author Mark Paluch
* @author Christoph Strobl
*/
public class ElasticsearchRepositoryConfigExtension extends RepositoryConfigurationExtensionSupport {
@ -88,7 +90,7 @@ public class ElasticsearchRepositoryConfigExtension extends RepositoryConfigurat
*/
@Override
protected Collection<Class<? extends Annotation>> getIdentifyingAnnotations() {
return Collections.<Class<? extends Annotation>> singleton(Document.class);
return Collections.singleton(Document.class);
}
/*
@ -97,6 +99,15 @@ public class ElasticsearchRepositoryConfigExtension extends RepositoryConfigurat
*/
@Override
protected Collection<Class<?>> getIdentifyingTypes() {
return Arrays.<Class<?>> asList(ElasticsearchRepository.class, ElasticsearchCrudRepository.class);
return Arrays.asList(ElasticsearchRepository.class, ElasticsearchCrudRepository.class);
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryConfigurationExtensionSupport#useRepositoryConfiguration(org.springframework.data.repository.core.RepositoryMetadata)
*/
@Override
protected boolean useRepositoryConfiguration(RepositoryMetadata metadata) {
return !metadata.isReactiveRepository();
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.config;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactoryBean;
import org.springframework.data.repository.config.DefaultRepositoryBaseClass;
import org.springframework.data.repository.query.QueryLookupStrategy;
import org.springframework.data.repository.query.QueryLookupStrategy.Key;
/**
* Annotation to activate reactive Elasticsearch repositories. If no base package is configured through either
* {@link #value()}, {@link #basePackages()} or {@link #basePackageClasses()} it will trigger scanning of the package of
* annotated class.
*
* @author Christoph Strobl
* @since 3.2
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(ReactiveElasticsearchRepositoriesRegistrar.class)
public @interface EnableReactiveElasticsearchRepositories {
/**
* Alias for the {@link #basePackages()} attribute.
*/
String[] value() default {};
/**
* Base packages to scan for annotated components. {@link #value()} is an alias for (and mutually exclusive with) this
* attribute. Use {@link #basePackageClasses()} for a type-safe alternative to String-based package names.
*/
String[] basePackages() default {};
/**
* Type-safe alternative to {@link #basePackages()} for specifying the packages to scan for annotated components. The
* package of each class specified will be scanned. Consider creating a special no-op marker class or interface in
* each package that serves no purpose other than being referenced by this attribute.
*/
Class<?>[] basePackageClasses() default {};
/**
* Specifies which types are eligible for component scanning. Further narrows the set of candidate components from
* everything in {@link #basePackages()} to everything in the base packages that matches the given filter or filters.
*/
Filter[] includeFilters() default {};
/**
* Specifies which types are not eligible for component scanning.
*/
Filter[] excludeFilters() default {};
/**
* Returns the postfix to be used when looking up custom repository implementations. Defaults to {@literal Impl}. So
* for a repository named {@code PersonRepository} the corresponding implementation class will be looked up scanning
* for {@code PersonRepositoryImpl}.
*
* @return
*/
String repositoryImplementationPostfix() default "Impl";
/**
* Configures the location of where to find the Spring Data named queries properties file. Will default to
* {@code META-INF/elasticsearch-named-queries.properties}.
*
* @return
*/
String namedQueriesLocation() default "";
/**
* Returns the key of the {@link QueryLookupStrategy} to be used for lookup queries for query methods. Defaults to
* {@link Key#CREATE_IF_NOT_FOUND}.
*
* @return
*/
Key queryLookupStrategy() default Key.CREATE_IF_NOT_FOUND;
/**
* Returns the {@link FactoryBean} class to be used for each repository instance. Defaults to
* {@link ReactiveElasticsearchRepositoryFactoryBean}.
*
* @return
*/
Class<?> repositoryFactoryBeanClass() default ReactiveElasticsearchRepositoryFactoryBean.class;
/**
* Configure the repository base class to be used to create repository proxies for this particular configuration.
*
* @return
*/
Class<?> repositoryBaseClass() default DefaultRepositoryBaseClass.class;
/**
* Configures the name of the {@link org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations} bean
* to be used with the repositories detected.
*
* @return
*/
String reactiveElasticsearchTemplateRef() default "reactiveElasticsearchTemplate";
/**
* Configures whether nested repository-interfaces (e.g. defined as inner classes) should be discovered by the
* repositories infrastructure.
*/
boolean considerNestedRepositories() default false;
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.config;
import java.lang.annotation.Annotation;
import org.springframework.data.repository.config.RepositoryBeanDefinitionRegistrarSupport;
import org.springframework.data.repository.config.RepositoryConfigurationExtension;
/**
* @author Christoph Strobl
* @since 3.2
*/
class ReactiveElasticsearchRepositoriesRegistrar extends RepositoryBeanDefinitionRegistrarSupport {
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryBeanDefinitionRegistrarSupport#getAnnotation()
*/
@Override
protected Class<? extends Annotation> getAnnotation() {
return EnableReactiveElasticsearchRepositories.class;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryBeanDefinitionRegistrarSupport#getExtension()
*/
@Override
protected RepositoryConfigurationExtension getExtension() {
return new ReactiveElasticsearchRepositoryConfigurationExtension();
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.config;
import java.util.Collection;
import java.util.Collections;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.data.config.ParsingUtils;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactoryBean;
import org.springframework.data.repository.config.AnnotationRepositoryConfigurationSource;
import org.springframework.data.repository.config.XmlRepositoryConfigurationSource;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.w3c.dom.Element;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class ReactiveElasticsearchRepositoryConfigurationExtension extends ElasticsearchRepositoryConfigExtension {
private static final String ELASTICSEARCH_TEMPLATE_REF = "reactive-elasticsearch-template-ref";
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryConfigurationExtensionSupport#getModuleName()
*/
@Override
public String getModuleName() {
return "Reactive Elasticsearch";
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryConfigurationExtension#getRepositoryFactoryClassName()
*/
public String getRepositoryFactoryClassName() {
return ReactiveElasticsearchRepositoryFactoryBean.class.getName();
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryConfigurationExtensionSupport#getIdentifyingTypes()
*/
@Override
protected Collection<Class<?>> getIdentifyingTypes() {
return Collections.singleton(ReactiveElasticsearchRepository.class);
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryConfigurationExtensionSupport#postProcess(org.springframework.beans.factory.support.BeanDefinitionBuilder, org.springframework.data.repository.config.XmlRepositoryConfigurationSource)
*/
@Override
public void postProcess(BeanDefinitionBuilder builder, XmlRepositoryConfigurationSource config) {
Element element = config.getElement();
ParsingUtils.setPropertyReference(builder, element, ELASTICSEARCH_TEMPLATE_REF, "reactiveElasticsearchOperations");
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryConfigurationExtensionSupport#postProcess(org.springframework.beans.factory.support.BeanDefinitionBuilder, org.springframework.data.repository.config.AnnotationRepositoryConfigurationSource)
*/
@Override
public void postProcess(BeanDefinitionBuilder builder, AnnotationRepositoryConfigurationSource config) {
AnnotationAttributes attributes = config.getAttributes();
builder.addPropertyReference("reactiveElasticsearchOperations",
attributes.getString("reactiveElasticsearchTemplateRef"));
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.config.RepositoryConfigurationExtensionSupport#useRepositoryConfiguration(org.springframework.data.repository.core.RepositoryMetadata)
*/
@Override
protected boolean useRepositoryConfiguration(RepositoryMetadata metadata) {
return metadata.isReactiveRepository();
}
}

View File

@ -0,0 +1,141 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.repository.query.ReactiveElasticsearchQueryExecution.ResultProcessingConverter;
import org.springframework.data.elasticsearch.repository.query.ReactiveElasticsearchQueryExecution.ResultProcessingExecution;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.QueryMethod;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.data.repository.query.ResultProcessor;
/**
* AbstractElasticsearchRepositoryQuery
*
* @author Christoph Strobl
* @since 3.2
*/
abstract class AbstractReactiveElasticsearchRepositoryQuery implements RepositoryQuery {
private final ReactiveElasticsearchQueryMethod queryMethod;
private final ReactiveElasticsearchOperations elasticsearchOperations;
AbstractReactiveElasticsearchRepositoryQuery(ReactiveElasticsearchQueryMethod queryMethod,
ReactiveElasticsearchOperations elasticsearchOperations) {
this.queryMethod = queryMethod;
this.elasticsearchOperations = elasticsearchOperations;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.RepositoryQuery#execute(java.lang.Object[])
*/
public Object execute(Object[] parameters) {
return queryMethod.hasReactiveWrapperParameter() ? executeDeferred(parameters)
: execute(new ReactiveElasticsearchParametersParameterAccessor(queryMethod, parameters));
}
private Object executeDeferred(Object[] parameters) {
ReactiveElasticsearchParametersParameterAccessor parameterAccessor = new ReactiveElasticsearchParametersParameterAccessor(
queryMethod, parameters);
if (getQueryMethod().isCollectionQuery()) {
return Flux.defer(() -> (Publisher<Object>) execute(parameterAccessor));
}
return Mono.defer(() -> (Mono<Object>) execute(parameterAccessor));
}
private Object execute(ElasticsearchParameterAccessor parameterAccessor) {
ResultProcessor processor = queryMethod.getResultProcessor().withDynamicProjection(parameterAccessor);
Query query = createQuery(
new ConvertingParameterAccessor(elasticsearchOperations.getElasticsearchConverter(), parameterAccessor));
Class<?> typeToRead = processor.getReturnedType().getTypeToRead();
String indexName = queryMethod.getEntityInformation().getIndexName();
String indexTypeName = queryMethod.getEntityInformation().getIndexTypeName();
ReactiveElasticsearchQueryExecution execution = getExecution(parameterAccessor,
new ResultProcessingConverter(processor, elasticsearchOperations));
return execution.execute(query, processor.getReturnedType().getDomainType(), indexName, indexTypeName, typeToRead);
}
private ReactiveElasticsearchQueryExecution getExecution(ElasticsearchParameterAccessor accessor,
Converter<Object, Object> resultProcessing) {
return new ResultProcessingExecution(getExecutionToWrap(accessor, elasticsearchOperations), resultProcessing);
}
/**
* Creates a {@link Query} instance using the given {@link ParameterAccessor}
*
* @param accessor must not be {@literal null}.
* @return
*/
protected abstract Query createQuery(ElasticsearchParameterAccessor accessor);
private ReactiveElasticsearchQueryExecution getExecutionToWrap(ElasticsearchParameterAccessor accessor,
ReactiveElasticsearchOperations operations) {
if (isDeleteQuery()) {
return (q, t, i, it, tt) -> operations.deleteBy(q, t, i, it);
} else if (isCountQuery()) {
return (q, t, i, it, tt) -> operations.count(q, t, i, it);
} else if (isExistsQuery()) {
return (q, t, i, it, tt) -> operations.count(q, t, i, it).map(count -> count > 0);
} else if (queryMethod.isCollectionQuery()) {
return (q, t, i, it, tt) -> operations.find(q.setPageable(accessor.getPageable()), t, i, it, tt);
} else {
return (q, t, i, it, tt) -> operations.find(q, t, i, it, tt);
}
}
abstract boolean isDeleteQuery();
abstract boolean isCountQuery();
abstract boolean isExistsQuery();
abstract boolean isLimiting();
@Override
public QueryMethod getQueryMethod() {
return queryMethod;
}
protected ReactiveElasticsearchOperations getElasticsearchOperations() {
return elasticsearchOperations;
}
protected MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> getMappingContext() {
return elasticsearchOperations.getElasticsearchConverter().getMappingContext();
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import java.util.Iterator;
import java.util.Optional;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.lang.Nullable;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class ConvertingParameterAccessor implements ElasticsearchParameterAccessor {
private final ElasticsearchConverter converter;
private final ElasticsearchParameterAccessor delegate;
public ConvertingParameterAccessor(ElasticsearchConverter converter, ElasticsearchParameterAccessor delegate) {
this.converter = converter;
this.delegate = delegate;
}
@Override
public Object[] getValues() {
return delegate.getValues();
}
@Override
public Pageable getPageable() {
return delegate.getPageable();
}
@Override
public Sort getSort() {
return delegate.getSort();
}
@Override
public Optional<Class<?>> getDynamicProjection() {
return delegate.getDynamicProjection();
}
@Override
public Object getBindableValue(int index) {
return getConvertedValue(delegate.getBindableValue(index));
}
@Override
public boolean hasBindableNullValue() {
return delegate.hasBindableNullValue();
}
@Override
public Iterator<Object> iterator() {
return delegate.iterator();
}
@Nullable
private Object getConvertedValue(Object value) {
if (value == null) {
return "null";
}
if (converter.getConversionService().canConvert(value.getClass(), String.class)) {
return converter.getConversionService().convert(value, String.class);
}
return value.toString();
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import org.springframework.data.repository.core.EntityMetadata;
/**
* @author Christoph Strobl
* @since 3.2
*/
public interface ElasticsearchEntityMetadata<T> extends EntityMetadata<T> {
String getIndexName();
String getIndexTypeName();
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import org.springframework.data.repository.query.ParameterAccessor;
/**
* @author Christoph Strobl
* @since 3.2
*/
public interface ElasticsearchParameterAccessor extends ParameterAccessor {
/**
* Returns the raw parameter values of the underlying query method.
*
* @return
*/
Object[] getValues();
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import java.lang.reflect.Method;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.data.elasticsearch.repository.query.ElasticsearchParameters.ElasticsearchParameter;
import org.springframework.data.geo.Distance;
import org.springframework.data.repository.query.Parameter;
import org.springframework.data.repository.query.Parameters;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class ElasticsearchParameters extends Parameters<ElasticsearchParameters, ElasticsearchParameter> {
public ElasticsearchParameters(Method method) {
super(method);
}
private ElasticsearchParameters(List<ElasticsearchParameter> parameters) {
super(parameters);
}
@Override
protected ElasticsearchParameter createParameter(MethodParameter parameter) {
return new ElasticsearchParameter(parameter);
}
@Override
protected ElasticsearchParameters createFrom(List<ElasticsearchParameter> parameters) {
return new ElasticsearchParameters(parameters);
}
/**
* Custom {@link Parameter} implementation adding parameters of type {@link Distance} to the special ones.
*
* @author Christoph Strobl
*/
class ElasticsearchParameter extends Parameter {
private final MethodParameter parameter;
/**
* Creates a new {@link ElasticsearchParameter}.
*
* @param parameter must not be {@literal null}.
*/
ElasticsearchParameter(MethodParameter parameter) {
super(parameter);
this.parameter = parameter;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.Parameter#isSpecialParameter()
*/
@Override
public boolean isSpecialParameter() {
return super.isSpecialParameter();
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import java.util.Arrays;
import java.util.List;
import org.springframework.data.repository.query.ParametersParameterAccessor;
/**
* @author Christoph Strobl
* @since 3.2
*/
class ElasticsearchParametersParameterAccessor extends ParametersParameterAccessor
implements ElasticsearchParameterAccessor {
private final List<Object> values;
/**
* Creates a new {@link ElasticsearchParametersParameterAccessor}.
*
* @param method must not be {@literal null}.
* @param values must not be {@literal null}.
*/
ElasticsearchParametersParameterAccessor(ElasticsearchQueryMethod method, Object[] values) {
super(method.getParameters(), values);
this.values = Arrays.asList(values);
}
@Override
public Object[] getValues() {
return values.toArray();
}
}

View File

@ -19,9 +19,15 @@ import java.lang.reflect.Method;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.data.elasticsearch.annotations.Query;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.springframework.data.repository.query.QueryMethod;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* ElasticsearchQueryMethod
@ -30,14 +36,23 @@ import org.springframework.data.repository.query.QueryMethod;
* @author Mohsin Husen
* @author Oliver Gierke
* @author Mark Paluch
* @author Christoph Strobl
*/
public class ElasticsearchQueryMethod extends QueryMethod {
private final Query queryAnnotation;
private final MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext;
private @Nullable ElasticsearchEntityMetadata<?> metadata;
public ElasticsearchQueryMethod(Method method, RepositoryMetadata metadata, ProjectionFactory factory,
MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext) {
public ElasticsearchQueryMethod(Method method, RepositoryMetadata metadata, ProjectionFactory factory) {
super(method, metadata, factory);
Assert.notNull(mappingContext, "MappingContext must not be null!");
this.queryAnnotation = method.getAnnotation(Query.class);
this.mappingContext = mappingContext;
}
public boolean hasAnnotatedQuery() {
@ -47,4 +62,42 @@ public class ElasticsearchQueryMethod extends QueryMethod {
public String getAnnotatedQuery() {
return (String) AnnotationUtils.getValue(queryAnnotation, "value");
}
/**
* @return the {@link ElasticsearchEntityMetadata} for the query methods {@link #getReturnedObjectType() return type}.
* @since 3.2
*/
public ElasticsearchEntityMetadata<?> getEntityInformation() {
if (metadata == null) {
Class<?> returnedObjectType = getReturnedObjectType();
Class<?> domainClass = getDomainClass();
if (ClassUtils.isPrimitiveOrWrapper(returnedObjectType)) {
this.metadata = new SimpleElasticsearchEntityMetadata<>((Class<Object>) domainClass,
mappingContext.getRequiredPersistentEntity(domainClass));
} else {
ElasticsearchPersistentEntity<?> returnedEntity = mappingContext.getPersistentEntity(returnedObjectType);
ElasticsearchPersistentEntity<?> managedEntity = mappingContext.getRequiredPersistentEntity(domainClass);
returnedEntity = returnedEntity == null || returnedEntity.getType().isInterface() ? managedEntity
: returnedEntity;
ElasticsearchPersistentEntity<?> collectionEntity = domainClass.isAssignableFrom(returnedObjectType)
? returnedEntity
: managedEntity;
this.metadata = new SimpleElasticsearchEntityMetadata<>((Class<Object>) returnedEntity.getType(),
collectionEntity);
}
}
return this.metadata;
}
protected MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> getMappingContext() {
return mappingContext;
}
}

View File

@ -0,0 +1,101 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import java.util.ArrayList;
import java.util.List;
import org.springframework.data.repository.util.ReactiveWrapperConverters;
import org.springframework.data.repository.util.ReactiveWrappers;
/**
* @author Christoph Strobl
* @since 3.2
*/
class ReactiveElasticsearchParametersParameterAccessor extends ElasticsearchParametersParameterAccessor {
private final List<MonoProcessor<?>> subscriptions;
/**
* Creates a new {@link ElasticsearchParametersParameterAccessor}.
*
* @param method must not be {@literal null}.
* @param values must not be {@literal null}.
*/
ReactiveElasticsearchParametersParameterAccessor(ReactiveElasticsearchQueryMethod method, Object[] values) {
super(method, values);
this.subscriptions = new ArrayList<>(values.length);
for (int i = 0; i < values.length; i++) {
Object value = values[i];
if (value == null || !ReactiveWrappers.supports(value.getClass())) {
subscriptions.add(null);
continue;
}
if (ReactiveWrappers.isSingleValueType(value.getClass())) {
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class).toProcessor());
} else {
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList().toProcessor());
}
}
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.ParametersParameterAccessor#getValue(int)
*/
@SuppressWarnings("unchecked")
@Override
protected <T> T getValue(int index) {
if (subscriptions.get(index) != null) {
return (T) subscriptions.get(index).block();
}
return super.getValue(index);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.repository.query.ElasticsearchParametersParameterAccessor#getValues(int)
*/
@Override
public Object[] getValues() {
Object[] result = new Object[getValues().length];
for (int i = 0; i < result.length; i++) {
result[i] = getValue(i);
}
return result;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.ParametersParameterAccessor#getBindableValue(int)
*/
public Object getBindableValue(int index) {
return getValue(getParameters().getBindableParameter(index).getIndex());
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ReturnedType;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
/**
* @author Christoph Strobl
* @since 3.2
*/
public interface ReactiveElasticsearchQueryExecution {
Object execute(Query query, Class<?> type, String indexName, String indexType, @Nullable Class<?> targetType);
/**
* An {@link ReactiveElasticsearchQueryExecution} that wraps the results of the given delegate with the given result
* processing.
*/
@RequiredArgsConstructor
final class ResultProcessingExecution implements ReactiveElasticsearchQueryExecution {
private final @NonNull ReactiveElasticsearchQueryExecution delegate;
private final @NonNull Converter<Object, Object> converter;
@Override
public Object execute(Query query, Class<?> type, String indexName, String indexType,
@Nullable Class<?> targetType) {
return converter.convert(delegate.execute(query, type, indexName, indexType, targetType));
}
}
/**
* A {@link Converter} to post-process all source objects using the given {@link ResultProcessor}.
*
* @author Mark Paluch
*/
@RequiredArgsConstructor
final class ResultProcessingConverter implements Converter<Object, Object> {
private final @NonNull ResultProcessor processor;
private final @NonNull ReactiveElasticsearchOperations operations;
/*
* (non-Javadoc)
* @see org.springframework.core.convert.converter.Converter#convert(java.lang.Object)
*/
@Override
public Object convert(Object source) {
ReturnedType returnedType = processor.getReturnedType();
if (ClassUtils.isPrimitiveOrWrapper(returnedType.getReturnedType())) {
return source;
}
return processor.processResult(source, it -> it);
}
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import static org.springframework.data.repository.util.ClassUtils.*;
import java.lang.reflect.Method;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.repository.query.ElasticsearchParameters.ElasticsearchParameter;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.springframework.data.repository.util.ReactiveWrapperConverters;
import org.springframework.data.repository.util.ReactiveWrappers;
import org.springframework.data.util.ClassTypeInformation;
import org.springframework.data.util.TypeInformation;
import org.springframework.util.ClassUtils;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class ReactiveElasticsearchQueryMethod extends ElasticsearchQueryMethod {
private static final ClassTypeInformation<Page> PAGE_TYPE = ClassTypeInformation.from(Page.class);
private static final ClassTypeInformation<Slice> SLICE_TYPE = ClassTypeInformation.from(Slice.class);
private final Method method;
public ReactiveElasticsearchQueryMethod(Method method, RepositoryMetadata metadata, ProjectionFactory factory,
MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext) {
super(method, metadata, factory, mappingContext);
this.method = method;
if (hasParameterOfType(method, Pageable.class)) {
TypeInformation<?> returnType = ClassTypeInformation.fromReturnTypeOf(method);
boolean multiWrapper = ReactiveWrappers.isMultiValueType(returnType.getType());
boolean singleWrapperWithWrappedPageableResult = ReactiveWrappers.isSingleValueType(returnType.getType())
&& (PAGE_TYPE.isAssignableFrom(returnType.getRequiredComponentType())
|| SLICE_TYPE.isAssignableFrom(returnType.getRequiredComponentType()));
if (singleWrapperWithWrappedPageableResult) {
throw new InvalidDataAccessApiUsageException(
String.format("'%s.%s' must not use sliced or paged execution. Please use Flux.buffer(size, skip).",
ClassUtils.getShortName(method.getDeclaringClass()), method.getName()));
}
if (!multiWrapper) {
throw new IllegalStateException(String.format(
"Method has to use a either multi-item reactive wrapper return type or a wrapped Page/Slice type. Offending method: %s",
method.toString()));
}
if (hasParameterOfType(method, Sort.class)) {
throw new IllegalStateException(String.format("Method must not have Pageable *and* Sort parameter. "
+ "Use sorting capabilities on Pageble instead! Offending method: %s", method.toString()));
}
}
}
@Override
protected ElasticsearchParameters createParameters(Method method) {
return new ElasticsearchParameters(method);
}
/**
* Check if the given {@link org.springframework.data.repository.query.QueryMethod} receives a reactive parameter
* wrapper as one of its parameters.
*
* @return
*/
public boolean hasReactiveWrapperParameter() {
for (ElasticsearchParameter param : getParameters()) {
if (ReactiveWrapperConverters.supports(param.getType())) {
return true;
}
}
return false;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.QueryMethod#isStreamQuery()
*/
@Override
public boolean isStreamQuery() {
// All reactive query methods are streaming.
return true;
}
@Override
public ElasticsearchParameters getParameters() {
return (ElasticsearchParameters) super.getParameters();
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.repository.query.QueryMethodEvaluationContextProvider;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.util.ObjectUtils;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class ReactiveElasticsearchStringQuery extends AbstractReactiveElasticsearchRepositoryQuery {
private static final Pattern PARAMETER_PLACEHOLDER = Pattern.compile("\\?(\\d+)");
private final String query;
public ReactiveElasticsearchStringQuery(ReactiveElasticsearchQueryMethod queryMethod,
ReactiveElasticsearchOperations operations, SpelExpressionParser expressionParser,
QueryMethodEvaluationContextProvider evaluationContextProvider) {
this(queryMethod.getAnnotatedQuery(), queryMethod, operations, expressionParser, evaluationContextProvider);
}
public ReactiveElasticsearchStringQuery(String query, ReactiveElasticsearchQueryMethod queryMethod,
ReactiveElasticsearchOperations operations, SpelExpressionParser expressionParser,
QueryMethodEvaluationContextProvider evaluationContextProvider) {
super(queryMethod, operations);
this.query = query;
}
@Override
protected StringQuery createQuery(ElasticsearchParameterAccessor parameterAccessor) {
String queryString = replacePlaceholders(this.query, parameterAccessor);
return new StringQuery(queryString);
}
private String replacePlaceholders(String input, ElasticsearchParameterAccessor accessor) {
Matcher matcher = PARAMETER_PLACEHOLDER.matcher(input);
String result = input;
while (matcher.find()) {
String group = matcher.group();
int index = Integer.parseInt(matcher.group(1));
result = result.replace(group, getParameterWithIndex(accessor, index));
}
return result;
}
private String getParameterWithIndex(ElasticsearchParameterAccessor accessor, int index) {
return ObjectUtils.nullSafeToString(accessor.getBindableValue(index));
}
@Override
boolean isCountQuery() {
return false;
}
@Override
boolean isDeleteQuery() {
return false;
}
@Override
boolean isExistsQuery() {
return false;
}
@Override
boolean isLimiting() {
return false;
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.repository.query.parser.ElasticsearchQueryCreator;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.parser.PartTree;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class ReactivePartTreeElasticsearchQuery extends AbstractReactiveElasticsearchRepositoryQuery {
private final PartTree tree;
private final ResultProcessor processor;
public ReactivePartTreeElasticsearchQuery(ReactiveElasticsearchQueryMethod queryMethod,
ReactiveElasticsearchOperations elasticsearchOperations) {
super(queryMethod, elasticsearchOperations);
this.processor = queryMethod.getResultProcessor();
this.tree = new PartTree(queryMethod.getName(), processor.getReturnedType().getDomainType());
}
@Override
protected Query createQuery(ElasticsearchParameterAccessor accessor) {
return new ElasticsearchQueryCreator(tree, accessor, getMappingContext()).createQuery();
}
@Override
boolean isLimiting() {
return tree.isLimiting();
}
@Override
boolean isExistsQuery() {
return tree.isExistsProjection();
}
@Override
boolean isDeleteQuery() {
return tree.isDelete();
}
@Override
boolean isCountQuery() {
return tree.isCountProjection();
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.util.Assert;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class SimpleElasticsearchEntityMetadata<T> implements ElasticsearchEntityMetadata<T> {
private final Class<T> type;
private final ElasticsearchPersistentEntity<?> entity;
public SimpleElasticsearchEntityMetadata(Class<T> type, ElasticsearchPersistentEntity<?> entity) {
Assert.notNull(type, "Type must not be null!");
Assert.notNull(entity, "Entity must not be null!");
this.type = type;
this.entity = entity;
}
@Override
public String getIndexName() {
return entity.getIndexName();
}
@Override
public String getIndexTypeName() {
return entity.getIndexType();
}
@Override
public Class<T> getJavaType() {
return type;
}
}

View File

@ -102,7 +102,8 @@ public class ElasticsearchRepositoryFactory extends RepositoryFactorySupport {
public RepositoryQuery resolveQuery(Method method, RepositoryMetadata metadata, ProjectionFactory factory,
NamedQueries namedQueries) {
ElasticsearchQueryMethod queryMethod = new ElasticsearchQueryMethod(method, metadata, factory);
ElasticsearchQueryMethod queryMethod = new ElasticsearchQueryMethod(method, metadata, factory,
elasticsearchOperations.getElasticsearchConverter().getMappingContext());
String namedQueryName = queryMethod.getNamedQueryName();
if (namedQueries.hasQuery(namedQueryName)) {

View File

@ -0,0 +1,157 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.support;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Optional;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.repository.query.ReactiveElasticsearchQueryMethod;
import org.springframework.data.elasticsearch.repository.query.ReactiveElasticsearchStringQuery;
import org.springframework.data.elasticsearch.repository.query.ReactivePartTreeElasticsearchQuery;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.repository.core.NamedQueries;
import org.springframework.data.repository.core.RepositoryInformation;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.springframework.data.repository.core.support.ReactiveRepositoryFactorySupport;
import org.springframework.data.repository.query.QueryLookupStrategy;
import org.springframework.data.repository.query.QueryLookupStrategy.Key;
import org.springframework.data.repository.query.QueryMethodEvaluationContextProvider;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Factory to create {@link org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository}
* instances.
*
* @author Christoph Strobl
* @since 3.2
*/
public class ReactiveElasticsearchRepositoryFactory extends ReactiveRepositoryFactorySupport {
private static final SpelExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();
private final ReactiveElasticsearchOperations operations;
private final MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext;
/**
* Creates a new {@link ReactiveElasticsearchRepositoryFactory} with the given
* {@link ReactiveElasticsearchOperations}.
*
* @param elasticsearchOperations must not be {@literal null}.
*/
public ReactiveElasticsearchRepositoryFactory(ReactiveElasticsearchOperations elasticsearchOperations) {
Assert.notNull(elasticsearchOperations, "ReactiveElasticsearchOperations must not be null!");
this.operations = elasticsearchOperations;
this.mappingContext = elasticsearchOperations.getElasticsearchConverter().getMappingContext();
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.core.support.RepositoryFactorySupport#getRepositoryBaseClass(org.springframework.data.repository.core.RepositoryMetadata)
*/
@Override
protected Class<?> getRepositoryBaseClass(RepositoryMetadata metadata) {
return SimpleReactiveElasticsearchRepository.class;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.core.support.RepositoryFactorySupport#getTargetRepository(org.springframework.data.repository.core.RepositoryInformation)
*/
@Override
protected Object getTargetRepository(RepositoryInformation information) {
ElasticsearchEntityInformation<?, Serializable> entityInformation = getEntityInformation(
information.getDomainType(), information);
return getTargetRepositoryViaReflection(information, entityInformation, operations);
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.core.support.RepositoryFactorySupport#getQueryLookupStrategy(org.springframework.data.repository.query.QueryLookupStrategy.Key, org.springframework.data.repository.query.EvaluationContextProvider)
*/
@Override
protected Optional<QueryLookupStrategy> getQueryLookupStrategy(@Nullable Key key,
QueryMethodEvaluationContextProvider evaluationContextProvider) {
return Optional.of(new ElasticsearchQueryLookupStrategy(operations, evaluationContextProvider, mappingContext));
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.core.support.RepositoryFactorySupport#getEntityInformation(java.lang.Class)
*/
public <T, ID> ElasticsearchEntityInformation<T, ID> getEntityInformation(Class<T> domainClass) {
return getEntityInformation(domainClass, null);
}
@SuppressWarnings("unchecked")
private <T, ID> ElasticsearchEntityInformation<T, ID> getEntityInformation(Class<T> domainClass,
@Nullable RepositoryInformation information) {
ElasticsearchPersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(domainClass);
return new MappingElasticsearchEntityInformation<>((ElasticsearchPersistentEntity<T>) entity, entity.getIndexName(),
entity.getIndexType());
}
/**
* @author Christoph Strobl
*/
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
private static class ElasticsearchQueryLookupStrategy implements QueryLookupStrategy {
private final ReactiveElasticsearchOperations operations;
private final QueryMethodEvaluationContextProvider evaluationContextProvider;
private final MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext;
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.QueryLookupStrategy#resolveQuery(java.lang.reflect.Method, org.springframework.data.repository.core.RepositoryMetadata, org.springframework.data.projection.ProjectionFactory, org.springframework.data.repository.core.NamedQueries)
*/
@Override
public RepositoryQuery resolveQuery(Method method, RepositoryMetadata metadata, ProjectionFactory factory,
NamedQueries namedQueries) {
ReactiveElasticsearchQueryMethod queryMethod = new ReactiveElasticsearchQueryMethod(method, metadata, factory,
mappingContext);
String namedQueryName = queryMethod.getNamedQueryName();
if (namedQueries.hasQuery(namedQueryName)) {
String namedQuery = namedQueries.getQuery(namedQueryName);
return new ReactiveElasticsearchStringQuery(namedQuery, queryMethod, operations, EXPRESSION_PARSER,
evaluationContextProvider);
} else if (queryMethod.hasAnnotatedQuery()) {
return new ReactiveElasticsearchStringQuery(queryMethod, operations, EXPRESSION_PARSER,
evaluationContextProvider);
} else {
return new ReactivePartTreeElasticsearchQuery(queryMethod, operations);
}
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.support;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.repository.Repository;
import org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport;
import org.springframework.data.repository.core.support.RepositoryFactorySupport;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* {@link org.springframework.beans.factory.FactoryBean} to create
* {@link org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository} instances.
*
* @author Christoph Strobl
* @since 3.2
* @see org.springframework.data.repository.reactive.ReactiveSortingRepository
*/
public class ReactiveElasticsearchRepositoryFactoryBean<T extends Repository<S, ID>, S, ID>
extends RepositoryFactoryBeanSupport<T, S, ID> {
private @Nullable ReactiveElasticsearchOperations operations;
private boolean mappingContextConfigured = false;
/**
* Creates a new {@link ReactiveElasticsearchRepositoryFactoryBean} for the given repository interface.
*
* @param repositoryInterface must not be {@literal null}.
*/
public ReactiveElasticsearchRepositoryFactoryBean(Class<? extends T> repositoryInterface) {
super(repositoryInterface);
}
/**
* Configures the {@link ReactiveElasticsearchOperations} to be used.
*
* @param operations the operations to set
*/
public void setReactiveElasticsearchOperations(@Nullable ReactiveElasticsearchOperations operations) {
this.operations = operations;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport#setMappingContext(org.springframework.data.mapping.context.MappingContext)
*/
@Override
protected void setMappingContext(MappingContext<?, ?> mappingContext) {
super.setMappingContext(mappingContext);
this.mappingContextConfigured = true;
}
/*
* (non-Javadoc)
*
* @see
* org.springframework.data.repository.support.RepositoryFactoryBeanSupport
* #createRepositoryFactory()
*/
@Override
protected final RepositoryFactorySupport createRepositoryFactory() {
return getFactoryInstance(operations);
}
/**
* Creates and initializes a {@link RepositoryFactorySupport} instance.
*
* @param operations
* @return
*/
protected RepositoryFactorySupport getFactoryInstance(ReactiveElasticsearchOperations operations) {
return new ReactiveElasticsearchRepositoryFactory(operations);
}
/*
* (non-Javadoc)
*
* @see
* org.springframework.data.repository.support.RepositoryFactoryBeanSupport
* #afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
Assert.state(operations != null, "ReactiveElasticsearchOperations must not be null!");
if (!mappingContextConfigured) {
setMappingContext(operations.getElasticsearchConverter().getMappingContext());
}
}
}

View File

@ -0,0 +1,185 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.support;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.util.Assert;
/**
* @author Christoph Strobl
* @since 3.2
*/
public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveElasticsearchRepository<T, ID> {
private final ElasticsearchEntityInformation<T, ID> entityInformation;
private final ReactiveElasticsearchOperations elasticsearchOperations;
public SimpleReactiveElasticsearchRepository(ElasticsearchEntityInformation<T, ID> entityInformation,
ReactiveElasticsearchOperations elasticsearchOperations) {
Assert.notNull(entityInformation, "EntityInformation must not be null!");
Assert.notNull(elasticsearchOperations, "ElasticsearchOperations must not be null!");
this.entityInformation = entityInformation;
this.elasticsearchOperations = elasticsearchOperations;
}
@Override
public Flux<T> findAll(Sort sort) {
return elasticsearchOperations.find(Query.findAll().addSort(sort), entityInformation.getJavaType(),
entityInformation.getIndexName(), entityInformation.getType());
}
@Override
public <S extends T> Mono<S> save(S entity) {
Assert.notNull(entity, "Entity must not be null!");
return elasticsearchOperations.save(entity, entityInformation.getIndexName(), entityInformation.getType());
}
@Override
public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
Assert.notNull(entities, "Entities must not be null!");
return saveAll(Flux.fromIterable(entities));
}
@Override
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
Assert.notNull(entityStream, "EntityStream must not be null!");
return Flux.from(entityStream).flatMap(this::save);
}
@Override
public Mono<T> findById(ID id) {
Assert.notNull(id, "Id must not be null!");
return elasticsearchOperations.findById(convertId(id), entityInformation.getJavaType(),
entityInformation.getIndexName(), entityInformation.getType());
}
@Override
public Mono<T> findById(Publisher<ID> id) {
Assert.notNull(id, "Id must not be null!");
return Mono.from(id).flatMap(this::findById);
}
@Override
public Mono<Boolean> existsById(ID id) {
Assert.notNull(id, "Id must not be null!");
return elasticsearchOperations.exists(convertId(id), entityInformation.getJavaType(),
entityInformation.getIndexName(), entityInformation.getType());
}
@Override
public Mono<Boolean> existsById(Publisher<ID> id) {
Assert.notNull(id, "Id must not be null!");
return Mono.from(id).flatMap(this::existsById);
}
@Override
public Flux<T> findAll() {
return elasticsearchOperations.find(Query.findAll(), entityInformation.getJavaType(),
entityInformation.getIndexName(), entityInformation.getType());
}
@Override
public Flux<T> findAllById(Iterable<ID> ids) {
Assert.notNull(ids, "Ids must not be null!");
return Flux.fromIterable(ids).flatMap(this::findById);
}
@Override
public Flux<T> findAllById(Publisher<ID> idStream) {
Assert.notNull(idStream, "IdStream must not be null!");
return Flux.from(idStream).buffer().flatMap(this::findAllById);
}
@Override
public Mono<Long> count() {
return elasticsearchOperations.count(Query.findAll(), entityInformation.getJavaType(),
entityInformation.getIndexName(), entityInformation.getType());
}
@Override
public Mono<Void> deleteById(ID id) {
Assert.notNull(id, "Id must not be null!");
return elasticsearchOperations
.deleteById(convertId(id), entityInformation.getJavaType(), entityInformation.getIndexName(),
entityInformation.getType()) //
.then();
}
@Override
public Mono<Void> deleteById(Publisher<ID> id) {
Assert.notNull(id, "Id must not be null!");
return Mono.from(id).flatMap(this::deleteById);
}
@Override
public Mono<Void> delete(T entity) {
Assert.notNull(entity, "Entity must not be null!");
return elasticsearchOperations.delete(entity, entityInformation.getIndexName(), entityInformation.getType()) //
.then();
}
@Override
public Mono<Void> deleteAll(Iterable<? extends T> entities) {
Assert.notNull(entities, "Entities must not be null!");
return deleteAll(Flux.fromIterable(entities));
}
@Override
public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
Assert.notNull(entityStream, "EntityStream must not be null!");
return Flux.from(entityStream).flatMap(this::delete).then();
}
@Override
public Mono<Void> deleteAll() {
return elasticsearchOperations
.deleteBy(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexName(),
entityInformation.getType()) //
.then();
}
private String convertId(Object id) {
return elasticsearchOperations.getElasticsearchConverter().convertId(id);
}
}

View File

@ -17,14 +17,16 @@ package org.springframework.data.elasticsearch;
import lombok.SneakyThrows;
import java.io.IOException;
import java.time.Duration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
@ -84,6 +86,18 @@ public final class TestUtils {
}
}
@SneakyThrows
public static boolean isEmptyIndex(String indexName) {
try (RestHighLevelClient client = restHighLevelClient()) {
return 0L == client
.search(new SearchRequest(indexName)
.source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())), RequestOptions.DEFAULT)
.getHits().getTotalHits();
}
}
public static OfType documentWithId(String id) {
return new DocumentLookup(id);
}
@ -106,19 +120,16 @@ public final class TestUtils {
}
@Override
@SneakyThrows
public boolean existsIn(String index) {
GetRequest request = new GetRequest(index).id(id);
if (StringUtils.hasText(type)) {
request = request.type(type);
}
try {
return restHighLevelClient().get(request, RequestOptions.DEFAULT).isExists();
} catch (IOException e) {
e.printStackTrace();
try (RestHighLevelClient client = restHighLevelClient()) {
return client.get(request, RequestOptions.DEFAULT).isExists();
}
return false;
}
@Override

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*;
import org.springframework.data.elasticsearch.ElasticsearchException;
import reactor.test.StepVerifier;
import java.io.IOException;
@ -135,6 +136,16 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete();
}
@Test // DATAES-519
public void getOnNonExistingIndexShouldThrowException() {
client.get(new GetRequest(INDEX_I, TYPE_I, "nonono"))
.as(StepVerifier::create)
.expectError(ElasticsearchStatusException.class)
.verify();
}
@Test // DATAES-488
public void getShouldFetchDocumentById() {

View File

@ -177,6 +177,14 @@ public class ReactiveElasticsearchTemplateTests {
template.save(null);
}
@Test // DATAES-519
public void findByIdShouldCompleteWhenIndexDoesNotExist() {
template.findById("foo", SampleEntity.class, "no-such-index") //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAES-504
public void findByIdShouldReturnEntity() {
@ -245,6 +253,15 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-519
public void existsShouldReturnFalseWhenIndexDoesNotExist() {
template.exists("foo", SampleEntity.class, "no-such-index") //
.as(StepVerifier::create) //
.expectNext(false) //
.verifyComplete();
}
@Test // DATAES-504
public void existsShouldReturnTrueWhenFound() {
@ -269,6 +286,14 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-519
public void findShouldCompleteWhenIndexDoesNotExist() {
template.find(new CriteriaQuery(Criteria.where("message").is("some message")), SampleEntity.class, "no-such-index") //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAES-504
public void findShouldApplyCriteria() {
@ -392,6 +417,15 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-519
public void countShouldReturnZeroWhenIndexDoesNotExist() {
template.count(SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete();
}
@Test // DATAES-504
public void countShouldReturnCountAllWhenGivenNoQuery() {
@ -416,6 +450,14 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-519
public void deleteByIdShouldCompleteWhenIndexDoesNotExist() {
template.deleteById("does-not-exists", SampleEntity.class, "no-such-index") //
.as(StepVerifier::create)//
.verifyComplete();
}
@Test // DATAES-504
public void deleteByIdShouldRemoveExistingDocumentById() {
@ -462,6 +504,18 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-519
@ElasticsearchVersion(asOf = "6.5.0")
public void deleteByQueryShouldReturnZeroWhenIndexDoesNotExist() {
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test"));
template.deleteBy(query, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete();
}
@Test // DATAES-504
@ElasticsearchVersion(asOf = "6.5.0")
public void deleteByQueryShouldReturnNumberOfDeletedDocuments() {

View File

@ -0,0 +1,63 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.config;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.entities.SampleEntity;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class ReactiveElasticsearchRepositoriesRegistrarTests {
@Configuration
@EnableReactiveElasticsearchRepositories(considerNestedRepositories = true)
static class Config {
@Bean
public ReactiveElasticsearchTemplate reactiveElasticsearchTemplate() {
return new ReactiveElasticsearchTemplate(TestUtils.reactiveClient());
}
}
@Autowired ReactiveSampleEntityRepository repository;
@Autowired ApplicationContext context;
@Test // DATAES-519
public void testConfiguration() {
Assertions.assertThat(context).isNotNull();
Assertions.assertThat(repository).isNotNull();
}
interface ReactiveSampleEntityRepository extends ReactiveElasticsearchRepository<SampleEntity, String> {}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.config;
import static org.junit.Assert.*;
import java.util.Collection;
import org.junit.Test;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.env.Environment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.type.StandardAnnotationMetadata;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.data.repository.config.AnnotationRepositoryConfigurationSource;
import org.springframework.data.repository.config.RepositoryConfiguration;
import org.springframework.data.repository.config.RepositoryConfigurationSource;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
public class ReactiveElasticsearchRepositoryConfigurationExtensionUnitTests {
StandardAnnotationMetadata metadata = new StandardAnnotationMetadata(Config.class, true);
ResourceLoader loader = new PathMatchingResourcePatternResolver();
Environment environment = new StandardEnvironment();
BeanDefinitionRegistry registry = new DefaultListableBeanFactory();
RepositoryConfigurationSource configurationSource = new AnnotationRepositoryConfigurationSource(metadata,
EnableReactiveElasticsearchRepositories.class, loader, environment, registry);
@Test // DATAES-519
public void isStrictMatchIfDomainTypeIsAnnotatedWithDocument() {
ReactiveElasticsearchRepositoryConfigurationExtension extension = new ReactiveElasticsearchRepositoryConfigurationExtension();
assertHasRepo(CrudRepositoryForAnnotatedType.class,
extension.getRepositoryConfigurations(configurationSource, loader, true));
}
@Test // DATAES-519
public void isStrictMatchIfRepositoryExtendsStoreSpecificBase() {
ReactiveElasticsearchRepositoryConfigurationExtension extension = new ReactiveElasticsearchRepositoryConfigurationExtension();
assertHasRepo(EsRepositoryForUnAnnotatedType.class,
extension.getRepositoryConfigurations(configurationSource, loader, true));
}
@Test // DATAES-519
public void isNotStrictMatchIfDomainTypeIsNotAnnotatedWithDocument() {
ReactiveElasticsearchRepositoryConfigurationExtension extension = new ReactiveElasticsearchRepositoryConfigurationExtension();
assertDoesNotHaveRepo(CrudRepositoryForUnAnnotatedType.class,
extension.getRepositoryConfigurations(configurationSource, loader, true));
}
private static void assertHasRepo(Class<?> repositoryInterface,
Collection<RepositoryConfiguration<RepositoryConfigurationSource>> configs) {
for (RepositoryConfiguration<?> config : configs) {
if (config.getRepositoryInterface().equals(repositoryInterface.getName())) {
return;
}
}
fail("Expected to find config for repository interface ".concat(repositoryInterface.getName()).concat(" but got ")
.concat(configs.toString()));
}
private static void assertDoesNotHaveRepo(Class<?> repositoryInterface,
Collection<RepositoryConfiguration<RepositoryConfigurationSource>> configs) {
for (RepositoryConfiguration<?> config : configs) {
if (config.getRepositoryInterface().equals(repositoryInterface.getName())) {
fail("Expected not to find config for repository interface ".concat(repositoryInterface.getName()));
}
}
}
@EnableReactiveElasticsearchRepositories(considerNestedRepositories = true)
static class Config {
}
@Document(indexName = "star-wars", type = "character")
static class SwCharacter {}
static class Store {}
interface CrudRepositoryForAnnotatedType extends ReactiveCrudRepository<SwCharacter, String> {}
interface CrudRepositoryForUnAnnotatedType extends ReactiveCrudRepository<Store, String> {}
interface EsRepositoryForUnAnnotatedType extends ReactiveElasticsearchRepository<Store, String> {}
}

View File

@ -0,0 +1,129 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.reflect.Method;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.entities.Person;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.data.repository.Repository;
import org.springframework.data.repository.core.support.DefaultRepositoryMetadata;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
public class ReactiveElasticsearchQueryMethodUnitTests {
SimpleElasticsearchMappingContext mappingContext;
@Before
public void setUp() {
mappingContext = new SimpleElasticsearchMappingContext();
}
@Test // DATAES-519
public void detectsCollectionFromRepoTypeIfReturnTypeNotAssignable() throws Exception {
ReactiveElasticsearchQueryMethod queryMethod = queryMethod(NonReactiveRepository.class, "method");
ElasticsearchEntityMetadata<?> metadata = queryMethod.getEntityInformation();
assertThat(metadata.getJavaType()).isAssignableFrom(Person.class);
assertThat(metadata.getIndexName()).isEqualTo("test-index-person");
assertThat(metadata.getIndexTypeName()).isEqualTo("user");
}
@Test(expected = IllegalArgumentException.class) // DATAES-519
public void rejectsNullMappingContext() throws Exception {
Method method = PersonRepository.class.getMethod("findByName", String.class);
new ReactiveElasticsearchQueryMethod(method, new DefaultRepositoryMetadata(PersonRepository.class),
new SpelAwareProxyProjectionFactory(), null);
}
@Test(expected = IllegalStateException.class) // DATAES-519
public void rejectsMonoPageableResult() throws Exception {
queryMethod(PersonRepository.class, "findMonoByName", String.class, Pageable.class);
}
@Test(expected = InvalidDataAccessApiUsageException.class) // DATAES-519
public void throwsExceptionOnWrappedPage() throws Exception {
queryMethod(PersonRepository.class, "findMonoPageByName", String.class, Pageable.class);
}
@Test(expected = InvalidDataAccessApiUsageException.class) // DATAES-519
public void throwsExceptionOnWrappedSlice() throws Exception {
queryMethod(PersonRepository.class, "findMonoSliceByName", String.class, Pageable.class);
}
@Test // DATAES-519
public void allowsPageableOnFlux() throws Exception {
queryMethod(PersonRepository.class, "findByName", String.class, Pageable.class);
}
@Test // DATAES-519
public void fallsBackToRepositoryDomainTypeIfMethodDoesNotReturnADomainType() throws Exception {
ReactiveElasticsearchQueryMethod method = queryMethod(PersonRepository.class, "deleteByName", String.class);
assertThat(method.getEntityInformation().getJavaType()).isAssignableFrom(Person.class);
}
private ReactiveElasticsearchQueryMethod queryMethod(Class<?> repository, String name, Class<?>... parameters)
throws Exception {
Method method = repository.getMethod(name, parameters);
ProjectionFactory factory = new SpelAwareProxyProjectionFactory();
return new ReactiveElasticsearchQueryMethod(method, new DefaultRepositoryMetadata(repository), factory,
mappingContext);
}
interface PersonRepository extends Repository<Person, String> {
Mono<Person> findMonoByName(String name, Pageable pageRequest);
Mono<Page<Person>> findMonoPageByName(String name, Pageable pageRequest);
Mono<Slice<Person>> findMonoSliceByName(String name, Pageable pageRequest);
Flux<Person> findByName(String name);
Flux<Person> findByName(String name, Pageable pageRequest);
void deleteByName(String name);
}
interface NonReactiveRepository extends Repository<Person, Long> {
List<Person> method();
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.reflect.Method;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.data.elasticsearch.annotations.Query;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.entities.Person;
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.data.repository.Repository;
import org.springframework.data.repository.core.support.DefaultRepositoryMetadata;
import org.springframework.data.repository.query.QueryMethodEvaluationContextProvider;
import org.springframework.expression.spel.standard.SpelExpressionParser;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
@RunWith(MockitoJUnitRunner.class)
public class ReactiveElasticsearchStringQueryUnitTests {
SpelExpressionParser PARSER = new SpelExpressionParser();
ElasticsearchConverter converter;
@Mock ReactiveElasticsearchOperations operations;
@Before
public void setUp() {
converter = new MappingElasticsearchConverter(new SimpleElasticsearchMappingContext());
}
@Test // DATAES-519
public void bindsSimplePropertyCorrectly() throws Exception {
ReactiveElasticsearchStringQuery elasticsearchStringQuery = createQueryForMethod("findByName", String.class);
StubParameterAccessor accesor = new StubParameterAccessor("Luke");
org.springframework.data.elasticsearch.core.query.Query query = elasticsearchStringQuery.createQuery(accesor);
StringQuery reference = new StringQuery("{ 'bool' : { 'must' : { 'term' : { 'name' : 'Luke' } } } }");
assertThat(query).isInstanceOf(StringQuery.class);
assertThat(((StringQuery) query).getSource()).isEqualTo(reference.getSource());
}
@Test // DATAES-519
@Ignore("TODO: fix spel query integration")
public void bindsExpressionPropertyCorrectly() throws Exception {
ReactiveElasticsearchStringQuery elasticsearchStringQuery = createQueryForMethod("findByNameWithExpression",
String.class);
StubParameterAccessor accesor = new StubParameterAccessor("Luke");
org.springframework.data.elasticsearch.core.query.Query query = elasticsearchStringQuery.createQuery(accesor);
StringQuery reference = new StringQuery("{ 'bool' : { 'must' : { 'term' : { 'name' : 'Luke' } } } }");
assertThat(query).isInstanceOf(StringQuery.class);
assertThat(((StringQuery) query).getSource()).isEqualTo(reference.getSource());
}
private ReactiveElasticsearchStringQuery createQueryForMethod(String name, Class<?>... parameters) throws Exception {
Method method = SampleRepository.class.getMethod(name, parameters);
ProjectionFactory factory = new SpelAwareProxyProjectionFactory();
ReactiveElasticsearchQueryMethod queryMethod = new ReactiveElasticsearchQueryMethod(method,
new DefaultRepositoryMetadata(SampleRepository.class), factory, converter.getMappingContext());
return new ReactiveElasticsearchStringQuery(queryMethod, operations, PARSER,
QueryMethodEvaluationContextProvider.DEFAULT);
}
private interface SampleRepository extends Repository<Person, String> {
@Query("{ 'bool' : { 'must' : { 'term' : { 'name' : '?0' } } } }")
Mono<Person> findByName(String name);
@Query("{ 'bool' : { 'must' : { 'term' : { 'name' : '?#{[0]}' } } } }")
Flux<Person> findByNameWithExpression(String param0);
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.query;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.repository.query.ParameterAccessor;
/**
* Simple {@link ParameterAccessor} that returns the given parameters unfiltered.
*
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
class StubParameterAccessor implements ElasticsearchParameterAccessor {
private final Object[] values;
@SuppressWarnings("unchecked")
public StubParameterAccessor(Object... values) {
this.values = values;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.ParameterAccessor#getPageable()
*/
public Pageable getPageable() {
return null;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.ParameterAccessor#getBindableValue(int)
*/
public Object getBindableValue(int index) {
return values[index];
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.ParameterAccessor#hasBindableNullValue()
*/
public boolean hasBindableNullValue() {
return false;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.ParameterAccessor#getSort()
*/
public Sort getSort() {
return Sort.unsorted();
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.ParameterAccessor#iterator()
*/
public Iterator<Object> iterator() {
return Arrays.asList(values).iterator();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.repository.query.ElasticsearchParameterAccessor#getValues()
*/
@Override
public Object[] getValues() {
return this.values;
}
/*
* (non-Javadoc)
* @see org.springframework.data.repository.query.ParameterAccessor#getDynamicProjection()
*/
@Override
public Optional<Class<?>> getDynamicProjection() {
return Optional.empty();
}
}

View File

@ -0,0 +1,492 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.support;
import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.data.elasticsearch.annotations.Query;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.config.AbstractReactiveElasticsearchConfiguration;
import org.springframework.data.elasticsearch.entities.SampleEntity;
import org.springframework.data.elasticsearch.repository.config.EnableReactiveElasticsearchRepositories;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.StringUtils;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class SimpleReactiveElasticsearchRepositoryTests {
@Configuration
@EnableReactiveElasticsearchRepositories(considerNestedRepositories = true)
static class Config extends AbstractReactiveElasticsearchConfiguration {
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
return TestUtils.reactiveClient();
}
}
static final String INDEX = "test-index-sample";
static final String TYPE = "test-type";
@Autowired ReactiveSampleEntityRepository repository;
@After
public void tearDown() {
TestUtils.deleteIndex(INDEX);
}
@Test // DATAES-519
public void saveShouldSaveSingleEntity() {
repository.save(SampleEntity.builder().build()) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(TestUtils.documentWithId(it.getId()).ofType(TYPE).existsIn(INDEX)).isTrue();
}) //
.verifyComplete();
}
@Test // DATAES-519
public void saveShouldComputeMultipleEntities() {
repository
.saveAll(Arrays.asList(SampleEntity.builder().build(), SampleEntity.builder().build(),
SampleEntity.builder().build())) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(TestUtils.documentWithId(it.getId()).ofType(TYPE).existsIn(INDEX)).isTrue();
}) //
.consumeNextWith(it -> {
assertThat(TestUtils.documentWithId(it.getId()).ofType(TYPE).existsIn(INDEX)).isTrue();
}) //
.consumeNextWith(it -> {
assertThat(TestUtils.documentWithId(it.getId()).ofType(TYPE).existsIn(INDEX)).isTrue();
}) //
.verifyComplete();
}
@Test // DATAES-519
public void findByIdShouldCompleteIfIndexDoesNotExist() {
repository.findById("id-two").as(StepVerifier::create).verifyComplete();
}
@Test // DATAES-519
public void findShouldRetrieveSingleEntityById() {
bulkIndex(SampleEntity.builder().id("id-one").build(), //
SampleEntity.builder().id("id-two").build(), //
SampleEntity.builder().id("id-three").build());
repository.findById("id-two").as(StepVerifier::create)//
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo("id-two");
}) //
.verifyComplete();
}
@Test // DATAES-519
public void findByIdShouldCompleteIfNothingFound() {
bulkIndex(SampleEntity.builder().id("id-one").build(), //
SampleEntity.builder().id("id-two").build(), //
SampleEntity.builder().id("id-three").build());
repository.findById("does-not-exist").as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAES-519
public void findAllByIdByIdShouldCompleteIfIndexDoesNotExist() {
repository.findAllById(Arrays.asList("id-two", "id-two")).as(StepVerifier::create).verifyComplete();
}
@Test // DATAES-519
public void findAllByIdShouldRetrieveMatchingDocuments() {
bulkIndex(SampleEntity.builder().id("id-one").build(), //
SampleEntity.builder().id("id-two").build(), //
SampleEntity.builder().id("id-three").build());
repository.findAllById(Arrays.asList("id-one", "id-two")) //
.as(StepVerifier::create)//
.expectNextCount(2) //
.verifyComplete();
}
@Test // DATAES-519
public void findAllByIdShouldCompleteWhenNothingFound() {
bulkIndex(SampleEntity.builder().id("id-one").build(), //
SampleEntity.builder().id("id-two").build(), //
SampleEntity.builder().id("id-three").build());
repository.findAllById(Arrays.asList("can't", "touch", "this")) //
.as(StepVerifier::create)//
.verifyComplete();
}
@Test // DATAES-519
public void countShouldReturnZeroWhenIndexDoesNotExist() {
repository.count().as(StepVerifier::create).expectNext(0L).verifyComplete();
}
@Test // DATAES-519
public void countShouldCountDocuments() {
bulkIndex(SampleEntity.builder().id("id-one").build(), //
SampleEntity.builder().id("id-two").build());
repository.count().as(StepVerifier::create).expectNext(2L).verifyComplete();
}
@Test // DATAES-519
public void existsByIdShouldReturnTrueIfExists() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.existsById("id-two") //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@Test // DATAES-519
public void existsByIdShouldReturnFalseIfNotExists() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.existsById("wrecking ball") //
.as(StepVerifier::create) //
.expectNext(false) //
.verifyComplete();
}
@Test // DATAES-519
public void countShouldCountMatchingDocuments() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.countAllByMessage("test") //
.as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete();
}
@Test // DATAES-519
public void existsShouldReturnTrueIfExists() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.existsAllByMessage("message") //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@Test // DATAES-519
public void existsShouldReturnFalseIfNotExists() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.existsAllByMessage("these days") //
.as(StepVerifier::create) //
.expectNext(false) //
.verifyComplete();
}
@Test // DATAES-519
public void deleteByIdShouldCompleteIfNothingDeleted() {
bulkIndex(SampleEntity.builder().id("id-one").build(), //
SampleEntity.builder().id("id-two").build());
repository.deleteById("does-not-exist").as(StepVerifier::create).verifyComplete();
}
@Test // DATAES-519
public void deleteByIdShouldCompleteWhenIndexDoesNotExist() {
repository.deleteById("does-not-exist").as(StepVerifier::create).verifyComplete();
}
@Test // DATAES-519
public void deleteByIdShouldDeleteEntry() {
SampleEntity toBeDeleted = SampleEntity.builder().id("id-two").build();
bulkIndex(SampleEntity.builder().id("id-one").build(), toBeDeleted);
repository.deleteById(toBeDeleted.getId()).as(StepVerifier::create).verifyComplete();
assertThat(TestUtils.documentWithId(toBeDeleted.getId()).ofType(TYPE).existsIn(INDEX)).isFalse();
}
@Test // DATAES-519
public void deleteShouldDeleteEntry() {
SampleEntity toBeDeleted = SampleEntity.builder().id("id-two").build();
bulkIndex(SampleEntity.builder().id("id-one").build(), toBeDeleted);
repository.delete(toBeDeleted).as(StepVerifier::create).verifyComplete();
assertThat(TestUtils.documentWithId(toBeDeleted.getId()).ofType(TYPE).existsIn(INDEX)).isFalse();
}
@Test // DATAES-519
public void deleteAllShouldDeleteGivenEntries() {
SampleEntity toBeDeleted = SampleEntity.builder().id("id-one").build();
SampleEntity hangInThere = SampleEntity.builder().id("id-two").build();
SampleEntity toBeDeleted2 = SampleEntity.builder().id("id-three").build();
bulkIndex(toBeDeleted, hangInThere, toBeDeleted2);
repository.deleteAll(Arrays.asList(toBeDeleted, toBeDeleted2)).as(StepVerifier::create).verifyComplete();
assertThat(TestUtils.documentWithId(toBeDeleted.getId()).ofType(TYPE).existsIn(INDEX)).isFalse();
assertThat(TestUtils.documentWithId(toBeDeleted2.getId()).ofType(TYPE).existsIn(INDEX)).isFalse();
assertThat(TestUtils.documentWithId(hangInThere.getId()).ofType(TYPE).existsIn(INDEX)).isTrue();
}
@Test // DATAES-519
public void deleteAllShouldDeleteAllEntries() {
bulkIndex(SampleEntity.builder().id("id-one").build(), //
SampleEntity.builder().id("id-two").build(), //
SampleEntity.builder().id("id-three").build());
repository.deleteAll().as(StepVerifier::create).verifyComplete();
assertThat(TestUtils.isEmptyIndex(INDEX)).isTrue();
}
@Test // DATAES-519
public void derivedFinderMethodShouldBeExecutedCorrectly() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.findAllByMessageLike("test") //
.as(StepVerifier::create) //
.expectNextCount(2) //
.verifyComplete();
}
@Test // DATAES-519
public void derivedFinderMethodShouldBeExecutedCorrectlyWhenGivenPublisher() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.findAllByMessage(Mono.just("test")) //
.as(StepVerifier::create) //
.expectNextCount(2) //
.verifyComplete();
}
@Test // DATAES-519
public void derivedFinderWithDerivedSortMethodShouldBeExecutedCorrectly() {
bulkIndex(SampleEntity.builder().id("id-one").message("test").rate(3).build(), //
SampleEntity.builder().id("id-two").message("test test").rate(1).build(), //
SampleEntity.builder().id("id-three").message("test test").rate(2).build());
repository.findAllByMessageLikeOrderByRate("test") //
.as(StepVerifier::create) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo("id-two")) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo("id-three")) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo("id-one")) //
.verifyComplete();
}
@Test // DATAES-519
public void derivedFinderMethodWithSortParameterShouldBeExecutedCorrectly() {
bulkIndex(SampleEntity.builder().id("id-one").message("test").rate(3).build(), //
SampleEntity.builder().id("id-two").message("test test").rate(1).build(), //
SampleEntity.builder().id("id-three").message("test test").rate(2).build());
repository.findAllByMessage("test", Sort.by(Order.asc("rate"))) //
.as(StepVerifier::create) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo("id-two")) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo("id-three")) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo("id-one")) //
.verifyComplete();
}
@Test // DATAES-519
public void derivedFinderMethodWithPageableParameterShouldBeExecutedCorrectly() {
bulkIndex(SampleEntity.builder().id("id-one").message("test").rate(3).build(), //
SampleEntity.builder().id("id-two").message("test test").rate(1).build(), //
SampleEntity.builder().id("id-three").message("test test").rate(2).build());
repository.findAllByMessage("test", PageRequest.of(0, 2, Sort.by(Order.asc("rate")))) //
.as(StepVerifier::create) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo("id-two")) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo("id-three")) //
.verifyComplete();
}
@Test // DATAES-519
public void derivedFinderMethodReturningMonoShouldBeExecutedCorrectly() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.findFirstByMessageLike("test") //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
}
@Test // DATAES-519
public void annotatedFinderMethodShouldBeExecutedCorrectly() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.findAllViaAnnotatedQueryByMessageLike("test") //
.as(StepVerifier::create) //
.expectNextCount(2) //
.verifyComplete();
}
@Test // DATAES-519
public void derivedDeleteMethodShouldBeExecutedCorrectly() {
bulkIndex(SampleEntity.builder().id("id-one").message("message").build(), //
SampleEntity.builder().id("id-two").message("test message").build(), //
SampleEntity.builder().id("id-three").message("test test").build());
repository.deleteAllByMessage("message") //
.as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete();
assertThat(TestUtils.documentWithId("id-one").ofType(TYPE).existsIn(INDEX)).isFalse();
assertThat(TestUtils.documentWithId("id-two").ofType(TYPE).existsIn(INDEX)).isFalse();
assertThat(TestUtils.documentWithId("id-three").ofType(TYPE).existsIn(INDEX)).isTrue();
}
IndexRequest indexRequest(Map source, String index, String type) {
return new IndexRequest(index, type) //
.id(source.containsKey("id") ? source.get("id").toString() : UUID.randomUUID().toString()) //
.source(source) //
.create(true);
}
IndexRequest indexRequestFrom(SampleEntity entity) {
Map<String, Object> target = new LinkedHashMap<>();
if (StringUtils.hasText(entity.getId())) {
target.put("id", entity.getId());
}
if (StringUtils.hasText(entity.getType())) {
target.put("type", entity.getType());
}
if (StringUtils.hasText(entity.getMessage())) {
target.put("message", entity.getMessage());
}
target.put("rate", entity.getRate());
target.put("available", entity.isAvailable());
return indexRequest(target, INDEX, TYPE);
}
void bulkIndex(SampleEntity... entities) {
BulkRequest request = new BulkRequest();
Arrays.stream(entities).forEach(it -> request.add(indexRequestFrom(it)));
try (RestHighLevelClient client = TestUtils.restHighLevelClient()) {
client.bulk(request.setRefreshPolicy(RefreshPolicy.IMMEDIATE), RequestOptions.DEFAULT);
} catch (Exception e) {}
}
interface ReactiveSampleEntityRepository extends ReactiveCrudRepository<SampleEntity, String> {
Flux<SampleEntity> findAllByMessageLike(String message);
Flux<SampleEntity> findAllByMessageLikeOrderByRate(String message);
Flux<SampleEntity> findAllByMessage(String message, Sort sort);
Flux<SampleEntity> findAllByMessage(String message, Pageable pageable);
Flux<SampleEntity> findAllByMessage(Publisher<String> message);
@Query("{ \"bool\" : { \"must\" : { \"term\" : { \"message\" : \"?0\" } } } }")
Flux<SampleEntity> findAllViaAnnotatedQueryByMessageLike(String message);
Mono<SampleEntity> findFirstByMessageLike(String message);
Mono<Long> countAllByMessage(String message);
Mono<Boolean> existsAllByMessage(String message);
Mono<Long> deleteAllByMessage(String message);
}
}