diff --git a/pom.xml b/pom.xml index fc3e71bde..372997959 100644 --- a/pom.xml +++ b/pom.xml @@ -1,285 +1,293 @@ - - 4.0.0 + + + 4.0.0 org.springframework.data spring-data-elasticsearch 3.2.0.BUILD-SNAPSHOT - - org.springframework.data.build - spring-data-parent - 2.2.0.BUILD-SNAPSHOT - + + org.springframework.data.build + spring-data-parent + 2.2.0.BUILD-SNAPSHOT + - Spring Data Elasticsearch - Spring Data Implementation for Elasticsearch - https://github.com/spring-projects/spring-data-elasticsearch + Spring Data Elasticsearch + Spring Data Implementation for Elasticsearch + https://github.com/spring-projects/spring-data-elasticsearch - - 3.2.1 - 2.6 - 6.5.0 - 2.9.1 - 2.2.0.BUILD-SNAPSHOT - spring.data.elasticsearch - + + 2.6 + 6.5.0 + 2.9.1 + 2.2.0.BUILD-SNAPSHOT + spring.data.elasticsearch + - + + + biomedcentral + BioMed Central Development Team + +0 + + + cstrobl + Christoph Strobl + cstrobl at pivotal.io + Pivotal + http://www.pivotal.io + + Developer + + +1 + + + mpaluch + Mark Paluch + mpaluch at pivotal.io + Pivotal + http://www.pivotal.io + + Developer + + +1 + + - - - org.springframework - spring-context - - - commons-logging - commons-logging - - - + + https://github.com/spring-projects/spring-data-elasticsearch + scm:git:git://github.com/spring-projects/spring-data-elasticsearch.git + scm:git:ssh://git@github.com/spring-projects/spring-data-elasticsearch.git + + - - org.springframework - spring-tx - + + Bamboo + https://build.spring.io/browse/SPRINGDATAES + - - - org.springframework.data - spring-data-commons - ${springdata.commons} - + + JIRA + https://jira.spring.io/browse/DATAES + - - org.springframework - spring-webflux - 5.1.0.RELEASE - + - - io.projectreactor.netty - reactor-netty - 0.8.0.RELEASE - - - - io.projectreactor - reactor-test - 3.2.0.RELEASE - - - - - commons-lang - commons-lang - ${commonslang} - test - - - - - joda-time - joda-time - ${jodatime} - - - - - org.elasticsearch.client - transport - ${elasticsearch} - - - commons-logging - commons-logging - - - - - - org.elasticsearch.client - elasticsearch-rest-high-level-client - ${elasticsearch} - - - commons-logging - commons-logging - - + + + org.springframework + spring-context + + + commons-logging + commons-logging + + - - org.slf4j - log4j-over-slf4j - ${slf4j} - test - + + org.springframework + spring-tx + - - org.apache.logging.log4j - log4j-core - ${log4j} - test - + + + org.springframework.data + spring-data-commons + ${springdata.commons} + - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - + + + org.springframework + spring-webflux + true + - - - javax.enterprise - cdi-api - ${cdi} - provided - true - + + io.projectreactor.netty + reactor-netty + true + - - - org.springframework - spring-test - test - - - ch.qos.logback - logback-classic - - - + + io.projectreactor + reactor-test + test + - - org.apache.openwebbeans.test - cditest-owb - 1.2.8 - test - - - org.apache.geronimo.specs - geronimo-jcdi_1.0_spec - - - org.apache.geronimo.specs - geronimo-atinject_1.0_spec - - - + + + commons-lang + commons-lang + ${commonslang} + test + - - - org.apache.xbean - xbean-asm5-shaded - 4.5 - test - + + + joda-time + joda-time + ${jodatime} + - - javax.servlet - servlet-api - 3.0-alpha-1 - test - + + + org.elasticsearch.client + transport + ${elasticsearch} + + + commons-logging + commons-logging + + + - - - org.elasticsearch.plugin - transport-netty4-client - ${elasticsearch} - - + + + org.elasticsearch.plugin + transport-netty4-client + ${elasticsearch} + - + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${elasticsearch} + + + commons-logging + commons-logging + + + - - - - org.apache.maven.plugins - maven-assembly-plugin - - - org.codehaus.mojo - wagon-maven-plugin - - - org.asciidoctor - asciidoctor-maven-plugin - - - - org.apache.maven.plugins - maven-surefire-plugin - - true - false - - **/*Tests.java - - - - - + + org.slf4j + log4j-over-slf4j + ${slf4j} + test + - - - release - - - - org.jfrog.buildinfo - artifactory-maven-plugin - false - - - - - + + org.apache.logging.log4j + log4j-core + ${log4j} + test + - - - biomedcentral - BioMed Central Development Team - +0 - - + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + - - - spring-libs-snapshot - https://repo.spring.io/libs-snapshot - - + + + javax.enterprise + cdi-api + ${cdi} + provided + true + - - - spring-plugins-release - https://repo.spring.io/plugins-release - - + + + org.springframework + spring-test + test + + + ch.qos.logback + logback-classic + + + - - https://github.com/spring-projects/spring-data-elasticsearch - scm:git:git://github.com/spring-projects/spring-data-elasticsearch.git - scm:git:ssh://git@github.com/spring-projects/spring-data-elasticsearch.git - - + + org.apache.openwebbeans.test + cditest-owb + 1.2.8 + test + + + org.apache.geronimo.specs + geronimo-jcdi_1.0_spec + + + org.apache.geronimo.specs + geronimo-atinject_1.0_spec + + + - - Bamboo - https://build.spring.io/browse/SPRINGDATAES - + + + org.apache.xbean + xbean-asm5-shaded + 4.5 + test + - - JIRA - https://jira.spring.io/browse/DATAES - + + javax.servlet + servlet-api + 3.0-alpha-1 + test + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + org.codehaus.mojo + wagon-maven-plugin + + + org.asciidoctor + asciidoctor-maven-plugin + + + + org.apache.maven.plugins + maven-surefire-plugin + + true + false + + **/*Tests.java + + + + + + + + + spring-libs-snapshot + https://repo.spring.io/libs-snapshot + + + + + + spring-plugins-release + https://repo.spring.io/plugins-release + + diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java new file mode 100644 index 000000000..8ec42dcd7 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java @@ -0,0 +1,189 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Optional; + +import javax.net.ssl.SSLContext; + +import org.springframework.http.HttpHeaders; + +/** + * Configuration interface exposing common client configuration properties for Elasticsearch clients. + * + * @author Mark Paluch + * @since 4.0 + */ +public interface ClientConfiguration { + + /** + * Creates a new {@link ClientConfigurationBuilder} instance. + * + * @return a new {@link ClientConfigurationBuilder} instance. + */ + static ClientConfigurationBuilderWithRequiredEndpoint builder() { + return new ClientConfigurationBuilder(); + } + + /** + * Creates a new {@link ClientConfiguration} instance configured to a single host given {@code hostAndPort}. + *

+ * For example given the endpoint http://localhost:9200 + * + *

+	 * ClientConfiguration configuration = ClientConfiguration.create("localhost:9200");
+	 * 
+ * + * @return a new {@link ClientConfigurationBuilder} instance. + */ + static ClientConfiguration create(String hostAndPort) { + return new ClientConfigurationBuilder().connectedTo(hostAndPort).build(); + } + + /** + * Creates a new {@link ClientConfiguration} instance configured to a single host given {@link InetSocketAddress}. + *

+ * For example given the endpoint http://localhost:9200 + * + *

+	 * ClientConfiguration configuration = ClientConfiguration
+	 * 		.create(InetSocketAddress.createUnresolved("localhost", 9200));
+	 * 
+ * + * @return a new {@link ClientConfigurationBuilder} instance. + */ + static ClientConfiguration create(InetSocketAddress socketAddress) { + return new ClientConfigurationBuilder().connectedTo(socketAddress).build(); + } + + /** + * Returns the configured endpoints. + * + * @return the configured endpoints. + */ + List getEndpoints(); + + /** + * Obtain the {@link HttpHeaders} to be used by default. + * + * @return the {@link HttpHeaders} to be used by default. + */ + HttpHeaders getDefaultHeaders(); + + /** + * Returns {@literal true} when the client should use SSL. + * + * @return {@literal true} when the client should use SSL. + */ + boolean useSsl(); + + /** + * Returns the {@link SSLContext} to use. Can be {@link Optional#empty()} if unconfigured. + * + * @return the {@link SSLContext} to use. Can be {@link Optional#empty()} if unconfigured. + */ + Optional getSslContext(); + + /** + * @author Christoph Strobl + */ + interface ClientConfigurationBuilderWithRequiredEndpoint { + + /** + * @param hostAndPort the {@literal host} and {@literal port} formatted as String {@literal host:port}. + * @return the {@link MaybeSecureClientConfigurationBuilder}. + */ + default MaybeSecureClientConfigurationBuilder connectedTo(String hostAndPort) { + return connectedTo(new String[] { hostAndPort }); + } + + /** + * @param hostAndPorts the list of {@literal host} and {@literal port} combinations formatted as String + * {@literal host:port}. + * @return the {@link MaybeSecureClientConfigurationBuilder}. + */ + MaybeSecureClientConfigurationBuilder connectedTo(String... hostAndPorts); + + /** + * @param endpoint the {@literal host} and {@literal port}. + * @return the {@link MaybeSecureClientConfigurationBuilder}. + */ + default MaybeSecureClientConfigurationBuilder connectedTo(InetSocketAddress endpoint) { + return connectedTo(new InetSocketAddress[] { endpoint }); + } + + /** + * @param endpoints the list of {@literal host} and {@literal port} combinations. + * @return the {@link MaybeSecureClientConfigurationBuilder}. + */ + MaybeSecureClientConfigurationBuilder connectedTo(InetSocketAddress... endpoints); + + /** + * Obviously for testing. + * + * @return the {@link MaybeSecureClientConfigurationBuilder}. + */ + default MaybeSecureClientConfigurationBuilder connectedToLocalhost() { + return connectedTo("localhost:9200"); + } + } + + /** + * @author Christoph Strobl + */ + interface MaybeSecureClientConfigurationBuilder extends ClientConfigurationBuilderWithOptionalDefaultHeaders { + + /** + * Connect via {@literal https}
+ * NOTE You need to leave out the protocol in + * {@link ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(String)}. + * + * @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders}. + */ + ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl(); + + /** + * Connect via {@literal https} using the given {@link SSLContext}.
+ * NOTE You need to leave out the protocol in + * {@link ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(String)}. + * + * @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders}. + */ + ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl(SSLContext sslContext); + } + + /** + * @author Christoph Strobl + * @author Mark Paluch + */ + interface ClientConfigurationBuilderWithOptionalDefaultHeaders { + + /** + * @param defaultHeaders must not be {@literal null}. + * @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders} + */ + ClientConfigurationBuilderWithOptionalDefaultHeaders withDefaultHeaders(HttpHeaders defaultHeaders); + + /** + * Build the {@link ClientConfiguration} object. + * + * @return the {@link ClientConfiguration} object. + */ + ClientConfiguration build(); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java new file mode 100644 index 000000000..42e2d57de --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java @@ -0,0 +1,125 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; + +import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders; +import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint; +import org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder; +import org.springframework.http.HttpHeaders; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Default builder implementation for {@link ClientConfiguration}. + * + * @author Christoph Strobl + * @author Mark Paluch + * @since 4.0 + */ +class ClientConfigurationBuilder + implements ClientConfigurationBuilderWithRequiredEndpoint, MaybeSecureClientConfigurationBuilder { + + private List hosts = new ArrayList<>(); + private HttpHeaders headers = HttpHeaders.EMPTY; + private boolean useSsl; + private @Nullable SSLContext sslContext; + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(java.lang.String[]) + */ + @Override + public MaybeSecureClientConfigurationBuilder connectedTo(String... hostAndPorts) { + + Assert.notEmpty(hostAndPorts, "At least one host is required"); + + this.hosts.addAll(Arrays.stream(hostAndPorts).map(ClientConfigurationBuilder::parse).collect(Collectors.toList())); + return this; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(java.net.InetSocketAddress[]) + */ + @Override + public MaybeSecureClientConfigurationBuilder connectedTo(InetSocketAddress... endpoints) { + + Assert.notEmpty(endpoints, "At least one endpoint is required"); + + this.hosts.addAll(Arrays.asList(endpoints)); + + return this; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder#usingSsl() + */ + @Override + public ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl() { + + this.useSsl = true; + return this; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder#usingSsl(javax.net.ssl.SSLContext) + */ + @Override + public ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl(SSLContext sslContext) { + + Assert.notNull(sslContext, "SSL Context must not be null"); + + this.useSsl = true; + this.sslContext = sslContext; + return this; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders#withDefaultHeaders(org.springframework.http.HttpHeaders) + */ + @Override + public ClientConfigurationBuilderWithOptionalDefaultHeaders withDefaultHeaders(HttpHeaders defaultHeaders) { + + Assert.notNull(defaultHeaders, "Default HTTP headers must not be null"); + + this.headers = defaultHeaders; + return this; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders#build() + */ + @Override + public ClientConfiguration build() { + return new DefaultClientConfiguration(this.hosts, this.headers, this.useSsl, this.sslContext); + } + + private static InetSocketAddress parse(String hostAndPort) { + return InetSocketAddressParser.parse(hostAndPort, ElasticsearchHost.DEFAULT_PORT); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java new file mode 100644 index 000000000..396d325b5 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java @@ -0,0 +1,86 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import javax.net.ssl.SSLContext; + +import org.springframework.http.HttpHeaders; +import org.springframework.lang.Nullable; + +/** + * Default {@link ClientConfiguration} implementation. + * + * @author Mark Paluch + * @since 4.0 + */ +class DefaultClientConfiguration implements ClientConfiguration { + + private final List hosts; + private final HttpHeaders headers; + private final boolean useSsl; + private final @Nullable SSLContext sslContext; + + DefaultClientConfiguration(List hosts, HttpHeaders headers, boolean useSsl, + @Nullable SSLContext sslContext) { + + this.hosts = Collections.unmodifiableList(new ArrayList<>(hosts)); + this.headers = new HttpHeaders(headers); + this.useSsl = useSsl; + this.sslContext = sslContext; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getEndpoints() + */ + @Override + public List getEndpoints() { + return this.hosts; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getDefaultHeaders() + */ + @Override + public HttpHeaders getDefaultHeaders() { + return this.headers; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration#useSsl() + */ + @Override + public boolean useSsl() { + return this.useSsl; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getSslContext() + */ + @Override + public Optional getSslContext() { + return Optional.ofNullable(this.sslContext); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ElasticsearchClients.java b/src/main/java/org/springframework/data/elasticsearch/client/ElasticsearchClients.java deleted file mode 100644 index 95bc0be72..000000000 --- a/src/main/java/org/springframework/data/elasticsearch/client/ElasticsearchClients.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.data.elasticsearch.client; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -import org.apache.http.Header; -import org.apache.http.HttpHost; -import org.apache.http.message.BasicHeader; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient; -import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; -import org.springframework.http.HttpHeaders; -import org.springframework.util.Assert; - -/** - * Utility class for common access to Elasticsearch clients. {@link ElasticsearchClients} consolidates set up routines - * for the various drivers into a single place. - * - * @author Christoph Strobl - * @since 4.0 - */ -public final class ElasticsearchClients { - - private ElasticsearchClients() {} - - /** - * Start here to create a new client tailored to your needs. - * - * @return new instance of {@link ClientBuilderWithRequiredHost}. - */ - public static ClientBuilderWithRequiredHost createClient() { - return new ElasticsearchClientBuilderImpl(); - } - - /** - * @author Christoph Strobl - */ - public interface ElasticsearchClientBuilder { - - /** - * Apply the configuration to create a {@link ReactiveElasticsearchClient}. - * - * @return new instance of {@link ReactiveElasticsearchClient}. - */ - ReactiveElasticsearchClient reactive(); - - /** - * Apply the configuration to create a {@link RestHighLevelClient}. - * - * @return new instance of {@link RestHighLevelClient}. - */ - RestHighLevelClient rest(); - - /** - * Apply the configuration to create a {@link RestClient}. - * - * @return new instance of {@link RestClient}. - */ - default RestClient lowLevelRest() { - return rest().getLowLevelClient(); - } - } - - /** - * @author Christoph Strobl - */ - public interface ClientBuilderWithRequiredHost { - - /** - * @param host the {@literal host} and {@literal port} formatted as String {@literal host:port}. You may leave out - * {@literal http / https} and use {@link MaybeSecureClientBuilder#viaSsl() viaSsl}. - * @return the {@link MaybeSecureClientBuilder}. - */ - default MaybeSecureClientBuilder connectedTo(String host) { - return connectedTo(new String[] { host }); - } - - /** - * @param hosts the list of {@literal host} and {@literal port} combinations formatted as String - * {@literal host:port}. You may leave out {@literal http / https} and use - * {@link MaybeSecureClientBuilder#viaSsl() viaSsl}. - * @return the {@link MaybeSecureClientBuilder}. - */ - MaybeSecureClientBuilder connectedTo(String... hosts); - - /** - * Obviously for testing. - * - * @return the {@link MaybeSecureClientBuilder}. - */ - default MaybeSecureClientBuilder connectedToLocalhost() { - return connectedTo("localhost:9200"); - } - } - - /** - * @author Christoph Strobl - */ - public interface MaybeSecureClientBuilder extends ClientBuilderWithOptionalDefaultHeaders { - - /** - * Connect via {@literal https}
- * NOTE You need to leave out the protocol in - * {@link ClientBuilderWithRequiredHost#connectedTo(String)}. - * - * @return the {@link ClientBuilderWithOptionalDefaultHeaders}. - */ - ClientBuilderWithOptionalDefaultHeaders viaSsl(); - } - - /** - * @author Christoph Strobl - */ - public interface ClientBuilderWithOptionalDefaultHeaders extends ElasticsearchClientBuilder { - - /** - * @param defaultHeaders - * @return the {@link ElasticsearchClientBuilder} - */ - ElasticsearchClientBuilder withDefaultHeaders(HttpHeaders defaultHeaders); - } - - private static class ElasticsearchClientBuilderImpl - implements ElasticsearchClientBuilder, ClientBuilderWithRequiredHost, MaybeSecureClientBuilder { - - private List hosts = new ArrayList<>(); - private HttpHeaders headers = HttpHeaders.EMPTY; - private String protocoll = "http"; - - @Override - public ReactiveElasticsearchClient reactive() { - return DefaultReactiveElasticsearchClient.create(headers, formattedHosts().toArray(new String[0])); - } - - @Override - public RestHighLevelClient rest() { - - HttpHost[] httpHosts = formattedHosts().stream().map(HttpHost::create).toArray(HttpHost[]::new); - RestClientBuilder builder = RestClient.builder(httpHosts); - - if (!headers.isEmpty()) { - - Header[] httpHeaders = headers.toSingleValueMap().entrySet().stream() - .map(it -> new BasicHeader(it.getKey(), it.getValue())).toArray(Header[]::new); - builder = builder.setDefaultHeaders(httpHeaders); - } - - return new RestHighLevelClient(builder); - } - - @Override - public MaybeSecureClientBuilder connectedTo(String... hosts) { - - Assert.notEmpty(hosts, "At least one host is required."); - this.hosts.addAll(Arrays.asList(hosts)); - return this; - } - - @Override - public ClientBuilderWithOptionalDefaultHeaders withDefaultHeaders(HttpHeaders defaultHeaders) { - - Assert.notNull(defaultHeaders, "DefaultHeaders must not be null!"); - this.headers = defaultHeaders; - return this; - } - - List formattedHosts() { - return hosts.stream().map(it -> it.startsWith("http") ? it : protocoll + "://" + it).collect(Collectors.toList()); - } - - @Override - public ClientBuilderWithOptionalDefaultHeaders viaSsl() { - this.protocoll = "https"; - return this; - } - } -} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ElasticsearchHost.java b/src/main/java/org/springframework/data/elasticsearch/client/ElasticsearchHost.java index 32044b4b2..3e37fe23b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ElasticsearchHost.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ElasticsearchHost.java @@ -16,8 +16,11 @@ package org.springframework.data.elasticsearch.client; +import java.net.InetSocketAddress; import java.time.Instant; +import org.springframework.util.Assert; + /** * Value Object containing information about Elasticsearch cluster nodes. * @@ -26,13 +29,21 @@ import java.time.Instant; */ public class ElasticsearchHost { - private final String host; + /** + * Default HTTP port for Elasticsearch servers. + */ + public static final int DEFAULT_PORT = 9200; + + private final InetSocketAddress endpoint; private final State state; private final Instant timestamp; - public ElasticsearchHost(String host, State state) { + public ElasticsearchHost(InetSocketAddress endpoint, State state) { - this.host = host; + Assert.notNull(endpoint, "Host must not be null"); + Assert.notNull(state, "State must not be null"); + + this.endpoint = endpoint; this.state = state; this.timestamp = Instant.now(); } @@ -41,7 +52,7 @@ public class ElasticsearchHost { * @param host must not be {@literal null}. * @return new instance of {@link ElasticsearchHost}. */ - public static ElasticsearchHost online(String host) { + public static ElasticsearchHost online(InetSocketAddress host) { return new ElasticsearchHost(host, State.ONLINE); } @@ -49,10 +60,20 @@ public class ElasticsearchHost { * @param host must not be {@literal null}. * @return new instance of {@link ElasticsearchHost}. */ - public static ElasticsearchHost offline(String host) { + public static ElasticsearchHost offline(InetSocketAddress host) { return new ElasticsearchHost(host, State.OFFLINE); } + /** + * Parse a {@literal hostAndPort} string into a {@link InetSocketAddress}. + * + * @param hostAndPort the string containing host and port or IP address and port in the format {@code host:port}. + * @return the parsed {@link InetSocketAddress}. + */ + public static InetSocketAddress parse(String hostAndPort) { + return InetSocketAddressParser.parse(hostAndPort, DEFAULT_PORT); + } + /** * @return {@literal true} if the last known {@link State} was {@link State#ONLINE} */ @@ -63,8 +84,8 @@ public class ElasticsearchHost { /** * @return never {@literal null}. */ - public String getHost() { - return host; + public InetSocketAddress getEndpoint() { + return endpoint; } /** @@ -83,7 +104,7 @@ public class ElasticsearchHost { @Override public String toString() { - return "ElasticsearchHost(" + host + ", " + state.name() + ")"; + return "ElasticsearchHost(" + endpoint + ", " + state.name() + ")"; } public enum State { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/InetSocketAddressParser.java b/src/main/java/org/springframework/data/elasticsearch/client/InetSocketAddressParser.java new file mode 100644 index 000000000..d951de883 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/InetSocketAddressParser.java @@ -0,0 +1,117 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import java.net.InetSocketAddress; + +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * Utility to parse endpoints in {@code host:port} format into {@link java.net.InetSocketAddress}. + * + * @author Mark Paluch + * @since 4.0 + */ +class InetSocketAddressParser { + + /** + * Parse a host and port string into a {@link InetSocketAddress}. + * + * @param hostPortString Hostname/IP address and port formatted as {@code host:port} or {@code host}. + * @param defaultPort default port to apply if {@code hostPostString} does not contain a port. + * @return a {@link InetSocketAddress} that is unresolved to avoid DNS lookups. + * @see InetSocketAddress#createUnresolved(String, int) + */ + static InetSocketAddress parse(String hostPortString, int defaultPort) { + + Assert.notNull(hostPortString, "HostPortString must not be null"); + String host; + String portString = null; + + if (hostPortString.startsWith("[")) { + String[] hostAndPort = getHostAndPortFromBracketedHost(hostPortString); + host = hostAndPort[0]; + portString = hostAndPort[1]; + } else { + int colonPos = hostPortString.indexOf(':'); + if (colonPos >= 0 && hostPortString.indexOf(':', colonPos + 1) == -1) { + // Exactly 1 colon. Split into host:port. + host = hostPortString.substring(0, colonPos); + portString = hostPortString.substring(colonPos + 1); + } else { + // 0 or 2+ colons. Bare hostname or IPv6 literal. + host = hostPortString; + } + } + + int port = defaultPort; + if (StringUtils.hasText(portString)) { + // Try to parse the whole port string as a number. + Assert.isTrue(!portString.startsWith("+"), String.format("Cannot parse port number: %s", hostPortString)); + try { + port = Integer.parseInt(portString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(String.format("Cannot parse port number: %s", hostPortString)); + } + + Assert.isTrue(isValidPort(port), String.format("Port number out of range: %s", hostPortString)); + } + + return InetSocketAddress.createUnresolved(host, port); + } + + /** + * Parses a bracketed host-port string, throwing IllegalArgumentException if parsing fails. + * + * @param hostPortString the full bracketed host-port specification. Post might not be specified. + * @return an array with 2 strings: host and port, in that order. + * @throws IllegalArgumentException if parsing the bracketed host-port string fails. + */ + private static String[] getHostAndPortFromBracketedHost(String hostPortString) { + + Assert.isTrue(hostPortString.charAt(0) == '[', + String.format("Bracketed host-port string must start with a bracket: %s", hostPortString)); + + int colonIndex = hostPortString.indexOf(':'); + int closeBracketIndex = hostPortString.lastIndexOf(']'); + + Assert.isTrue(colonIndex > -1 && closeBracketIndex > colonIndex, + String.format("Invalid bracketed host/port: %s", hostPortString)); + + String host = hostPortString.substring(1, closeBracketIndex); + if (closeBracketIndex + 1 == hostPortString.length()) { + return new String[] { host, "" }; + } else { + + Assert.isTrue(hostPortString.charAt(closeBracketIndex + 1) == ':', + "Only a colon may follow a close bracket: " + hostPortString); + for (int i = closeBracketIndex + 2; i < hostPortString.length(); ++i) { + Assert.isTrue(Character.isDigit(hostPortString.charAt(i)), + String.format("Port must be numeric: %s", hostPortString)); + } + return new String[] { host, hostPortString.substring(closeBracketIndex + 2) }; + } + } + + /** + * @param port the port number + * @return {@literal true} for valid port numbers. + */ + private static boolean isValidPort(int port) { + return port >= 0 && port <= 65535; + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/NoReachableHostException.java b/src/main/java/org/springframework/data/elasticsearch/client/NoReachableHostException.java index 49b6cedcc..f5346b1c2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/NoReachableHostException.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/NoReachableHostException.java @@ -37,7 +37,7 @@ public class NoReachableHostException extends RuntimeException { private static String createMessage(Set hosts) { if (hosts.size() == 1) { - return String.format("Host '%s' not reachable. Cluster state is offline.", hosts.iterator().next().getHost()); + return String.format("Host '%s' not reachable. Cluster state is offline.", hosts.iterator().next().getEndpoint()); } return String.format("No active host found in cluster. (%s) of (%s) nodes offline.", hosts.size(), hosts.size()); diff --git a/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java b/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java new file mode 100644 index 000000000..fb0102913 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java @@ -0,0 +1,111 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; + +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.http.HttpHeaders; +import org.springframework.util.Assert; + +/** + * Utility class for common access to Elasticsearch clients. {@link RestClients} consolidates set up routines for the + * various drivers into a single place. + * + * @author Christoph Strobl + * @author Mark Paluch + * @since 4.0 + */ +public final class RestClients { + + private RestClients() {} + + /** + * Start here to create a new client tailored to your needs. + * + * @return new instance of {@link ElasticsearchRestClient}. + */ + public static ElasticsearchRestClient create(ClientConfiguration clientConfiguration) { + + Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); + + HttpHost[] httpHosts = formattedHosts(clientConfiguration.getEndpoints(), clientConfiguration.useSsl()).stream() + .map(HttpHost::create).toArray(HttpHost[]::new); + RestClientBuilder builder = RestClient.builder(httpHosts); + HttpHeaders headers = clientConfiguration.getDefaultHeaders(); + + if (!headers.isEmpty()) { + + Header[] httpHeaders = headers.toSingleValueMap().entrySet().stream() + .map(it -> new BasicHeader(it.getKey(), it.getValue())).toArray(Header[]::new); + builder.setDefaultHeaders(httpHeaders); + } + + builder.setHttpClientConfigCallback(clientBuilder -> { + Optional sslContext = clientConfiguration.getSslContext(); + sslContext.ifPresent(clientBuilder::setSSLContext); + + return clientBuilder; + }); + + RestHighLevelClient client = new RestHighLevelClient(builder); + return () -> client; + } + + private static List formattedHosts(List hosts, boolean useSsl) { + return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it).collect(Collectors.toList()); + } + + /** + * @author Christoph Strobl + */ + @FunctionalInterface + public interface ElasticsearchRestClient extends Closeable { + + /** + * Apply the configuration to create a {@link RestHighLevelClient}. + * + * @return new instance of {@link RestHighLevelClient}. + */ + RestHighLevelClient rest(); + + /** + * Apply the configuration to create a {@link RestClient}. + * + * @return new instance of {@link RestClient}. + */ + default RestClient lowLevelRest() { + return rest().getLowLevelClient(); + } + + @Override + default void close() throws IOException { + rest().close(); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index c10d57b49..ef67033a6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -13,22 +13,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; -import org.springframework.data.elasticsearch.client.ElasticsearchHost; -import org.springframework.data.elasticsearch.client.NoReachableHostException; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.JdkSslContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; import java.io.IOException; import java.lang.reflect.Method; import java.net.ConnectException; +import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.Collection; -import java.util.List; +import java.util.Optional; import java.util.function.Function; +import javax.net.ssl.SSLContext; + import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionRequest; @@ -57,47 +60,48 @@ import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.reactivestreams.Publisher; -import org.springframework.core.ParameterizedTypeReference; import org.springframework.dao.DataAccessResourceFailureException; +import org.springframework.data.elasticsearch.client.ClientConfiguration; +import org.springframework.data.elasticsearch.client.ElasticsearchHost; +import org.springframework.data.elasticsearch.client.NoReachableHostException; import org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode; import org.springframework.data.elasticsearch.client.util.RequestConverters; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; -import org.springframework.http.ResponseCookie; -import org.springframework.http.ResponseEntity; -import org.springframework.http.client.reactive.ClientHttpResponse; -import org.springframework.http.codec.HttpMessageReader; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.util.Assert; -import org.springframework.util.MultiValueMap; import org.springframework.util.ReflectionUtils; -import org.springframework.util.StreamUtils; -import org.springframework.web.client.HttpClientErrorException; -import org.springframework.web.reactive.function.BodyExtractor; +import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.client.ClientResponse; -import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec; -import org.springframework.web.reactive.function.client.WebClientException; /** - * A {@link WebClient} based {@link ReactiveElasticsearchClient} that connects to an Elasticsearch cluster through HTTP. - * + * A {@link WebClient} based {@link ReactiveElasticsearchClient} that connects to an Elasticsearch cluster using HTTP. + * * @author Christoph Strobl + * @author Mark Paluch * @since 4.0 + * @see ClientConfiguration + * @see ReactiveRestClients */ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient { private final HostProvider hostProvider; /** - * Create a new {@link DefaultReactiveElasticsearchClient} using the given hostProvider to obtain server connections. + * Create a new {@link DefaultReactiveElasticsearchClient} using the given {@link HostProvider} to obtain server + * connections. * * @param hostProvider must not be {@literal null}. */ public DefaultReactiveElasticsearchClient(HostProvider hostProvider) { + + Assert.notNull(hostProvider, "HostProvider must not be null"); + this.hostProvider = hostProvider; } @@ -112,11 +116,53 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch */ public static ReactiveElasticsearchClient create(HttpHeaders headers, String... hosts) { + Assert.notNull(headers, "HttpHeaders must not be null"); Assert.notEmpty(hosts, "Elasticsearch Cluster needs to consist of at least one host"); - HostProvider hostProvider = HostProvider.provider(hosts); - return new DefaultReactiveElasticsearchClient( - headers.isEmpty() ? hostProvider : hostProvider.withDefaultHeaders(headers)); + ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(hosts) + .withDefaultHeaders(headers).build(); + return create(clientConfiguration); + } + + /** + * Create a new {@link DefaultReactiveElasticsearchClient} given {@link ClientConfiguration}.
+ * NOTE If the cluster requires authentication be sure to provide the according {@link HttpHeaders} + * correctly. + * + * @param clientConfiguration Client configuration. Must not be {@literal null}. + * @return new instance of {@link DefaultReactiveElasticsearchClient}. + */ + public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration) { + + Assert.notNull(clientConfiguration, "ClientConfiguration must not be null"); + + WebClientProvider provider = getWebClientProvider(clientConfiguration); + + HostProvider hostProvider = HostProvider.provider(provider, + clientConfiguration.getEndpoints().toArray(new InetSocketAddress[0])); + return new DefaultReactiveElasticsearchClient(hostProvider); + } + + private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) { + + WebClientProvider provider; + + if (clientConfiguration.useSsl()) { + + ReactorClientHttpConnector connector = new ReactorClientHttpConnector(HttpClient.create().secure(sslConfig -> { + + Optional sslContext = clientConfiguration.getSslContext(); + + sslContext.ifPresent(it -> { + sslConfig.sslContext(new JdkSslContext(it, true, ClientAuth.NONE)); + }); + })); + provider = WebClientProvider.create("https", connector); + } else { + provider = WebClientProvider.create("http"); + } + + return provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()); } /* @@ -144,7 +190,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch /* * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaderss, org.elasticsearch.action.get.GetRequest) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest) */ @Override public Mono get(HttpHeaders headers, GetRequest getRequest) { @@ -177,20 +223,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch public Mono exists(HttpHeaders headers, GetRequest getRequest) { return sendRequest(getRequest, RequestCreator.exists(), RawActionResponse.class, headers) // - .map(response -> { - - if (response.statusCode().is2xxSuccessful()) { - return true; - } - - if (response.statusCode().is5xxServerError()) { - - throw new HttpClientErrorException(response.statusCode(), String.format( - "Exists request (%s) returned error code %s.", getRequest.toString(), response.statusCode().value())); - } - - return false; - }) // + .map(response -> response.statusCode().is2xxSuccessful()) // .next(); } @@ -241,13 +274,13 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch public Mono execute(ReactiveElasticsearchClientCallback callback) { return this.hostProvider.getActive(VerificationMode.LAZY) // - .flatMap(it -> callback.doWithClient(it)) // + .flatMap(callback::doWithClient) // .onErrorResume(throwable -> { if (throwable instanceof ConnectException) { - return hostProvider.getActive(VerificationMode.FORCE) // - .flatMap(webClient -> callback.doWithClient(webClient)); + return hostProvider.getActive(VerificationMode.ACTIVE) // + .flatMap(callback::doWithClient); } return Mono.error(throwable); @@ -315,55 +348,60 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch private Publisher readResponseBody(Request request, ClientResponse response, Class responseType) { if (RawActionResponse.class.equals(responseType)) { - return Mono.just((T) new RawActionResponse(response)); + return Mono.just(responseType.cast(RawActionResponse.create(response))); } if (response.statusCode().is5xxServerError()) { - - throw new HttpClientErrorException(response.statusCode(), - String.format("%s request to %s returned error code %s.", request.getMethod(), request.getEndpoint(), - response.statusCode().value())); + return handleServerError(request, response); } - return response.body(BodyExtractors.toDataBuffers()).flatMap(it -> { + return response.body(BodyExtractors.toMono(byte[].class)) // + .map(it -> new String(it, StandardCharsets.UTF_8)) // + .flatMap(content -> { + return doDecode(response, responseType, content); + }); + } + + private static Mono doDecode(ClientResponse response, Class responseType, String content) { + + String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); + + try { + + XContentParser contentParser = createParser(mediaType, content); + try { - String content = StreamUtils.copyToString(it.asInputStream(true), StandardCharsets.UTF_8); + Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); + return Mono + .justOrEmpty(responseType.cast(ReflectionUtils.invokeMethod(fromXContent, responseType, contentParser))); + } catch (Exception errorParseFailure) { try { - - XContentParser contentParser = XContentType - .fromMediaTypeOrFormat( - response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType())) - .xContent() - .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content); - - Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); - return Mono.just((T) ReflectionUtils.invokeMethod(fromXContent, responseType, contentParser)); - } catch (Exception parseFailure) { - - try { - - XContentParser errorParser = XContentType - .fromMediaTypeOrFormat( - response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType())) - .xContent() - .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content); - - // return Mono.error to avoid ElasticsearchStatusException to be caught by outer catch. - return Mono.error(BytesRestResponse.errorFromXContent(errorParser)); - - } catch (Exception errorParseFailure) { - - // return Mono.error to avoid ElasticsearchStatusException to be caught by outer catch. - return Mono.error(new ElasticsearchStatusException("Unable to parse response body", - RestStatus.fromCode(response.statusCode().value()))); - } + return Mono.error(BytesRestResponse.errorFromXContent(contentParser)); + } catch (Exception e) { + // return Mono.error to avoid ElasticsearchStatusException to be caught by outer catch. + return Mono.error(new ElasticsearchStatusException("Unable to parse response body", + RestStatus.fromCode(response.statusCode().value()))); } - } catch (IOException e) { - throw new DataAccessResourceFailureException("Error parsing XContent.", e); } - }); + + } catch (IOException e) { + return Mono.error(new DataAccessResourceFailureException("Error parsing XContent.", e)); + } + } + + private static XContentParser createParser(String mediaType, String content) throws IOException { + + return XContentType.fromMediaTypeOrFormat(mediaType) // + .xContent() // + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content); + } + + private static Publisher handleServerError(Request request, ClientResponse response) { + return Mono.error( + new HttpServerErrorException(response.statusCode(), String.format("%s request to %s returned error code %s.", + request.getMethod(), request.getEndpoint(), response.statusCode().value()))); } static class RequestCreator { @@ -405,97 +443,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } } - public static class RequestBodyEncodingException extends WebClientException { - - RequestBodyEncodingException(String msg, Throwable ex) { - super(msg, ex); - } - } - - static class RawActionResponse extends ActionResponse implements ClientResponse { - - final ClientResponse delegate; - - RawActionResponse(ClientResponse delegate) { - this.delegate = delegate; - } - - public HttpStatus statusCode() { - return delegate.statusCode(); - } - - public int rawStatusCode() { - return delegate.rawStatusCode(); - } - - public Headers headers() { - return delegate.headers(); - } - - public MultiValueMap cookies() { - return delegate.cookies(); - } - - public ExchangeStrategies strategies() { - return delegate.strategies(); - } - - public T body(BodyExtractor extractor) { - return delegate.body(extractor); - } - - public Mono bodyToMono(Class elementClass) { - return delegate.bodyToMono(elementClass); - } - - public Mono bodyToMono(ParameterizedTypeReference typeReference) { - return delegate.bodyToMono(typeReference); - } - - public Flux bodyToFlux(Class elementClass) { - return delegate.bodyToFlux(elementClass); - } - - public Flux bodyToFlux(ParameterizedTypeReference typeReference) { - return delegate.bodyToFlux(typeReference); - } - - public Mono> toEntity(Class bodyType) { - return delegate.toEntity(bodyType); - } - - public Mono> toEntity(ParameterizedTypeReference typeReference) { - return delegate.toEntity(typeReference); - } - - public Mono>> toEntityList(Class elementType) { - return delegate.toEntityList(elementType); - } - - public Mono>> toEntityList(ParameterizedTypeReference typeReference) { - return delegate.toEntityList(typeReference); - } - - public static Builder from(ClientResponse other) { - return ClientResponse.from(other); - } - - public static Builder create(HttpStatus statusCode) { - return ClientResponse.create(statusCode); - } - - public static Builder create(HttpStatus statusCode, ExchangeStrategies strategies) { - return ClientResponse.create(statusCode, strategies); - } - - public static Builder create(HttpStatus statusCode, List> messageReaders) { - return ClientResponse.create(statusCode, messageReaders); - } - } - /** * Reactive client {@link ReactiveElasticsearchClient.Status} implementation. - * + * * @author Christoph Strobl */ class ClientStatus implements Status { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java new file mode 100644 index 000000000..82bb08d70 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java @@ -0,0 +1,133 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client.reactive; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClient.Builder; + +/** + * Default {@link WebClientProvider} that uses cached {@link WebClient} instances per {@code hostAndPort}. + * + * @author Mark Paluch + * @since 4.0 + */ +class DefaultWebClientProvider implements WebClientProvider { + + private final Map cachedClients; + + private final String scheme; + private final @Nullable ClientHttpConnector connector; + private final Consumer errorListener; + private final HttpHeaders headers; + + DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector) { + this(scheme, connector, e -> {}, HttpHeaders.EMPTY); + } + + private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector, + Consumer errorListener, HttpHeaders headers) { + this(new ConcurrentHashMap<>(), scheme, connector, errorListener, headers); + } + + private DefaultWebClientProvider(Map cachedClients, String scheme, + @Nullable ClientHttpConnector connector, Consumer errorListener, HttpHeaders headers) { + + this.cachedClients = cachedClients; + this.scheme = scheme; + this.connector = connector; + this.errorListener = errorListener; + this.headers = headers; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#get(java.net.InetSocketAddress) + */ + @Override + public WebClient get(InetSocketAddress endpoint) { + + Assert.notNull(endpoint, "Endpoint must not be empty!"); + + return this.cachedClients.computeIfAbsent(endpoint, key -> { + + Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders())); + + if (connector != null) { + builder.clientConnector(connector); + } + + String baseUrl = String.format("%s://%s:%d", this.scheme, key.getHostString(), key.getPort()); + return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)) + .build(); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getDefaultHeaders() + */ + @Override + public HttpHeaders getDefaultHeaders() { + return headers; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withDefaultHeaders(org.springframework.http.HttpHeaders) + */ + @Override + public WebClientProvider withDefaultHeaders(HttpHeaders headers) { + + Assert.notNull(headers, "HttpHeaders must not be null"); + + HttpHeaders merged = new HttpHeaders(); + merged.addAll(this.headers); + merged.addAll(headers); + + return new DefaultWebClientProvider(this.scheme, this.connector, errorListener, merged); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getErrorListener() + */ + @Override + public Consumer getErrorListener() { + return this.errorListener; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withErrorListener(java.util.function.Consumer) + */ + @Override + public WebClientProvider withErrorListener(Consumer errorListener) { + + Assert.notNull(errorListener, "Error listener must not be null"); + + Consumer listener = this.errorListener.andThen(errorListener); + return new DefaultWebClientProvider(this.scheme, this.connector, listener, this.headers); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java index ae4ddbca3..8e194a9f1 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java @@ -13,18 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; -import org.springframework.data.elasticsearch.client.NoReachableHostException; import reactor.core.publisher.Mono; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.Set; -import java.util.function.Consumer; import org.springframework.data.elasticsearch.client.ElasticsearchHost; -import org.springframework.http.HttpHeaders; +import org.springframework.data.elasticsearch.client.NoReachableHostException; import org.springframework.util.Assert; import org.springframework.web.reactive.function.client.WebClient; @@ -33,6 +31,7 @@ import org.springframework.web.reactive.function.client.WebClient; * active ones. * * @author Christoph Strobl + * @author Mark Paluch * @since 4.0 */ public interface HostProvider { @@ -42,7 +41,7 @@ public interface HostProvider { * * @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error} if none found. */ - default Mono lookupActiveHost() { + default Mono lookupActiveHost() { return lookupActiveHost(VerificationMode.LAZY); } @@ -53,7 +52,7 @@ public interface HostProvider { * @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error} * ({@link NoReachableHostException}) if none found. */ - Mono lookupActiveHost(VerificationMode verificationMode); + Mono lookupActiveHost(VerificationMode verificationMode); /** * Get the {@link WebClient} connecting to an active host utilizing cached {@link ElasticsearchHost}. @@ -73,31 +72,16 @@ public interface HostProvider { * found. */ default Mono getActive(VerificationMode verificationMode) { - return getActive(verificationMode, getDefaultHeaders()); + return lookupActiveHost(verificationMode).map(this::createWebClient); } /** - * Get the {@link WebClient} with default {@link HttpHeaders} connecting to an active host. + * Creates a {@link WebClient} for {@link InetSocketAddress endpoint}. * - * @param verificationMode must not be {@literal null}. - * @param headers must not be {@literal null}. - * @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none - * found. - */ - default Mono getActive(VerificationMode verificationMode, HttpHeaders headers) { - return lookupActiveHost(verificationMode).map(host -> createWebClient(host, headers)); - } - - /** - * Get the {@link WebClient} with default {@link HttpHeaders} connecting to the given host. - * - * @param host must not be {@literal null}. - * @param headers must not be {@literal null}. + * @param baseUrl * @return */ - default WebClient createWebClient(String host, HttpHeaders headers) { - return WebClient.builder().baseUrl(host).defaultHeaders(defaultHeaders -> defaultHeaders.putAll(headers)).build(); - } + WebClient createWebClient(InetSocketAddress endpoint); /** * Obtain information about known cluster nodes. @@ -107,42 +91,21 @@ public interface HostProvider { Mono clusterInfo(); /** - * Obtain the {@link HttpHeaders} to be used by default. - * - * @return never {@literal null}. {@link HttpHeaders#EMPTY} by default. - */ - HttpHeaders getDefaultHeaders(); - - /** - * Create a new instance of {@link HostProvider} applying the given headers by default. - * - * @param headers must not be {@literal null}. - * @return new instance of {@link HostProvider}. - */ - HostProvider withDefaultHeaders(HttpHeaders headers); - - /** - * Create a new instance of {@link HostProvider} calling the given {@link Consumer} on error. - * - * @param errorListener must not be {@literal null}. - * @return new instance of {@link HostProvider}. - */ - HostProvider withErrorListener(Consumer errorListener); - - /** - * Create a new {@link HostProvider} best suited for the given number of hosts. + * Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts. * + * @param clientProvider must not be {@literal null} . * @param hosts must not be {@literal null} nor empty. * @return new instance of {@link HostProvider}. */ - static HostProvider provider(String... hosts) { + static HostProvider provider(WebClientProvider clientProvider, InetSocketAddress... endpoints) { - Assert.notEmpty(hosts, "Please provide at least one host to connect to."); + Assert.notNull(clientProvider, "WebClientProvider must not be null"); + Assert.notEmpty(endpoints, "Please provide at least one endpoint to connect to."); - if (hosts.length == 1) { - return new SingleNodeHostProvider(HttpHeaders.EMPTY, (err) -> {}, hosts[0]); + if (endpoints.length == 1) { + return new SingleNodeHostProvider(clientProvider, endpoints[0]); } else { - return new MultiNodeHostProvider(HttpHeaders.EMPTY, (err) -> {}, hosts); + return new MultiNodeHostProvider(clientProvider, endpoints); } } @@ -155,7 +118,7 @@ public interface HostProvider { /** * Actively check for cluster node health. */ - FORCE, + ACTIVE, /** * Use cached data for cluster node health. diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java index b9eaea53f..a5e56a821 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java @@ -13,13 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,56 +27,68 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost.State; import org.springframework.data.elasticsearch.client.NoReachableHostException; -import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; /** + * {@link HostProvider} for a cluster of nodes. + * * @author Christoph Strobl + * @author Mark Paluch * @since 4.0 */ class MultiNodeHostProvider implements HostProvider { - private final HttpHeaders headers; - private final Consumer errorListener; - private final Map hosts; + private final WebClientProvider clientProvider; + private final Map hosts; - MultiNodeHostProvider(HttpHeaders headers, Consumer errorListener, String... hosts) { + MultiNodeHostProvider(WebClientProvider clientProvider, InetSocketAddress... endpoints) { - this.headers = headers; - this.errorListener = errorListener; + this.clientProvider = clientProvider; this.hosts = new ConcurrentHashMap<>(); - for (String host : hosts) { - this.hosts.put(host, new ElasticsearchHost(host, State.UNKNOWN)); + for (InetSocketAddress endpoint : endpoints) { + this.hosts.put(endpoint, new ElasticsearchHost(endpoint, State.UNKNOWN)); } } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#clusterInfo() + */ public Mono clusterInfo() { return nodes(null).map(this::updateNodeState).buffer(hosts.size()) .then(Mono.just(new ClusterInformation(new LinkedHashSet<>(this.hosts.values())))); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#createWebClient(java.net.InetSocketAddress) + */ + @Override + public WebClient createWebClient(InetSocketAddress endpoint) { + return this.clientProvider.get(endpoint); + } + Collection getCachedHostState() { return hosts.values(); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#lookupActiveHost(org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode) + */ @Override - public HttpHeaders getDefaultHeaders() { - return this.headers; - } - - @Override - public Mono lookupActiveHost(VerificationMode verificationMode) { + public Mono lookupActiveHost(VerificationMode verificationMode) { if (VerificationMode.LAZY.equals(verificationMode)) { for (ElasticsearchHost entry : hosts()) { if (entry.isOnline()) { - return Mono.just(entry.getHost()); + return Mono.just(entry.getEndpoint()); } } } @@ -87,33 +99,24 @@ class MultiNodeHostProvider implements HostProvider { .switchIfEmpty(Mono.error(() -> new NoReachableHostException(new LinkedHashSet<>(getCachedHostState())))); } - @Override - public HostProvider withDefaultHeaders(HttpHeaders headers) { - return new MultiNodeHostProvider(headers, errorListener, hosts.keySet().toArray(new String[0])); - } - - @Override - public HostProvider withErrorListener(Consumer errorListener) { - return new MultiNodeHostProvider(headers, errorListener, hosts.keySet().toArray(new String[0])); - } - - private Mono findActiveHostInKnownActives() { + private Mono findActiveHostInKnownActives() { return findActiveForSate(State.ONLINE); } - private Mono findActiveHostInUnresolved() { + private Mono findActiveHostInUnresolved() { return findActiveForSate(State.UNKNOWN); } - private Mono findActiveHostInDead() { + private Mono findActiveHostInDead() { return findActiveForSate(State.OFFLINE); } - private Mono findActiveForSate(State state) { - return nodes(state).map(this::updateNodeState).filter(ElasticsearchHost::isOnline).map(it -> it.getHost()).next(); + private Mono findActiveForSate(State state) { + return nodes(state).map(this::updateNodeState).filter(ElasticsearchHost::isOnline) + .map(ElasticsearchHost::getEndpoint).next(); } - private ElasticsearchHost updateNodeState(Tuple2 tuple2) { + private ElasticsearchHost updateNodeState(Tuple2 tuple2) { State state = tuple2.getT2().statusCode().isError() ? State.OFFLINE : State.ONLINE; ElasticsearchHost elasticsearchHost = new ElasticsearchHost(tuple2.getT1(), state); @@ -121,24 +124,24 @@ class MultiNodeHostProvider implements HostProvider { return elasticsearchHost; } - private Flux> nodes(@Nullable State state) { + private Flux> nodes(@Nullable State state) { return Flux.fromIterable(hosts()) // - .filter(entry -> state != null ? entry.getState().equals(state) : true) // - .map(ElasticsearchHost::getHost) // + .filter(entry -> state == null || entry.getState().equals(state)) // + .map(ElasticsearchHost::getEndpoint) // .flatMap(host -> { - Mono exchange = createWebClient(host, headers) // + Mono exchange = createWebClient(host) // .head().uri("/").exchange().doOnError(throwable -> { hosts.put(host, new ElasticsearchHost(host, State.OFFLINE)); - errorListener.accept(throwable); + clientProvider.getErrorListener().accept(throwable); }); return Mono.just(host).zipWith(exchange); }) // .onErrorContinue((throwable, o) -> { - errorListener.accept(throwable); + clientProvider.getErrorListener().accept(throwable); }); } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RawActionResponse.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RawActionResponse.java new file mode 100644 index 000000000..b4073249b --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RawActionResponse.java @@ -0,0 +1,177 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client.reactive; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; + +import org.elasticsearch.action.ActionResponse; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseCookie; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.http.codec.HttpMessageReader; +import org.springframework.util.MultiValueMap; +import org.springframework.web.reactive.function.BodyExtractor; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeStrategies; + +/** + * Extension to {@link ActionResponse} that also implements {@link ClientResponse}. + */ +class RawActionResponse extends ActionResponse implements ClientResponse { + + private final ClientResponse delegate; + + private RawActionResponse(ClientResponse delegate) { + this.delegate = delegate; + } + + public static RawActionResponse create(ClientResponse response) { + return new RawActionResponse(response); + } + + public static Builder builder(ClientResponse other) { + return ClientResponse.from(other); + } + + public static Builder builder(HttpStatus statusCode) { + return ClientResponse.create(statusCode); + } + + public static Builder builder(HttpStatus statusCode, ExchangeStrategies strategies) { + return ClientResponse.create(statusCode, strategies); + } + + public static Builder builder(HttpStatus statusCode, List> messageReaders) { + return ClientResponse.create(statusCode, messageReaders); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#statusCode() + */ + public HttpStatus statusCode() { + return delegate.statusCode(); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#rawStatusCode() + */ + public int rawStatusCode() { + return delegate.rawStatusCode(); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#headers() + */ + public Headers headers() { + return delegate.headers(); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#cookies() + */ + public MultiValueMap cookies() { + return delegate.cookies(); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#strategies() + */ + public ExchangeStrategies strategies() { + return delegate.strategies(); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#body(org.springframework.web.reactive.function.BodyExtractor) + */ + public T body(BodyExtractor extractor) { + return delegate.body(extractor); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#bodyToMono(java.lang.Class) + */ + public Mono bodyToMono(Class elementClass) { + return delegate.bodyToMono(elementClass); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#bodyToMono(org.springframework.core.ParameterizedTypeReference) + */ + public Mono bodyToMono(ParameterizedTypeReference typeReference) { + return delegate.bodyToMono(typeReference); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#bodyToFlux(java.lang.Class) + */ + public Flux bodyToFlux(Class elementClass) { + return delegate.bodyToFlux(elementClass); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#bodyToFlux(org.springframework.core.ParameterizedTypeReference) + */ + public Flux bodyToFlux(ParameterizedTypeReference typeReference) { + return delegate.bodyToFlux(typeReference); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#toEntity(java.lang.Class) + */ + public Mono> toEntity(Class bodyType) { + return delegate.toEntity(bodyType); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#toEntity(org.springframework.core.ParameterizedTypeReference) + */ + public Mono> toEntity(ParameterizedTypeReference typeReference) { + return delegate.toEntity(typeReference); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#toEntityList(java.lang.Class) + */ + public Mono>> toEntityList(Class elementType) { + return delegate.toEntityList(elementType); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#toEntityList(org.springframework.core.ParameterizedTypeReference) + */ + public Mono>> toEntityList(ParameterizedTypeReference typeReference) { + return delegate.toEntityList(typeReference); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 89c76273b..e398c384f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; import reactor.core.publisher.Flux; @@ -35,6 +34,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.search.SearchHit; +import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.http.HttpHeaders; import org.springframework.util.CollectionUtils; @@ -42,10 +42,13 @@ import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; /** - * A reactive client to connect to Elasticsearch.
- * + * A reactive client to connect to Elasticsearch. + * * @author Christoph Strobl + * @author Mark Paluch * @since 4.0 + * @see ClientConfiguration + * @see ReactiveRestClients */ public interface ReactiveElasticsearchClient { @@ -385,7 +388,7 @@ public interface ReactiveElasticsearchClient { /** * Get the list of known hosts and their getCachedHostState. - * + * * @return never {@literal null}. */ Collection hosts(); @@ -401,7 +404,7 @@ public interface ReactiveElasticsearchClient { return false; } - return !hosts().stream().filter(it -> !it.isOnline()).findFirst().isPresent(); + return hosts().stream().anyMatch(ElasticsearchHost::isOnline); } } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveRestClients.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveRestClients.java new file mode 100644 index 000000000..f1d7baeac --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveRestClients.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client.reactive; + +import org.springframework.data.elasticsearch.client.ClientConfiguration; +import org.springframework.data.elasticsearch.client.RestClients.ElasticsearchRestClient; +import org.springframework.util.Assert; + +/** + * Utility class for common access to reactive Elasticsearch clients. {@link ReactiveRestClients} consolidates set up + * routines for the various drivers into a single place. + * + * @author Christoph Strobl + * @author Mark Paluch + * @since 4.0 + */ +public final class ReactiveRestClients { + + private ReactiveRestClients() {} + + /** + * Start here to create a new client tailored to your needs. + * + * @return new instance of {@link ElasticsearchRestClient}. + */ + public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration) { + + Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); + + return DefaultReactiveElasticsearchClient.create(clientConfiguration); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestBodyEncodingException.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestBodyEncodingException.java new file mode 100644 index 000000000..c7b77af03 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestBodyEncodingException.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client.reactive; + +import org.springframework.web.reactive.function.client.WebClientException; + +/** + * Exception thrown if the request body cannot be properly encoded. + * + * @author Christoph Strobl + * @author Mark Paluch + * @since 4.0 + */ +public class RequestBodyEncodingException extends WebClientException { + + private static final long serialVersionUID = 472776714118912855L; + + /** + * Construct a new instance of {@link RequestBodyEncodingException} with the given message and exception. + * + * @param msg the message + * @param ex the exception + */ + public RequestBodyEncodingException(String msg, Throwable ex) { + super(msg, ex); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java index 57aa2ee94..4726a57e6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java @@ -13,95 +13,96 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; import reactor.core.publisher.Mono; +import java.net.InetSocketAddress; import java.util.Collections; -import java.util.function.Consumer; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost.State; import org.springframework.data.elasticsearch.client.NoReachableHostException; -import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.function.client.WebClient; /** + * {@link HostProvider} for a single host. + * * @author Christoph Strobl + * @author Mark Paluch * @since 4.0 */ class SingleNodeHostProvider implements HostProvider { - private final HttpHeaders headers; - private final Consumer errorListener; - private final String hostname; + private final WebClientProvider clientProvider; + private final InetSocketAddress endpoint; private volatile ElasticsearchHost state; - SingleNodeHostProvider(HttpHeaders headers, Consumer errorListener, String host) { + SingleNodeHostProvider(WebClientProvider clientProvider, InetSocketAddress endpoint) { - this.headers = headers; - this.errorListener = errorListener; - this.hostname = host; - this.state = new ElasticsearchHost(hostname, State.UNKNOWN); + this.clientProvider = clientProvider; + this.endpoint = endpoint; + this.state = new ElasticsearchHost(this.endpoint, State.UNKNOWN); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#clusterInfo() + */ @Override public Mono clusterInfo() { - return createWebClient(hostname, headers) // + return createWebClient(endpoint) // .head().uri("/").exchange() // .flatMap(it -> { - if(it.statusCode().isError()) { - state = ElasticsearchHost.offline(hostname); + if (it.statusCode().isError()) { + state = ElasticsearchHost.offline(endpoint); } else { - state = ElasticsearchHost.online(hostname); + state = ElasticsearchHost.online(endpoint); } return Mono.just(state); }).onErrorResume(throwable -> { - state = ElasticsearchHost.offline(hostname); - errorListener.accept(throwable); + state = ElasticsearchHost.offline(endpoint); + clientProvider.getErrorListener().accept(throwable); return Mono.just(state); }) // .flatMap(it -> Mono.just(new ClusterInformation(Collections.singleton(it)))); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#createWebClient(java.net.InetSocketAddress) + */ @Override - public Mono lookupActiveHost(VerificationMode verificationMode) { + public WebClient createWebClient(InetSocketAddress endpoint) { + return this.clientProvider.get(endpoint); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#lookupActiveHost(org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode) + */ + @Override + public Mono lookupActiveHost(VerificationMode verificationMode) { if (VerificationMode.LAZY.equals(verificationMode) && state.isOnline()) { - return Mono.just(hostname); + return Mono.just(endpoint); } return clusterInfo().flatMap(it -> { ElasticsearchHost host = it.getNodes().iterator().next(); if (host.isOnline()) { - return Mono.just(host.getHost()); + return Mono.just(host.getEndpoint()); } return Mono.error(() -> new NoReachableHostException(Collections.singleton(host))); }); } - @Override - public HttpHeaders getDefaultHeaders() { - return this.headers; - } - - @Override - public HostProvider withDefaultHeaders(HttpHeaders headers) { - return new SingleNodeHostProvider(headers, errorListener, hostname); - } - - @Override - public HostProvider withErrorListener(Consumer errorListener) { - return new SingleNodeHostProvider(headers, errorListener, hostname); - } - ElasticsearchHost getCachedHostState() { return state; } - } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/WebClientProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/WebClientProvider.java new file mode 100644 index 000000000..a2633cbb7 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/WebClientProvider.java @@ -0,0 +1,104 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client.reactive; + +import java.net.InetSocketAddress; +import java.util.function.Consumer; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.util.Assert; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * Provider for {@link WebClient}s using a pre-configured {@code scheme}. This class returns {@link WebClient} for a + * specific {@link InetSocketAddress endpoint} and encapsulates common configuration aspects of {@link WebClient} so + * that code using {@link WebClient} is not required to apply further configuration to the actual client. + *

+ * Client instances are typically cached allowing reuse of pooled connections if configured on the + * {@link ClientHttpConnector}. + * + * @author Christoph Strobl + * @author Mark Paluch + * @since 4.0 + */ +public interface WebClientProvider { + + /** + * Creates a new {@link WebClientProvider} using the given {@code scheme} and a default {@link ClientHttpConnector}. + * + * @param scheme protocol scheme such as {@literal http} or {@literal https}. + * @return the resulting {@link WebClientProvider}. + */ + static WebClientProvider create(String scheme) { + + Assert.hasText(scheme, "Protocol scheme must not be empty"); + + return new DefaultWebClientProvider(scheme, null); + } + + /** + * Creates a new {@link WebClientProvider} given {@code scheme} and {@link ClientHttpConnector}. + * + * @param scheme protocol scheme such as {@literal http} or {@literal https}. + * @param connector the HTTP connector to use. + * @return the resulting {@link WebClientProvider}. + */ + static WebClientProvider create(String scheme, ClientHttpConnector connector) { + + Assert.hasText(scheme, "Protocol scheme must not be empty"); + + return new DefaultWebClientProvider(scheme, connector); + } + + /** + * Obtain the {@link WebClient} configured with {@link #withDefaultHeaders(HttpHeaders) default HTTP headers} and + * {@link Consumer} error callback for a given {@link InetSocketAddress endpoint}. + * + * @return the {@link WebClient} for the given {@link InetSocketAddress endpoint}. + */ + WebClient get(InetSocketAddress endpoint); + + /** + * Obtain the {@link HttpHeaders} to be used by default. + * + * @return never {@literal null}. {@link HttpHeaders#EMPTY} by default. + */ + HttpHeaders getDefaultHeaders(); + + /** + * Create a new instance of {@link WebClientProvider} applying the given headers by default. + * + * @param headers must not be {@literal null}. + * @return new instance of {@link WebClientProvider}. + */ + WebClientProvider withDefaultHeaders(HttpHeaders headers); + + /** + * Obtain the {@link Consumer error listener} to be used; + * + * @return never {@literal null}. + */ + Consumer getErrorListener(); + + /** + * Create a new instance of {@link WebClientProvider} calling the given {@link Consumer} on error. + * + * @param errorListener must not be {@literal null}. + * @return new instance of {@link WebClientProvider}. + */ + WebClientProvider withErrorListener(Consumer errorListener); +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index cfa9de718..d7b9ba234 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -13,12 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.core; import static org.elasticsearch.index.VersionType.*; -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -31,6 +29,7 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.client.Requests; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilder; @@ -130,7 +129,7 @@ public class ReactiveElasticsearchTemplate { ElasticsearchPersistentEntity persistentEntity = lookupPersistentEntity(resultType); GetRequest request = new GetRequest(persistentEntity.getIndexName(), persistentEntity.getIndexType(), id); - return goGet(id, persistentEntity, index, type).map(it -> mapper.mapEntity(it.sourceAsString(), resultType)); + return doGet(id, persistentEntity, index, type).map(it -> mapper.mapEntity(it.sourceAsString(), resultType)); } /** @@ -198,7 +197,7 @@ public class ReactiveElasticsearchTemplate { // Customization Hooks - protected Mono goGet(String id, ElasticsearchPersistentEntity entity, @Nullable String index, + protected Mono doGet(String id, ElasticsearchPersistentEntity entity, @Nullable String index, @Nullable String type) { String indexToUse = indexName(index, entity); @@ -342,7 +341,7 @@ public class ReactiveElasticsearchTemplate { } // Additional types - public interface ClientCallback { + public interface ClientCallback> { T doWithClient(ReactiveElasticsearchClient client); } diff --git a/src/test/java/org/springframework/data/elasticsearch/TestUtils.java b/src/test/java/org/springframework/data/elasticsearch/TestUtils.java index 0280a510a..804ea2f15 100644 --- a/src/test/java/org/springframework/data/elasticsearch/TestUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/TestUtils.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch; import lombok.SneakyThrows; @@ -22,12 +21,15 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.springframework.data.elasticsearch.client.ElasticsearchClients; +import org.springframework.data.elasticsearch.client.ClientConfiguration; +import org.springframework.data.elasticsearch.client.RestClients; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; +import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients; import org.springframework.util.ObjectUtils; /** * @author Christoph Strobl + * @author Mark Paluch * @currentRead Fool's Fate - Robin Hobb */ public final class TestUtils { @@ -35,11 +37,11 @@ public final class TestUtils { private TestUtils() {} public static RestHighLevelClient restHighLevelClient() { - return ElasticsearchClients.createClient().connectedToLocalhost().rest(); + return RestClients.create(ClientConfiguration.create("localhost:9200")).rest(); } public static ReactiveElasticsearchClient reactiveClient() { - return ElasticsearchClients.createClient().connectedToLocalhost().reactive(); + return ReactiveRestClients.create(ClientConfiguration.create("localhost:9200")); } @SneakyThrows diff --git a/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java new file mode 100644 index 000000000..ec8d0be25 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java @@ -0,0 +1,76 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.net.InetSocketAddress; + +import javax.net.ssl.SSLContext; + +import org.junit.Test; +import org.springframework.http.HttpHeaders; + +/** + * Unit tests for {@link ClientConfiguration}. + * + * @author Mark Paluch + */ +public class ClientConfigurationUnitTests { + + @Test // DATAES-488 + public void shouldCreateSimpleConfiguration() { + + ClientConfiguration clientConfiguration = ClientConfiguration.create("localhost:9200"); + + assertThat(clientConfiguration.getEndpoints()).containsOnly(InetSocketAddress.createUnresolved("localhost", 9200)); + } + + @Test // DATAES-488 + public void shouldCreateCustomizedConfiguration() { + + HttpHeaders headers = new HttpHeaders(); + headers.set("foo", "bar"); + + ClientConfiguration clientConfiguration = ClientConfiguration.builder() // + .connectedTo("foo", "bar") // + .usingSsl() // + .withDefaultHeaders(headers) // + .build(); + + assertThat(clientConfiguration.getEndpoints()).containsOnly(InetSocketAddress.createUnresolved("foo", 9200), + InetSocketAddress.createUnresolved("bar", 9200)); + assertThat(clientConfiguration.useSsl()).isTrue(); + assertThat(clientConfiguration.getDefaultHeaders().get("foo")).containsOnly("bar"); + } + + @Test // DATAES-488 + public void shouldCreateSslConfiguration() { + + SSLContext sslContext = mock(SSLContext.class); + + ClientConfiguration clientConfiguration = ClientConfiguration.builder() // + .connectedTo("foo", "bar") // + .usingSsl(sslContext) // + .build(); + + assertThat(clientConfiguration.getEndpoints()).containsOnly(InetSocketAddress.createUnresolved("foo", 9200), + InetSocketAddress.createUnresolved("bar", 9200)); + assertThat(clientConfiguration.useSsl()).isTrue(); + assertThat(clientConfiguration.getSslContext()).contains(sslContext); + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/client/InetSocketAddressParserUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/InetSocketAddressParserUnitTests.java new file mode 100644 index 000000000..5b2ae5cb3 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/client/InetSocketAddressParserUnitTests.java @@ -0,0 +1,122 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import static org.assertj.core.api.Assertions.*; + +import java.net.InetSocketAddress; + +import org.junit.Test; + +/** + * Unit tests for {@link InetSocketAddressParser}. + * + * @author Mark Paluch + */ +public class InetSocketAddressParserUnitTests { + + @Test + public void testFromStringWellFormed() { + // Well-formed inputs. + checkFromStringCase("pivotal.io", 80, "pivotal.io", 80, false); + checkFromStringCase("pivotal.io", 80, "pivotal.io", 80, false); + checkFromStringCase("192.0.2.1", 82, "192.0.2.1", 82, false); + checkFromStringCase("[2001::1]", 84, "2001::1", 84, false); + checkFromStringCase("2001::3", 86, "2001::3", 86, false); + checkFromStringCase("host:", 80, "host", 80, false); + } + + @Test + public void testFromStringBadDefaultPort() { + // Well-formed strings with bad default ports. + checkFromStringCase("gmail.com:81", -1, "gmail.com", 81, true); + checkFromStringCase("192.0.2.2:83", -1, "192.0.2.2", 83, true); + checkFromStringCase("[2001::2]:85", -1, "2001::2", 85, true); + checkFromStringCase("goo.gl:65535", 65536, "goo.gl", 65535, true); + // No port, bad default. + checkFromStringCase("pivotal.io", -1, null, -1, false); + checkFromStringCase("192.0.2.1", 65536, null, -1, false); + checkFromStringCase("[2001::1]", -1, null, -1, false); + checkFromStringCase("2001::3", 65536, null, -1, false); + } + + @Test + public void testFromStringUnusedDefaultPort() { + // Default port, but unused. + checkFromStringCase("gmail.com:81", 77, "gmail.com", 81, true); + checkFromStringCase("192.0.2.2:83", 77, "192.0.2.2", 83, true); + checkFromStringCase("[2001::2]:85", 77, "2001::2", 85, true); + } + + @Test + public void testFromStringBadPort() { + // Out-of-range ports. + checkFromStringCase("pivotal.io:65536", 1, null, 99, false); + checkFromStringCase("pivotal.io:9999999999", 1, null, 99, false); + // Invalid port parts. + checkFromStringCase("pivotal.io:port", 1, null, 99, false); + checkFromStringCase("pivotal.io:-25", 1, null, 99, false); + checkFromStringCase("pivotal.io:+25", 1, null, 99, false); + checkFromStringCase("pivotal.io:25 ", 1, null, 99, false); + checkFromStringCase("pivotal.io:25\t", 1, null, 99, false); + checkFromStringCase("pivotal.io:0x25 ", 1, null, 99, false); + } + + @Test + public void testFromStringUnparseableNonsense() { + // Some nonsense that causes parse failures. + checkFromStringCase("[goo.gl]", 1, null, 99, false); + checkFromStringCase("[goo.gl]:80", 1, null, 99, false); + checkFromStringCase("[", 1, null, 99, false); + checkFromStringCase("[]:", 1, null, 99, false); + checkFromStringCase("[]:80", 1, null, 99, false); + checkFromStringCase("[]bad", 1, null, 99, false); + } + + @Test + public void testFromStringParseableNonsense() { + // Examples of nonsense that gets through. + checkFromStringCase("[[:]]", 86, "[:]", 86, false); + checkFromStringCase("x:y:z", 87, "x:y:z", 87, false); + checkFromStringCase("", 88, "", 88, false); + checkFromStringCase(":", 99, "", 99, false); + checkFromStringCase(":123", -1, "", 123, true); + checkFromStringCase("\nOMG\t", 89, "\nOMG\t", 89, false); + } + + private static void checkFromStringCase(String hpString, int defaultPort, String expectHost, int expectPort, + boolean expectHasExplicitPort) { + InetSocketAddress hp; + + try { + hp = InetSocketAddressParser.parse(hpString, defaultPort); + } catch (IllegalArgumentException e) { + // Make sure we expected this. + assertThat(expectHost).isNull(); + return; + } + + assertThat(expectHost).isNotNull(); + + if (expectHasExplicitPort) { + assertThat(hp.getPort()).isEqualTo(expectPort); + } else { + assertThat(hp.getPort()).isEqualTo(defaultPort); + } + + assertThat(hp.getHostString()).isEqualTo(expectHost); + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java index 5cba3982c..55444035f 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; @@ -24,11 +23,11 @@ import reactor.test.StepVerifier; import org.junit.Before; import org.junit.Test; -import org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost.State; +import org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode; import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockDelegatingElasticsearchHostProvider; -import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Receive; +import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive; import org.springframework.web.reactive.function.client.ClientResponse; /** @@ -107,7 +106,7 @@ public class MultiNodeHostProviderUnitTests { provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete(); - provider.getActive(VerificationMode.FORCE).as(StepVerifier::create).expectNext(mock.client(HOST_2)) + provider.getActive(VerificationMode.ACTIVE).as(StepVerifier::create).expectNext(mock.client(HOST_2)) .verifyComplete(); verify(mock.client(HOST_2), times(2)).head(); diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index daf34fd3c..29f5d1d54 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; @@ -43,14 +42,20 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.springframework.data.elasticsearch.TestUtils; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; /** * @author Christoph Strobl + * @author Mark Paluch * @currentRead Fool's Fate - Robin Hobb */ +@RunWith(SpringRunner.class) +@ContextConfiguration("classpath:infrastructure.xml") public class ReactiveElasticsearchClientTests { static final String INDEX_I = "idx-1-reactive-client-tests"; diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java index 0729a11f6..f7637d0d8 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java @@ -13,14 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Receive.*; +import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*; import reactor.test.StepVerifier; @@ -40,7 +38,7 @@ import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockDelegatingElasticsearchHostProvider; -import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Receive; +import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java index 96e649999..455936259 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java @@ -13,21 +13,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.client.reactive; import static org.mockito.Mockito.*; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; import java.util.function.Supplier; @@ -35,10 +36,8 @@ import java.util.function.Supplier; import org.mockito.Mockito; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.data.elasticsearch.client.ElasticsearchHost; -import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Send; +import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Send; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -52,10 +51,11 @@ import org.springframework.web.reactive.function.client.WebClient.RequestHeaders /** * @author Christoph Strobl - * @since 2018/10 */ public class ReactiveMockClientTestsUtils { + private final static Map ADDRESS_CACHE = new ConcurrentHashMap<>(); + public static MockDelegatingElasticsearchHostProvider single(String host) { return provider(host); } @@ -66,26 +66,17 @@ public class ReactiveMockClientTestsUtils { public static MockDelegatingElasticsearchHostProvider provider(String... hosts) { - WebClientProvider clientProvider = new WebClientProvider(); ErrorCollector errorCollector = new ErrorCollector(); + MockWebClientProvider clientProvider = new MockWebClientProvider(errorCollector); HostProvider delegate = null; if (hosts.length == 1) { - delegate = new SingleNodeHostProvider(HttpHeaders.EMPTY, errorCollector, hosts[0]) { - @Override // hook in there to modify result - public WebClient createWebClient(String host, HttpHeaders headers) { - return clientProvider.get(host); - } - }; + delegate = new SingleNodeHostProvider(clientProvider, getInetSocketAddress(hosts[0])) {}; } else { - delegate = new MultiNodeHostProvider(HttpHeaders.EMPTY, errorCollector, hosts) { - @Override // hook in there to modify result - public WebClient createWebClient(String host, HttpHeaders headers) { - return clientProvider.get(host); - } - }; + delegate = new MultiNodeHostProvider(clientProvider, Arrays.stream(hosts) + .map(ReactiveMockClientTestsUtils::getInetSocketAddress).toArray(InetSocketAddress[]::new)) {}; } return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate, @@ -93,6 +84,10 @@ public class ReactiveMockClientTestsUtils { } + private static InetSocketAddress getInetSocketAddress(String hostAndPort) { + return ADDRESS_CACHE.computeIfAbsent(hostAndPort, ElasticsearchHost::parse); + } + public static class ErrorCollector implements Consumer { List errors = new CopyOnWriteArrayList<>(); @@ -110,11 +105,11 @@ public class ReactiveMockClientTestsUtils { public static class MockDelegatingElasticsearchHostProvider implements HostProvider { private final T delegate; - private final WebClientProvider clientProvider; + private final MockWebClientProvider clientProvider; private final ErrorCollector errorCollector; private @Nullable String activeDefaultHost; - public MockDelegatingElasticsearchHostProvider(HttpHeaders httpHeaders, WebClientProvider clientProvider, + public MockDelegatingElasticsearchHostProvider(HttpHeaders httpHeaders, MockWebClientProvider clientProvider, ErrorCollector errorCollector, T delegate, String activeDefaultHost) { this.errorCollector = errorCollector; @@ -123,14 +118,14 @@ public class ReactiveMockClientTestsUtils { this.activeDefaultHost = activeDefaultHost; } - public Mono lookupActiveHost() { + public Mono lookupActiveHost() { return delegate.lookupActiveHost(); } - public Mono lookupActiveHost(VerificationMode verificationMode) { + public Mono lookupActiveHost(VerificationMode verificationMode) { if (StringUtils.hasText(activeDefaultHost)) { - return Mono.just(activeDefaultHost); + return Mono.just(getInetSocketAddress(activeDefaultHost)); } return delegate.lookupActiveHost(verificationMode); @@ -144,34 +139,21 @@ public class ReactiveMockClientTestsUtils { return delegate.getActive(verificationMode); } - public Mono getActive(VerificationMode verificationMode, HttpHeaders headers) { - return delegate.getActive(verificationMode, headers); - } - - public WebClient createWebClient(String host, HttpHeaders headers) { - return delegate.createWebClient(host, headers); + public WebClient createWebClient(InetSocketAddress endpoint) { + return delegate.createWebClient(endpoint); } @Override public Mono clusterInfo() { if (StringUtils.hasText(activeDefaultHost)) { - return Mono.just(new ClusterInformation(Collections.singleton(ElasticsearchHost.online(activeDefaultHost)))); + return Mono.just(new ClusterInformation( + Collections.singleton(ElasticsearchHost.online(getInetSocketAddress(activeDefaultHost))))); } return delegate.clusterInfo(); } - @Override - public HttpHeaders getDefaultHeaders() { - return delegate.getDefaultHeaders(); - } - - @Override - public HostProvider withDefaultHeaders(HttpHeaders headers) { - throw new UnsupportedOperationException(); - } - public Send when(String host) { return clientProvider.when(host); } @@ -188,28 +170,25 @@ public class ReactiveMockClientTestsUtils { return delegate; } - @Override - public HostProvider withErrorListener(Consumer errorListener) { - throw new UnsupportedOperationException(); - } - public MockDelegatingElasticsearchHostProvider withActiveDefaultHost(String host) { return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate, host); } } - public static class WebClientProvider { + public static class MockWebClientProvider implements WebClientProvider { private final Object lock = new Object(); + private final Consumer errorListener; - private Map clientMap; - private Map headersUriSpecMap; - private Map bodyUriSpecMap; - private Map responseMap; + private Map clientMap; + private Map headersUriSpecMap; + private Map bodyUriSpecMap; + private Map responseMap; - public WebClientProvider() { + public MockWebClientProvider(Consumer errorListener) { + this.errorListener = errorListener; this.clientMap = new LinkedHashMap<>(); this.headersUriSpecMap = new LinkedHashMap<>(); this.bodyUriSpecMap = new LinkedHashMap<>(); @@ -217,10 +196,14 @@ public class ReactiveMockClientTestsUtils { } public WebClient get(String host) { + return get(getInetSocketAddress(host)); + } + + public WebClient get(InetSocketAddress endpoint) { synchronized (lock) { - return clientMap.computeIfAbsent(host, key -> { + return clientMap.computeIfAbsent(endpoint, key -> { WebClient webClient = mock(WebClient.class); @@ -243,17 +226,39 @@ public class ReactiveMockClientTestsUtils { Mockito.when(bodyUriSpec.exchange()).thenReturn(Mono.just(response)); Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED); - headersUriSpecMap.putIfAbsent(host, headersUriSpec); - bodyUriSpecMap.putIfAbsent(host, bodyUriSpec); - responseMap.putIfAbsent(host, response); + headersUriSpecMap.putIfAbsent(key, headersUriSpec); + bodyUriSpecMap.putIfAbsent(key, bodyUriSpec); + responseMap.putIfAbsent(key, response); return webClient; }); } } + @Override + public HttpHeaders getDefaultHeaders() { + return HttpHeaders.EMPTY; + } + + @Override + public WebClientProvider withDefaultHeaders(HttpHeaders headers) { + throw new UnsupportedOperationException(); + } + + @Override + public Consumer getErrorListener() { + return errorListener; + } + + @Override + public WebClientProvider withErrorListener(Consumer errorListener) { + throw new UnsupportedOperationException(); + } + public Send when(String host) { - return new CallbackImpl(get(host), headersUriSpecMap.get(host), bodyUriSpecMap.get(host), responseMap.get(host)); + InetSocketAddress inetSocketAddress = getInetSocketAddress(host); + return new CallbackImpl(get(host), headersUriSpecMap.get(inetSocketAddress), + bodyUriSpecMap.get(inetSocketAddress), responseMap.get(inetSocketAddress)); } public interface Client { @@ -342,7 +347,7 @@ public class ReactiveMockClientTestsUtils { } default Receive body(Supplier json) { - return body(new DefaultDataBufferFactory().wrap(json.get())); + return body(json.get()); } default Receive body(Resource resource) { @@ -356,8 +361,8 @@ public class ReactiveMockClientTestsUtils { }); } - default Receive body(DataBuffer dataBuffer) { - return receive(response -> Mockito.when(response.body(any())).thenReturn(Flux.just(dataBuffer))); + default Receive body(byte[] bytes) { + return receive(response -> Mockito.when(response.body(any())).thenReturn(Mono.just(bytes))); } static void ok(ClientResponse response) { diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java index 81f6c4488..8532ed770 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java @@ -16,17 +16,17 @@ package org.springframework.data.elasticsearch.client.reactive; -import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.*; -import org.springframework.data.elasticsearch.client.NoReachableHostException; import reactor.test.StepVerifier; import org.junit.Before; import org.junit.Test; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost.State; +import org.springframework.data.elasticsearch.client.NoReachableHostException; import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockDelegatingElasticsearchHostProvider; -import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Receive; +import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive; /** * @author Christoph Strobl diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index f5109d788..493ffbb4b 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.elasticsearch.core; import static org.apache.commons.lang.RandomStringUtils.*; @@ -25,17 +24,22 @@ import java.util.List; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.springframework.data.elasticsearch.TestUtils; import org.springframework.data.elasticsearch.core.query.Criteria; import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; import org.springframework.data.elasticsearch.entities.SampleEntity; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; /** * @author Christoph Strobl * @currentRead Golden Fool - Robin Hobb */ +@RunWith(SpringRunner.class) +@ContextConfiguration("classpath:infrastructure.xml") public class ReactiveElasticsearchTemplateTests { private ElasticsearchRestTemplate restTemplate;