From 469455383feae8ec7ec6bdabd79558269b8b9438 Mon Sep 17 00:00:00 2001 From: Oliver Gierke Date: Thu, 12 Jul 2018 21:48:31 +0200 Subject: [PATCH] DATAES-470 - Fixed parsing of cluster nodes in TransportClientFactoryBean. Extracted ClusterNodes value object to capture the parsing logic and actually properly test it. Added unit tests to verify the proper rejection and the two cases outlined in the ticket. Related tickets: DATAES-283. --- .../elasticsearch/client/ClusterNodes.java | 102 ++++++++++++++++++ .../client/TransportClientFactoryBean.java | 34 ++---- .../client/ClusterNodesUnitTests.java | 90 ++++++++++++++++ 3 files changed, 201 insertions(+), 25 deletions(-) create mode 100644 src/main/java/org/springframework/data/elasticsearch/client/ClusterNodes.java create mode 100644 src/test/java/org/springframework/data/elasticsearch/client/ClusterNodesUnitTests.java diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClusterNodes.java b/src/main/java/org/springframework/data/elasticsearch/client/ClusterNodes.java new file mode 100644 index 000000000..43ac652bd --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClusterNodes.java @@ -0,0 +1,102 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.elasticsearch.common.transport.TransportAddress; +import org.springframework.data.util.Streamable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * Value object to represent a list of cluster nodes. + * + * @author Oliver Gierke + * @since 3.1 + */ +class ClusterNodes implements Streamable { + + public static ClusterNodes DEFAULT = ClusterNodes.of("127.0.0.1:9300"); + + private static final String COLON = ":"; + private static final String COMMA = ","; + + private final List clusterNodes; + + /** + * Creates a new {@link ClusterNodes} by parsing the given source. + * + * @param source must not be {@literal null} or empty. + */ + private ClusterNodes(String source) { + + Assert.hasText(source, "Cluster nodes source must not be null or empty!"); + + String[] nodes = StringUtils.delimitedListToStringArray(source, COMMA); + + this.clusterNodes = Arrays.stream(nodes).map(node -> { + + String[] segments = StringUtils.delimitedListToStringArray(node, COLON); + + Assert.isTrue(segments.length == 2, + () -> String.format("Invalid cluster node %s in %s! Must be in the format host:port!", node, source)); + + String host = segments[0].trim(); + String port = segments[1].trim(); + + Assert.hasText(host, () -> String.format("No host name given cluster node %s!", node)); + Assert.hasText(port, () -> String.format("No port given in cluster node %s!", node)); + + return new TransportAddress(toInetAddress(host), Integer.valueOf(port)); + + }).collect(Collectors.toList()); + } + + /** + * Creates a new {@link ClusterNodes} by parsing the given source. The expected format is a comma separated list of + * host-port-combinations separated by a colon: {@code host:port,host:port,…}. + * + * @param source must not be {@literal null} or empty. + * @return + */ + public static ClusterNodes of(String source) { + return new ClusterNodes(source); + } + + /* + * (non-Javadoc) + * @see java.lang.Iterable#iterator() + */ + @Override + public Iterator iterator() { + return clusterNodes.iterator(); + } + + private static InetAddress toInetAddress(String host) { + + try { + return InetAddress.getByName(host); + } catch (UnknownHostException o_O) { + throw new IllegalArgumentException(o_O); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java b/src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java index 44d952e72..43b8fd373 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/TransportClientFactoryBean.java @@ -15,20 +15,16 @@ */ package org.springframework.data.elasticsearch.client; -import java.net.InetAddress; import java.util.Properties; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.FactoryBean; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.util.Assert; -import org.springframework.util.StringUtils; +import org.springframework.beans.factory.InitializingBean; /** * TransportClientFactoryBean @@ -38,12 +34,12 @@ import org.springframework.util.StringUtils; * @author Jakub Vavrik * @author Piotr Betkier * @author Ilkang Na + * @author Oliver Gierke */ - public class TransportClientFactoryBean implements FactoryBean, InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(TransportClientFactoryBean.class); - private String clusterNodes = "127.0.0.1:9300"; + private ClusterNodes clusterNodes = ClusterNodes.of("127.0.0.1:9300"); private String clusterName = "elasticsearch"; private Boolean clientTransportSniff = true; private Boolean clientIgnoreClusterName = Boolean.FALSE; @@ -51,8 +47,6 @@ public class TransportClientFactoryBean implements FactoryBean, private String clientNodesSamplerInterval = "5s"; private TransportClient client; private Properties properties; - static final String COLON = ":"; - static final String COMMA = ","; @Override public void destroy() throws Exception { @@ -89,21 +83,11 @@ public class TransportClientFactoryBean implements FactoryBean, protected void buildClient() throws Exception { client = new PreBuiltTransportClient(settings()); - Assert.hasText(clusterNodes, "[Assertion failed] clusterNodes settings missing."); - String[] clusterNodesArray = StringUtils.split(clusterNodes, COMMA); - if (clusterNodesArray != null) { - for (String clusterNode : clusterNodesArray) { - if (clusterNode != null) { - int colonPosition = clusterName.lastIndexOf(COLON); - String hostName = colonPosition != -1 ? clusterNode.substring(0, colonPosition) : clusterNode; - String port = colonPosition != -1 ? clusterNode.substring(colonPosition, clusterNode.length()) : ""; - Assert.hasText(hostName, "[Assertion failed] missing host name in 'clusterNodes'"); - Assert.hasText(port, "[Assertion failed] missing port in 'clusterNodes'"); - logger.info("adding transport node : " + clusterNode); - client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostName), Integer.valueOf(port))); - } - } - } + + clusterNodes.stream() // + .peek(it -> logger.info("Adding transport node : " + it.toString())) // + .forEach(client::addTransportAddress); + client.connectedNodes(); } @@ -127,7 +111,7 @@ public class TransportClientFactoryBean implements FactoryBean, } public void setClusterNodes(String clusterNodes) { - this.clusterNodes = clusterNodes; + this.clusterNodes = ClusterNodes.of(clusterNodes); } public void setClusterName(String clusterName) { diff --git a/src/test/java/org/springframework/data/elasticsearch/client/ClusterNodesUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/ClusterNodesUnitTests.java new file mode 100644 index 000000000..6159516c5 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/client/ClusterNodesUnitTests.java @@ -0,0 +1,90 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +import static org.assertj.core.api.Assertions.*; + +import java.util.List; + +import org.elasticsearch.common.transport.TransportAddress; +import org.junit.Test; + +/** + * Unit tests for {@link ClusterNodes}. + * + * @author Oliver Gierke + */ +public class ClusterNodesUnitTests { + + @Test // DATAES-470 + public void parsesSingleClusterNode() { + + ClusterNodes nodes = ClusterNodes.DEFAULT; + + assertThat(nodes).hasSize(1) // + .first().satisfies(it -> { + assertThat(it.getAddress()).isEqualTo("127.0.0.1"); + assertThat(it.getPort()).isEqualTo(9300); + }); + } + + @Test // DATAES-470 + public void parsesMultiClusterNode() { + + ClusterNodes nodes = ClusterNodes.of("127.0.0.1:1234,10.1.0.1:5678"); + + assertThat(nodes.stream()).hasSize(2); // + assertThat(nodes.stream()).element(0).satisfies(it -> { + assertThat(it.getAddress()).isEqualTo("127.0.0.1"); + assertThat(it.getPort()).isEqualTo(1234); + }); + assertThat(nodes.stream()).element(1).satisfies(it -> { + assertThat(it.getAddress()).isEqualTo("10.1.0.1"); + assertThat(it.getPort()).isEqualTo(5678); + }); + } + + @Test // DATAES-470 + public void rejectsEmptyHostName() { + + assertThatExceptionOfType(IllegalArgumentException.class) // + .isThrownBy(() -> ClusterNodes.of(":8080")) // + .withMessageContaining("host"); + } + + @Test // DATAES-470 + public void rejectsEmptyPort() { + + assertThatExceptionOfType(IllegalArgumentException.class) // + .isThrownBy(() -> ClusterNodes.of("localhost:")) // + .withMessageContaining("port"); + } + + @Test // DATAES-470 + public void rejectsMissingPort() { + + assertThatExceptionOfType(IllegalArgumentException.class) // + .isThrownBy(() -> ClusterNodes.of("localhost")) // + .withMessageContaining("host:port"); + } + + @Test // DATAES-470 + public void rejectsUnresolvableHost() { + + assertThatExceptionOfType(IllegalArgumentException.class) // + .isThrownBy(() -> ClusterNodes.of("mylocalhost:80")); + } +}