DATAES-504 - Update documentation.

Update documentation to cover newly added configuration options for the ReactiveElasticsearchClient.
Make sure to apply postFilter correctly and set a default limit for unpaged search requests.

Also fix some code format issues.
This commit is contained in:
Christoph Strobl 2018-12-05 09:56:46 +01:00
parent 2dcd1cfbad
commit 1ea73d2fb9
14 changed files with 221 additions and 74 deletions

View File

@ -81,3 +81,43 @@ Mono<IndexResponse> response = client.index(request ->
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL. <1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL.
==== ====
NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request.
[[elasticsearch.clients.configuration]]
== Client Configuration
Client behaviour can be changed via the `ClientConfiguration` that allows to set options for SSL, connect and socket timeouts.
.Client Configuration
====
[source,java]
----
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo("localhost:9200", "localhost:9291") <1>
.withConnectTimeout(Duration.ofSeconds(5)) <2>
.withSocketTimeout(Duration.ofSeconds(3)) <3>
.useSsl()
. // ... other options
.build();
----
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL.
<2> Set the connection timeout. Default is 10 sec.
<3> Set the socket timeout. Default is 5 sec.
====
[[elasticsearch.clients.logging]]
== Client Logging
To see what is actually sent to and received from the server `Request` / `Response` logging on the transport level needs
to be turned on as outlined in the snippet below.
.Enable transport layer logging
[source,xml]
----
<logger name="org.springframework.data.elasticsearch.client.WIRE" level="trace"/>
----
NOTE: The above applies to both the `RestHighLevelClient` and `ReactiveElasticsearchClient` when obtained via `RestClients`
respectively `ReactiveRestClients`.

View File

@ -38,7 +38,7 @@ public class Config extends AbstractReactiveElasticsearchConfiguration {
<1> Configure the client to use. This can be done by `ReactiveRestClients` or directly via `DefaultReactiveElasticsearchClient`. <1> Configure the client to use. This can be done by `ReactiveRestClients` or directly via `DefaultReactiveElasticsearchClient`.
==== ====
NOTE: If applicable set default `HttpHeaders` via the `ClientConfiguration` of the `ReactiveElasticsearchClient`. NOTE: If applicable set default `HttpHeaders` via the `ClientConfiguration` of the `ReactiveElasticsearchClient`. See <<elasticsearch.clients.configuration>>.
TIP: If needed the `ReactiveElasticsearchTemplate` can be configured with default `RefreshPolicy` and `IndicesOptions` that get applied to the related requests by overriding the defaults of `refreshPolicy()` and `indicesOptions()`. TIP: If needed the `ReactiveElasticsearchTemplate` can be configured with default `RefreshPolicy` and `IndicesOptions` that get applied to the related requests by overriding the defaults of `refreshPolicy()` and `indicesOptions()`.

View File

@ -198,26 +198,48 @@ public interface ClientConfiguration {
*/ */
TerminalClientConfigurationBuilder withDefaultHeaders(HttpHeaders defaultHeaders); TerminalClientConfigurationBuilder withDefaultHeaders(HttpHeaders defaultHeaders);
/**
* Configure the {@literal milliseconds} for the connect timeout.
*
* @param millis the timeout to use.
* @return the {@link TerminalClientConfigurationBuilder}
* @see #withConnectTimeout(Duration)
*/
default TerminalClientConfigurationBuilder withConnectTimeout(long millis) {
return withConnectTimeout(Duration.ofMillis(millis));
}
/** /**
* Configure a {@link java.time.Duration} connect timeout. * Configure a {@link java.time.Duration} connect timeout.
* *
* @param connectTimeout the timeout to use. * @param timeout the timeout to use. Must not be {@literal null}.
* @return the {@link TerminalClientConfigurationBuilder} * @return the {@link TerminalClientConfigurationBuilder}
* @see java.net.Socket#connect(SocketAddress, int) * @see java.net.Socket#connect(SocketAddress, int)
* @see io.netty.channel.ChannelOption#CONNECT_TIMEOUT_MILLIS * @see io.netty.channel.ChannelOption#CONNECT_TIMEOUT_MILLIS
*/ */
TerminalClientConfigurationBuilder withConnectTimeout(Duration connectTimeout); TerminalClientConfigurationBuilder withConnectTimeout(Duration timeout);
/**
* Configure the {@literal milliseconds} for the socket timeout.
*
* @param millis the timeout to use.
* @return the {@link TerminalClientConfigurationBuilder}
* @see #withSocketTimeout(Duration)
*/
default TerminalClientConfigurationBuilder withSocketTimeout(long millis) {
return withSocketTimeout(Duration.ofMillis(millis));
}
/** /**
* Configure a {@link java.time.Duration socket timeout} which is typically applied as SO-timeout/read timeout. * Configure a {@link java.time.Duration socket timeout} which is typically applied as SO-timeout/read timeout.
* *
* @param soTimeout the timeout to use. * @param timeout the timeout to use. Must not be {@literal null}.
* @return the {@link TerminalClientConfigurationBuilder} * @return the {@link TerminalClientConfigurationBuilder}
* @see java.net.Socket#setSoTimeout(int) * @see java.net.Socket#setSoTimeout(int)
* @see io.netty.handler.timeout.ReadTimeoutHandler * @see io.netty.handler.timeout.ReadTimeoutHandler
* @see io.netty.handler.timeout.WriteTimeoutHandler * @see io.netty.handler.timeout.WriteTimeoutHandler
*/ */
TerminalClientConfigurationBuilder withSocketTimeout(Duration soTimeout); TerminalClientConfigurationBuilder withSocketTimeout(Duration timeout);
/** /**
* Build the {@link ClientConfiguration} object. * Build the {@link ClientConfiguration} object.

View File

@ -118,11 +118,11 @@ class ClientConfigurationBuilder
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withConnectTimeout(java.time.Duration) * @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withConnectTimeout(java.time.Duration)
*/ */
@Override @Override
public TerminalClientConfigurationBuilder withConnectTimeout(Duration connectTimeout) { public TerminalClientConfigurationBuilder withConnectTimeout(Duration timeout) {
Assert.notNull(connectTimeout, "I/O timeout must not be null!"); Assert.notNull(timeout, "I/O timeout must not be null!");
this.connectTimeout = connectTimeout; this.connectTimeout = timeout;
return this; return this;
} }
@ -131,11 +131,11 @@ class ClientConfigurationBuilder
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withTimeout(java.time.Duration) * @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withTimeout(java.time.Duration)
*/ */
@Override @Override
public TerminalClientConfigurationBuilder withSocketTimeout(Duration soTimeout) { public TerminalClientConfigurationBuilder withSocketTimeout(Duration timeout) {
Assert.notNull(soTimeout, "Socket timeout must not be null!"); Assert.notNull(timeout, "Socket timeout must not be null!");
this.soTimeout = soTimeout; this.soTimeout = timeout;
return this; return this;
} }

View File

@ -28,6 +28,7 @@ import org.springframework.util.ObjectUtils;
* level. * level.
* *
* @author Mark Paluch * @author Mark Paluch
* @author Christoph Strobl
* @since 4.0 * @since 4.0
*/ */
public abstract class ClientLogger { public abstract class ClientLogger {
@ -57,7 +58,8 @@ public abstract class ClientLogger {
*/ */
public static void logRequest(String logId, String method, String endpoint, Object parameters) { public static void logRequest(String logId, String method, String endpoint, Object parameters) {
if (WIRE_LOGGER.isTraceEnabled()) { if (isEnabled()) {
WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}", logId, method.toUpperCase(), endpoint, WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}", logId, method.toUpperCase(), endpoint,
parameters); parameters);
} }
@ -75,7 +77,8 @@ public abstract class ClientLogger {
public static void logRequest(String logId, String method, String endpoint, Object parameters, public static void logRequest(String logId, String method, String endpoint, Object parameters,
Supplier<Object> body) { Supplier<Object> body) {
if (WIRE_LOGGER.isTraceEnabled()) { if (isEnabled()) {
WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}{}Request body: {}", logId, method.toUpperCase(), WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}{}Request body: {}", logId, method.toUpperCase(),
endpoint, parameters, lineSeparator, body.get()); endpoint, parameters, lineSeparator, body.get());
} }
@ -89,7 +92,7 @@ public abstract class ClientLogger {
*/ */
public static void logRawResponse(String logId, HttpStatus statusCode) { public static void logRawResponse(String logId, HttpStatus statusCode) {
if (WIRE_LOGGER.isTraceEnabled()) { if (isEnabled()) {
WIRE_LOGGER.trace("[{}] Received raw response: ", logId, statusCode); WIRE_LOGGER.trace("[{}] Received raw response: ", logId, statusCode);
} }
} }
@ -103,7 +106,7 @@ public abstract class ClientLogger {
*/ */
public static void logResponse(String logId, HttpStatus statusCode, String body) { public static void logResponse(String logId, HttpStatus statusCode, String body) {
if (WIRE_LOGGER.isTraceEnabled()) { if (isEnabled()) {
WIRE_LOGGER.trace("[{}] Received response: {}{}Response body: {}", logId, statusCode, lineSeparator, body); WIRE_LOGGER.trace("[{}] Received response: {}{}Response body: {}", logId, statusCode, lineSeparator, body);
} }
} }
@ -114,6 +117,11 @@ public abstract class ClientLogger {
* @return a new, unique correlation Id. * @return a new, unique correlation Id.
*/ */
public static String newLogId() { public static String newLogId() {
if (!isEnabled()) {
return "-";
}
return ObjectUtils.getIdentityHexString(new Object()); return ObjectUtils.getIdentityHexString(new Object());
} }
} }

View File

@ -31,6 +31,7 @@ import org.springframework.lang.Nullable;
* Default {@link ClientConfiguration} implementation. * Default {@link ClientConfiguration} implementation.
* *
* @author Mark Paluch * @author Mark Paluch
* @author Christoph Strobl
* @since 4.0 * @since 4.0
*/ */
class DefaultClientConfiguration implements ClientConfiguration { class DefaultClientConfiguration implements ClientConfiguration {
@ -90,9 +91,9 @@ class DefaultClientConfiguration implements ClientConfiguration {
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getConnectTimeout() * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getConnectTimeout()
*/ */
@Override @Override
public Duration getConnectTimeout() { public Duration getConnectTimeout() {
return this.connectTimeout; return this.connectTimeout;

View File

@ -85,19 +85,25 @@ public final class RestClients {
} }
builder.setHttpClientConfigCallback(clientBuilder -> { builder.setHttpClientConfigCallback(clientBuilder -> {
Optional<SSLContext> sslContext = clientConfiguration.getSslContext(); Optional<SSLContext> sslContext = clientConfiguration.getSslContext();
sslContext.ifPresent(clientBuilder::setSSLContext); sslContext.ifPresent(clientBuilder::setSSLContext);
if (ClientLogger.isEnabled()) { if (ClientLogger.isEnabled()) {
clientBuilder.addInterceptorLast((HttpRequestInterceptor) LoggingInterceptors.INSTANCE);
clientBuilder.addInterceptorLast((HttpResponseInterceptor) LoggingInterceptors.INSTANCE); HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
clientBuilder.addInterceptorLast((HttpRequestInterceptor) interceptor);
clientBuilder.addInterceptorLast((HttpResponseInterceptor) interceptor);
} }
Duration connectTimeout = clientConfiguration.getConnectTimeout(); Duration connectTimeout = clientConfiguration.getConnectTimeout();
Duration timeout = clientConfiguration.getSocketTimeout(); Duration timeout = clientConfiguration.getSocketTimeout();
Builder requestConfigBuilder = RequestConfig.custom(); Builder requestConfigBuilder = RequestConfig.custom();
if (!connectTimeout.isNegative()) { if (!connectTimeout.isNegative()) {
requestConfigBuilder.setConnectTimeout(Math.toIntExact(connectTimeout.toMillis())); requestConfigBuilder.setConnectTimeout(Math.toIntExact(connectTimeout.toMillis()));
requestConfigBuilder.setConnectionRequestTimeout(Math.toIntExact(connectTimeout.toMillis())); requestConfigBuilder.setConnectionRequestTimeout(Math.toIntExact(connectTimeout.toMillis()));
} }
@ -151,16 +157,17 @@ public final class RestClients {
* Logging interceptors for Elasticsearch client logging. * Logging interceptors for Elasticsearch client logging.
* *
* @see ClientLogger * @see ClientLogger
* @since 4.0
*/ */
enum LoggingInterceptors implements HttpResponseInterceptor, HttpRequestInterceptor { private static class HttpLoggingInterceptor implements HttpResponseInterceptor, HttpRequestInterceptor {
INSTANCE;
@Override @Override
public void process(HttpRequest request, HttpContext context) throws IOException { public void process(HttpRequest request, HttpContext context) throws IOException {
String logId = (String) context.getAttribute(RestClients.LOG_ID_ATTRIBUTE); String logId = (String) context.getAttribute(RestClients.LOG_ID_ATTRIBUTE);
if (logId == null) { if (logId == null) {
logId = ClientLogger.newLogId(); logId = ClientLogger.newLogId();
context.setAttribute(RestClients.LOG_ID_ATTRIBUTE, logId); context.setAttribute(RestClients.LOG_ID_ATTRIBUTE, logId);
} }

View File

@ -156,20 +156,20 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) { private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) {
WebClientProvider provider; Duration connectTimeout = clientConfiguration.getConnectTimeout();
Duration soTimeout = clientConfiguration.getSocketTimeout();
TcpClient tcpClient = TcpClient.create(); TcpClient tcpClient = TcpClient.create();
Duration connectTimeout = clientConfiguration.getConnectTimeout();
Duration timeout = clientConfiguration.getSocketTimeout();
if (!connectTimeout.isNegative()) { if (!connectTimeout.isNegative()) {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis())); tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
} }
if (!timeout.isNegative()) { if (!soTimeout.isNegative()) {
tcpClient = tcpClient.doOnConnected(connection -> connection // tcpClient = tcpClient.doOnConnected(connection -> connection //
.addHandlerLast(new ReadTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS)) .addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS))); .addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)));
} }
String scheme = "http"; String scheme = "http";
@ -187,7 +187,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
} }
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient); ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
provider = WebClientProvider.create(scheme, connector); WebClientProvider provider = WebClientProvider.create(scheme, connector);
return provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()); return provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders());
} }
@ -370,6 +370,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(),
body::get); body::get);
requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()));
requestBodySpec.body(Mono.fromSupplier(body::get), String.class); requestBodySpec.body(Mono.fromSupplier(body::get), String.class);
} else { } else {
@ -397,21 +398,21 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
Class<T> responseType) { Class<T> responseType) {
if (RawActionResponse.class.equals(responseType)) { if (RawActionResponse.class.equals(responseType)) {
ClientLogger.logRawResponse(logId, response.statusCode());
ClientLogger.logRawResponse(logId, response.statusCode());
return Mono.just(responseType.cast(RawActionResponse.create(response))); return Mono.just(responseType.cast(RawActionResponse.create(response)));
} }
if (response.statusCode().is5xxServerError()) { if (response.statusCode().is5xxServerError()) {
ClientLogger.logRawResponse(logId, response.statusCode()); ClientLogger.logRawResponse(logId, response.statusCode());
return handleServerError(request, response); return handleServerError(request, response);
} }
return response.body(BodyExtractors.toMono(byte[].class)) // return response.body(BodyExtractors.toMono(byte[].class)) //
.map(it -> new String(it, StandardCharsets.UTF_8)) // .map(it -> new String(it, StandardCharsets.UTF_8)) //
.doOnNext(it -> { .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
ClientLogger.logResponse(logId, response.statusCode(), it); .flatMap(content -> doDecode(response, responseType, content));
}).flatMap(content -> doDecode(response, responseType, content));
} }
private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) { private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) {
@ -435,7 +436,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value()))); .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
} }
} }
} }
private static XContentParser createParser(String mediaType, String content) throws IOException { private static XContentParser createParser(String mediaType, String content) throws IOException {
@ -446,6 +446,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
} }
private static <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) { private static <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
return Mono.error( return Mono.error(
new HttpServerErrorException(response.statusCode(), String.format("%s request to %s returned error code %s.", new HttpServerErrorException(response.statusCode(), String.format("%s request to %s returned error code %s.",
request.getMethod(), request.getEndpoint(), response.statusCode().value()))); request.getMethod(), request.getEndpoint(), response.statusCode().value())));

View File

@ -43,7 +43,7 @@ import org.springframework.util.StringUtils;
@RequiredArgsConstructor @RequiredArgsConstructor
class EntityOperations { class EntityOperations {
public static final String ID_FIELD = "id"; private static final String ID_FIELD = "id";
private final @NonNull MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> context; private final @NonNull MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> context;
@ -54,7 +54,7 @@ class EntityOperations {
* @return * @return
*/ */
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public <T> Entity<T> forEntity(T entity) { <T> Entity<T> forEntity(T entity) {
Assert.notNull(entity, "Bean must not be null!"); Assert.notNull(entity, "Bean must not be null!");
@ -73,7 +73,7 @@ class EntityOperations {
* @return * @return
*/ */
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public <T> AdaptibleEntity<T> forEntity(T entity, ConversionService conversionService) { <T> AdaptibleEntity<T> forEntity(T entity, ConversionService conversionService) {
Assert.notNull(entity, "Bean must not be null!"); Assert.notNull(entity, "Bean must not be null!");
Assert.notNull(conversionService, "ConversionService must not be null!"); Assert.notNull(conversionService, "ConversionService must not be null!");
@ -97,7 +97,7 @@ class EntityOperations {
* @see ElasticsearchPersistentEntity#getIndexName() * @see ElasticsearchPersistentEntity#getIndexName()
* @see ElasticsearchPersistentEntity#getIndexType() * @see ElasticsearchPersistentEntity#getIndexType()
*/ */
public IndexCoordinates determineIndex(Entity<?> entity, @Nullable String index, @Nullable String type) { IndexCoordinates determineIndex(Entity<?> entity, @Nullable String index, @Nullable String type) {
return determineIndex(entity.getPersistentEntity(), index, type); return determineIndex(entity.getPersistentEntity(), index, type);
} }
@ -114,7 +114,7 @@ class EntityOperations {
* @see ElasticsearchPersistentEntity#getIndexName() * @see ElasticsearchPersistentEntity#getIndexName()
* @see ElasticsearchPersistentEntity#getIndexType() * @see ElasticsearchPersistentEntity#getIndexType()
*/ */
public IndexCoordinates determineIndex(ElasticsearchPersistentEntity<?> persistentEntity, @Nullable String index, IndexCoordinates determineIndex(ElasticsearchPersistentEntity<?> persistentEntity, @Nullable String index,
@Nullable String type) { @Nullable String type) {
return new IndexCoordinates(indexName(persistentEntity, index), typeName(persistentEntity, type)); return new IndexCoordinates(indexName(persistentEntity, index), typeName(persistentEntity, type));
} }
@ -214,20 +214,21 @@ class EntityOperations {
* Information and commands on an entity. * Information and commands on an entity.
* *
* @author Mark Paluch * @author Mark Paluch
* @author Christoph Strobl
*/ */
interface AdaptibleEntity<T> extends Entity<T> { interface AdaptibleEntity<T> extends Entity<T> {
/** /**
* Returns whether the entity has a parent. * Returns whether the entity has a parent.
* *
* @return * @return {@literal true} if the entity has a parent that has an {@literal id}.
*/ */
boolean hasParent(); boolean hasParent();
/** /**
* Returns the parent Id. Can be {@literal null}. * Returns the parent Id. Can be {@literal null}.
* *
* @return * @return can be {@literal null}.
*/ */
@Nullable @Nullable
Object getParentId(); Object getParentId();
@ -236,8 +237,8 @@ class EntityOperations {
* Populates the identifier of the backing entity if it has an identifier property and there's no identifier * Populates the identifier of the backing entity if it has an identifier property and there's no identifier
* currently present. * currently present.
* *
* @param id must not be {@literal null}. * @param id can be {@literal null}.
* @return * @return can be {@literal null}.
*/ */
@Nullable @Nullable
T populateIdIfNecessary(@Nullable Object id); T populateIdIfNecessary(@Nullable Object id);
@ -267,16 +268,16 @@ class EntityOperations {
} }
/** /**
* Plain entity without applying further mapping.
*
* @param <T> * @param <T>
* @author Christoph Strobl
* @since 4.0
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
private static class UnmappedEntity<T extends Map<String, Object>> implements AdaptibleEntity<T> { private static class MapBackedEntity<T extends Map<String, Object>> implements AdaptibleEntity<T> {
private final T map; private final T map;
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getId() * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getId()
*/ */
@ -285,7 +286,7 @@ class EntityOperations {
return map.get(ID_FIELD); return map.get(ID_FIELD);
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#hasParent() * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#hasParent()
*/ */
@ -294,7 +295,7 @@ class EntityOperations {
return false; return false;
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getParentId() * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getParentId()
*/ */
@ -303,7 +304,7 @@ class EntityOperations {
return null; return null;
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#populateIdIfNecessary(java.lang.Object) * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#populateIdIfNecessary(java.lang.Object)
*/ */
@ -316,7 +317,7 @@ class EntityOperations {
return map; return map;
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty() * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty()
*/ */
@ -325,7 +326,7 @@ class EntityOperations {
return map; return map;
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getVersion() * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getVersion()
*/ */
@ -335,7 +336,7 @@ class EntityOperations {
return null; return null;
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion() * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion()
*/ */
@ -344,7 +345,7 @@ class EntityOperations {
return map; return map;
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getBean() * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getBean()
*/ */
@ -353,7 +354,7 @@ class EntityOperations {
return map; return map;
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isNew() * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isNew()
*/ */
@ -362,7 +363,7 @@ class EntityOperations {
return map.get(ID_FIELD) != null; return map.get(ID_FIELD) != null;
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getPersistentEntity() * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getPersistentEntity()
*/ */
@ -372,12 +373,26 @@ class EntityOperations {
} }
} }
/**
* Plain entity without applying further mapping.
*
* @param <T>
* @since 4.0
*/
private static class UnmappedEntity<T extends Map<String, Object>> extends MapBackedEntity<T> {
UnmappedEntity(T map) {
super(map);
}
}
/** /**
* Simple mapped entity without an associated {@link ElasticsearchPersistentEntity}. * Simple mapped entity without an associated {@link ElasticsearchPersistentEntity}.
* *
* @param <T> * @param <T>
* @since 4.0
*/ */
private static class SimpleMappedEntity<T extends Map<String, Object>> extends UnmappedEntity<T> { private static class SimpleMappedEntity<T extends Map<String, Object>> extends MapBackedEntity<T> {
SimpleMappedEntity(T map) { SimpleMappedEntity(T map) {
super(map); super(map);
@ -471,6 +486,10 @@ class EntityOperations {
} }
} }
/**
* @param <T>
* @since 4.0
*/
private static class AdaptibleMappedEntity<T> extends MappedEntity<T> implements AdaptibleEntity<T> { private static class AdaptibleMappedEntity<T> extends MappedEntity<T> implements AdaptibleEntity<T> {
private final ElasticsearchPersistentEntity<?> entity; private final ElasticsearchPersistentEntity<?> entity;

View File

@ -62,6 +62,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.data.elasticsearch.core.query.StringQuery; import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.mapping.context.MappingContext;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
@ -240,12 +241,14 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(mappedQuery(query, entity)); searchSourceBuilder.query(mappedQuery(query, entity));
// TODO: request.source().postFilter(elasticsearchFilter); -- filter query
searchSourceBuilder.version(entity.hasVersionProperty()); // This has been true by default before searchSourceBuilder.version(entity.hasVersionProperty()); // This has been true by default before
searchSourceBuilder.trackScores(query.getTrackScores()); searchSourceBuilder.trackScores(query.getTrackScores());
QueryBuilder postFilterQuery = mappedFilterQuery(query, entity);
if (postFilterQuery != null) {
searchSourceBuilder.postFilter(postFilterQuery);
}
if (query.getSourceFilter() != null) { if (query.getSourceFilter() != null) {
searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes());
} }
@ -259,6 +262,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
searchSourceBuilder.from((int) offset); searchSourceBuilder.from((int) offset);
searchSourceBuilder.size(query.getPageable().getPageSize()); searchSourceBuilder.size(query.getPageable().getPageSize());
} else {
searchSourceBuilder.from(0);
searchSourceBuilder.size(10000); // this is the index.max_result_window default value
} }
if (query.getIndicesOptions() != null) { if (query.getIndicesOptions() != null) {
@ -273,9 +280,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
request.source(searchSourceBuilder); request.source(searchSourceBuilder);
request = prepareSearchRequest(request); return doFind(prepareSearchRequest(request));
return doFind(request);
}); });
} }
@ -605,9 +610,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery(); return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery();
} }
private QueryBuilder mappedFilterQuery(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) { @Nullable
private QueryBuilder mappedFilterQuery(Query query, ElasticsearchPersistentEntity<?> entity) {
if (query instanceof SearchQuery) {
return ((SearchQuery) query).getFilter();
}
// TODO: this is actually strange in the RestTemplate:L378 - need to check
return null; return null;
} }

View File

@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@ -41,12 +42,15 @@ public final class TestUtils {
private TestUtils() {} private TestUtils() {}
private static final ClientConfiguration CONFIG = ClientConfiguration.builder().connectedToLocalhost()
.withConnectTimeout(Duration.ofSeconds(5)).withSocketTimeout(Duration.ofSeconds(3)).build();
public static RestHighLevelClient restHighLevelClient() { public static RestHighLevelClient restHighLevelClient() {
return RestClients.create(ClientConfiguration.create("localhost:9200")).rest(); return RestClients.create(CONFIG).rest();
} }
public static ReactiveElasticsearchClient reactiveClient() { public static ReactiveElasticsearchClient reactiveClient() {
return ReactiveRestClients.create(ClientConfiguration.create("localhost:9200")); return ReactiveRestClients.create(CONFIG);
} }
public static Version serverVersion() { public static Version serverVersion() {

View File

@ -17,12 +17,10 @@ package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import org.junit.Rule;
import org.springframework.data.elasticsearch.ElasticsearchVersion;
import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@ -45,10 +43,13 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.data.elasticsearch.ElasticsearchVersion;
import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
import org.springframework.data.elasticsearch.TestUtils; import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.http.HttpHeaders; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
@ -109,7 +110,10 @@ public class ReactiveElasticsearchClientTests {
@Test // DATAES-488 @Test // DATAES-488
public void pingForUnknownHostShouldReturnFalse() { public void pingForUnknownHostShouldReturnFalse() {
DefaultReactiveElasticsearchClient.create(HttpHeaders.EMPTY, "http://localhost:4711").ping() // DefaultReactiveElasticsearchClient
.create(ClientConfiguration.builder().connectedTo("localhost:4711")
.withConnectTimeout(Duration.ofSeconds(2)).build())
.ping() //
.as(StepVerifier::create) // .as(StepVerifier::create) //
.expectNext(false) // .expectNext(false) //
.verifyComplete(); .verifyComplete();

View File

@ -440,7 +440,7 @@ public class ReactiveElasticsearchTemplateTests {
@Test // DATAES-504 @Test // DATAES-504
@ElasticsearchVersion(asOf = "6.5.0") @ElasticsearchVersion(asOf = "6.5.0")
public void deleteByQueryShouldReturnZeroIfNothingDeleted() throws Exception { public void deleteByQueryShouldReturnZeroIfNothingDeleted() {
index(randomEntity("test message")); index(randomEntity("test message"));

View File

@ -19,6 +19,8 @@ import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.action.search.SearchRequest.*; import static org.elasticsearch.action.search.SearchRequest.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -118,6 +120,36 @@ public class ReactiveElasticsearchTemplateUnitTests {
assertThat(captor.getValue().indicesOptions()).isEqualTo(IndicesOptions.LENIENT_EXPAND_OPEN); assertThat(captor.getValue().indicesOptions()).isEqualTo(IndicesOptions.LENIENT_EXPAND_OPEN);
} }
@Test // DATAES-504
public void findShouldApplyPaginationIfSet() {
ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
when(client.search(captor.capture())).thenReturn(Flux.empty());
template.find(new CriteriaQuery(new Criteria("*")).setPageable(PageRequest.of(2, 50)), SampleEntity.class) //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().source().from()).isEqualTo(100);
assertThat(captor.getValue().source().size()).isEqualTo(50);
}
@Test // DATAES-504
public void findShouldApplyDefaultMaxIfPaginationNotSet() {
ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
when(client.search(captor.capture())).thenReturn(Flux.empty());
template.find(new CriteriaQuery(new Criteria("*")).setPageable(Pageable.unpaged()), SampleEntity.class) //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().source().from()).isEqualTo(0);
assertThat(captor.getValue().source().size()).isEqualTo(10000);
}
@Test // DATAES-504 @Test // DATAES-504
public void deleteShouldUseDefaultRefreshPolicy() { public void deleteShouldUseDefaultRefreshPolicy() {