DATAES-504 - Add ReactiveElasticsearchOperations & ReactiveElasticsearchTemplate

ReactiveElasticsearchOperations is the gateway to executing high level commands against an Elasticsearch cluster using the ReactiveElasticsearchClient.

The ReactiveElasticsearchTemplate is the default implementation of ReactiveElasticsearchOperations and offers the following set of features.

* Read/Write mapping support for domain types.
* A rich query and criteria api.
* Resource management and Exception translation.

To get started the ReactiveElasticsearchTemplate needs to know about the actual client to work with.
The easiest way of setting up the ReactiveElasticsearchTemplate is via AbstractReactiveElasticsearchConfiguration providing
dedicated configuration method hooks for base package, the initial entity set etc.

@Configuration
public class Config extends AbstractReactiveElasticsearchConfiguration {

    @Bean
    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        // ...
    }
}

NOTE: If applicable set default HttpHeaders via the ClientConfiguration of the ReactiveElasticsearchClient.

TIP: If needed the ReactiveElasticsearchTemplate can be configured with default RefreshPolicy and IndicesOptions that get applied to the related requests by overriding the defaults of refreshPolicy() and indicesOptions().

The ReactiveElasticsearchTemplate lets you save, find and delete your domain objects and map those objects to documents stored in Elasticsearch.

@Document(indexName = "marvel", type = "characters")
public class Person {

    private @Id String id;
    private String name;
    private int age;

    // Getter/Setter omitted...
}

template.save(new Person("Bruce Banner", 42)) // save a new document
    .doOnNext(System.out::println)
    .flatMap(person -> template.findById(person.id, Person.class)) // then go find it
    .doOnNext(System.out::println)
    .flatMap(person -> template.delete(person)) // just to remove remove it again
    .doOnNext(System.out::println)
    .flatMap(id -> template.count(Person.class)) // so we've got nothing at the end
    .doOnNext(System.out::println)
    .subscribe(); // yeah :)

The above outputs the following sequence on the console.

> Person(id=QjWCWWcBXiLAnp77ksfR, name=Bruce Banner, age=42)
> Person(id=QjWCWWcBXiLAnp77ksfR, name=Bruce Banner, age=42)
> QjWCWWcBXiLAnp77ksfR
> 0

Original Pull Request: #229
This commit is contained in:
Christoph Strobl 2018-11-23 15:00:59 +01:00
parent a39c34058b
commit ba890cb7eb
22 changed files with 2666 additions and 251 deletions

View File

@ -30,6 +30,7 @@ include::{spring-data-commons-docs}/repositories.adoc[]
:leveloffset: +1 :leveloffset: +1
include::reference/elasticsearch-clients.adoc[] include::reference/elasticsearch-clients.adoc[]
include::reference/data-elasticsearch.adoc[] include::reference/data-elasticsearch.adoc[]
include::reference/reactive-elasticsearch-operations.adoc[]
include::reference/elasticsearch-misc.adoc[] include::reference/elasticsearch-misc.adoc[]
:leveloffset: -1 :leveloffset: -1

View File

@ -0,0 +1,132 @@
[[elasticsearch.reactive.operations]]
= Reactive Elasticsearch Operations
`ReactiveElasticsearchOperations` is the gateway to executing high level commands against an Elasticsearch cluster using the `ReactiveElasticsearchClient`.
The `ReactiveElasticsearchTemplate` is the default implementation of `ReactiveElasticsearchOperations` and offers the following set of features.
* Read/Write mapping support for domain types.
* A rich query and criteria api.
* Resource management and Exception translation.
[[elasticsearch.reactive.template]]
== Reactive Elasticsearch Template
To get started the `ReactiveElasticsearchTemplate` needs to know about the actual client to work with.
Please see <<elasticsearch.clients.reactive>> for details on the client.
[[elasticsearch.reactive.template.configuration]]
=== Reactive Template Configuration
The easiest way of setting up the `ReactiveElasticsearchTemplate` is via `AbstractReactiveElasticsearchConfiguration` providing
dedicated configuration method hooks for `base package`, the `initial entity set` etc.
.The AbstractReactiveElasticsearchConfiguration
====
[source,java]
----
@Configuration
public class Config extends AbstractReactiveElasticsearchConfiguration {
@Bean <1>
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
// ...
}
}
----
<1> Configure the client to use. This can be done by `ReactiveRestClients` or directly via `DefaultReactiveElasticsearchClient`.
====
NOTE: If applicable set default `HttpHeaders` via the `ClientConfiguration` of the `ReactiveElasticsearchClient`.
TIP: If needed the `ReactiveElasticsearchTemplate` can be configured with default `RefreshPolicy` and `IndicesOptions` that get applied to the related requests by overriding the defaults of `refreshPolicy()` and `indicesOptions()`.
However one might want to be more in control over the actual components and use a more verbose approach.
.Configure the ReactiveElasticsearchTemplate
====
[source,java]
----
@Configuration
public class Config {
@Bean <1>
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
// ...
}
@Bean <2>
public ElasticsearchConverter elasticsearchConverter() {
return new MappingElasticsearchConverter(elasticsearchMappingContext());
}
@Bean <3>
public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
return new SimpleElasticsearchMappingContext();
}
@Bean <4>
public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
return new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(), elasticsearchConverter());
}
}
----
<1> Configure the client to use. This can be done by `ReactiveRestClients` or directly via `DefaultReactiveElasticsearchClient`.
<2> Set up the `ElasticsearchConverter` used for domain type mapping utilizing metadata provided by the mapping context.
<3> The Elasticsearch specific mapping context for domain type metadata.
<4> The actual template based on the client and conversion infrastructure.
====
[[elasticsearch.reactive.template.usage]]
=== Reactive Template Usage
`ReactiveElasticsearchTemplate` lets you save, find and delete your domain objects and map those objects to documents stored in Elasticsearch.
Consider the following:
.Use the ReactiveElasticsearchTemplate
====
[source,java]
----
@Document(indexName = "marvel", type = "characters")
public class Person {
private @Id String id;
private String name;
private int age;
// Getter/Setter omitted...
}
----
[source,java]
----
template.save(new Person("Bruce Banner", 42)) <1>
.doOnNext(System.out::println)
.flatMap(person -> template.findById(person.id, Person.class)) <2>
.doOnNext(System.out::println)
.flatMap(person -> template.delete(person)) <3>
.doOnNext(System.out::println)
.flatMap(id -> template.count(Person.class)) <4>
.doOnNext(System.out::println)
.subscribe(); <5>
----
The above outputs the following sequence on the console.
[source,text]
----
> Person(id=QjWCWWcBXiLAnp77ksfR, name=Bruce Banner, age=42)
> Person(id=QjWCWWcBXiLAnp77ksfR, name=Bruce Banner, age=42)
> QjWCWWcBXiLAnp77ksfR
> 0
----
<1> Insert a new `Person` document into the _marvel_ index under type _characters_. The `id` is generated on server side and set into the instance returned.
<2> Lookup the `Person` with matching `id` in the _marvel_ index under type _characters_.
<3> Delete the `Person` with matching `id`, extracted from the given instance, in the _marvel_ index under type _characters_.
<4> Count the total number of documents in the _marvel_ index under type _characters_.
<5> Don't forget to _subscribe()_.
====

View File

@ -56,11 +56,13 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException; import org.springframework.data.elasticsearch.client.NoReachableHostException;
@ -249,7 +251,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
*/ */
@Override @Override
public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) { public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
return sendRequest(deleteRequest, RequestCreator.delete(), DeleteResponse.class, headers).publishNext();
return sendRequest(deleteRequest, RequestCreator.delete(), DeleteResponse.class, headers) //
.publishNext();
} }
/* /*
@ -264,6 +268,16 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.flatMap(Flux::fromIterable); .flatMap(Flux::fromIterable);
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest)
*/
public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
return sendRequest(deleteRequest, RequestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) //
.publishNext();
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback)
@ -362,29 +376,24 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
try {
XContentParser contentParser = createParser(mediaType, content);
try { try {
Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class);
return Mono
.justOrEmpty(responseType.cast(ReflectionUtils.invokeMethod(fromXContent, responseType, contentParser))); return Mono.justOrEmpty(responseType
.cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content))));
} catch (Exception errorParseFailure) { } catch (Exception errorParseFailure) {
try { try {
return Mono.error(BytesRestResponse.errorFromXContent(contentParser)); return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content)));
} catch (Exception e) { } catch (Exception e) {
// return Mono.error to avoid ElasticsearchStatusException to be caught by outer catch.
return Mono.error(new ElasticsearchStatusException("Unable to parse response body", return Mono
RestStatus.fromCode(response.statusCode().value()))); .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
} }
} }
} catch (IOException e) {
return Mono.error(new DataAccessResourceFailureException("Error parsing XContent.", e));
}
} }
private static XContentParser createParser(String mediaType, String content) throws IOException { private static XContentParser createParser(String mediaType, String content) throws IOException {
@ -437,6 +446,18 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
static Function<DeleteRequest, Request> delete() { static Function<DeleteRequest, Request> delete() {
return RequestConverters::delete; return RequestConverters::delete;
} }
static Function<DeleteByQueryRequest, Request> deleteByQuery() {
return request -> {
try {
return RequestConverters.deleteByQuery(request);
} catch (IOException e) {
throw new ElasticsearchException("Could not parse request", e);
}
};
}
} }
/** /**

View File

@ -33,6 +33,8 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost;
@ -348,6 +350,44 @@ public interface ReactiveElasticsearchClient {
*/ */
Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest); Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest);
/**
* Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">Delete By
* Query API on elastic.co</a>
* @return a {@link Mono} emitting the emitting operation response.
*/
default Mono<BulkByScrollResponse> deleteBy(Consumer<DeleteByQueryRequest> consumer) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
consumer.accept(request);
return deleteBy(request);
}
/**
* Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API.
*
* @param deleteRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">Delete By
* Query API on elastic.co</a>
* @return a {@link Mono} emitting the emitting operation response.
*/
default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest) {
return deleteBy(HttpHeaders.EMPTY, deleteRequest);
}
/**
* Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param deleteRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">Delete By
* Query API on elastic.co</a>
* @return a {@link Mono} emitting operation response.
*/
Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest);
/** /**
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and

View File

@ -0,0 +1,48 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.config;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
/**
* @author Christoph Strobl
* @since 4.0
* @see ElasticsearchConfigurationSupport
*/
public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport {
/**
* Return the {@link RestHighLevelClient} instance used to connect to the cluster. <br />
* Annotate with {@link Bean} in case you want to expose a {@link RestHighLevelClient} instance to the
* {@link org.springframework.context.ApplicationContext}.
*
* @return never {@literal null}.
*/
public abstract RestHighLevelClient elasticsearchClient();
/**
* Creates {@link ElasticsearchOperations}.
*
* @return never {@literal null}.
*/
@Bean
public ElasticsearchOperations elasticsearchOperations() {
return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter());
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.config;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.lang.Nullable;
/**
* @author Christoph Strobl
* @since 4.0
* @see ElasticsearchConfigurationSupport
*/
@Configuration
public abstract class AbstractReactiveElasticsearchConfiguration extends ElasticsearchConfigurationSupport {
/**
* Return the {@link ReactiveElasticsearchClient} instance used to connect to the cluster. <br />
* Annotate with {@link Bean} in case you want to expose a {@link ReactiveElasticsearchClient} instance to the
* {@link org.springframework.context.ApplicationContext}.
*
* @return never {@literal null}.
*/
public abstract ReactiveElasticsearchClient reactiveElasticsearchClient();
/**
* Creates {@link ReactiveElasticsearchOperations}.
*
* @return never {@literal null}.
*/
@Bean
public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
ReactiveElasticsearchTemplate template = new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(),
elasticsearchConverter());
template.setIndicesOptions(indicesOptions());
template.setRefreshPolicy(refreshPolicy());
return template;
}
/**
* Set up the write {@link RefreshPolicy}. Default is set to {@link RefreshPolicy#IMMEDIATE}.
*
* @return {@literal null} to use the server defaults.
*/
@Nullable
protected RefreshPolicy refreshPolicy() {
return RefreshPolicy.IMMEDIATE;
}
/**
* Set up the read {@link IndicesOptions}. Default is set to {@link IndicesOptions#strictExpandOpenAndForbidClosed()}.
*
* @return {@literal null} to use the server defaults.
*/
@Nullable
protected IndicesOptions indicesOptions() {
return IndicesOptions.strictExpandOpenAndForbidClosed();
}
}

View File

@ -0,0 +1,146 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.config;
import lombok.SneakyThrows;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.data.annotation.Persistent;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
/**
* @author Christoph Strobl
* @since 4.0
*/
@Configuration
public class ElasticsearchConfigurationSupport {
@Bean
public ElasticsearchConverter elasticsearchConverter() {
return new MappingElasticsearchConverter(elasticsearchMappingContext());
}
/**
* Creates a {@link SimpleElasticsearchMappingContext} equipped with entity classes scanned from the mapping base
* package.
*
* @see #getMappingBasePackages()
* @return never {@literal null}.
*/
@Bean
@SneakyThrows
public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
SimpleElasticsearchMappingContext mappingContext = new SimpleElasticsearchMappingContext();
mappingContext.setInitialEntitySet(getInitialEntitySet());
mappingContext.setSimpleTypeHolder(customConversions().getSimpleTypeHolder());
return mappingContext;
}
/**
* Register custom {@link Converter}s in a {@link ElasticsearchCustomConversions} object if required.
*
* @return never {@literal null}.
*/
@Bean
public ElasticsearchCustomConversions customConversions() {
return new ElasticsearchCustomConversions(Collections.emptyList());
}
/**
* Returns the base packages to scan for Elasticsearch mapped entities at startup. Will return the package name of the
* configuration class' (the concrete class, not this one here) by default. So if you have a
* {@code com.acme.AppConfig} extending {@link ElasticsearchConfigurationSupport} the base package will be considered
* {@code com.acme} unless the method is overridden to implement alternate behavior.
*
* @return the base packages to scan for mapped {@link Document} classes or an empty collection to not enable scanning
* for entities.
*/
protected Collection<String> getMappingBasePackages() {
Package mappingBasePackage = getClass().getPackage();
return Collections.singleton(mappingBasePackage == null ? null : mappingBasePackage.getName());
}
/**
* Scans the mapping base package for classes annotated with {@link Document}. By default, it scans for entities in
* all packages returned by {@link #getMappingBasePackages()}.
*
* @see #getMappingBasePackages()
* @return never {@literal null}.
* @throws ClassNotFoundException
*/
protected Set<Class<?>> getInitialEntitySet() throws ClassNotFoundException {
Set<Class<?>> initialEntitySet = new HashSet<>();
for (String basePackage : getMappingBasePackages()) {
initialEntitySet.addAll(scanForEntities(basePackage));
}
return initialEntitySet;
}
/**
* Scans the given base package for entities, i.e. Elasticsearch specific types annotated with {@link Document} and
* {@link Persistent}.
*
* @param basePackage must not be {@literal null}.
* @return never {@literal null}.
* @throws ClassNotFoundException
*/
protected Set<Class<?>> scanForEntities(String basePackage) throws ClassNotFoundException {
if (!StringUtils.hasText(basePackage)) {
return Collections.emptySet();
}
Set<Class<?>> initialEntitySet = new HashSet<Class<?>>();
if (StringUtils.hasText(basePackage)) {
ClassPathScanningCandidateComponentProvider componentProvider = new ClassPathScanningCandidateComponentProvider(
false);
componentProvider.addIncludeFilter(new AnnotationTypeFilter(Document.class));
componentProvider.addIncludeFilter(new AnnotationTypeFilter(Persistent.class));
for (BeanDefinition candidate : componentProvider.findCandidateComponents(basePackage)) {
initialEntitySet.add(ClassUtils.forName(candidate.getBeanClassName(),
AbstractReactiveElasticsearchConfiguration.class.getClassLoader()));
}
}
return initialEntitySet;
}
}

View File

@ -15,11 +15,7 @@
*/ */
package org.springframework.data.elasticsearch.core; package org.springframework.data.elasticsearch.core;
import java.io.IOException;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/** /**
* @author Artur Konczak * @author Artur Konczak
@ -35,17 +31,6 @@ public abstract class AbstractResultMapper implements ResultsMapper {
this.entityMapper = entityMapper; this.entityMapper = entityMapper;
} }
public <T> T mapEntity(String source, Class<T> clazz) {
if (StringUtils.isEmpty(source)) {
return null;
}
try {
return entityMapper.mapToObject(source, clazz);
} catch (IOException e) {
throw new ElasticsearchException("failed to map source [ " + source + "] to class " + clazz.getSimpleName(), e);
}
}
@Override @Override
public EntityMapper getEntityMapper() { public EntityMapper getEntityMapper() {
return this.entityMapper; return this.entityMapper;

View File

@ -16,8 +16,11 @@
package org.springframework.data.elasticsearch.core; package org.springframework.data.elasticsearch.core;
import java.net.ConnectException;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.springframework.dao.DataAccessException; import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.dao.support.PersistenceExceptionTranslator;
/** /**
@ -31,6 +34,12 @@ public class ElasticsearchExceptionTranslator implements PersistenceExceptionTra
if (ex instanceof ElasticsearchException) { if (ex instanceof ElasticsearchException) {
// TODO: exception translation // TODO: exception translation
ElasticsearchException elasticsearchExption = (ElasticsearchException) ex;
// elasticsearchExption.get
}
if(ex.getCause() instanceof ConnectException) {
return new DataAccessResourceFailureException(ex.getMessage(), ex);
} }
return null; return null;

View File

@ -0,0 +1,459 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.elasticsearch.index.query.QueryBuilders;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Interface that specifies a basic set of Elasticsearch operations executed in a reactive way.
* <p>
* Implemented by {@link ReactiveElasticsearchTemplate}. Not often used but a useful option for extensibility and
* testability (as it can be easily mocked, stubbed, or be the target of a JDK proxy). Command execution using
* {@link ReactiveElasticsearchOperations} is deferred until a {@link org.reactivestreams.Subscriber} subscribes to the
* {@link Publisher}.
*
* @author Christoph Strobl
* @since 4.0
*/
public interface ReactiveElasticsearchOperations {
/**
* Execute within a {@link ClientCallback} managing resources and translating errors.
*
* @param callback must not be {@literal null}.
* @param <T>
* @return the {@link Publisher} emitting results.
*/
<T> Publisher<T> execute(ClientCallback<Publisher<T>> callback);
/**
* Index the given entity, once available, extracting index and type from entity metadata.
*
* @param entityPublisher must not be {@literal null}.
* @param <T>
* @return a {@link Mono} emitting the saved entity.
*/
default <T> Mono<T> save(Mono<? extends T> entityPublisher) {
Assert.notNull(entityPublisher, "EntityPublisher must not be null!");
return entityPublisher.flatMap(this::save);
}
/**
* Index the given entity extracting index and type from entity metadata.
*
* @param entity must not be {@literal null}.
* @param <T>
* @return a {@link Mono} emitting the saved entity.
*/
default <T> Mono<T> save(T entity) {
return save(entity, null);
}
/**
* Index the entity, once available, in the given {@literal index}. If the index is {@literal null} or empty the index
* name provided via entity metadata is used.
*
* @param entityPublisher must not be {@literal null}.
* @param index the name of the target index. Can be {@literal null}.
* @param <T>
* @return a {@link Mono} emitting the saved entity.
*/
default <T> Mono<T> save(Mono<? extends T> entityPublisher, String index) {
Assert.notNull(entityPublisher, "EntityPublisher must not be null!");
return entityPublisher.flatMap(it -> save(it, index));
}
/**
* Index the entity in the given {@literal index}. If the index is {@literal null} or empty the index name provided
* via entity metadata is used.
*
* @param entity must not be {@literal null}.
* @param index the name of the target index. Can be {@literal null}.
* @param <T>
* @return a {@link Mono} emitting the saved entity.
*/
default <T> Mono<T> save(T entity, @Nullable String index) {
return save(entity, index, null);
}
/**
* Index the entity, once available, under the given {@literal type} in the given {@literal index}. If the
* {@literal index} is {@literal null} or empty the index name provided via entity metadata is used. Same for the
* {@literal type}.
*
* @param entityPublisher must not be {@literal null}.
* @param index the name of the target index. Can be {@literal null}.
* @param type the name of the type within the index. Can be {@literal null}.
* @param <T>
* @return a {@link Mono} emitting the saved entity.
*/
default <T> Mono<T> save(Mono<? extends T> entityPublisher, @Nullable String index, @Nullable String type) {
Assert.notNull(entityPublisher, "EntityPublisher must not be null!");
return entityPublisher.flatMap(it -> save(it, index, type));
}
/**
* Index the entity under the given {@literal type} in the given {@literal index}. If the {@literal index} is
* {@literal null} or empty the index name provided via entity metadata is used. Same for the {@literal type}.
*
* @param entity must not be {@literal null}.
* @param index the name of the target index. Can be {@literal null}.
* @param type the name of the type within the index. Can be {@literal null}.
* @param <T>
* @return a {@link Mono} emitting the saved entity.
*/
<T> Mono<T> save(T entity, @Nullable String index, @Nullable String type);
/**
* Find the document with the given {@literal id} mapped onto the given {@literal entityType}.
*
* @param id the {@literal _id} of the document to fetch.
* @param entityType the domain type used for mapping the document.
* @param <T>
* @return {@link Mono#empty()} if not found.
*/
default <T> Mono<T> findById(String id, Class<T> entityType) {
return findById(id, entityType, null);
}
/**
* Fetch the entity with given {@literal id}.
*
* @param id the {@literal _id} of the document to fetch.
* @param entityType the domain type used for mapping the document.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param <T>
* @return {@link Mono#empty()} if not found.
*/
default <T> Mono<T> findById(String id, Class<T> entityType, @Nullable String index) {
return findById(id, entityType, index, null);
}
/**
* Fetch the entity with given {@literal id}.
*
* @param id must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param <T>
* @return the {@link Mono} emitting the entity or signalling completion if none found.
*/
<T> Mono<T> findById(String id, Class<T> entityType, @Nullable String index, @Nullable String type);
/**
* Check if an entity with given {@literal id} exists.
*
* @param id the {@literal _id} of the document to look for.
* @param entityType the domain type used.
* @return a {@link Mono} emitting {@literal true} if a matching document exists, {@literal false} otherwise.
*/
default Mono<Boolean> exists(String id, Class<?> entityType) {
return exists(id, entityType, null);
}
/**
* Check if an entity with given {@literal id} exists.
*
* @param id the {@literal _id} of the document to look for.
* @param entityType the domain type used.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting {@literal true} if a matching document exists, {@literal false} otherwise.
*/
default Mono<Boolean> exists(String id, Class<?> entityType, @Nullable String index) {
return exists(id, entityType, index, null);
}
/**
* Check if an entity with given {@literal id} exists.
*
* @param id the {@literal _id} of the document to look for.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting {@literal true} if a matching document exists, {@literal false} otherwise.
*/
Mono<Boolean> exists(String id, Class<?> entityType, @Nullable String index, @Nullable String type);
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param <T>
* @return a {@link Flux} emitting matching entities one by one.
*/
default <T> Flux<T> find(Query query, Class<T> entityType) {
return find(query, entityType, entityType);
}
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param query must not be {@literal null}.
* @param entityType The entity type for mapping the query. Must not be {@literal null}.
* @param returnType The mapping target type. Must not be {@literal null}. Th
* @param <T>
* @return a {@link Flux} emitting matching entities one by one.
*/
default <T> Flux<T> find(Query query, Class<?> entityType, Class<T> returnType) {
return find(query, entityType, null, null, returnType);
}
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param <T>
* @return a {@link Flux} emitting matching entities one by one.
*/
default <T> Flux<T> find(Query query, Class<T> entityType, @Nullable String index) {
return find(query, entityType, index, null);
}
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param <T>
* @returnm a {@link Flux} emitting matching entities one by one.
*/
default <T> Flux<T> find(Query query, Class<T> entityType, @Nullable String index, @Nullable String type) {
return find(query, entityType, index, type, entityType);
}
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param resultType the projection result type.
* @param <T>
* @return a {@link Flux} emitting matching entities one by one.
*/
<T> Flux<T> find(Query query, Class<?> entityType, @Nullable String index, @Nullable String type,
Class<T> resultType);
/**
* Count the number of documents matching the given {@link Query}.
*
* @param entityType must not be {@literal null}.
* @return a {@link Mono} emitting the nr of matching documents.
*/
default Mono<Long> count(Class<?> entityType) {
return count(new StringQuery(QueryBuilders.matchAllQuery().toString()), entityType, null);
}
/**
* Count the number of documents matching the given {@link Query}.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @return a {@link Mono} emitting the nr of matching documents.
*/
default Mono<Long> count(Query query, Class<?> entityType) {
return count(query, entityType, null);
}
/**
* Count the number of documents matching the given {@link Query}.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting the nr of matching documents.
*/
default Mono<Long> count(Query query, Class<?> entityType, @Nullable String index) {
return count(query, entityType, index, null);
}
/**
* Count the number of documents matching the given {@link Query}.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting the nr of matching documents.
*/
Mono<Long> count(Query query, Class<?> entityType, @Nullable String index, @Nullable String type);
/**
* Delete the given entity extracting index and type from entity metadata.
*
* @param entity must not be {@literal null}.
* @return a {@link Mono} emitting the {@literal id} of the removed document.
*/
default Mono<String> delete(Object entity) {
return delete(entity, null);
}
/**
* Delete the given entity extracting index and type from entity metadata.
*
* @param entity must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting the {@literal id} of the removed document.
*/
default Mono<String> delete(Object entity, @Nullable String index) {
return delete(entity, index, null);
}
/**
* Delete the given entity extracting index and type from entity metadata.
*
* @param entity must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting the {@literal id} of the removed document.
*/
Mono<String> delete(Object entity, @Nullable String index, @Nullable String type);
/**
* Delete the entity with given {@literal id}.
*
* @param id must not be {@literal null}.
* @param index the name of the target index.
* @param type the name of the target type.
* @return a {@link Mono} emitting the {@literal id} of the removed document.
*/
default Mono<String> deleteById(String id, String index, String type) {
Assert.notNull(index, "Index must not be null!");
Assert.notNull(type, "Type must not be null!");
return deleteById(id, Object.class, index, type);
}
/**
* Delete the entity with given {@literal id} extracting index and type from entity metadata.
*
* @param id must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @return a {@link Mono} emitting the {@literal id} of the removed document.
*/
default Mono<String> deleteById(String id, Class<?> entityType) {
return deleteById(id, entityType, null);
}
/**
* Delete the entity with given {@literal id} extracting index and type from entity metadata.
*
* @param id must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting the {@literal id} of the removed document.
*/
default Mono<String> deleteById(String id, Class<?> entityType, @Nullable String index) {
return deleteById(id, entityType, index, null);
}
/**
* Delete the entity with given {@literal id} extracting index and type from entity metadata.
*
* @param id must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting the {@literal id} of the removed document.
*/
Mono<String> deleteById(String id, Class<?> entityType, @Nullable String index, @Nullable String type);
/**
* Delete the documents matching the given {@link Query} extracting index and type from entity metadata.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @return a {@link Mono} emitting the number of the removed documents.
*/
default Mono<Long> deleteBy(Query query, Class<?> entityType) {
return deleteBy(query, entityType, null);
}
/**
* Delete the documents matching the given {@link Query} extracting index and type from entity metadata.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting the number of the removed documents.
*/
default Mono<Long> deleteBy(Query query, Class<?> entityType, @Nullable String index) {
return deleteBy(query, entityType, index, null);
}
/**
* Delete the documents matching the given {@link Query} extracting index and type from entity metadata.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the name of the target index. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @param type the name of the target type. Overwrites document metadata from {@literal entityType} if not
* {@literal null}.
* @return a {@link Mono} emitting the number of the removed documents.
*/
Mono<Long> deleteBy(Query query, Class<?> entityType, @Nullable String index, @Nullable String type);
/**
* Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on
* {@link ReactiveElasticsearchClient}.
*
* @param <T>
* @author Christoph Strobl
* @since 4.0
*/
interface ClientCallback<T extends Publisher<?>> {
T doWithClient(ReactiveElasticsearchClient client);
}
}

View File

@ -17,7 +17,6 @@ package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.index.VersionType.*; import static org.elasticsearch.index.VersionType.*;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -25,16 +24,25 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.WrapperQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortBuilders;
@ -45,25 +53,36 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsea
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.mapping.IdentifierAccessor;
import org.springframework.data.mapping.PersistentProperty;
import org.springframework.data.mapping.PersistentPropertyAccessor; import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.util.ClassTypeInformation;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.client.HttpClientErrorException;
/** /**
* @author Christoph Strobl * @author Christoph Strobl
* @since 4.0 * @since 4.0
*/ */
public class ReactiveElasticsearchTemplate { public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations {
private final ReactiveElasticsearchClient client; private final ReactiveElasticsearchClient client;
private final ElasticsearchConverter converter; private final ElasticsearchConverter converter;
private final DefaultResultMapper mapper; private final ResultsMapper resultMapper;
private final ElasticsearchExceptionTranslator exceptionTranslator; private final ElasticsearchExceptionTranslator exceptionTranslator;
private @Nullable RefreshPolicy refreshPolicy = RefreshPolicy.IMMEDIATE;
private @Nullable IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client) { public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client) {
this(client, new MappingElasticsearchConverter(new SimpleElasticsearchMappingContext())); this(client, new MappingElasticsearchConverter(new SimpleElasticsearchMappingContext()));
} }
@ -72,87 +91,152 @@ public class ReactiveElasticsearchTemplate {
this.client = client; this.client = client;
this.converter = converter; this.converter = converter;
this.mapper = new DefaultResultMapper(converter.getMappingContext()); this.resultMapper = new DefaultResultMapper(converter.getMappingContext());
this.exceptionTranslator = new ElasticsearchExceptionTranslator(); this.exceptionTranslator = new ElasticsearchExceptionTranslator();
} }
public <T> Mono<T> index(T entity) { /*
return index(entity, null); * (non-Javadoc)
} * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#exctute(ClientCallback)
public <T> Mono<T> index(T entity, String index) {
return index(entity, index, null);
}
/**
* Add the given entity to the index.
*
* @param entity
* @param index
* @param type
* @param <T>
* @return
*/ */
public <T> Mono<T> index(T entity, String index, String type) { @Override
public <T> Publisher<T> execute(ClientCallback<Publisher<T>> callback) {
return Flux.defer(() -> callback.doWithClient(getClient())).onErrorMap(this::translateException);
}
ElasticsearchPersistentEntity<?> persistentEntity = lookupPersistentEntity(entity.getClass()); /*
return doIndex(entity, persistentEntity, index, type) // * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#index(Object, String, String)
*/
@Override
public <T> Mono<T> save(T entity, @Nullable String index, @Nullable String type) {
Assert.notNull(entity, "Entity must not be null!");
AdaptableEntity<T> adaptableEntity = ConverterAwareAdaptableEntity.of(entity, converter);
return doIndex(entity, adaptableEntity, index, type) //
.map(it -> { .map(it -> {
return adaptableEntity.updateIdIfNecessary(it.getId());
// TODO: update id if necessary!
// it.getId()
// it.getVersion()
return entity;
}); });
} }
public <T> Mono<T> get(String id, Class<T> resultType) { private Mono<IndexResponse> doIndex(Object value, AdaptableEntity<?> entity, @Nullable String index,
return get(id, resultType, null); @Nullable String type) {
return Mono.defer(() -> {
Object id = entity.getId();
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
IndexRequest request = id != null ? new IndexRequest(indexToUse, typeToUse, id.toString())
: new IndexRequest(indexToUse, typeToUse);
try {
request.source(resultMapper.getEntityMapper().mapToString(value), Requests.INDEX_CONTENT_TYPE);
} catch (IOException e) {
throw new RuntimeException(e);
} }
public <T> Mono<T> get(String id, Class<T> resultType, @Nullable String index) { if (entity.isVersioned()) {
return get(id, resultType, index, null);
Object version = entity.getVersion();
if (version != null) {
request.version(((Number) version).longValue());
request.versionType(EXTERNAL);
}
} }
/** if (entity.getPersistentEntity().getParentIdProperty() != null) {
* Fetch the entity with given id.
* Object parentId = entity.getPropertyValue(entity.getPersistentEntity().getParentIdProperty());
* @param id must not be {@literal null}. if (parentId != null) {
* @param resultType must not be {@literal null}. request.parent(parentId.toString());
* @param index }
* @param type }
* @param <T>
* @return the {@link Mono} emitting the entity or signalling completion if none found. request = prepareIndexRequest(value, request);
return doIndex(request);
});
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#findById(String, Class, String, String)
*/ */
public <T> Mono<T> get(String id, Class<T> resultType, @Nullable String index, @Nullable String type) { @Override
public <T> Mono<T> findById(String id, Class<T> entityType, @Nullable String index, @Nullable String type) {
ElasticsearchPersistentEntity<?> persistentEntity = lookupPersistentEntity(resultType); Assert.notNull(id, "Id must not be null!");
GetRequest request = new GetRequest(persistentEntity.getIndexName(), persistentEntity.getIndexType(), id);
return doGet(id, persistentEntity, index, type).map(it -> mapper.mapEntity(it.sourceAsString(), resultType)); return doFindById(id, BasicElasticsearchEntity.of(entityType, converter), index, type)
.map(it -> resultMapper.mapEntity(it, entityType));
} }
/** private Mono<GetResult> doFindById(String id, ElasticsearchEntity<?> entity, @Nullable String index,
* Search the index for entities matching the given {@link CriteriaQuery query}. @Nullable String type) {
*
* @param query must not be {@literal null}. return Mono.defer(() -> {
* @param resultType must not be {@literal null}.
* @param <T> String indexToUse = indexName(index, entity);
* @return String typeToUse = typeName(type, entity);
return doFindById(new GetRequest(indexToUse, typeToUse, id));
});
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#exists(String, Class, String, String)
*/ */
public <T> Flux<T> query(CriteriaQuery query, Class<T> resultType) { @Override
public Mono<Boolean> exists(String id, Class<?> entityType, String index, String type) {
ElasticsearchPersistentEntity<?> entity = lookupPersistentEntity(resultType); Assert.notNull(id, "Id must not be null!");
SearchRequest request = new SearchRequest(indices(query, entity)); return doExists(id, BasicElasticsearchEntity.of(entityType, converter), index, type);
request.types(indexTypes(query, entity)); }
private Mono<Boolean> doExists(String id, ElasticsearchEntity<?> entity, @Nullable String index,
@Nullable String type) {
return Mono.defer(() -> {
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
return doExists(new GetRequest(indexToUse, typeToUse, id));
});
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#find(Query, Class, String, String, Class)
*/
@Override
public <T> Flux<T> find(Query query, Class<?> entityType, @Nullable String index, @Nullable String type,
Class<T> resultType) {
return doFind(query, BasicElasticsearchEntity.of(entityType, converter), index, type)
.map(it -> resultMapper.mapEntity(it, resultType));
}
private Flux<SearchHit> doFind(Query query, ElasticsearchEntity<?> entity, @Nullable String index,
@Nullable String type) {
return Flux.defer(() -> {
SearchRequest request = new SearchRequest(indices(query, () -> indexName(index, entity)));
request.types(indexTypes(query, () -> typeName(type, entity)));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(mappedQuery(query, entity)); searchSourceBuilder.query(mappedQuery(query, entity.getPersistentEntity()));
// TODO: request.source().postFilter(elasticsearchFilter); -- filter query // TODO: request.source().postFilter(elasticsearchFilter); -- filter query
searchSourceBuilder.version(entity.hasVersionProperty()); // This has been true by default before searchSourceBuilder.version(entity.isVersioned()); // This has been true by default before
searchSourceBuilder.trackScores(query.getTrackScores()); searchSourceBuilder.trackScores(query.getTrackScores());
if (query.getSourceFilter() != null) { if (query.getSourceFilter() != null) {
@ -174,123 +258,304 @@ public class ReactiveElasticsearchTemplate {
request.indicesOptions(query.getIndicesOptions()); request.indicesOptions(query.getIndicesOptions());
} }
sort(query, entity).forEach(searchSourceBuilder::sort); sort(query, entity.getPersistentEntity()).forEach(searchSourceBuilder::sort);
if (query.getMinScore() > 0) { if (query.getMinScore() > 0) {
searchSourceBuilder.minScore(query.getMinScore()); searchSourceBuilder.minScore(query.getMinScore());
} }
request.source(searchSourceBuilder); request.source(searchSourceBuilder);
return Flux.from( request = prepareSearchRequest(request);
execute(client -> client.search(request).map(it -> mapper.mapEntity(it.getSourceAsString(), resultType))));
return doFind(request);
});
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#count(Query, Class, String, String)
*/
@Override
public Mono<Long> count(Query query, Class<?> entityType, String index, String type) {
// TODO: ES 7.0 has a dedicated CountRequest - use that one once available.
return find(query, entityType, index, type).count();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#delete(Object, String, String)
*/
@Override
public Mono<String> delete(Object entity, @Nullable String index, @Nullable String type) {
AdaptableEntity<?> elasticsearchEntity = ConverterAwareAdaptableEntity.of(entity, converter);
return Mono.defer(() -> doDeleteById(entity, ObjectUtils.nullSafeToString(elasticsearchEntity.getId()),
elasticsearchEntity, index, type));
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#delete(String, Class, String, String)
*/
@Override
public Mono<String> deleteById(String id, Class<?> entityType, @Nullable String index, @Nullable String type) {
Assert.notNull(id, "Id must not be null!");
return doDeleteById(null, id, BasicElasticsearchEntity.of(entityType, converter), index, type);
}
private Mono<String> doDeleteById(@Nullable Object source, String id, ElasticsearchEntity<?> entity,
@Nullable String index, @Nullable String type) {
return Mono.defer(() -> {
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
return doDelete(prepareDeleteRequest(source, new DeleteRequest(indexToUse, typeToUse, id)));
});
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#deleteBy(Query, Class, String, String)
*/
@Override
public Mono<Long> deleteBy(Query query, Class<?> entityType, String index, String type) {
Assert.notNull(query, "Query must not be null!");
return doDeleteBy(query, BasicElasticsearchEntity.of(entityType, converter), index, type)
.map(BulkByScrollResponse::getDeleted).publishNext();
}
private Flux<BulkByScrollResponse> doDeleteBy(Query query, ElasticsearchEntity<?> entity, @Nullable String index,
@Nullable String type) {
return Flux.defer(() -> {
DeleteByQueryRequest request = new DeleteByQueryRequest(indices(query, () -> indexName(index, entity)));
request.types(indexTypes(query, () -> typeName(type, entity)));
request.setQuery(mappedQuery(query, entity.getPersistentEntity()));
return doDeleteBy(prepareDeleteByRequest(request));
});
}
// Property Setters / Getters
/**
* Set the default {@link RefreshPolicy} to apply when writing to Elasticsearch.
*
* @param refreshPolicy can be {@literal null}.
*/
public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
} }
/** /**
* Execute within a {@link ClientCallback} managing resources and translating errors. * Set the default {@link IndicesOptions} for {@link SearchRequest search requests}.
* *
* @param callback must not be {@literal null}. * @param indicesOptions can be {@literal null}.
* @param <T>
* @return the {@link Publisher} emitting results.
*/ */
public <T> Publisher<T> execute(ClientCallback<Publisher<T>> callback) { public void setIndicesOptions(@Nullable IndicesOptions indicesOptions) {
return Flux.from(callback.doWithClient(this.client)).onErrorMap(this::translateException); this.indicesOptions = indicesOptions;
} }
// Customization Hooks // Customization Hooks
protected Mono<GetResult> doGet(String id, ElasticsearchPersistentEntity<?> entity, @Nullable String index, /**
@Nullable String type) { * Obtain the {@link ReactiveElasticsearchClient} to operate upon.
*
String indexToUse = indexName(index, entity); * @return never {@literal null}.
String typeToUse = typeName(type, entity); */
protected ReactiveElasticsearchClient getClient() {
return doGet(new GetRequest(indexToUse, typeToUse, id)); return this.client;
} }
protected Mono<GetResult> doGet(GetRequest request) { /**
* Pre process the write request before it is sent to the server, eg. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param request must not be {@literal null}.
* @param <R>
* @return the processed {@link WriteRequest}.
*/
protected <R extends WriteRequest<R>> R prepareWriteRequest(R request) {
return Mono.from(execute(client -> client.get(request))) // if (refreshPolicy == null) {
.onErrorResume((it) -> { return request;
if (it instanceof HttpClientErrorException) {
return ((HttpClientErrorException) it).getRawStatusCode() == 404;
}
return false;
}, (it) -> Mono.empty());
} }
protected Mono<IndexResponse> doIndex(Object value, ElasticsearchPersistentEntity<?> entity, @Nullable String index, return request.setRefreshPolicy(refreshPolicy);
@Nullable String type) {
PersistentPropertyAccessor<?> propertyAccessor = entity.getPropertyAccessor(value);
Object id = propertyAccessor.getProperty(entity.getIdProperty());
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
IndexRequest request = id != null ? new IndexRequest(indexToUse, typeToUse, id.toString())
: new IndexRequest(indexToUse, typeToUse);
try {
request.source(mapper.getEntityMapper().mapToString(value), Requests.INDEX_CONTENT_TYPE);
} catch (IOException e) {
throw new RuntimeException(e);
} }
if (entity.hasVersionProperty()) { /**
* Customization hook to modify a generated {@link IndexRequest} prior to its execution. Eg. by setting the
Object version = propertyAccessor.getProperty(entity.getVersionProperty()); * {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
if (version != null) { *
request.version(((Number) version).longValue()); * @param source the source object the {@link IndexRequest} was derived from.
request.versionType(EXTERNAL); * @param request the generated {@link IndexRequest}.
} * @return never {@literal null}.
*/
protected IndexRequest prepareIndexRequest(Object source, IndexRequest request) {
return prepareWriteRequest(request);
} }
if (entity.getParentIdProperty() != null) { /**
* Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the
* {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable.
*
* @param request the generated {@link SearchRequest}.
* @return never {@literal null}.
*/
protected SearchRequest prepareSearchRequest(SearchRequest request) {
Object parentId = propertyAccessor.getProperty(entity.getParentIdProperty()); if (indicesOptions == null) {
if (parentId != null) { return request;
request.parent(parentId.toString());
}
} }
return doIndex(request.setRefreshPolicy(RefreshPolicy.IMMEDIATE)); return request.indicesOptions(indicesOptions);
} }
/**
* Customization hook to modify a generated {@link DeleteRequest} prior to its execution. Eg. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param source the source object the {@link DeleteRequest} was derived from. My be {@literal null} if using the
* {@literal id} directly.
* @param request the generated {@link DeleteRequest}.
* @return never {@literal null}.
*/
protected DeleteRequest prepareDeleteRequest(@Nullable Object source, DeleteRequest request) {
return prepareWriteRequest(request);
}
/**
* Customization hook to modify a generated {@link DeleteByQueryRequest} prior to its execution. Eg. by setting the
* {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable.
*
* @param request the generated {@link DeleteByQueryRequest}.
* @return never {@literal null}.
*/
protected DeleteByQueryRequest prepareDeleteByRequest(DeleteByQueryRequest request) {
if (refreshPolicy != null && !RefreshPolicy.NONE.equals(refreshPolicy)) {
request = request.setRefresh(true);
}
if (indicesOptions != null) {
request = request.setIndicesOptions(indicesOptions);
}
return request;
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
* You know what you're doing here? Well fair enough, go ahead on your own risk.
*
* @param request the already prepared {@link IndexRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<IndexResponse> doIndex(IndexRequest request) { protected Mono<IndexResponse> doIndex(IndexRequest request) {
return Mono.from(execute(client -> client.index(request))); return Mono.from(execute(client -> client.index(request)));
} }
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link GetRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<GetResult> doFindById(GetRequest request) {
return Mono.from(execute(client -> client.get(request)));
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link GetRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<Boolean> doExists(GetRequest request) {
return Mono.from(execute(client -> client.exists(request)));
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<SearchHit> doFind(SearchRequest request) {
return Flux.from(execute(client -> client.search(request)));
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link DeleteRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<String> doDelete(DeleteRequest request) {
return Mono.from(execute(client -> client.delete(request))) //
.flatMap(it -> {
if (HttpStatus.valueOf(it.status().getStatus()).equals(HttpStatus.NOT_FOUND)) {
return Mono.empty();
}
return Mono.just(it.getId());
});
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link DeleteByQueryRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<BulkByScrollResponse> doDeleteBy(DeleteByQueryRequest request) {
return Mono.from(execute(client -> client.deleteBy(request)));
}
// private helpers // private helpers
private static String indexName(@Nullable String index, ElasticsearchPersistentEntity<?> entity) { private static String indexName(@Nullable String index, ElasticsearchEntity<?> entity) {
return StringUtils.isEmpty(index) ? entity.getIndexName() : index; return StringUtils.isEmpty(index) ? entity.getIndexName() : index;
} }
private static String typeName(@Nullable String type, ElasticsearchPersistentEntity<?> entity) { private static String typeName(@Nullable String type, ElasticsearchEntity<?> entity) {
return StringUtils.isEmpty(type) ? entity.getIndexType() : type; return StringUtils.isEmpty(type) ? entity.getTypeName() : type;
} }
private static String[] indices(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) { private static String[] indices(Query query, Supplier<String> index) {
if (query.getIndices().isEmpty()) { if (query.getIndices().isEmpty()) {
return new String[] { entity.getIndexName() }; return new String[] { index.get() };
} }
return query.getIndices().toArray(new String[0]); return query.getIndices().toArray(new String[0]);
} }
private static String[] indexTypes(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) { private static String[] indexTypes(Query query, Supplier<String> indexType) {
if (query.getTypes().isEmpty()) { if (query.getTypes().isEmpty()) {
return new String[] { entity.getIndexType() }; return new String[] { indexType.get() };
} }
return query.getTypes().toArray(new String[0]); return query.getTypes().toArray(new String[0]);
} }
private List<FieldSortBuilder> sort(Query query, ElasticsearchPersistentEntity<?> entity) { private static List<FieldSortBuilder> sort(Query query, ElasticsearchPersistentEntity<?> entity) {
if (query.getSort() == null || query.getSort().isUnsorted()) { if (query.getSort() == null || query.getSort().isUnsorted()) {
return Collections.emptyList(); return Collections.emptyList();
@ -317,10 +582,20 @@ public class ReactiveElasticsearchTemplate {
return mappedSort; return mappedSort;
} }
private QueryBuilder mappedQuery(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) { private QueryBuilder mappedQuery(Query query, ElasticsearchPersistentEntity<?> entity) {
// TODO: we need to actually map the fields to the according field names! // TODO: we need to actually map the fields to the according field names!
QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(query.getCriteria());
QueryBuilder elasticsearchQuery = null;
if (query instanceof CriteriaQuery) {
elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(((CriteriaQuery) query).getCriteria());
} else if (query instanceof StringQuery) {
elasticsearchQuery = new WrapperQueryBuilder(((StringQuery) query).getSource());
} else {
throw new IllegalArgumentException(String.format("Unknown query type '%s'.", query.getClass()));
}
return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery(); return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery();
} }
@ -336,17 +611,166 @@ public class ReactiveElasticsearchTemplate {
private Throwable translateException(Throwable throwable) { private Throwable translateException(Throwable throwable) {
if (!(throwable instanceof RuntimeException)) { RuntimeException exception = throwable instanceof RuntimeException ? (RuntimeException) throwable
return throwable; : new RuntimeException(throwable.getMessage(), throwable);
RuntimeException potentiallyTranslatedException = exceptionTranslator.translateExceptionIfPossible(exception);
return potentiallyTranslatedException != null ? potentiallyTranslatedException : throwable;
} }
RuntimeException ex = exceptionTranslator.translateExceptionIfPossible((RuntimeException) throwable); /**
return ex != null ? ex : throwable; * @param <T>
* @author Christoph Strobl
* @since 4.0
*/
protected interface ElasticsearchEntity<T> {
default boolean isIdentifiable() {
return getPersistentEntity().hasVersionProperty();
} }
// Additional types default boolean isVersioned() {
public interface ClientCallback<T extends Publisher<?>> { return getPersistentEntity().hasVersionProperty();
}
default ElasticsearchPersistentProperty getIdProperty() {
return getPersistentEntity().getIdProperty();
}
default String getIndexName() {
return getPersistentEntity().getIndexName();
}
default String getTypeName() {
return getPersistentEntity().getIndexType();
}
ElasticsearchPersistentEntity<?> getPersistentEntity();
}
protected interface AdaptableEntity<T> extends ElasticsearchEntity<T> {
PersistentPropertyAccessor<T> getPropertyAccessor();
IdentifierAccessor getIdentifierAccessor();
@Nullable
default Object getId() {
return getIdentifierAccessor().getIdentifier();
}
default Object getVersion() {
return getPropertyAccessor().getProperty(getPersistentEntity().getRequiredVersionProperty());
}
@Nullable
default Object getPropertyValue(PersistentProperty<?> property) {
return getPropertyAccessor().getProperty(property);
}
default T getBean() {
return getPropertyAccessor().getBean();
}
default T updateIdIfNecessary(Object id) {
if (id == null || !getPersistentEntity().hasIdProperty() || getId() != null) {
return getPropertyAccessor().getBean();
}
return updatePropertyValue(getPersistentEntity().getIdProperty(), id);
}
default T updatePropertyValue(PersistentProperty<?> property, @Nullable Object value) {
getPropertyAccessor().setProperty(property, value);
return getPropertyAccessor().getBean();
}
}
protected static class BasicElasticsearchEntity<T> implements ElasticsearchEntity<T> {
final ElasticsearchPersistentEntity<?> entity;
BasicElasticsearchEntity(ElasticsearchPersistentEntity<?> entity) {
this.entity = entity;
}
static <T> BasicElasticsearchEntity<T> of(T bean, ElasticsearchConverter converter) {
return new BasicElasticsearchEntity<>(converter.getMappingContext().getRequiredPersistentEntity(bean.getClass()));
}
static <T> BasicElasticsearchEntity<T> of(Class<T> type, ElasticsearchConverter converter) {
if (Object.class.equals(type)) {
return new BasicElasticsearchEntity<>(new SimpleElasticsearchPersistentEntity<>(ClassTypeInformation.OBJECT));
}
return new BasicElasticsearchEntity<>(converter.getMappingContext().getRequiredPersistentEntity(type));
}
@Override
public ElasticsearchPersistentEntity<?> getPersistentEntity() {
return entity;
}
}
protected static class ConverterAwareAdaptableEntity<T> implements AdaptableEntity<T> {
final ElasticsearchPersistentEntity<?> entity;
final PersistentPropertyAccessor<T> propertyAccessor;
final IdentifierAccessor idAccessor;
final ElasticsearchConverter converter;
ConverterAwareAdaptableEntity(ElasticsearchPersistentEntity<?> entity, IdentifierAccessor idAccessor,
PersistentPropertyAccessor<T> propertyAccessor, ElasticsearchConverter converter) {
this.entity = entity;
this.propertyAccessor = propertyAccessor;
this.idAccessor = idAccessor;
this.converter = converter;
}
static <T> ConverterAwareAdaptableEntity<T> of(T bean, ElasticsearchConverter converter) {
ElasticsearchPersistentEntity<?> entity = converter.getMappingContext()
.getRequiredPersistentEntity(bean.getClass());
IdentifierAccessor idAccessor = entity.getIdentifierAccessor(bean);
PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(bean);
return new ConverterAwareAdaptableEntity<>(entity, idAccessor, propertyAccessor, converter);
}
@Override
public PersistentPropertyAccessor<T> getPropertyAccessor() {
return propertyAccessor;
}
@Override
public IdentifierAccessor getIdentifierAccessor() {
if (entity.getTypeInformation().isMap()) {
return () -> {
Object id = idAccessor.getIdentifier();
if (id != null) {
return id;
}
Map<?, ?> source = (Map<?, ?>) propertyAccessor.getBean();
return source.get("id");
};
}
return idAccessor;
}
@Override
public ElasticsearchPersistentEntity<?> getPersistentEntity() {
return entity;
}
T doWithClient(ReactiveElasticsearchClient client);
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2013-2014 the original author or authors. * Copyright 2013-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -15,15 +15,86 @@
*/ */
package org.springframework.data.elasticsearch.core; package org.springframework.data.elasticsearch.core;
import java.io.IOException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.search.SearchHit;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
/** /**
* ResultsMapper * ResultsMapper
* *
* @author Rizwan Idrees * @author Rizwan Idrees
* @author Mohsin Husen * @author Mohsin Husen
* @author Artur Konczak * @author Artur Konczak
* @author Christoph Strobl
*/ */
public interface ResultsMapper extends SearchResultMapper, GetResultMapper, MultiGetResultMapper { public interface ResultsMapper extends SearchResultMapper, GetResultMapper, MultiGetResultMapper {
EntityMapper getEntityMapper(); EntityMapper getEntityMapper();
@Nullable
default <T> T mapEntity(String source, Class<T> clazz) {
if (StringUtils.isEmpty(source)) {
return null;
}
try {
return getEntityMapper().mapToObject(source, clazz);
} catch (IOException e) {
throw new ElasticsearchException("failed to map source [ " + source + "] to class " + clazz.getSimpleName(), e);
}
}
/**
* Map a single {@link GetResult} to an instance of the given type.
*
* @param getResult must not be {@literal null}.
* @param type must not be {@literal null}.
* @param <T>
* @return can be {@literal null} if the {@link GetResult#isSourceEmpty() is empty}.
* @since 4.0
*/
@Nullable
default <T> T mapEntity(GetResult getResult, Class<T> type) {
if (getResult.isSourceEmpty()) {
return null;
}
String sourceString = getResult.sourceAsString();
if (sourceString.startsWith("{\"id\":null,")) {
sourceString = sourceString.replaceFirst("\"id\":null", "\"id\":\"" + getResult.getId() + "\"");
}
return mapEntity(sourceString, type);
}
/**
* Map a single {@link SearchHit} to an instance of the given type.
*
* @param searchHit must not be {@literal null}.
* @param type must not be {@literal null}.
* @param <T>
* @return can be {@literal null} if the {@link SearchHit} does not have {@link SearchHit#hasSource() a source}.
* @since 4.0
*/
@Nullable
default <T> T mapEntity(SearchHit searchHit, Class<T> type) {
if (!searchHit.hasSource()) {
return null;
}
String sourceString = searchHit.getSourceAsString();
if (sourceString.startsWith("{\"id\":null,")) {
sourceString = sourceString.replaceFirst("\"id\":null", "\"id\":\"" + searchHit.getId() + "\"");
}
return mapEntity(sourceString, type);
}
} }

View File

@ -0,0 +1,36 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.convert;
import java.util.Collection;
import org.springframework.data.convert.CustomConversions;
/**
* @author Christoph Strobl
* @since 4.0
*/
public class ElasticsearchCustomConversions extends CustomConversions {
/**
* Creates a new {@link CustomConversions} instance registering the given converters.
*
* @param converters must not be {@literal null}.
*/
public ElasticsearchCustomConversions(Collection<?> converters) {
super(StoreConversions.NONE, converters);
}
}

View File

@ -32,4 +32,11 @@ public class GetQuery {
public void setId(String id) { public void setId(String id) {
this.id = id; this.id = id;
} }
public static GetQuery getById(String id) {
GetQuery query = new GetQuery();
query.setId(id);
return query;
}
} }

View File

@ -0,0 +1,45 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author Christoph Strobl
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface ElasticsearchVersion {
/**
* Inclusive lower bound of Elasticsearch server range.
*
* @return {@code 0.0.0} by default.
*/
String asOf() default "0.0.0";
/**
* Exclusive upper bound of Elasticsearch server range.
*
* @return {@code 9999.9999.9999} by default.
*/
String until() default "9999.9999.9999";
}

View File

@ -0,0 +1,131 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.AssumptionViolatedException;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.util.Version;
/**
* @author Christoph Strobl
*/
public class ElasticsearchVersionRule implements TestRule {
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchVersionRule.class);
private static final Version ANY = new Version(9999, 9999, 9999);
private static final Version DEFAULT_HIGH = ANY;
private static final Version DEFAULT_LOW = new Version(0, 0, 0);
private final static AtomicReference<Version> currentVersion = new AtomicReference<>(null);
private final Version minVersion;
private final Version maxVersion;
public ElasticsearchVersionRule(Version min, Version max) {
this.minVersion = min;
this.maxVersion = max;
}
public static ElasticsearchVersionRule any() {
return new ElasticsearchVersionRule(ANY, ANY);
}
public static ElasticsearchVersionRule atLeast(Version minVersion) {
return new ElasticsearchVersionRule(minVersion, DEFAULT_HIGH);
}
public static ElasticsearchVersionRule atMost(Version maxVersion) {
return new ElasticsearchVersionRule(DEFAULT_LOW, maxVersion);
}
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
if (!getCurrentVersion().equals(ANY)) {
Version minVersion = ElasticsearchVersionRule.this.minVersion.equals(ANY) ? DEFAULT_LOW
: ElasticsearchVersionRule.this.minVersion;
Version maxVersion = ElasticsearchVersionRule.this.maxVersion.equals(ANY) ? DEFAULT_HIGH
: ElasticsearchVersionRule.this.maxVersion;
if (description.getAnnotation(ElasticsearchVersion.class) != null) {
ElasticsearchVersion version = description.getAnnotation(ElasticsearchVersion.class);
if (version != null) {
Version expectedMinVersion = Version.parse(version.asOf());
if (!expectedMinVersion.equals(ANY) && !expectedMinVersion.equals(DEFAULT_LOW)) {
minVersion = expectedMinVersion;
}
Version expectedMaxVersion = Version.parse(version.until());
if (!expectedMaxVersion.equals(ANY) && !expectedMaxVersion.equals(DEFAULT_HIGH)) {
maxVersion = expectedMaxVersion;
}
}
}
validateVersion(minVersion, maxVersion);
}
base.evaluate();
}
};
}
private void validateVersion(Version min, Version max) {
if (getCurrentVersion().isLessThan(min) || getCurrentVersion().isGreaterThanOrEqualTo(max)) {
throw new AssumptionViolatedException(String
.format("Expected Elasticsearch server to be in range (%s, %s] but found %s", min, max, currentVersion));
}
}
private Version getCurrentVersion() {
if (currentVersion.get() == null) {
Version current = fetchCurrentVersion();
if (currentVersion.compareAndSet(null, current)) {
logger.info("Running Elasticsearch " + current);
}
}
return currentVersion.get();
}
private Version fetchCurrentVersion() {
return TestUtils.serverVersion();
}
@Override
public String toString() {
return getCurrentVersion().toString();
}
}

View File

@ -17,15 +17,20 @@ package org.springframework.data.elasticsearch;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import java.io.IOException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients; import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients; import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.data.util.Version;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/** /**
* @author Christoph Strobl * @author Christoph Strobl
@ -44,6 +49,18 @@ public final class TestUtils {
return ReactiveRestClients.create(ClientConfiguration.create("localhost:9200")); return ReactiveRestClients.create(ClientConfiguration.create("localhost:9200"));
} }
public static Version serverVersion() {
try (RestHighLevelClient client = restHighLevelClient()) {
org.elasticsearch.Version version = client.info(RequestOptions.DEFAULT).getVersion();
return new Version(version.major, version.minor, version.revision);
} catch (Exception e) {
return new Version(0, 0, 0);
}
}
@SneakyThrows @SneakyThrows
public static void deleteIndex(String... indexes) { public static void deleteIndex(String... indexes) {
@ -62,4 +79,48 @@ public final class TestUtils {
} }
} }
} }
public static OfType documentWithId(String id) {
return new DocumentLookup(id);
}
public interface ExistsIn {
boolean existsIn(String index);
}
public interface OfType extends ExistsIn {
ExistsIn ofType(String type);
}
private static class DocumentLookup implements OfType {
private String id;
private String type;
public DocumentLookup(String id) {
this.id = id;
}
@Override
public boolean existsIn(String index) {
GetRequest request = new GetRequest(index).id(id);
if (StringUtils.hasText(type)) {
request = request.type(type);
}
try {
return restHighLevelClient().get(request, RequestOptions.DEFAULT).isExists();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
@Override
public ExistsIn ofType(String type) {
this.type = type;
return this;
}
}
} }

View File

@ -17,6 +17,9 @@ package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import org.junit.Rule;
import org.springframework.data.elasticsearch.ElasticsearchVersion;
import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import java.io.IOException; import java.io.IOException;
@ -37,6 +40,7 @@ import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.After; import org.junit.After;
@ -58,6 +62,8 @@ import org.springframework.test.context.junit4.SpringRunner;
@ContextConfiguration("classpath:infrastructure.xml") @ContextConfiguration("classpath:infrastructure.xml")
public class ReactiveElasticsearchClientTests { public class ReactiveElasticsearchClientTests {
public @Rule ElasticsearchVersionRule elasticsearchVersion = ElasticsearchVersionRule.any();
static final String INDEX_I = "idx-1-reactive-client-tests"; static final String INDEX_I = "idx-1-reactive-client-tests";
static final String INDEX_II = "idx-2-reactive-client-tests"; static final String INDEX_II = "idx-2-reactive-client-tests";
@ -413,6 +419,43 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete(); .verifyComplete();
} }
@Test // DATAES-488
@ElasticsearchVersion(asOf = "6.5.0")
public void deleteByShouldRemoveExistingDocument() {
String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
DeleteByQueryRequest request = new DeleteByQueryRequest(INDEX_I) //
.setDocTypes(TYPE_I) //
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("_id", id)));
client.deleteBy(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getDeleted()).isEqualTo(1);
}) //
.verifyComplete();
}
@Test // DATAES-488
@ElasticsearchVersion(asOf = "6.5.0")
public void deleteByEmitResultWhenNothingRemoved() {
addSourceDocument().ofType(TYPE_I).to(INDEX_I);
DeleteByQueryRequest request = new DeleteByQueryRequest(INDEX_I) //
.setDocTypes(TYPE_I) //
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("_id", "it-was-not-me")));
client.deleteBy(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getDeleted()).isEqualTo(0);
}) //
.verifyComplete();
}
AddToIndexOfType addSourceDocument() { AddToIndexOfType addSourceDocument() {
return add(DOC_SOURCE); return add(DOC_SOURCE);
} }

View File

@ -0,0 +1,117 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.config;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.util.Collection;
import java.util.Collections;
import org.apache.commons.lang.ClassUtils;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
/**
* @author Christoph Strobl
*/
public class ElasticsearchConfigurationSupportUnitTests {
@Test // DATAES-504
public void usesConfigClassPackageAsBaseMappingPackage() throws ClassNotFoundException {
ElasticsearchConfigurationSupport configuration = new StubConfig();
assertThat(configuration.getMappingBasePackages()).contains(ClassUtils.getPackageName(StubConfig.class));
assertThat(configuration.getInitialEntitySet()).contains(Entity.class);
}
@Test // DATAES-504
public void doesNotScanOnEmptyBasePackage() throws ClassNotFoundException {
ElasticsearchConfigurationSupport configuration = new StubConfig() {
@Override
protected Collection<String> getMappingBasePackages() {
return Collections.emptySet();
}
};
assertThat(configuration.getInitialEntitySet()).isEmpty();
}
@Test // DATAES-504
public void containsMappingContext() {
AbstractApplicationContext context = new AnnotationConfigApplicationContext(StubConfig.class);
assertThat(context.getBean(SimpleElasticsearchMappingContext.class)).isNotNull();
}
@Test // DATAES-504
public void containsElasticsearchConverter() {
AbstractApplicationContext context = new AnnotationConfigApplicationContext(StubConfig.class);
assertThat(context.getBean(ElasticsearchConverter.class)).isNotNull();
}
@Test // DATAES-504
public void restConfigContainsElasticsearchTemplate() {
AbstractApplicationContext context = new AnnotationConfigApplicationContext(RestConfig.class);
assertThat(context.getBean(ElasticsearchRestTemplate.class)).isNotNull();
}
@Test // DATAES-504
public void reactiveConfigContainsReactiveElasticsearchTemplate() {
AbstractApplicationContext context = new AnnotationConfigApplicationContext(ReactiveRestConfig.class);
assertThat(context.getBean(ReactiveElasticsearchTemplate.class)).isNotNull();
}
@Configuration
static class StubConfig extends ElasticsearchConfigurationSupport {
}
@Configuration
static class ReactiveRestConfig extends AbstractReactiveElasticsearchConfiguration {
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
return mock(ReactiveElasticsearchClient.class);
}
}
@Configuration
static class RestConfig extends AbstractElasticsearchConfiguration {
@Override
public RestHighLevelClient elasticsearchClient() {
return mock(RestHighLevelClient.class);
}
}
@Document(indexName = "config-support-tests")
static class Entity {}
}

View File

@ -15,24 +15,42 @@
*/ */
package org.springframework.data.elasticsearch.core; package org.springframework.data.elasticsearch.core;
import static org.apache.commons.lang.RandomStringUtils.*;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.junit.Rule;
import org.springframework.data.elasticsearch.ElasticsearchVersion;
import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.TestUtils; import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.core.query.Criteria; import org.springframework.data.elasticsearch.core.query.Criteria;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.entities.SampleEntity; import org.springframework.data.elasticsearch.entities.SampleEntity;
import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StringUtils;
/** /**
* @author Christoph Strobl * @author Christoph Strobl
@ -42,16 +60,20 @@ import org.springframework.test.context.junit4.SpringRunner;
@ContextConfiguration("classpath:infrastructure.xml") @ContextConfiguration("classpath:infrastructure.xml")
public class ReactiveElasticsearchTemplateTests { public class ReactiveElasticsearchTemplateTests {
public @Rule ElasticsearchVersionRule elasticsearchVersion = ElasticsearchVersionRule.any();
static final String DEFAULT_INDEX = "test-index-sample";
static final String ALTERNATE_INDEX = "reactive-template-tests-alternate-index";
private ElasticsearchRestTemplate restTemplate; private ElasticsearchRestTemplate restTemplate;
private ReactiveElasticsearchTemplate template; private ReactiveElasticsearchTemplate template;
@Before @Before
public void setUp() { public void setUp() {
TestUtils.deleteIndex(DEFAULT_INDEX, ALTERNATE_INDEX);
restTemplate = new ElasticsearchRestTemplate(TestUtils.restHighLevelClient()); restTemplate = new ElasticsearchRestTemplate(TestUtils.restHighLevelClient());
TestUtils.deleteIndex("test-index-sample");
restTemplate.createIndex(SampleEntity.class); restTemplate.createIndex(SampleEntity.class);
restTemplate.putMapping(SampleEntity.class); restTemplate.putMapping(SampleEntity.class);
restTemplate.refresh(SampleEntity.class); restTemplate.refresh(SampleEntity.class);
@ -59,14 +81,35 @@ public class ReactiveElasticsearchTemplateTests {
template = new ReactiveElasticsearchTemplate(TestUtils.reactiveClient()); template = new ReactiveElasticsearchTemplate(TestUtils.reactiveClient());
} }
@Test // DATAES-488 @Test // DATAES-504
public void indexWithIdShouldWork() { public void executeShouldProvideResource() {
String documentId = randomNumeric(5); Mono.from(template.execute(client -> client.ping())) //
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("foo bar") .as(StepVerifier::create) //
.version(System.currentTimeMillis()).build(); .expectNext(true) //
.verifyComplete();
}
template.index(sampleEntity).as(StepVerifier::create).expectNextCount(1).verifyComplete(); @Test // DATAES-504
public void executeShouldConvertExceptions() {
Mono.from(template.execute(client -> {
throw new RuntimeException(new ConnectException("we're doomed"));
})) //
.as(StepVerifier::create) //
.expectError(DataAccessResourceFailureException.class) //
.verify();
}
@Test // DATAES-504
public void insertWithIdShouldWork() {
SampleEntity sampleEntity = randomEntity("foo bar");
template.save(sampleEntity)//
.as(StepVerifier::create)//
.expectNextCount(1)//
.verifyComplete();
restTemplate.refresh(SampleEntity.class); restTemplate.refresh(SampleEntity.class);
@ -75,78 +118,388 @@ public class ReactiveElasticsearchTemplateTests {
assertThat(result).hasSize(1); assertThat(result).hasSize(1);
} }
@Test // DATAES-488 @Test // DATAES-504
public void getShouldReturnEntity() { public void insertWithAutogeneratedIdShouldUpdateEntityId() {
String documentId = randomNumeric(5); SampleEntity sampleEntity = SampleEntity.builder().message("wohoo").build();
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message")
.version(System.currentTimeMillis()).build(); template.save(sampleEntity) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getId()).isNotNull();
IndexQuery indexQuery = getIndexQuery(sampleEntity);
restTemplate.index(indexQuery);
restTemplate.refresh(SampleEntity.class); restTemplate.refresh(SampleEntity.class);
assertThat(TestUtils.documentWithId(it.getId()).existsIn(DEFAULT_INDEX)).isTrue();
}) //
.verifyComplete();
}
template.get(documentId, SampleEntity.class) // @Test // DATAES-504
public void insertWithExplicitIndexNameShouldOverwriteMetadata() {
SampleEntity sampleEntity = randomEntity("in another index");
template.save(sampleEntity, ALTERNATE_INDEX).as(StepVerifier::create)//
.expectNextCount(1)//
.verifyComplete();
restTemplate.refresh(DEFAULT_INDEX);
restTemplate.refresh(ALTERNATE_INDEX);
assertThat(TestUtils.documentWithId(sampleEntity.getId()).existsIn(DEFAULT_INDEX)).isFalse();
assertThat(TestUtils.documentWithId(sampleEntity.getId()).existsIn(ALTERNATE_INDEX)).isTrue();
}
@Test // DATAES-504
public void insertShouldAcceptPlainMapStructureAsSource() {
Map<String, Object> map = Collections.singletonMap("foo", "bar");
template.save(map, ALTERNATE_INDEX, "singleton-map") //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
}
@Test(expected = IllegalArgumentException.class) // DATAES-504
public void insertShouldErrorOnNullEntity() {
template.save(null);
}
@Test // DATAES-504
public void findByIdShouldReturnEntity() {
SampleEntity sampleEntity = randomEntity("some message");
index(sampleEntity);
template.findById(sampleEntity.getId(), SampleEntity.class) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.expectNext(sampleEntity) // .expectNext(sampleEntity) //
.verifyComplete(); .verifyComplete();
} }
@Test // DATAES-488 @Test // DATAES-504
public void getForNothing() { public void findByIdWhenIdIsAutogeneratedShouldHaveIdSetCorrectly() {
String documentId = randomNumeric(5); SampleEntity sampleEntity = new SampleEntity();
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message") sampleEntity.setMessage("some message");
.version(System.currentTimeMillis()).build();
IndexQuery indexQuery = getIndexQuery(sampleEntity); index(sampleEntity);
restTemplate.index(indexQuery);
restTemplate.refresh(SampleEntity.class);
template.get("foo", SampleEntity.class) // assertThat(sampleEntity.getId()).isNotNull();
template.findById(sampleEntity.getId(), SampleEntity.class) //
.as(StepVerifier::create) //
.consumeNextWith(it -> assertThat(it.getId()).isEqualTo(sampleEntity.getId())) //
.verifyComplete();
}
@Test // DATAES-504
public void findByIdShouldCompleteWhenNotingFound() {
SampleEntity sampleEntity = randomEntity("some message");
index(sampleEntity);
template.findById("foo", SampleEntity.class) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.verifyComplete(); .verifyComplete();
} }
@Test // DATAES-488 @Test(expected = IllegalArgumentException.class) // DATAES-504
public void findShouldApplyCriteria() { public void findByIdShouldErrorForNullId() {
template.findById(null, SampleEntity.class);
}
String documentId = randomNumeric(5); @Test // DATAES-504
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message") public void findByIdWithExplicitIndexNameShouldOverwriteMetadata() {
.version(System.currentTimeMillis()).build();
SampleEntity sampleEntity = randomEntity("some message");
IndexQuery indexQuery = getIndexQuery(sampleEntity); IndexQuery indexQuery = getIndexQuery(sampleEntity);
indexQuery.setIndexName(ALTERNATE_INDEX);
restTemplate.index(indexQuery); restTemplate.index(indexQuery);
restTemplate.refresh(SampleEntity.class); restTemplate.refresh(SampleEntity.class);
restTemplate.refresh(DEFAULT_INDEX);
restTemplate.refresh(ALTERNATE_INDEX);
template.findById(sampleEntity.getId(), SampleEntity.class) //
.as(StepVerifier::create) //
.verifyComplete();
template.findById(sampleEntity.getId(), SampleEntity.class, ALTERNATE_INDEX) //
.as(StepVerifier::create)//
.expectNextCount(1) //
.verifyComplete();
}
@Test // DATAES-504
public void existsShouldReturnTrueWhenFound() {
SampleEntity sampleEntity = randomEntity("some message");
index(sampleEntity);
template.exists(sampleEntity.getId(), SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@Test // DATAES-504
public void existsShouldReturnFalseWhenNotFound() {
SampleEntity sampleEntity = randomEntity("some message");
index(sampleEntity);
template.exists("foo", SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(false) //
.verifyComplete();
}
@Test // DATAES-504
public void findShouldApplyCriteria() {
SampleEntity sampleEntity = randomEntity("some message");
index(sampleEntity);
CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("message").is("some message")); CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("message").is("some message"));
template.query(criteriaQuery, SampleEntity.class) // template.find(criteriaQuery, SampleEntity.class) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.expectNext(sampleEntity) // .expectNext(sampleEntity) //
.verifyComplete(); .verifyComplete();
} }
@Test // DATAES-488 @Test // DATAES-504
public void findShouldReturnEmptyFluxIfNothingFound() { public void findShouldReturnEmptyFluxIfNothingFound() {
String documentId = randomNumeric(5); SampleEntity sampleEntity = randomEntity("some message");
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message") index(sampleEntity);
.version(System.currentTimeMillis()).build();
IndexQuery indexQuery = getIndexQuery(sampleEntity);
restTemplate.index(indexQuery);
restTemplate.refresh(SampleEntity.class);
CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("message").is("foo")); CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("message").is("foo"));
template.query(criteriaQuery, SampleEntity.class) // template.find(criteriaQuery, SampleEntity.class) //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.verifyComplete(); .verifyComplete();
} }
@Test // DATAES-504
public void shouldAllowStringBasedQuery() {
index(randomEntity("test message"), randomEntity("test test"), randomEntity("some message"));
template.find(new StringQuery(matchAllQuery().toString()), SampleEntity.class) //
.as(StepVerifier::create) //
.expectNextCount(3) //
.verifyComplete();
}
@Test // DATAES-504
public void shouldExecuteGivenCriteriaQuery() {
SampleEntity shouldMatch = randomEntity("test message");
SampleEntity shouldNotMatch = randomEntity("the dog ate my homework");
index(shouldMatch, shouldNotMatch);
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test"));
template.find(query, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(shouldMatch) //
.verifyComplete();
}
@Test // DATAES-504
public void shouldReturnListForGivenCriteria() {
SampleEntity sampleEntity1 = randomEntity("test message");
SampleEntity sampleEntity2 = randomEntity("test test");
SampleEntity sampleEntity3 = randomEntity("some message");
index(sampleEntity1, sampleEntity2, sampleEntity3);
CriteriaQuery query = new CriteriaQuery(
new Criteria("message").contains("some").and("message").contains("message"));
template.find(query, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(sampleEntity3) //
.verifyComplete();
}
@Test // DATAES-504
public void shouldReturnProjectedTargetEntity() {
SampleEntity sampleEntity1 = randomEntity("test message");
SampleEntity sampleEntity2 = randomEntity("test test");
SampleEntity sampleEntity3 = randomEntity("some message");
index(sampleEntity1, sampleEntity2, sampleEntity3);
CriteriaQuery query = new CriteriaQuery(
new Criteria("message").contains("some").and("message").contains("message"));
template.find(query, SampleEntity.class, Message.class) //
.as(StepVerifier::create) //
.expectNext(new Message(sampleEntity3.getMessage())) //
.verifyComplete();
}
@Test // DATAES-504
public void countShouldReturnCountAllWhenGivenNoQuery() {
index(randomEntity("test message"), randomEntity("test test"), randomEntity("some message"));
template.count(SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(3L) //
.verifyComplete();
}
@Test // DATAES-504
public void countShouldReturnCountMatchingDocuments() {
index(randomEntity("test message"), randomEntity("test test"), randomEntity("some message"));
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test"));
template.count(query, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete();
}
@Test // DATAES-504
public void deleteByIdShouldRemoveExistingDocumentById() {
SampleEntity sampleEntity = randomEntity("test message");
index(sampleEntity);
template.deleteById(sampleEntity.getId(), SampleEntity.class) //
.as(StepVerifier::create)//
.expectNext(sampleEntity.getId()) //
.verifyComplete();
}
@Test // DATAES-504
public void deleteShouldRemoveExistingDocumentByIdUsingIndexName() {
SampleEntity sampleEntity = randomEntity("test message");
index(sampleEntity);
template.deleteById(sampleEntity.getId(), DEFAULT_INDEX, "test-type") //
.as(StepVerifier::create)//
.expectNext(sampleEntity.getId()) //
.verifyComplete();
}
@Test // DATAES-504
public void deleteShouldRemoveExistingDocument() {
SampleEntity sampleEntity = randomEntity("test message");
index(sampleEntity);
template.delete(sampleEntity) //
.as(StepVerifier::create)//
.expectNext(sampleEntity.getId()) //
.verifyComplete();
}
@Test // DATAES-504
public void deleteByIdShouldCompleteWhenNothingDeleted() {
SampleEntity sampleEntity = randomEntity("test message");
template.delete(sampleEntity) //
.as(StepVerifier::create)//
.verifyComplete();
}
@Test // DATAES-504
@ElasticsearchVersion(asOf = "6.5.0")
public void deleteByQueryShouldReturnNumberOfDeletedDocuments() {
index(randomEntity("test message"), randomEntity("test test"), randomEntity("some message"));
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test"));
template.deleteBy(query, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete();
}
@Test // DATAES-504
@ElasticsearchVersion(asOf = "6.5.0")
public void deleteByQueryShouldReturnZeroIfNothingDeleted() {
index(randomEntity("test message"));
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("luke"));
template.deleteBy(query, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete();
}
@Data
@Document(indexName = "marvel", type = "characters")
static class Person {
private @Id String id;
private String name;
private int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
// TODO: check field mapping !!!
// --> JUST some helpers
private SampleEntity randomEntity(String message) {
return SampleEntity.builder() //
.id(UUID.randomUUID().toString()) //
.message(StringUtils.hasText(message) ? message : "test message") //
.version(System.currentTimeMillis()).build();
}
private IndexQuery getIndexQuery(SampleEntity sampleEntity) { private IndexQuery getIndexQuery(SampleEntity sampleEntity) {
return new IndexQueryBuilder().withId(sampleEntity.getId()).withObject(sampleEntity) return new IndexQueryBuilder().withId(sampleEntity.getId()).withObject(sampleEntity)
.withVersion(sampleEntity.getVersion()).build(); .withVersion(sampleEntity.getVersion()).build();
} }
private List<IndexQuery> getIndexQueries(SampleEntity... sampleEntities) {
return Arrays.stream(sampleEntities).map(this::getIndexQuery).collect(Collectors.toList());
}
private void index(SampleEntity... entities) {
if (entities.length == 1) {
restTemplate.index(getIndexQuery(entities[0]));
} else {
restTemplate.bulkIndex(getIndexQueries(entities));
}
restTemplate.refresh(SampleEntity.class);
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class Message {
String message;
}
} }

View File

@ -0,0 +1,204 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.action.search.SearchRequest.*;
import static org.mockito.Mockito.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.Collections;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.query.Criteria;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.entities.SampleEntity;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
public class ReactiveElasticsearchTemplateUnitTests {
@Rule //
public MockitoRule rule = MockitoJUnit.rule();
@Mock ReactiveElasticsearchClient client;
ReactiveElasticsearchTemplate template;
@Before
public void setUp() {
template = new ReactiveElasticsearchTemplate(client);
}
@Test // DATAES-504
public void insertShouldUseDefaultRefreshPolicy() {
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
when(client.index(captor.capture())).thenReturn(Mono.empty());
template.save(Collections.singletonMap("key", "value"), "index", "type") //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE);
}
@Test // DATAES-504
public void insertShouldApplyRefreshPolicy() {
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
when(client.index(captor.capture())).thenReturn(Mono.empty());
template.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
template.save(Collections.singletonMap("key", "value"), "index", "type") //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL);
}
@Test // DATAES-504
public void findShouldFallBackToDefaultIndexOptionsIfNotSet() {
ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
when(client.search(captor.capture())).thenReturn(Flux.empty());
template.find(new CriteriaQuery(new Criteria("*")), SampleEntity.class) //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().indicesOptions()).isEqualTo(DEFAULT_INDICES_OPTIONS);
}
@Test // DATAES-504
public void findShouldApplyIndexOptionsIfSet() {
ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
when(client.search(captor.capture())).thenReturn(Flux.empty());
template.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
template.find(new CriteriaQuery(new Criteria("*")), SampleEntity.class) //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().indicesOptions()).isEqualTo(IndicesOptions.LENIENT_EXPAND_OPEN);
}
@Test // DATAES-504
public void deleteShouldUseDefaultRefreshPolicy() {
ArgumentCaptor<DeleteRequest> captor = ArgumentCaptor.forClass(DeleteRequest.class);
when(client.delete(captor.capture())).thenReturn(Mono.empty());
template.deleteById("id", "index", "type") //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE);
}
@Test // DATAES-504
public void deleteShouldApplyRefreshPolicy() {
ArgumentCaptor<DeleteRequest> captor = ArgumentCaptor.forClass(DeleteRequest.class);
when(client.delete(captor.capture())).thenReturn(Mono.empty());
template.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
template.deleteById("id", "index", "type") //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL);
}
@Test // DATAES-504
public void deleteByShouldUseDefaultRefreshPolicy() {
ArgumentCaptor<DeleteByQueryRequest> captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
when(client.deleteBy(captor.capture())).thenReturn(Mono.empty());
template.deleteBy(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, "index", "type") //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().isRefresh()).isTrue();
}
@Test // DATAES-504
public void deleteByShouldApplyRefreshPolicy() {
ArgumentCaptor<DeleteByQueryRequest> captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
when(client.deleteBy(captor.capture())).thenReturn(Mono.empty());
template.setRefreshPolicy(RefreshPolicy.NONE);
template.deleteBy(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, "index", "type") //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().isRefresh()).isFalse();
}
@Test // DATAES-504
public void deleteByShouldApplyIndicesOptions() {
ArgumentCaptor<DeleteByQueryRequest> captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
when(client.deleteBy(captor.capture())).thenReturn(Mono.empty());
template.deleteBy(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, "index", "type") //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().indicesOptions()).isEqualTo(DEFAULT_INDICES_OPTIONS);
}
@Test // DATAES-504
public void deleteByShouldApplyIndicesOptionsIfSet() {
ArgumentCaptor<DeleteByQueryRequest> captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
when(client.deleteBy(captor.capture())).thenReturn(Mono.empty());
template.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
template.deleteBy(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, "index", "type") //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().indicesOptions()).isEqualTo(IndicesOptions.LENIENT_EXPAND_OPEN);
}
}

View File

@ -7,6 +7,8 @@
</encoder> </encoder>
</appender> </appender>
<logger name="org.springframework.data.elasticsearch.ElasticsearchVersionRule" level="info" />
<root level="error"> <root level="error">
<appender-ref ref="console"/> <appender-ref ref="console"/>
</root> </root>