DATAES-470 - Fixed parsing of cluster nodes in TransportClientFactoryBean.

Extracted ClusterNodes value object to capture the parsing logic and actually properly test it. Added unit tests to verify the proper rejection and the two cases outlined in the ticket.

Related tickets: DATAES-283.
This commit is contained in:
Oliver Gierke 2018-07-12 21:48:31 +02:00
parent cdbc832068
commit 469455383f
No known key found for this signature in database
GPG Key ID: 6E42B5787543F690
3 changed files with 201 additions and 25 deletions

View File

@ -0,0 +1,102 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.elasticsearch.common.transport.TransportAddress;
import org.springframework.data.util.Streamable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* Value object to represent a list of cluster nodes.
*
* @author Oliver Gierke
* @since 3.1
*/
class ClusterNodes implements Streamable<TransportAddress> {
public static ClusterNodes DEFAULT = ClusterNodes.of("127.0.0.1:9300");
private static final String COLON = ":";
private static final String COMMA = ",";
private final List<TransportAddress> clusterNodes;
/**
* Creates a new {@link ClusterNodes} by parsing the given source.
*
* @param source must not be {@literal null} or empty.
*/
private ClusterNodes(String source) {
Assert.hasText(source, "Cluster nodes source must not be null or empty!");
String[] nodes = StringUtils.delimitedListToStringArray(source, COMMA);
this.clusterNodes = Arrays.stream(nodes).map(node -> {
String[] segments = StringUtils.delimitedListToStringArray(node, COLON);
Assert.isTrue(segments.length == 2,
() -> String.format("Invalid cluster node %s in %s! Must be in the format host:port!", node, source));
String host = segments[0].trim();
String port = segments[1].trim();
Assert.hasText(host, () -> String.format("No host name given cluster node %s!", node));
Assert.hasText(port, () -> String.format("No port given in cluster node %s!", node));
return new TransportAddress(toInetAddress(host), Integer.valueOf(port));
}).collect(Collectors.toList());
}
/**
* Creates a new {@link ClusterNodes} by parsing the given source. The expected format is a comma separated list of
* host-port-combinations separated by a colon: {@code host:port,host:port,}.
*
* @param source must not be {@literal null} or empty.
* @return
*/
public static ClusterNodes of(String source) {
return new ClusterNodes(source);
}
/*
* (non-Javadoc)
* @see java.lang.Iterable#iterator()
*/
@Override
public Iterator<TransportAddress> iterator() {
return clusterNodes.iterator();
}
private static InetAddress toInetAddress(String host) {
try {
return InetAddress.getByName(host);
} catch (UnknownHostException o_O) {
throw new IllegalArgumentException(o_O);
}
}
}

View File

@ -15,20 +15,16 @@
*/
package org.springframework.data.elasticsearch.client;
import java.net.InetAddress;
import java.util.Properties;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.beans.factory.InitializingBean;
/**
* TransportClientFactoryBean
@ -38,12 +34,12 @@ import org.springframework.util.StringUtils;
* @author Jakub Vavrik
* @author Piotr Betkier
* @author Ilkang Na
* @author Oliver Gierke
*/
public class TransportClientFactoryBean implements FactoryBean<TransportClient>, InitializingBean, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(TransportClientFactoryBean.class);
private String clusterNodes = "127.0.0.1:9300";
private ClusterNodes clusterNodes = ClusterNodes.of("127.0.0.1:9300");
private String clusterName = "elasticsearch";
private Boolean clientTransportSniff = true;
private Boolean clientIgnoreClusterName = Boolean.FALSE;
@ -51,8 +47,6 @@ public class TransportClientFactoryBean implements FactoryBean<TransportClient>,
private String clientNodesSamplerInterval = "5s";
private TransportClient client;
private Properties properties;
static final String COLON = ":";
static final String COMMA = ",";
@Override
public void destroy() throws Exception {
@ -89,21 +83,11 @@ public class TransportClientFactoryBean implements FactoryBean<TransportClient>,
protected void buildClient() throws Exception {
client = new PreBuiltTransportClient(settings());
Assert.hasText(clusterNodes, "[Assertion failed] clusterNodes settings missing.");
String[] clusterNodesArray = StringUtils.split(clusterNodes, COMMA);
if (clusterNodesArray != null) {
for (String clusterNode : clusterNodesArray) {
if (clusterNode != null) {
int colonPosition = clusterName.lastIndexOf(COLON);
String hostName = colonPosition != -1 ? clusterNode.substring(0, colonPosition) : clusterNode;
String port = colonPosition != -1 ? clusterNode.substring(colonPosition, clusterNode.length()) : "";
Assert.hasText(hostName, "[Assertion failed] missing host name in 'clusterNodes'");
Assert.hasText(port, "[Assertion failed] missing port in 'clusterNodes'");
logger.info("adding transport node : " + clusterNode);
client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostName), Integer.valueOf(port)));
}
}
}
clusterNodes.stream() //
.peek(it -> logger.info("Adding transport node : " + it.toString())) //
.forEach(client::addTransportAddress);
client.connectedNodes();
}
@ -127,7 +111,7 @@ public class TransportClientFactoryBean implements FactoryBean<TransportClient>,
}
public void setClusterNodes(String clusterNodes) {
this.clusterNodes = clusterNodes;
this.clusterNodes = ClusterNodes.of(clusterNodes);
}
public void setClusterName(String clusterName) {

View File

@ -0,0 +1,90 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.elasticsearch.common.transport.TransportAddress;
import org.junit.Test;
/**
* Unit tests for {@link ClusterNodes}.
*
* @author Oliver Gierke
*/
public class ClusterNodesUnitTests {
@Test // DATAES-470
public void parsesSingleClusterNode() {
ClusterNodes nodes = ClusterNodes.DEFAULT;
assertThat(nodes).hasSize(1) //
.first().satisfies(it -> {
assertThat(it.getAddress()).isEqualTo("127.0.0.1");
assertThat(it.getPort()).isEqualTo(9300);
});
}
@Test // DATAES-470
public void parsesMultiClusterNode() {
ClusterNodes nodes = ClusterNodes.of("127.0.0.1:1234,10.1.0.1:5678");
assertThat(nodes.stream()).hasSize(2); //
assertThat(nodes.stream()).element(0).satisfies(it -> {
assertThat(it.getAddress()).isEqualTo("127.0.0.1");
assertThat(it.getPort()).isEqualTo(1234);
});
assertThat(nodes.stream()).element(1).satisfies(it -> {
assertThat(it.getAddress()).isEqualTo("10.1.0.1");
assertThat(it.getPort()).isEqualTo(5678);
});
}
@Test // DATAES-470
public void rejectsEmptyHostName() {
assertThatExceptionOfType(IllegalArgumentException.class) //
.isThrownBy(() -> ClusterNodes.of(":8080")) //
.withMessageContaining("host");
}
@Test // DATAES-470
public void rejectsEmptyPort() {
assertThatExceptionOfType(IllegalArgumentException.class) //
.isThrownBy(() -> ClusterNodes.of("localhost:")) //
.withMessageContaining("port");
}
@Test // DATAES-470
public void rejectsMissingPort() {
assertThatExceptionOfType(IllegalArgumentException.class) //
.isThrownBy(() -> ClusterNodes.of("localhost")) //
.withMessageContaining("host:port");
}
@Test // DATAES-470
public void rejectsUnresolvableHost() {
assertThatExceptionOfType(IllegalArgumentException.class) //
.isThrownBy(() -> ClusterNodes.of("mylocalhost:80"));
}
}