From 2dcd1cfbadfbca804103ed220d8247f5f8d336a6 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 29 Nov 2018 14:26:34 +0100 Subject: [PATCH] DATAES-504 - Add configuration options for logging and timeouts to ReactiveElasticsearchClient. We now log HTTP requests and responses with the org.springframework.data.elasticsearch.client.WIRE logger for both, the HighLevelRestClient and our reactive client and associate a logging Id for improved traceability. Configuration of connection/socket timeouts is now available via the ClientConfiguration. Along the lines we also aligned entity handling to EntityOperations already common in other modules. EntityOperations centralizes how aspects of entities (versioning, retrieval of index name/index type) are handled. Original Pull Request: #229 --- .../client/ClientConfiguration.java | 56 +- .../client/ClientConfigurationBuilder.java | 54 +- .../elasticsearch/client/ClientLogger.java | 119 ++++ .../client/DefaultClientConfiguration.java | 34 +- .../elasticsearch/client/RestClients.java | 83 +++ .../DefaultReactiveElasticsearchClient.java | 70 +- .../elasticsearch/core/EntityOperations.java | 608 ++++++++++++++++++ .../core/ReactiveElasticsearchTemplate.java | 281 ++------ .../client/ClientConfigurationUnitTests.java | 11 +- .../ReactiveMockClientTestsUtils.java | 2 + .../ReactiveElasticsearchTemplateTests.java | 22 +- src/test/resources/log4j2.xml | 5 +- src/test/resources/logback.xml | 1 + .../reindex/plugin-descriptor.properties | 45 ++ .../modules/reindex/plugin-security.policy | 33 + 15 files changed, 1158 insertions(+), 266 deletions(-) create mode 100644 src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java create mode 100644 src/test/resources/test-home-dir/modules/reindex/plugin-descriptor.properties create mode 100644 src/test/resources/test-home-dir/modules/reindex/plugin-security.policy diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java index 8ec42dcd7..41910a822 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java @@ -16,6 +16,8 @@ package org.springframework.data.elasticsearch.client; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Duration; import java.util.List; import java.util.Optional; @@ -99,6 +101,23 @@ public interface ClientConfiguration { */ Optional getSslContext(); + /** + * Returns the {@link java.time.Duration connect timeout}. + * + * @see java.net.Socket#connect(SocketAddress, int) + * @see io.netty.channel.ChannelOption#CONNECT_TIMEOUT_MILLIS + */ + Duration getConnectTimeout(); + + /** + * Returns the {@link java.time.Duration socket timeout} which is typically applied as SO-timeout/read timeout. + * + * @see java.net.Socket#setSoTimeout(int) + * @see io.netty.handler.timeout.ReadTimeoutHandler + * @see io.netty.handler.timeout.WriteTimeoutHandler + */ + Duration getSocketTimeout(); + /** * @author Christoph Strobl */ @@ -146,38 +165,59 @@ public interface ClientConfiguration { /** * @author Christoph Strobl */ - interface MaybeSecureClientConfigurationBuilder extends ClientConfigurationBuilderWithOptionalDefaultHeaders { + interface MaybeSecureClientConfigurationBuilder extends TerminalClientConfigurationBuilder { /** * Connect via {@literal https}
* NOTE You need to leave out the protocol in * {@link ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(String)}. * - * @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders}. + * @return the {@link TerminalClientConfigurationBuilder}. */ - ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl(); + TerminalClientConfigurationBuilder usingSsl(); /** * Connect via {@literal https} using the given {@link SSLContext}.
* NOTE You need to leave out the protocol in * {@link ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(String)}. * - * @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders}. + * @return the {@link TerminalClientConfigurationBuilder}. */ - ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl(SSLContext sslContext); + TerminalClientConfigurationBuilder usingSsl(SSLContext sslContext); } /** * @author Christoph Strobl * @author Mark Paluch */ - interface ClientConfigurationBuilderWithOptionalDefaultHeaders { + interface TerminalClientConfigurationBuilder { /** * @param defaultHeaders must not be {@literal null}. - * @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders} + * @return the {@link TerminalClientConfigurationBuilder} */ - ClientConfigurationBuilderWithOptionalDefaultHeaders withDefaultHeaders(HttpHeaders defaultHeaders); + TerminalClientConfigurationBuilder withDefaultHeaders(HttpHeaders defaultHeaders); + + /** + * Configure a {@link java.time.Duration} connect timeout. + * + * @param connectTimeout the timeout to use. + * @return the {@link TerminalClientConfigurationBuilder} + * @see java.net.Socket#connect(SocketAddress, int) + * @see io.netty.channel.ChannelOption#CONNECT_TIMEOUT_MILLIS + */ + TerminalClientConfigurationBuilder withConnectTimeout(Duration connectTimeout); + + /** + * Configure a {@link java.time.Duration socket timeout} which is typically applied as SO-timeout/read timeout. + * + * @param soTimeout the timeout to use. + * @return the {@link TerminalClientConfigurationBuilder} + * @see java.net.Socket#setSoTimeout(int) + * @see io.netty.handler.timeout.ReadTimeoutHandler + * @see io.netty.handler.timeout.WriteTimeoutHandler + */ + TerminalClientConfigurationBuilder withSocketTimeout(Duration soTimeout); /** * Build the {@link ClientConfiguration} object. diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java index 42e2d57de..89b8451fc 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java @@ -16,6 +16,7 @@ package org.springframework.data.elasticsearch.client; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -23,9 +24,9 @@ import java.util.stream.Collectors; import javax.net.ssl.SSLContext; -import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders; import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint; import org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder; +import org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -44,8 +45,10 @@ class ClientConfigurationBuilder private HttpHeaders headers = HttpHeaders.EMPTY; private boolean useSsl; private @Nullable SSLContext sslContext; + private Duration connectTimeout = Duration.ofSeconds(10); + private Duration soTimeout = Duration.ofSeconds(5); - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(java.lang.String[]) */ @@ -58,7 +61,7 @@ class ClientConfigurationBuilder return this; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(java.net.InetSocketAddress[]) */ @@ -72,23 +75,23 @@ class ClientConfigurationBuilder return this; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder#usingSsl() */ @Override - public ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl() { + public TerminalClientConfigurationBuilder usingSsl() { this.useSsl = true; return this; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder#usingSsl(javax.net.ssl.SSLContext) */ @Override - public ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl(SSLContext sslContext) { + public TerminalClientConfigurationBuilder usingSsl(SSLContext sslContext) { Assert.notNull(sslContext, "SSL Context must not be null"); @@ -97,12 +100,12 @@ class ClientConfigurationBuilder return this; } - /* + /* * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders#withDefaultHeaders(org.springframework.http.HttpHeaders) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withDefaultHeaders(org.springframework.http.HttpHeaders) */ @Override - public ClientConfigurationBuilderWithOptionalDefaultHeaders withDefaultHeaders(HttpHeaders defaultHeaders) { + public TerminalClientConfigurationBuilder withDefaultHeaders(HttpHeaders defaultHeaders) { Assert.notNull(defaultHeaders, "Default HTTP headers must not be null"); @@ -110,13 +113,40 @@ class ClientConfigurationBuilder return this; } - /* + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withConnectTimeout(java.time.Duration) + */ + @Override + public TerminalClientConfigurationBuilder withConnectTimeout(Duration connectTimeout) { + + Assert.notNull(connectTimeout, "I/O timeout must not be null!"); + + this.connectTimeout = connectTimeout; + return this; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withTimeout(java.time.Duration) + */ + @Override + public TerminalClientConfigurationBuilder withSocketTimeout(Duration soTimeout) { + + Assert.notNull(soTimeout, "Socket timeout must not be null!"); + + this.soTimeout = soTimeout; + return this; + } + + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders#build() */ @Override public ClientConfiguration build() { - return new DefaultClientConfiguration(this.hosts, this.headers, this.useSsl, this.sslContext); + return new DefaultClientConfiguration(this.hosts, this.headers, this.useSsl, this.sslContext, this.soTimeout, + this.connectTimeout); } private static InetSocketAddress parse(String hostAndPort) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java new file mode 100644 index 000000000..ede54037e --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java @@ -0,0 +1,119 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.util.ObjectUtils; + +/** + * Logging Utility to log client requests and responses. Logs client requests and responses to Elasticsearch to a + * dedicated logger: {@code org.springframework.data.elasticsearch.client.WIRE} on {@link org.slf4j.event.Level#TRACE} + * level. + * + * @author Mark Paluch + * @since 4.0 + */ +public abstract class ClientLogger { + + private static final String lineSeparator = System.getProperty("line.separator"); + private static final Logger WIRE_LOGGER = LoggerFactory + .getLogger("org.springframework.data.elasticsearch.client.WIRE"); + + private ClientLogger() {} + + /** + * Returns {@literal true} if the logger is enabled. + * + * @return {@literal true} if the logger is enabled. + */ + public static boolean isEnabled() { + return WIRE_LOGGER.isTraceEnabled(); + } + + /** + * Log an outgoing HTTP request. + * + * @param logId the correlation Id, see {@link #newLogId()}. + * @param method HTTP method + * @param endpoint URI + * @param parameters optional parameters. + */ + public static void logRequest(String logId, String method, String endpoint, Object parameters) { + + if (WIRE_LOGGER.isTraceEnabled()) { + WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}", logId, method.toUpperCase(), endpoint, + parameters); + } + } + + /** + * Log an outgoing HTTP request with a request body. + * + * @param logId the correlation Id, see {@link #newLogId()}. + * @param method HTTP method + * @param endpoint URI + * @param parameters optional parameters. + * @param body body content supplier. + */ + public static void logRequest(String logId, String method, String endpoint, Object parameters, + Supplier body) { + + if (WIRE_LOGGER.isTraceEnabled()) { + WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}{}Request body: {}", logId, method.toUpperCase(), + endpoint, parameters, lineSeparator, body.get()); + } + } + + /** + * Log a raw HTTP response without logging the body. + * + * @param logId the correlation Id, see {@link #newLogId()}. + * @param statusCode the HTTP status code. + */ + public static void logRawResponse(String logId, HttpStatus statusCode) { + + if (WIRE_LOGGER.isTraceEnabled()) { + WIRE_LOGGER.trace("[{}] Received raw response: ", logId, statusCode); + } + } + + /** + * Log a raw HTTP response along with the body. + * + * @param logId the correlation Id, see {@link #newLogId()}. + * @param statusCode the HTTP status code. + * @param body body content. + */ + public static void logResponse(String logId, HttpStatus statusCode, String body) { + + if (WIRE_LOGGER.isTraceEnabled()) { + WIRE_LOGGER.trace("[{}] Received response: {}{}Response body: {}", logId, statusCode, lineSeparator, body); + } + } + + /** + * Creates a new, unique correlation Id to improve tracing across log events. + * + * @return a new, unique correlation Id. + */ + public static String newLogId() { + return ObjectUtils.getIdentityHexString(new Object()); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java index 396d325b5..79899ba51 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java @@ -16,6 +16,7 @@ package org.springframework.data.elasticsearch.client; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -38,17 +39,21 @@ class DefaultClientConfiguration implements ClientConfiguration { private final HttpHeaders headers; private final boolean useSsl; private final @Nullable SSLContext sslContext; + private final Duration soTimeout; + private final Duration connectTimeout; DefaultClientConfiguration(List hosts, HttpHeaders headers, boolean useSsl, - @Nullable SSLContext sslContext) { + @Nullable SSLContext sslContext, Duration soTimeout, Duration connectTimeout) { this.hosts = Collections.unmodifiableList(new ArrayList<>(hosts)); this.headers = new HttpHeaders(headers); this.useSsl = useSsl; this.sslContext = sslContext; + this.soTimeout = soTimeout; + this.connectTimeout = connectTimeout; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getEndpoints() */ @@ -57,7 +62,7 @@ class DefaultClientConfiguration implements ClientConfiguration { return this.hosts; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getDefaultHeaders() */ @@ -66,7 +71,7 @@ class DefaultClientConfiguration implements ClientConfiguration { return this.headers; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration#useSsl() */ @@ -75,7 +80,7 @@ class DefaultClientConfiguration implements ClientConfiguration { return this.useSsl; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getSslContext() */ @@ -83,4 +88,23 @@ class DefaultClientConfiguration implements ClientConfiguration { public Optional getSslContext() { return Optional.ofNullable(this.sslContext); } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getConnectTimeout() + */ + @Override + public Duration getConnectTimeout() { + return this.connectTimeout; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getSocketTimeout() + */ + @Override + public Duration getSocketTimeout() { + return this.soTimeout; + } + } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java b/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java index fb0102913..9329eabf4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java @@ -15,9 +15,11 @@ */ package org.springframework.data.elasticsearch.client; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -25,12 +27,23 @@ import java.util.stream.Collectors; import javax.net.ssl.SSLContext; import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.HttpResponse; +import org.apache.http.HttpResponseInterceptor; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.config.RequestConfig.Builder; +import org.apache.http.entity.ByteArrayEntity; import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HttpContext; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; import org.springframework.util.Assert; /** @@ -43,6 +56,11 @@ import org.springframework.util.Assert; */ public final class RestClients { + /** + * Name of whose value can be used to correlate log messages for this request. + */ + private static final String LOG_ID_ATTRIBUTE = RestClients.class.getName() + ".LOG_ID"; + private RestClients() {} /** @@ -70,6 +88,26 @@ public final class RestClients { Optional sslContext = clientConfiguration.getSslContext(); sslContext.ifPresent(clientBuilder::setSSLContext); + if (ClientLogger.isEnabled()) { + clientBuilder.addInterceptorLast((HttpRequestInterceptor) LoggingInterceptors.INSTANCE); + clientBuilder.addInterceptorLast((HttpResponseInterceptor) LoggingInterceptors.INSTANCE); + } + + Duration connectTimeout = clientConfiguration.getConnectTimeout(); + Duration timeout = clientConfiguration.getSocketTimeout(); + + Builder requestConfigBuilder = RequestConfig.custom(); + if (!connectTimeout.isNegative()) { + requestConfigBuilder.setConnectTimeout(Math.toIntExact(connectTimeout.toMillis())); + requestConfigBuilder.setConnectionRequestTimeout(Math.toIntExact(connectTimeout.toMillis())); + } + + if (!timeout.isNegative()) { + requestConfigBuilder.setSocketTimeout(Math.toIntExact(timeout.toMillis())); + } + + clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); + return clientBuilder; }); @@ -108,4 +146,49 @@ public final class RestClients { rest().close(); } } + + /** + * Logging interceptors for Elasticsearch client logging. + * + * @see ClientLogger + */ + enum LoggingInterceptors implements HttpResponseInterceptor, HttpRequestInterceptor { + + INSTANCE; + + @Override + public void process(HttpRequest request, HttpContext context) throws IOException { + + String logId = (String) context.getAttribute(RestClients.LOG_ID_ATTRIBUTE); + if (logId == null) { + logId = ClientLogger.newLogId(); + context.setAttribute(RestClients.LOG_ID_ATTRIBUTE, logId); + } + + if (request instanceof HttpEntityEnclosingRequest && ((HttpEntityEnclosingRequest) request).getEntity() != null) { + + HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest) request; + HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + entity.writeTo(buffer); + + if (!entity.isRepeatable()) { + entityRequest.setEntity(new ByteArrayEntity(buffer.toByteArray())); + } + + ClientLogger.logRequest(logId, request.getRequestLine().getMethod(), request.getRequestLine().getUri(), "", + () -> new String(buffer.toByteArray())); + } else { + ClientLogger.logRequest(logId, request.getRequestLine().getMethod(), request.getRequestLine().getUri(), ""); + } + } + + @Override + public void process(HttpResponse response, HttpContext context) { + + String logId = (String) context.getAttribute(RestClients.LOG_ID_ATTRIBUTE); + + ClientLogger.logRawResponse(logId, HttpStatus.resolve(response.getStatusLine().getStatusCode())); + } + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 4ed9c82e6..7e0e620c2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -15,19 +15,25 @@ */ package org.springframework.data.elasticsearch.client.reactive; +import io.netty.channel.ChannelOption; import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.JdkSslContext; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.tcp.TcpClient; import java.io.IOException; import java.lang.reflect.Method; import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import javax.net.ssl.SSLContext; @@ -64,10 +70,12 @@ import org.elasticsearch.search.SearchHit; import org.reactivestreams.Publisher; import org.springframework.data.elasticsearch.ElasticsearchException; import org.springframework.data.elasticsearch.client.ClientConfiguration; +import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.NoReachableHostException; import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification; import org.springframework.data.elasticsearch.client.util.RequestConverters; +import org.springframework.data.util.Lazy; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -77,6 +85,7 @@ import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.reactive.function.BodyExtractors; +import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec; @@ -149,19 +158,37 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch WebClientProvider provider; + TcpClient tcpClient = TcpClient.create(); + Duration connectTimeout = clientConfiguration.getConnectTimeout(); + Duration timeout = clientConfiguration.getSocketTimeout(); + + if (!connectTimeout.isNegative()) { + tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis())); + } + + if (!timeout.isNegative()) { + tcpClient = tcpClient.doOnConnected(connection -> connection // + .addHandlerLast(new ReadTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS)) + .addHandlerLast(new WriteTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS))); + } + + String scheme = "http"; + HttpClient httpClient = HttpClient.from(tcpClient); + if (clientConfiguration.useSsl()) { - ReactorClientHttpConnector connector = new ReactorClientHttpConnector(HttpClient.create().secure(sslConfig -> { + httpClient = httpClient.secure(sslConfig -> { Optional sslContext = clientConfiguration.getSslContext(); - sslContext.ifPresent(it -> sslConfig.sslContext(new JdkSslContext(it, true, ClientAuth.NONE))); - })); - provider = WebClientProvider.create("https", connector); - } else { - provider = WebClientProvider.create("http"); + }); + + scheme = "https"; } + ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient); + provider = WebClientProvider.create(scheme, connector); + return provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()); } @@ -324,20 +351,29 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch private Flux sendRequest(Request request, Class responseType, HttpHeaders headers) { - return execute(webClient -> sendRequest(webClient, request, headers)) - .flatMapMany(response -> readResponseBody(request, response, responseType)); + String logId = ClientLogger.newLogId(); + + return execute(webClient -> sendRequest(webClient, logId, request, headers)) + .flatMapMany(response -> readResponseBody(logId, request, response, responseType)); } - private Mono sendRequest(WebClient webClient, Request request, HttpHeaders headers) { + private Mono sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) // .uri(request.getEndpoint(), request.getParameters()) // + .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) // .headers(theHeaders -> theHeaders.addAll(headers)); if (request.getEntity() != null) { + Lazy body = bodyExtractor(request); + + ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), + body::get); requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); - requestBodySpec.body(bodyExtractor(request), String.class); + requestBodySpec.body(Mono.fromSupplier(body::get), String.class); + } else { + ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } return requestBodySpec // @@ -345,9 +381,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); } - private Publisher bodyExtractor(Request request) { + private Lazy bodyExtractor(Request request) { - return Mono.fromSupplier(() -> { + return Lazy.of(() -> { try { return EntityUtils.toString(request.getEntity()); @@ -357,19 +393,25 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch }); } - private Publisher readResponseBody(Request request, ClientResponse response, Class responseType) { + private Publisher readResponseBody(String logId, Request request, ClientResponse response, + Class responseType) { if (RawActionResponse.class.equals(responseType)) { + ClientLogger.logRawResponse(logId, response.statusCode()); + return Mono.just(responseType.cast(RawActionResponse.create(response))); } if (response.statusCode().is5xxServerError()) { + ClientLogger.logRawResponse(logId, response.statusCode()); return handleServerError(request, response); } return response.body(BodyExtractors.toMono(byte[].class)) // .map(it -> new String(it, StandardCharsets.UTF_8)) // - .flatMap(content -> doDecode(response, responseType, content)); + .doOnNext(it -> { + ClientLogger.logResponse(logId, response.statusCode(), it); + }).flatMap(content -> doDecode(response, responseType, content)); } private static Mono doDecode(ClientResponse response, Class responseType, String content) { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java new file mode 100644 index 000000000..8852b93cc --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java @@ -0,0 +1,608 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +import java.util.Map; + +import org.springframework.core.convert.ConversionService; +import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; +import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; +import org.springframework.data.mapping.IdentifierAccessor; +import org.springframework.data.mapping.PersistentPropertyAccessor; +import org.springframework.data.mapping.context.MappingContext; +import org.springframework.data.mapping.model.ConvertingPropertyAccessor; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * Common operations performed on an entity in the context of it's mapping metadata. + * + * @author Mark Paluch + * @author Christoph Strobl + * @since 4.0 + */ +@RequiredArgsConstructor +class EntityOperations { + + public static final String ID_FIELD = "id"; + + private final @NonNull MappingContext, ElasticsearchPersistentProperty> context; + + /** + * Creates a new {@link Entity} for the given bean. + * + * @param entity must not be {@literal null}. + * @return + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Entity forEntity(T entity) { + + Assert.notNull(entity, "Bean must not be null!"); + + if (entity instanceof Map) { + return new SimpleMappedEntity((Map) entity); + } + + return MappedEntity.of(entity, context); + } + + /** + * Creates a new {@link AdaptibleEntity} for the given bean and {@link ConversionService}. + * + * @param entity must not be {@literal null}. + * @param conversionService must not be {@literal null}. + * @return + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public AdaptibleEntity forEntity(T entity, ConversionService conversionService) { + + Assert.notNull(entity, "Bean must not be null!"); + Assert.notNull(conversionService, "ConversionService must not be null!"); + + if (entity instanceof Map) { + return new SimpleMappedEntity((Map) entity); + } + + return AdaptibleMappedEntity.of(entity, context, conversionService); + } + + /** + * Determine index name and type name from {@link Entity} with {@code index} and {@code type} overrides. Allows using + * preferred values for index and type if provided, otherwise fall back to index and type defined on entity level. + * + * @param entity the entity to determine the index name. Can be {@literal null} if {@code index} and {@literal type} + * are provided. + * @param index index name override can be {@literal null}. + * @param type index type override can be {@literal null}. + * @return the {@link IndexCoordinates} containing index name and index type. + * @see ElasticsearchPersistentEntity#getIndexName() + * @see ElasticsearchPersistentEntity#getIndexType() + */ + public IndexCoordinates determineIndex(Entity entity, @Nullable String index, @Nullable String type) { + return determineIndex(entity.getPersistentEntity(), index, type); + } + + /** + * Determine index name and type name from {@link ElasticsearchPersistentEntity} with {@code index} and {@code type} + * overrides. Allows using preferred values for index and type if provided, otherwise fall back to index and type + * defined on entity level. + * + * @param persistentEntity the entity to determine the index name. Can be {@literal null} if {@code index} and + * {@literal type} are provided. + * @param index index name override can be {@literal null}. + * @param type index type override can be {@literal null}. + * @return the {@link IndexCoordinates} containing index name and index type. + * @see ElasticsearchPersistentEntity#getIndexName() + * @see ElasticsearchPersistentEntity#getIndexType() + */ + public IndexCoordinates determineIndex(ElasticsearchPersistentEntity persistentEntity, @Nullable String index, + @Nullable String type) { + return new IndexCoordinates(indexName(persistentEntity, index), typeName(persistentEntity, type)); + } + + private static String indexName(@Nullable ElasticsearchPersistentEntity entity, @Nullable String index) { + + if (StringUtils.isEmpty(index)) { + Assert.notNull(entity, "Cannot determine index name"); + return entity.getIndexName(); + } + + return index; + } + + private static String typeName(@Nullable ElasticsearchPersistentEntity entity, @Nullable String type) { + + if (StringUtils.isEmpty(type)) { + Assert.notNull(entity, "Cannot determine index type"); + return entity.getIndexType(); + } + + return type; + } + + /** + * A representation of information about an entity. + * + * @author Christoph Strobl + */ + interface Entity { + + /** + * Returns the identifier of the entity. + * + * @return the ID value, can be {@literal null}. + */ + @Nullable + Object getId(); + + /** + * Returns whether the entity is versioned, i.e. if it contains a version property. + * + * @return + */ + default boolean isVersionedEntity() { + return false; + } + + /** + * Returns the value of the version if the entity has a version property, {@literal null} otherwise. + * + * @return + */ + @Nullable + Object getVersion(); + + /** + * Returns the underlying bean. + * + * @return + */ + T getBean(); + + /** + * Returns whether the entity is considered to be new. + * + * @return + */ + boolean isNew(); + + /** + * Returns the {@link ElasticsearchPersistentEntity} associated with this entity. + * + * @return can be {@literal null} if this entity is not mapped. + */ + @Nullable + ElasticsearchPersistentEntity getPersistentEntity(); + + /** + * Returns the required {@link ElasticsearchPersistentEntity}. + * + * @return + * @throws IllegalStateException if no {@link ElasticsearchPersistentEntity} is associated with this entity. + */ + default ElasticsearchPersistentEntity getRequiredPersistentEntity() { + + ElasticsearchPersistentEntity persistentEntity = getPersistentEntity(); + if (persistentEntity == null) { + throw new IllegalStateException("No ElasticsearchPersistentEntity available for this entity!"); + } + + return persistentEntity; + } + } + + /** + * Information and commands on an entity. + * + * @author Mark Paluch + */ + interface AdaptibleEntity extends Entity { + + /** + * Returns whether the entity has a parent. + * + * @return + */ + boolean hasParent(); + + /** + * Returns the parent Id. Can be {@literal null}. + * + * @return + */ + @Nullable + Object getParentId(); + + /** + * Populates the identifier of the backing entity if it has an identifier property and there's no identifier + * currently present. + * + * @param id must not be {@literal null}. + * @return + */ + @Nullable + T populateIdIfNecessary(@Nullable Object id); + + /** + * Initializes the version property of the of the current entity if available. + * + * @return the entity with the version property updated if available. + */ + T initializeVersionProperty(); + + /** + * Increments the value of the version property if available. + * + * @return the entity with the version property incremented if available. + */ + T incrementVersion(); + + /** + * Returns the current version value if the entity has a version property. + * + * @return the current version or {@literal null} in case it's uninitialized or the entity doesn't expose a version + * property. + */ + @Nullable + Number getVersion(); + } + + /** + * Plain entity without applying further mapping. + * + * @param + */ + @RequiredArgsConstructor + private static class UnmappedEntity> implements AdaptibleEntity { + + private final T map; + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getId() + */ + @Override + public Object getId() { + return map.get(ID_FIELD); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#hasParent() + */ + @Override + public boolean hasParent() { + return false; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getParentId() + */ + @Override + public Entity getParentId() { + return null; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#populateIdIfNecessary(java.lang.Object) + */ + @Nullable + @Override + public T populateIdIfNecessary(@Nullable Object id) { + + map.put(ID_FIELD, id); + + return map; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty() + */ + @Override + public T initializeVersionProperty() { + return map; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getVersion() + */ + @Override + @Nullable + public Number getVersion() { + return null; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion() + */ + @Override + public T incrementVersion() { + return map; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getBean() + */ + @Override + public T getBean() { + return map; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isNew() + */ + @Override + public boolean isNew() { + return map.get(ID_FIELD) != null; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getPersistentEntity() + */ + @Override + public ElasticsearchPersistentEntity getPersistentEntity() { + return null; + } + } + + /** + * Simple mapped entity without an associated {@link ElasticsearchPersistentEntity}. + * + * @param + */ + private static class SimpleMappedEntity> extends UnmappedEntity { + + SimpleMappedEntity(T map) { + super(map); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.UnmappedEntity#getId() + */ + @Override + public Object getId() { + return getBean().get(ID_FIELD); + } + } + + /** + * Mapped entity with an associated {@link ElasticsearchPersistentEntity}. + * + * @param + */ + @RequiredArgsConstructor(access = AccessLevel.PROTECTED) + private static class MappedEntity implements Entity { + + private final ElasticsearchPersistentEntity entity; + private final IdentifierAccessor idAccessor; + private final PersistentPropertyAccessor propertyAccessor; + + private static MappedEntity of(T bean, + MappingContext, ElasticsearchPersistentProperty> context) { + + ElasticsearchPersistentEntity entity = context.getRequiredPersistentEntity(bean.getClass()); + IdentifierAccessor identifierAccessor = entity.getIdentifierAccessor(bean); + PersistentPropertyAccessor propertyAccessor = entity.getPropertyAccessor(bean); + + return new MappedEntity<>(entity, identifierAccessor, propertyAccessor); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getId() + */ + @Override + public Object getId() { + return idAccessor.getIdentifier(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isVersionedEntity() + */ + @Override + public boolean isVersionedEntity() { + return entity.hasVersionProperty(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getVersion() + */ + @Override + @Nullable + public Object getVersion() { + return propertyAccessor.getProperty(entity.getRequiredVersionProperty()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getBean() + */ + @Override + public T getBean() { + return propertyAccessor.getBean(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isNew() + */ + @Override + public boolean isNew() { + return entity.isNew(propertyAccessor.getBean()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getPersistentEntity() + */ + @Override + public ElasticsearchPersistentEntity getPersistentEntity() { + return entity; + } + } + + private static class AdaptibleMappedEntity extends MappedEntity implements AdaptibleEntity { + + private final ElasticsearchPersistentEntity entity; + private final ConvertingPropertyAccessor propertyAccessor; + private final IdentifierAccessor identifierAccessor; + + private AdaptibleMappedEntity(ElasticsearchPersistentEntity entity, IdentifierAccessor identifierAccessor, + ConvertingPropertyAccessor propertyAccessor) { + + super(entity, identifierAccessor, propertyAccessor); + + this.entity = entity; + this.propertyAccessor = propertyAccessor; + this.identifierAccessor = identifierAccessor; + } + + static AdaptibleEntity of(T bean, + MappingContext, ElasticsearchPersistentProperty> context, + ConversionService conversionService) { + + ElasticsearchPersistentEntity entity = context.getRequiredPersistentEntity(bean.getClass()); + IdentifierAccessor identifierAccessor = entity.getIdentifierAccessor(bean); + PersistentPropertyAccessor propertyAccessor = entity.getPropertyAccessor(bean); + + return new AdaptibleMappedEntity<>(entity, identifierAccessor, + new ConvertingPropertyAccessor<>(propertyAccessor, conversionService)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#hasParent() + */ + @Override + public boolean hasParent() { + return getRequiredPersistentEntity().getParentIdProperty() != null; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getParentId() + */ + @Override + public Object getParentId() { + + ElasticsearchPersistentProperty parentProperty = getRequiredPersistentEntity().getParentIdProperty(); + return propertyAccessor.getProperty(parentProperty); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#populateIdIfNecessary(java.lang.Object) + */ + @Nullable + @Override + public T populateIdIfNecessary(@Nullable Object id) { + + if (id == null) { + return null; + } + + T bean = propertyAccessor.getBean(); + ElasticsearchPersistentProperty idProperty = entity.getIdProperty(); + + if (idProperty == null) { + return bean; + } + + if (identifierAccessor.getIdentifier() != null) { + return bean; + } + + propertyAccessor.setProperty(idProperty, id); + + return propertyAccessor.getBean(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.MappedEntity#getVersion() + */ + @Override + @Nullable + public Number getVersion() { + + ElasticsearchPersistentProperty versionProperty = entity.getRequiredVersionProperty(); + + return propertyAccessor.getProperty(versionProperty, Number.class); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty() + */ + @Override + public T initializeVersionProperty() { + + if (!entity.hasVersionProperty()) { + return propertyAccessor.getBean(); + } + + ElasticsearchPersistentProperty versionProperty = entity.getRequiredVersionProperty(); + + propertyAccessor.setProperty(versionProperty, versionProperty.getType().isPrimitive() ? 1 : 0); + + return propertyAccessor.getBean(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion() + */ + @Override + public T incrementVersion() { + + ElasticsearchPersistentProperty versionProperty = entity.getRequiredVersionProperty(); + Number version = getVersion(); + Number nextVersion = version == null ? 0 : version.longValue() + 1; + + propertyAccessor.setProperty(versionProperty, nextVersion); + + return propertyAccessor.getBean(); + } + } + + /** + * Value object encapsulating index name and index type. + */ + @RequiredArgsConstructor(access = AccessLevel.PROTECTED) + @Getter + static class IndexCoordinates { + + private final String indexName; + private final String typeName; + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index dffa36edb..27c0b1e9f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core; import static org.elasticsearch.index.VersionType.*; +import lombok.NonNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -24,7 +25,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.function.Supplier; import org.elasticsearch.action.delete.DeleteRequest; @@ -48,37 +48,43 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; +import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity; +import org.springframework.data.elasticsearch.core.EntityOperations.Entity; +import org.springframework.data.elasticsearch.core.EntityOperations.IndexCoordinates; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; -import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.StringQuery; -import org.springframework.data.mapping.IdentifierAccessor; -import org.springframework.data.mapping.PersistentProperty; -import org.springframework.data.mapping.PersistentPropertyAccessor; -import org.springframework.data.util.ClassTypeInformation; +import org.springframework.data.mapping.context.MappingContext; import org.springframework.http.HttpStatus; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; /** * @author Christoph Strobl + * @author Mark Paluch * @since 4.0 */ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations { + private static final Logger QUERY_LOGGER = LoggerFactory + .getLogger("org.springframework.data.elasticsearch.core.QUERY"); + private final ReactiveElasticsearchClient client; private final ElasticsearchConverter converter; + private final @NonNull MappingContext, ElasticsearchPersistentProperty> mappingContext; private final ResultsMapper resultMapper; private final ElasticsearchExceptionTranslator exceptionTranslator; + private final EntityOperations operations; private @Nullable RefreshPolicy refreshPolicy = RefreshPolicy.IMMEDIATE; private @Nullable IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); @@ -91,8 +97,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera this.client = client; this.converter = converter; + this.mappingContext = converter.getMappingContext(); this.resultMapper = new DefaultResultMapper(converter.getMappingContext()); this.exceptionTranslator = new ElasticsearchExceptionTranslator(); + this.operations = new EntityOperations(this.mappingContext); } /* @@ -113,26 +121,26 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(entity, "Entity must not be null!"); - AdaptableEntity adaptableEntity = ConverterAwareAdaptableEntity.of(entity, converter); + AdaptibleEntity adaptableEntity = operations.forEntity(entity, converter.getConversionService()); return doIndex(entity, adaptableEntity, index, type) // .map(it -> { - return adaptableEntity.updateIdIfNecessary(it.getId()); + return adaptableEntity.populateIdIfNecessary(it.getId()); }); } - private Mono doIndex(Object value, AdaptableEntity entity, @Nullable String index, + private Mono doIndex(Object value, AdaptibleEntity entity, @Nullable String index, @Nullable String type) { return Mono.defer(() -> { Object id = entity.getId(); - String indexToUse = indexName(index, entity); - String typeToUse = typeName(type, entity); + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); - IndexRequest request = id != null ? new IndexRequest(indexToUse, typeToUse, id.toString()) - : new IndexRequest(indexToUse, typeToUse); + IndexRequest request = id != null + ? new IndexRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id.toString()) + : new IndexRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName()); try { request.source(resultMapper.getEntityMapper().mapToString(value), Requests.INDEX_CONTENT_TYPE); @@ -140,7 +148,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera throw new RuntimeException(e); } - if (entity.isVersioned()) { + if (entity.isVersionedEntity()) { Object version = entity.getVersion(); if (version != null) { @@ -149,9 +157,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera } } - if (entity.getPersistentEntity().getParentIdProperty() != null) { + if (entity.hasParent()) { - Object parentId = entity.getPropertyValue(entity.getPersistentEntity().getParentIdProperty()); + Object parentId = entity.getParentId(); if (parentId != null) { request.parent(parentId.toString()); } @@ -171,19 +179,18 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(id, "Id must not be null!"); - return doFindById(id, BasicElasticsearchEntity.of(entityType, converter), index, type) + return doFindById(id, getPersistentEntity(entityType), index, type) .map(it -> resultMapper.mapEntity(it, entityType)); } - private Mono doFindById(String id, ElasticsearchEntity entity, @Nullable String index, + private Mono doFindById(String id, ElasticsearchPersistentEntity entity, @Nullable String index, @Nullable String type) { return Mono.defer(() -> { - String indexToUse = indexName(index, entity); - String typeToUse = typeName(type, entity); + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); - return doFindById(new GetRequest(indexToUse, typeToUse, id)); + return doFindById(new GetRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id)); }); } @@ -196,18 +203,17 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(id, "Id must not be null!"); - return doExists(id, BasicElasticsearchEntity.of(entityType, converter), index, type); + return doExists(id, getPersistentEntity(entityType), index, type); } - private Mono doExists(String id, ElasticsearchEntity entity, @Nullable String index, + private Mono doExists(String id, ElasticsearchPersistentEntity entity, @Nullable String index, @Nullable String type) { return Mono.defer(() -> { - String indexToUse = indexName(index, entity); - String typeToUse = typeName(type, entity); + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); - return doExists(new GetRequest(indexToUse, typeToUse, id)); + return doExists(new GetRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id)); }); } @@ -219,24 +225,25 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera public Flux find(Query query, Class entityType, @Nullable String index, @Nullable String type, Class resultType) { - return doFind(query, BasicElasticsearchEntity.of(entityType, converter), index, type) + return doFind(query, getPersistentEntity(entityType), index, type) .map(it -> resultMapper.mapEntity(it, resultType)); } - private Flux doFind(Query query, ElasticsearchEntity entity, @Nullable String index, + private Flux doFind(Query query, ElasticsearchPersistentEntity entity, @Nullable String index, @Nullable String type) { return Flux.defer(() -> { - SearchRequest request = new SearchRequest(indices(query, () -> indexName(index, entity))); - request.types(indexTypes(query, () -> typeName(type, entity))); + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); + SearchRequest request = new SearchRequest(indices(query, indexCoordinates::getIndexName)); + request.types(indexTypes(query, indexCoordinates::getTypeName)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(mappedQuery(query, entity.getPersistentEntity())); + searchSourceBuilder.query(mappedQuery(query, entity)); // TODO: request.source().postFilter(elasticsearchFilter); -- filter query - searchSourceBuilder.version(entity.isVersioned()); // This has been true by default before + searchSourceBuilder.version(entity.hasVersionProperty()); // This has been true by default before searchSourceBuilder.trackScores(query.getTrackScores()); if (query.getSourceFilter() != null) { @@ -258,7 +265,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera request.indicesOptions(query.getIndicesOptions()); } - sort(query, entity.getPersistentEntity()).forEach(searchSourceBuilder::sort); + sort(query, entity).forEach(searchSourceBuilder::sort); if (query.getMinScore() > 0) { searchSourceBuilder.minScore(query.getMinScore()); @@ -290,10 +297,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera @Override public Mono delete(Object entity, @Nullable String index, @Nullable String type) { - AdaptableEntity elasticsearchEntity = ConverterAwareAdaptableEntity.of(entity, converter); + Entity elasticsearchEntity = operations.forEntity(entity); return Mono.defer(() -> doDeleteById(entity, ObjectUtils.nullSafeToString(elasticsearchEntity.getId()), - elasticsearchEntity, index, type)); + elasticsearchEntity.getPersistentEntity(), index, type)); } /* @@ -305,19 +312,19 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(id, "Id must not be null!"); - return doDeleteById(null, id, BasicElasticsearchEntity.of(entityType, converter), index, type); + return doDeleteById(null, id, getPersistentEntity(entityType), index, type); } - private Mono doDeleteById(@Nullable Object source, String id, ElasticsearchEntity entity, + private Mono doDeleteById(@Nullable Object source, String id, ElasticsearchPersistentEntity entity, @Nullable String index, @Nullable String type) { return Mono.defer(() -> { - String indexToUse = indexName(index, entity); - String typeToUse = typeName(type, entity); + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); - return doDelete(prepareDeleteRequest(source, new DeleteRequest(indexToUse, typeToUse, id))); + return doDelete(prepareDeleteRequest(source, + new DeleteRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id))); }); } @@ -330,18 +337,20 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(query, "Query must not be null!"); - return doDeleteBy(query, BasicElasticsearchEntity.of(entityType, converter), index, type) - .map(BulkByScrollResponse::getDeleted).publishNext(); + return doDeleteBy(query, getPersistentEntity(entityType), index, type).map(BulkByScrollResponse::getDeleted) + .publishNext(); } - private Flux doDeleteBy(Query query, ElasticsearchEntity entity, @Nullable String index, - @Nullable String type) { + private Flux doDeleteBy(Query query, ElasticsearchPersistentEntity entity, + @Nullable String index, @Nullable String type) { return Flux.defer(() -> { - DeleteByQueryRequest request = new DeleteByQueryRequest(indices(query, () -> indexName(index, entity))); - request.types(indexTypes(query, () -> typeName(type, entity))); - request.setQuery(mappedQuery(query, entity.getPersistentEntity())); + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); + + DeleteByQueryRequest request = new DeleteByQueryRequest(indices(query, indexCoordinates::getIndexName)); + request.types(indexTypes(query, indexCoordinates::getTypeName)); + request.setQuery(mappedQuery(query, entity)); return doDeleteBy(prepareDeleteByRequest(request)); }); @@ -494,6 +503,11 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera * @return a {@link Flux} emitting the result of the operation. */ protected Flux doFind(SearchRequest request) { + + if (QUERY_LOGGER.isDebugEnabled()) { + QUERY_LOGGER.debug("Executing doFind: {}", request); + } + return Flux.from(execute(client -> client.search(request))); } @@ -529,14 +543,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera // private helpers - private static String indexName(@Nullable String index, ElasticsearchEntity entity) { - return StringUtils.isEmpty(index) ? entity.getIndexName() : index; - } - - private static String typeName(@Nullable String type, ElasticsearchEntity entity) { - return StringUtils.isEmpty(type) ? entity.getTypeName() : type; - } - private static String[] indices(Query query, Supplier index) { if (query.getIndices().isEmpty()) { @@ -601,12 +607,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera private QueryBuilder mappedFilterQuery(CriteriaQuery query, ElasticsearchPersistentEntity entity) { - // TODO: this is actually strange in the RestTemplate:L378 - need to chack + // TODO: this is actually strange in the RestTemplate:L378 - need to check return null; } - private ElasticsearchPersistentEntity lookupPersistentEntity(Class type) { - return converter.getMappingContext().getPersistentEntity(type); + @Nullable + private ElasticsearchPersistentEntity getPersistentEntity(@Nullable Class type) { + return type != null ? mappingContext.getPersistentEntity(type) : null; } private Throwable translateException(Throwable throwable) { @@ -617,160 +624,4 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return potentiallyTranslatedException != null ? potentiallyTranslatedException : throwable; } - - /** - * @param - * @author Christoph Strobl - * @since 4.0 - */ - protected interface ElasticsearchEntity { - - default boolean isIdentifiable() { - return getPersistentEntity().hasVersionProperty(); - } - - default boolean isVersioned() { - return getPersistentEntity().hasVersionProperty(); - } - - default ElasticsearchPersistentProperty getIdProperty() { - return getPersistentEntity().getIdProperty(); - } - - default String getIndexName() { - return getPersistentEntity().getIndexName(); - } - - default String getTypeName() { - return getPersistentEntity().getIndexType(); - } - - ElasticsearchPersistentEntity getPersistentEntity(); - } - - protected interface AdaptableEntity extends ElasticsearchEntity { - - PersistentPropertyAccessor getPropertyAccessor(); - - IdentifierAccessor getIdentifierAccessor(); - - @Nullable - default Object getId() { - return getIdentifierAccessor().getIdentifier(); - } - - default Object getVersion() { - return getPropertyAccessor().getProperty(getPersistentEntity().getRequiredVersionProperty()); - } - - @Nullable - default Object getPropertyValue(PersistentProperty property) { - return getPropertyAccessor().getProperty(property); - } - - default T getBean() { - return getPropertyAccessor().getBean(); - } - - default T updateIdIfNecessary(Object id) { - - if (id == null || !getPersistentEntity().hasIdProperty() || getId() != null) { - return getPropertyAccessor().getBean(); - } - - return updatePropertyValue(getPersistentEntity().getIdProperty(), id); - } - - default T updatePropertyValue(PersistentProperty property, @Nullable Object value) { - - getPropertyAccessor().setProperty(property, value); - return getPropertyAccessor().getBean(); - } - - } - - protected static class BasicElasticsearchEntity implements ElasticsearchEntity { - - final ElasticsearchPersistentEntity entity; - - BasicElasticsearchEntity(ElasticsearchPersistentEntity entity) { - this.entity = entity; - } - - static BasicElasticsearchEntity of(T bean, ElasticsearchConverter converter) { - return new BasicElasticsearchEntity<>(converter.getMappingContext().getRequiredPersistentEntity(bean.getClass())); - } - - static BasicElasticsearchEntity of(Class type, ElasticsearchConverter converter) { - - if (Object.class.equals(type)) { - return new BasicElasticsearchEntity<>(new SimpleElasticsearchPersistentEntity<>(ClassTypeInformation.OBJECT)); - } - - return new BasicElasticsearchEntity<>(converter.getMappingContext().getRequiredPersistentEntity(type)); - } - - @Override - public ElasticsearchPersistentEntity getPersistentEntity() { - return entity; - } - } - - protected static class ConverterAwareAdaptableEntity implements AdaptableEntity { - - final ElasticsearchPersistentEntity entity; - final PersistentPropertyAccessor propertyAccessor; - final IdentifierAccessor idAccessor; - final ElasticsearchConverter converter; - - ConverterAwareAdaptableEntity(ElasticsearchPersistentEntity entity, IdentifierAccessor idAccessor, - PersistentPropertyAccessor propertyAccessor, ElasticsearchConverter converter) { - - this.entity = entity; - this.propertyAccessor = propertyAccessor; - this.idAccessor = idAccessor; - this.converter = converter; - } - - static ConverterAwareAdaptableEntity of(T bean, ElasticsearchConverter converter) { - - ElasticsearchPersistentEntity entity = converter.getMappingContext() - .getRequiredPersistentEntity(bean.getClass()); - IdentifierAccessor idAccessor = entity.getIdentifierAccessor(bean); - PersistentPropertyAccessor propertyAccessor = entity.getPropertyAccessor(bean); - - return new ConverterAwareAdaptableEntity<>(entity, idAccessor, propertyAccessor, converter); - } - - @Override - public PersistentPropertyAccessor getPropertyAccessor() { - return propertyAccessor; - } - - @Override - public IdentifierAccessor getIdentifierAccessor() { - - if (entity.getTypeInformation().isMap()) { - - return () -> { - - Object id = idAccessor.getIdentifier(); - if (id != null) { - return id; - } - - Map source = (Map) propertyAccessor.getBean(); - return source.get("id"); - }; - } - - return idAccessor; - } - - @Override - public ElasticsearchPersistentEntity getPersistentEntity() { - return entity; - } - - } } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java index ec8d0be25..58ef16304 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; import java.net.InetSocketAddress; +import java.time.Duration; import javax.net.ssl.SSLContext; @@ -40,7 +41,7 @@ public class ClientConfigurationUnitTests { assertThat(clientConfiguration.getEndpoints()).containsOnly(InetSocketAddress.createUnresolved("localhost", 9200)); } - @Test // DATAES-488 + @Test // DATAES-488, DATAES-504 public void shouldCreateCustomizedConfiguration() { HttpHeaders headers = new HttpHeaders(); @@ -50,15 +51,17 @@ public class ClientConfigurationUnitTests { .connectedTo("foo", "bar") // .usingSsl() // .withDefaultHeaders(headers) // - .build(); + .withConnectTimeout(Duration.ofDays(1)).withSocketTimeout(Duration.ofDays(2)).build(); assertThat(clientConfiguration.getEndpoints()).containsOnly(InetSocketAddress.createUnresolved("foo", 9200), InetSocketAddress.createUnresolved("bar", 9200)); assertThat(clientConfiguration.useSsl()).isTrue(); assertThat(clientConfiguration.getDefaultHeaders().get("foo")).containsOnly("bar"); + assertThat(clientConfiguration.getConnectTimeout()).isEqualTo(Duration.ofDays(1)); + assertThat(clientConfiguration.getSocketTimeout()).isEqualTo(Duration.ofDays(2)); } - @Test // DATAES-488 + @Test // DATAES-488, DATAES-504 public void shouldCreateSslConfiguration() { SSLContext sslContext = mock(SSLContext.class); @@ -72,5 +75,7 @@ public class ClientConfigurationUnitTests { InetSocketAddress.createUnresolved("bar", 9200)); assertThat(clientConfiguration.useSsl()).isTrue(); assertThat(clientConfiguration.getSslContext()).contains(sslContext); + assertThat(clientConfiguration.getConnectTimeout()).isEqualTo(Duration.ofSeconds(10)); + assertThat(clientConfiguration.getSocketTimeout()).isEqualTo(Duration.ofSeconds(5)); } } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java index 82a6d6f0f..c70b1d261 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java @@ -214,11 +214,13 @@ public class ReactiveMockClientTestsUtils { Mockito.when(headersUriSpec.uri(any(String.class))).thenReturn(headersUriSpec); Mockito.when(headersUriSpec.uri(any(), any(Map.class))).thenReturn(headersUriSpec); Mockito.when(headersUriSpec.headers(any(Consumer.class))).thenReturn(headersUriSpec); + Mockito.when(headersUriSpec.attribute(anyString(), anyString())).thenReturn(headersUriSpec); RequestBodyUriSpec bodyUriSpec = mock(RequestBodyUriSpec.class); Mockito.when(webClient.method(any())).thenReturn(bodyUriSpec); Mockito.when(bodyUriSpec.body(any())).thenReturn(headersUriSpec); Mockito.when(bodyUriSpec.uri(any(), any(Map.class))).thenReturn(bodyUriSpec); + Mockito.when(bodyUriSpec.attribute(anyString(), anyString())).thenReturn(bodyUriSpec); Mockito.when(bodyUriSpec.headers(any(Consumer.class))).thenReturn(bodyUriSpec); ClientResponse response = mock(ClientResponse.class); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index ea06af16c..699f1c7fe 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -21,25 +21,26 @@ import static org.elasticsearch.index.query.QueryBuilders.*; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import org.junit.Rule; -import org.springframework.data.elasticsearch.ElasticsearchVersion; -import org.springframework.data.elasticsearch.ElasticsearchVersionRule; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.net.ConnectException; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.data.annotation.Id; +import org.springframework.data.elasticsearch.ElasticsearchVersion; +import org.springframework.data.elasticsearch.ElasticsearchVersionRule; import org.springframework.data.elasticsearch.TestUtils; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.core.query.Criteria; @@ -53,7 +54,10 @@ import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.StringUtils; /** + * Integration tests for {@link ReactiveElasticsearchTemplate}. + * * @author Christoph Strobl + * @author Mark Paluch * @currentRead Golden Fool - Robin Hobb */ @RunWith(SpringRunner.class) @@ -140,7 +144,8 @@ public class ReactiveElasticsearchTemplateTests { SampleEntity sampleEntity = randomEntity("in another index"); - template.save(sampleEntity, ALTERNATE_INDEX).as(StepVerifier::create)// + template.save(sampleEntity, ALTERNATE_INDEX) // + .as(StepVerifier::create)// .expectNextCount(1)// .verifyComplete(); @@ -154,12 +159,13 @@ public class ReactiveElasticsearchTemplateTests { @Test // DATAES-504 public void insertShouldAcceptPlainMapStructureAsSource() { - Map map = Collections.singletonMap("foo", "bar"); + Map map = new LinkedHashMap<>(Collections.singletonMap("foo", "bar")); template.save(map, ALTERNATE_INDEX, "singleton-map") // .as(StepVerifier::create) // - .expectNextCount(1) // - .verifyComplete(); + .consumeNextWith(actual -> { + assertThat(map).containsKey("id"); + }).verifyComplete(); } @Test(expected = IllegalArgumentException.class) // DATAES-504 @@ -434,7 +440,7 @@ public class ReactiveElasticsearchTemplateTests { @Test // DATAES-504 @ElasticsearchVersion(asOf = "6.5.0") - public void deleteByQueryShouldReturnZeroIfNothingDeleted() { + public void deleteByQueryShouldReturnZeroIfNothingDeleted() throws Exception { index(randomEntity("test message")); diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 5dc63878c..326e48951 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -6,8 +6,11 @@ + + + - \ No newline at end of file + diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 0281205b7..ace5ad382 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -8,6 +8,7 @@ + diff --git a/src/test/resources/test-home-dir/modules/reindex/plugin-descriptor.properties b/src/test/resources/test-home-dir/modules/reindex/plugin-descriptor.properties new file mode 100644 index 000000000..063327da1 --- /dev/null +++ b/src/test/resources/test-home-dir/modules/reindex/plugin-descriptor.properties @@ -0,0 +1,45 @@ +# Elasticsearch plugin descriptor file +# This file must exist as 'plugin-descriptor.properties' inside a plugin. +# +### example plugin for "foo" +# +# foo.zip <-- zip file for the plugin, with this structure: +# |____ .jar <-- classes, resources, dependencies +# |____ .jar <-- any number of jars +# |____ plugin-descriptor.properties <-- example contents below: +# +# classname=foo.bar.BazPlugin +# description=My cool plugin +# version=6.0 +# elasticsearch.version=6.0 +# java.version=1.8 +# +### mandatory elements for all plugins: +# +# 'description': simple summary of the plugin +description=The Reindex module adds APIs to reindex from one index to another or update documents in place. +# +# 'version': plugin's version +version=6.5.0 +# +# 'name': the plugin name +name=reindex +# +# 'classname': the name of the class to load, fully-qualified. +classname=org.elasticsearch.index.reindex.ReindexPlugin +# +# 'java.version': version of java the code is built against +# use the system property java.specification.version +# version string must be a sequence of nonnegative decimal integers +# separated by "."'s and may have leading zeros +java.version=1.8 +# +# 'elasticsearch.version': version of elasticsearch compiled against +elasticsearch.version=6.5.0 +### optional elements for plugins: +# +# 'extended.plugins': other plugins this plugin extends through SPI +extended.plugins= +# +# 'has.native.controller': whether or not the plugin has a native controller +has.native.controller=false diff --git a/src/test/resources/test-home-dir/modules/reindex/plugin-security.policy b/src/test/resources/test-home-dir/modules/reindex/plugin-security.policy new file mode 100644 index 000000000..a2482eaf4 --- /dev/null +++ b/src/test/resources/test-home-dir/modules/reindex/plugin-security.policy @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +grant { + // reindex opens socket connections using the rest client + permission java.net.SocketPermission "*", "connect"; +}; + +grant codeBase "${codebase.elasticsearch-rest-client}" { + // rest client uses system properties which gets the default proxy + permission java.net.NetPermission "getProxySelector"; +}; + +grant codeBase "${codebase.httpasyncclient}" { + // rest client uses system properties which gets the default proxy + permission java.net.NetPermission "getProxySelector"; +};