From f6a5a0a4adab554bf8c19f3e7b16c2c1f0f4b6a5 Mon Sep 17 00:00:00 2001 From: javanna Date: Wed, 4 May 2016 16:27:52 +0200 Subject: [PATCH] get rid of Node abstraction --- .../client/AbstractStaticConnectionPool.java | 13 +- .../org/elasticsearch/client/Connection.java | 18 +-- .../client/ElasticsearchResponse.java | 20 +-- .../ElasticsearchResponseException.java | 32 +++-- .../java/org/elasticsearch/client/Node.java | 125 ---------------- .../elasticsearch/client/RequestLogger.java | 13 +- .../org/elasticsearch/client/Sniffer.java | 54 +++---- .../client/SniffingConnectionPool.java | 29 ++-- .../client/StatefulConnection.java | 8 +- .../client/StaticConnectionPool.java | 19 +-- .../org/elasticsearch/client/Transport.java | 14 +- .../org/elasticsearch/client/NodeTests.java | 136 ------------------ .../elasticsearch/client/SnifferTests.java | 63 ++++---- .../client/SniffingConnectionPoolTests.java | 28 ++-- .../client/StaticConnectionPoolTests.java | 22 +-- 15 files changed, 171 insertions(+), 423 deletions(-) delete mode 100644 client/src/main/java/org/elasticsearch/client/Node.java delete mode 100644 client/src/test/java/org/elasticsearch/client/NodeTests.java diff --git a/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java b/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java index f30a2b9c5c1..c043dd17059 100644 --- a/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java @@ -21,6 +21,7 @@ package org.elasticsearch.client; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpHost; import java.io.IOException; import java.util.ArrayList; @@ -74,11 +75,11 @@ public abstract class AbstractStaticConnectionPool implements ConnectionPool connection.isAlive() || connection.shouldBeRetried()); } - protected List createConnections(Node... nodes) { + protected List createConnections(HttpHost... hosts) { List connections = new ArrayList<>(); - for (Node node : nodes) { - Objects.requireNonNull(node, "node cannot be null"); - connections.add(new StatefulConnection(node)); + for (HttpHost host : hosts) { + Objects.requireNonNull(host, "host cannot be null"); + connections.add(new StatefulConnection(host)); } return Collections.unmodifiableList(connections); } @@ -94,12 +95,12 @@ public abstract class AbstractStaticConnectionPool implements ConnectionPool= 502 && statusLine.getStatusCode() <= 504; } - public Node getNode() { - return node; + /** + * Returns the {@link HttpHost} that returned the error + */ + public HttpHost getHost() { + return host; } + /** + * Returns the {@link RequestLine} that triggered the error + */ public RequestLine getRequestLine() { return requestLine; } + /** + * Returns the {@link StatusLine} that was returned by elasticsearch + */ public StatusLine getStatusLine() { return statusLine; } diff --git a/client/src/main/java/org/elasticsearch/client/Node.java b/client/src/main/java/org/elasticsearch/client/Node.java deleted file mode 100644 index ab6d3631527..00000000000 --- a/client/src/main/java/org/elasticsearch/client/Node.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import org.apache.http.HttpHost; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -/** - * Represents an Elasticsearch node. - * Holds its http address as an {@link HttpHost} instance, as well as its optional set of roles and attributes. - * Roles and attributes can be populated in one of the two following ways: - * 1) using a connection pool that supports sniffing, so that all the info is retrieved from elasticsearch itself - * 2) manually passing the info through the {@link #Node(HttpHost, Set, Map)} constructor - * Roles and attributes may be taken into account as part of connection selection by the connection pool, which - * can be customized by passing in a predicate at connection pool creation. - */ -public class Node { - private final HttpHost httpHost; - private final Set roles; - private final Map attributes; - - /** - * Creates a node given its http address as an {@link HttpHost} instance. - * Roles are not provided hence all possible roles will be assumed, as that is the default in Elasticsearch. - * No attributes will be associated with the node. - * - * @param httpHost the http address of the node - */ - public Node(HttpHost httpHost) { - this(httpHost, new HashSet<>(Arrays.asList(Role.values())), Collections.emptyMap()); - } - - /** - * Creates a node given its http address as an {@link HttpHost} instance, its set or roles and attributes. - * - * @param httpHost the http address of the node - * @param roles the set of roles that the node fulfills within the cluster - * @param attributes the attributes associated with the node - */ - public Node(HttpHost httpHost, Set roles, Map attributes) { - Objects.requireNonNull(httpHost, "host cannot be null"); - Objects.requireNonNull(roles, "roles cannot be null"); - Objects.requireNonNull(attributes, "attributes cannot be null"); - this.httpHost = httpHost; - this.roles = Collections.unmodifiableSet(roles); - this.attributes = Collections.unmodifiableMap(attributes); - } - - /** - * Returns the http address of the node - */ - public HttpHost getHttpHost() { - return httpHost; - } - - /** - * Returns the set of roles associated with the node - */ - public Set getRoles() { - return roles; - } - - /** - * Returns the set of attributes associated with the node - */ - public Map getAttributes() { - return attributes; - } - - @Override - public String toString() { - return "Node{" + - "httpHost=" + httpHost + - ", roles=" + roles + - ", attributes=" + attributes + - '}'; - } - - /** - * Holds all the potential roles that a node can fulfill within a cluster - */ - public enum Role { - /** - * Data node - */ - DATA, - /** - * Master eligible node - */ - MASTER, - /** - * Ingest node - */ - INGEST; - - @Override - public String toString() { - return name().toLowerCase(Locale.ROOT); - } - } -} diff --git a/client/src/main/java/org/elasticsearch/client/RequestLogger.java b/client/src/main/java/org/elasticsearch/client/RequestLogger.java index 8423dcb8ade..99ce924c164 100644 --- a/client/src/main/java/org/elasticsearch/client/RequestLogger.java +++ b/client/src/main/java/org/elasticsearch/client/RequestLogger.java @@ -20,13 +20,14 @@ package org.elasticsearch.client; import org.apache.commons.logging.Log; +import org.apache.http.HttpHost; import org.apache.http.RequestLine; import org.apache.http.StatusLine; import java.io.IOException; /** - * Helper class that exposes static method to unify the way requests are logged + * Helper class that exposes static methods to unify the way requests are logged */ public final class RequestLogger { @@ -36,16 +37,14 @@ public final class RequestLogger { /** * Logs a request that yielded a response */ - public static void log(Log logger, String message, RequestLine requestLine, Node node, StatusLine statusLine) { - logger.debug(message + " [" + requestLine.getMethod() + " " + node.getHttpHost() + - requestLine.getUri() + "] [" + statusLine + "]"); + public static void log(Log logger, String message, RequestLine requestLine, HttpHost host, StatusLine statusLine) { + logger.debug(message + " [" + requestLine.getMethod() + " " + host + requestLine.getUri() + "] [" + statusLine + "]"); } /** * Logs a request that failed */ - public static void log(Log logger, String message, RequestLine requestLine, Node node, IOException e) { - logger.debug(message + " [" + requestLine.getMethod() + " " + node.getHttpHost() + - requestLine.getUri() + "]", e); + public static void log(Log logger, String message, RequestLine requestLine, HttpHost host, IOException e) { + logger.debug(message + " [" + requestLine.getMethod() + " " + host + requestLine.getUri() + "]", e); } } diff --git a/client/src/main/java/org/elasticsearch/client/Sniffer.java b/client/src/main/java/org/elasticsearch/client/Sniffer.java index 07d485de879..1bec4eb87e1 100644 --- a/client/src/main/java/org/elasticsearch/client/Sniffer.java +++ b/client/src/main/java/org/elasticsearch/client/Sniffer.java @@ -37,14 +37,13 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Objects; -import java.util.Set; +/** + * Calls nodes info api and returns a list of http hosts extracted from it + */ +//TODO this could potentially a call to _cat/nodes (although it doesn't support timeout param), but how would we handle bw comp with 2.x? final class Sniffer { private static final Log logger = LogFactory.getLog(Sniffer.class); @@ -69,34 +68,34 @@ final class Sniffer { this.jsonFactory = new JsonFactory(); } - List sniffNodes(Node node) throws IOException { + List sniffNodes(HttpHost host) throws IOException { HttpGet httpGet = new HttpGet("/_nodes/http?timeout=" + sniffRequestTimeout + "ms"); httpGet.setConfig(sniffRequestConfig); - try (CloseableHttpResponse response = client.execute(node.getHttpHost(), httpGet)) { + try (CloseableHttpResponse response = client.execute(host, httpGet)) { StatusLine statusLine = response.getStatusLine(); if (statusLine.getStatusCode() >= 300) { - RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), node, statusLine); + RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), host, statusLine); EntityUtils.consume(response.getEntity()); - throw new ElasticsearchResponseException(httpGet.getRequestLine(), node, statusLine); + throw new ElasticsearchResponseException(httpGet.getRequestLine(), host, statusLine); } else { - List nodes = readNodes(response.getEntity()); - RequestLogger.log(logger, "sniff succeeded", httpGet.getRequestLine(), node, statusLine); + List nodes = readHosts(response.getEntity()); + RequestLogger.log(logger, "sniff succeeded", httpGet.getRequestLine(), host, statusLine); return nodes; } } catch(IOException e) { - RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), node, e); + RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), host, e); throw e; } } - private List readNodes(HttpEntity entity) throws IOException { + private List readHosts(HttpEntity entity) throws IOException { try (InputStream inputStream = entity.getContent()) { JsonParser parser = jsonFactory.createParser(inputStream); if (parser.nextToken() != JsonToken.START_OBJECT) { throw new IOException("expected data to start with an object"); } - List nodes = new ArrayList<>(); + List hosts = new ArrayList<>(); while (parser.nextToken() != JsonToken.END_OBJECT) { if (parser.getCurrentToken() == JsonToken.START_OBJECT) { if ("nodes".equals(parser.getCurrentName())) { @@ -104,10 +103,10 @@ final class Sniffer { JsonToken token = parser.nextToken(); assert token == JsonToken.START_OBJECT; String nodeId = parser.getCurrentName(); - Node sniffedNode = readNode(nodeId, parser, this.scheme); - if (sniffedNode != null) { + HttpHost sniffedHost = readNode(nodeId, parser, this.scheme); + if (sniffedHost != null) { logger.trace("adding node [" + nodeId + "]"); - nodes.add(sniffedNode); + hosts.add(sniffedHost); } } } else { @@ -115,31 +114,20 @@ final class Sniffer { } } } - return nodes; + return hosts; } } - private static Node readNode(String nodeId, JsonParser parser, String scheme) throws IOException { + private static HttpHost readNode(String nodeId, JsonParser parser, String scheme) throws IOException { HttpHost httpHost = null; - Set roles = new HashSet<>(); - Map attributes = new HashMap<>(); String fieldName = null; while (parser.nextToken() != JsonToken.END_OBJECT) { if (parser.getCurrentToken() == JsonToken.FIELD_NAME) { fieldName = parser.getCurrentName(); - } else if (parser.getCurrentToken() == JsonToken.START_ARRAY && "roles".equals(fieldName)) { - while (parser.nextToken() != JsonToken.END_ARRAY) { - roles.add(Node.Role.valueOf(parser.getValueAsString().toUpperCase(Locale.ROOT))); - } } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) { - if ("attributes".equals(fieldName)) { + if ("http".equals(fieldName)) { while (parser.nextToken() != JsonToken.END_OBJECT) { - attributes.put(parser.getCurrentName(), parser.getValueAsString()); - } - } else if ("http".equals(fieldName)) { - while (parser.nextToken() != JsonToken.END_OBJECT) { - if (parser.getCurrentToken() == JsonToken.VALUE_STRING && - "publish_address".equals(parser.getCurrentName())) { + if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) { URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString()); httpHost = new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(), boundAddressAsURI.getScheme()); @@ -157,6 +145,6 @@ final class Sniffer { logger.debug("skipping node [" + nodeId + "] with http disabled"); return null; } - return new Node(httpHost, roles, attributes); + return httpHost; } } diff --git a/client/src/main/java/org/elasticsearch/client/SniffingConnectionPool.java b/client/src/main/java/org/elasticsearch/client/SniffingConnectionPool.java index 0488033c1ca..25d17b45ee5 100644 --- a/client/src/main/java/org/elasticsearch/client/SniffingConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/SniffingConnectionPool.java @@ -21,6 +21,7 @@ package org.elasticsearch.client; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; @@ -48,7 +49,7 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool { public SniffingConnectionPool(int sniffInterval, boolean sniffOnFailure, int sniffAfterFailureDelay, CloseableHttpClient client, RequestConfig sniffRequestConfig, int sniffRequestTimeout, Scheme scheme, - Predicate connectionSelector, Node... nodes) { + Predicate connectionSelector, HttpHost... hosts) { super(connectionSelector); if (sniffInterval <= 0) { throw new IllegalArgumentException("sniffInterval must be greater than 0"); @@ -57,12 +58,12 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool { throw new IllegalArgumentException("sniffAfterFailureDelay must be greater than 0"); } Objects.requireNonNull(scheme, "scheme cannot be null"); - if (nodes == null || nodes.length == 0) { - throw new IllegalArgumentException("no nodes provided"); + if (hosts == null || hosts.length == 0) { + throw new IllegalArgumentException("no hosts provided"); } this.sniffOnFailure = sniffOnFailure; this.sniffer = new Sniffer(client, sniffRequestConfig, sniffRequestTimeout, scheme.toString()); - this.connections = createConnections(nodes); + this.connections = createConnections(hosts); this.snifferTask = new SnifferTask(sniffInterval, sniffAfterFailureDelay); } @@ -81,7 +82,7 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool { super.onFailure(connection); if (sniffOnFailure) { //re-sniff immediately but take out the node that failed - snifferTask.sniffOnFailure(connection.getNode()); + snifferTask.sniffOnFailure(connection.getHost()); } } @@ -119,22 +120,22 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool { sniff(node -> true); } - void sniffOnFailure(Node failedNode) { + void sniffOnFailure(HttpHost failedHost) { //sync sniff straightaway on failure failure = true; - sniff(node -> node.getHttpHost().equals(failedNode.getHttpHost()) == false); + sniff(host -> host.equals(failedHost) == false); } - void sniff(Predicate nodeFilter) { + void sniff(Predicate hostFilter) { if (running.compareAndSet(false, true)) { try { Iterator connectionIterator = nextUnfilteredConnection().iterator(); if (connectionIterator.hasNext()) { - sniff(connectionIterator, nodeFilter); + sniff(connectionIterator, hostFilter); } else { StatefulConnection connection = lastResortConnection(); - logger.info("no healthy nodes available, trying " + connection.getNode()); - sniff(Stream.of(connection).iterator(), nodeFilter); + logger.info("no healthy nodes available, trying " + connection.getHost()); + sniff(Stream.of(connection).iterator(), hostFilter); } } catch (Throwable t) { logger.error("error while sniffing nodes", t); @@ -158,13 +159,13 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool { } } - void sniff(Iterator connectionIterator, Predicate nodeFilter) throws IOException { + void sniff(Iterator connectionIterator, Predicate hostFilter) throws IOException { IOException lastSeenException = null; while (connectionIterator.hasNext()) { StatefulConnection connection = connectionIterator.next(); try { - List sniffedNodes = sniffer.sniffNodes(connection.getNode()); - Node[] filteredNodes = sniffedNodes.stream().filter(nodeFilter).toArray(Node[]::new); + List sniffedNodes = sniffer.sniffNodes(connection.getHost()); + HttpHost[] filteredNodes = sniffedNodes.stream().filter(hostFilter).toArray(HttpHost[]::new); logger.debug("adding " + filteredNodes.length + " nodes out of " + sniffedNodes.size() + " sniffed nodes"); connections = createConnections(filteredNodes); onSuccess(connection); diff --git a/client/src/main/java/org/elasticsearch/client/StatefulConnection.java b/client/src/main/java/org/elasticsearch/client/StatefulConnection.java index 1ac72382e19..50ac78cdde9 100644 --- a/client/src/main/java/org/elasticsearch/client/StatefulConnection.java +++ b/client/src/main/java/org/elasticsearch/client/StatefulConnection.java @@ -19,6 +19,8 @@ package org.elasticsearch.client; +import org.apache.http.HttpHost; + import java.util.concurrent.TimeUnit; /** @@ -43,10 +45,10 @@ public final class StatefulConnection extends Connection { private volatile long deadUntil = -1; /** - * Creates a new mutable connection pointing to the provided {@link Node} argument + * Creates a new mutable connection pointing to the provided {@link HttpHost} argument */ - public StatefulConnection(Node node) { - super(node); + public StatefulConnection(HttpHost host) { + super(host); } /** diff --git a/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java b/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java index 5fb32341c99..539b7ad0f1f 100644 --- a/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java @@ -21,6 +21,7 @@ package org.elasticsearch.client; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpHost; import org.apache.http.StatusLine; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; @@ -43,17 +44,17 @@ public class StaticConnectionPool extends AbstractStaticConnectionPool { private final List connections; public StaticConnectionPool(CloseableHttpClient client, boolean pingEnabled, RequestConfig pingRequestConfig, - Predicate connectionSelector, Node... nodes) { + Predicate connectionSelector, HttpHost... hosts) { super(connectionSelector); Objects.requireNonNull(client, "client cannot be null"); Objects.requireNonNull(pingRequestConfig, "pingRequestConfig cannot be null"); - if (nodes == null || nodes.length == 0) { - throw new IllegalArgumentException("no nodes provided"); + if (hosts == null || hosts.length == 0) { + throw new IllegalArgumentException("no hosts provided"); } this.client = client; this.pingEnabled = pingEnabled; this.pingRequestConfig = pingRequestConfig; - this.connections = createConnections(nodes); + this.connections = createConnections(hosts); } @Override @@ -67,20 +68,20 @@ public class StaticConnectionPool extends AbstractStaticConnectionPool { HttpHead httpHead = new HttpHead("/"); httpHead.setConfig(pingRequestConfig); StatusLine statusLine; - try(CloseableHttpResponse httpResponse = client.execute(connection.getNode().getHttpHost(), httpHead)) { + try(CloseableHttpResponse httpResponse = client.execute(connection.getHost(), httpHead)) { statusLine = httpResponse.getStatusLine(); EntityUtils.consume(httpResponse.getEntity()); } catch(IOException e) { - RequestLogger.log(logger, "ping failed", httpHead.getRequestLine(), connection.getNode(), e); + RequestLogger.log(logger, "ping failed", httpHead.getRequestLine(), connection.getHost(), e); onFailure(connection); throw e; } if (statusLine.getStatusCode() >= 300) { - RequestLogger.log(logger, "ping failed", httpHead.getRequestLine(), connection.getNode(), statusLine); + RequestLogger.log(logger, "ping failed", httpHead.getRequestLine(), connection.getHost(), statusLine); onFailure(connection); - throw new ElasticsearchResponseException(httpHead.getRequestLine(), connection.getNode(), statusLine); + throw new ElasticsearchResponseException(httpHead.getRequestLine(), connection.getHost(), statusLine); } else { - RequestLogger.log(logger, "ping succeeded", httpHead.getRequestLine(), connection.getNode(), statusLine); + RequestLogger.log(logger, "ping succeeded", httpHead.getRequestLine(), connection.getHost(), statusLine); onSuccess(connection); } } diff --git a/client/src/main/java/org/elasticsearch/client/Transport.java b/client/src/main/java/org/elasticsearch/client/Transport.java index 88bf1bf6c7a..ff0835a991c 100644 --- a/client/src/main/java/org/elasticsearch/client/Transport.java +++ b/client/src/main/java/org/elasticsearch/client/Transport.java @@ -69,7 +69,7 @@ final class Transport implements Closeable { Iterator connectionIterator = connectionPool.nextConnection().iterator(); if (connectionIterator.hasNext() == false) { C connection = connectionPool.lastResortConnection(); - logger.info("no healthy nodes available, trying " + connection.getNode()); + logger.info("no healthy nodes available, trying " + connection.getHost()); return performRequest(request, Stream.of(connection).iterator()); } return performRequest(request, connectionIterator); @@ -127,9 +127,9 @@ final class Transport implements Closeable { private ElasticsearchResponse performRequest(HttpRequestBase request, C connection) throws IOException { CloseableHttpResponse response; try { - response = client.execute(connection.getNode().getHttpHost(), request); + response = client.execute(connection.getHost(), request); } catch(IOException e) { - RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getNode(), e); + RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), e); throw e; } finally { request.reset(); @@ -138,12 +138,12 @@ final class Transport implements Closeable { //TODO make ignore status code configurable. rest-spec and tests support that parameter. if (statusLine.getStatusCode() < 300 || request.getMethod().equals(HttpHead.METHOD_NAME) && statusLine.getStatusCode() == 404) { - RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getNode(), response.getStatusLine()); - return new ElasticsearchResponse(request.getRequestLine(), connection.getNode(), response); + RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getHost(), response.getStatusLine()); + return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response); } else { EntityUtils.consume(response.getEntity()); - RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getNode(), response.getStatusLine()); - throw new ElasticsearchResponseException(request.getRequestLine(), connection.getNode(), statusLine); + RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), response.getStatusLine()); + throw new ElasticsearchResponseException(request.getRequestLine(), connection.getHost(), statusLine); } } diff --git a/client/src/test/java/org/elasticsearch/client/NodeTests.java b/client/src/test/java/org/elasticsearch/client/NodeTests.java deleted file mode 100644 index 73bad0f6d7b..00000000000 --- a/client/src/test/java/org/elasticsearch/client/NodeTests.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import com.carrotsearch.randomizedtesting.generators.RandomInts; -import com.carrotsearch.randomizedtesting.generators.RandomPicks; -import com.carrotsearch.randomizedtesting.generators.RandomStrings; -import org.apache.http.HttpHost; -import org.apache.lucene.util.LuceneTestCase; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.sameInstance; - -public class NodeTests extends LuceneTestCase { - - public void testSingleArgumentConstructor() { - HttpHost httpHost = new HttpHost(randomHost(), randomPort(), randomScheme()); - Node node = new Node(httpHost); - assertThat(node.getHttpHost(), sameInstance(httpHost)); - assertThat(node.getAttributes(), notNullValue()); - assertThat(node.getAttributes().size(), equalTo(0)); - assertThat(node.getRoles(), notNullValue()); - assertThat(node.getRoles(), equalTo(new HashSet<>(Arrays.asList(Node.Role.values())))); - - try { - new Node(null); - fail("node construction should have failed"); - } catch(NullPointerException e) { - assertThat(e.getMessage(), equalTo("host cannot be null")); - } - - } - - public void testThreeArgumentsConstructor() { - HttpHost httpHost = new HttpHost(randomHost(), randomPort(), randomScheme()); - Set roles = randomRoles(); - Map attributes = randomAttributes(); - Node node = new Node(httpHost, roles, attributes); - assertThat(node.getHttpHost(), sameInstance(httpHost)); - assertThat(node.getAttributes(), equalTo(attributes)); - assertThat(node.getRoles(), equalTo(roles)); - - try { - new Node(null, roles, attributes); - fail("node construction should have failed"); - } catch(NullPointerException e) { - assertThat(e.getMessage(), equalTo("host cannot be null")); - } - - try { - new Node(httpHost, null, attributes); - fail("node construction should have failed"); - } catch(NullPointerException e) { - assertThat(e.getMessage(), equalTo("roles cannot be null")); - } - - try { - new Node(httpHost, roles, null); - fail("node construction should have failed"); - } catch(NullPointerException e) { - assertThat(e.getMessage(), equalTo("attributes cannot be null")); - } - } - - public void testToString() { - HttpHost httpHost = new HttpHost(randomHost(), randomPort(), randomScheme()); - Set roles = randomRoles(); - Map attributes = randomAttributes(); - Node node = new Node(httpHost, roles, attributes); - String expectedString = "Node{" + - "httpHost=" + httpHost.toString() + - ", roles=" + roles.toString() + - ", attributes=" + attributes.toString() + - '}'; - assertThat(node.toString(), equalTo(expectedString)); - } - - private static String randomHost() { - return RandomStrings.randomAsciiOfLengthBetween(random(), 5, 10); - } - - private static int randomPort() { - return random().nextInt(); - } - - private static String randomScheme() { - if (rarely()) { - return null; - } - return random().nextBoolean() ? "http" : "https"; - } - - private static Map randomAttributes() { - int numAttributes = RandomInts.randomIntBetween(random(), 0, 5); - Map attributes = new HashMap<>(numAttributes); - for (int i = 0; i < numAttributes; i++) { - String key = RandomStrings.randomAsciiOfLengthBetween(random(), 3, 10); - String value = RandomStrings.randomAsciiOfLengthBetween(random(), 3, 10); - attributes.put(key, value); - } - return attributes; - } - - private static Set randomRoles() { - int numRoles = RandomInts.randomIntBetween(random(), 0, 3); - Set roles = new HashSet<>(numRoles); - for (int j = 0; j < numRoles; j++) { - roles.add(RandomPicks.randomFrom(random(), Node.Role.values())); - } - return roles; - } -} diff --git a/client/src/test/java/org/elasticsearch/client/SnifferTests.java b/client/src/test/java/org/elasticsearch/client/SnifferTests.java index 9169afd7695..c2caa5ab4fa 100644 --- a/client/src/test/java/org/elasticsearch/client/SnifferTests.java +++ b/client/src/test/java/org/elasticsearch/client/SnifferTests.java @@ -90,24 +90,21 @@ public class SnifferTests extends LuceneTestCase { Sniffer sniffer = new Sniffer(client, RequestConfig.DEFAULT, sniffRequestTimeout, scheme.toString()); HttpHost httpHost = new HttpHost(server.getHostName(), server.getPort()); try { - List sniffedNodes = sniffer.sniffNodes(new Node(httpHost)); + List sniffedHosts = sniffer.sniffNodes(httpHost); if (sniffResponse.isFailure) { fail("sniffNodes should have failed"); } - assertThat(sniffedNodes.size(), equalTo(sniffResponse.nodes.size())); - Iterator responseNodesIterator = sniffResponse.nodes.iterator(); - for (Node sniffedNode : sniffedNodes) { - Node responseNode = responseNodesIterator.next(); - assertThat(sniffedNode.getHttpHost(), equalTo(responseNode.getHttpHost())); - assertThat(sniffedNode.getRoles(), equalTo(responseNode.getRoles())); - assertThat(sniffedNode.getAttributes(), equalTo(responseNode.getAttributes())); + assertThat(sniffedHosts.size(), equalTo(sniffResponse.hosts.size())); + Iterator responseHostsIterator = sniffResponse.hosts.iterator(); + for (HttpHost sniffedHost : sniffedHosts) { + assertEquals(sniffedHost, responseHostsIterator.next()); } } catch(ElasticsearchResponseException e) { if (sniffResponse.isFailure) { assertThat(e.getMessage(), containsString("GET http://localhost:" + server.getPort() + "/_nodes/http?timeout=" + sniffRequestTimeout)); assertThat(e.getMessage(), containsString(Integer.toString(sniffResponse.nodesInfoResponseCode))); - assertThat(e.getNode().getHttpHost(), equalTo(httpHost)); + assertThat(e.getHost(), equalTo(httpHost)); assertThat(e.getStatusLine().getStatusCode(), equalTo(sniffResponse.nodesInfoResponseCode)); assertThat(e.getRequestLine().toString(), equalTo("GET /_nodes/http?timeout=" + sniffRequestTimeout + "ms HTTP/1.1")); } else { @@ -141,7 +138,7 @@ public class SnifferTests extends LuceneTestCase { private static SniffResponse buildSniffResponse(SniffingConnectionPool.Scheme scheme) throws IOException { int numNodes = RandomInts.randomIntBetween(random(), 1, 5); - List nodes = new ArrayList<>(numNodes); + List hosts = new ArrayList<>(numNodes); JsonFactory jsonFactory = new JsonFactory(); StringWriter writer = new StringWriter(); JsonGenerator generator = jsonFactory.createGenerator(writer); @@ -168,25 +165,11 @@ public class SnifferTests extends LuceneTestCase { generator.writeEndArray(); } boolean isHttpEnabled = rarely() == false; - int numRoles = RandomInts.randomIntBetween(random(), 0, 3); - Set nodeRoles = new HashSet<>(numRoles); - for (int j = 0; j < numRoles; j++) { - Node.Role role; - do { - role = RandomPicks.randomFrom(random(), Node.Role.values()); - } while(nodeRoles.add(role) == false); - } - - int numAttributes = RandomInts.randomIntBetween(random(), 0, 3); - Map attributes = new HashMap<>(numAttributes); - for (int j = 0; j < numAttributes; j++) { - attributes.put("attr" + j, "value" + j); - } if (isHttpEnabled) { String host = "host" + i; int port = RandomInts.randomIntBetween(random(), 9200, 9299); HttpHost httpHost = new HttpHost(host, port, scheme.toString()); - nodes.add(new Node(httpHost, nodeRoles, attributes)); + hosts.add(httpHost); generator.writeObjectFieldStart("http"); if (random().nextBoolean()) { generator.writeArrayFieldStart("bound_address"); @@ -205,11 +188,25 @@ public class SnifferTests extends LuceneTestCase { } generator.writeEndObject(); } + String[] roles = {"master", "data", "ingest"}; + int numRoles = RandomInts.randomIntBetween(random(), 0, 3); + Set nodeRoles = new HashSet<>(numRoles); + for (int j = 0; j < numRoles; j++) { + String role; + do { + role = RandomPicks.randomFrom(random(), roles); + } while(nodeRoles.add(role) == false); + } generator.writeArrayFieldStart("roles"); - for (Node.Role nodeRole : nodeRoles) { - generator.writeString(nodeRole.toString()); + for (String nodeRole : nodeRoles) { + generator.writeString(nodeRole); } generator.writeEndArray(); + int numAttributes = RandomInts.randomIntBetween(random(), 0, 3); + Map attributes = new HashMap<>(numAttributes); + for (int j = 0; j < numAttributes; j++) { + attributes.put("attr" + j, "value" + j); + } if (numAttributes > 0) { generator.writeObjectFieldStart("attributes"); } @@ -224,18 +221,18 @@ public class SnifferTests extends LuceneTestCase { generator.writeEndObject(); generator.writeEndObject(); generator.close(); - return SniffResponse.buildResponse(writer.toString(), nodes); + return SniffResponse.buildResponse(writer.toString(), hosts); } private static class SniffResponse { private final String nodesInfoBody; private final int nodesInfoResponseCode; - private final List nodes; + private final List hosts; private final boolean isFailure; - SniffResponse(String nodesInfoBody, List nodes, boolean isFailure) { + SniffResponse(String nodesInfoBody, List hosts, boolean isFailure) { this.nodesInfoBody = nodesInfoBody; - this.nodes = nodes; + this.hosts = hosts; this.isFailure = isFailure; if (isFailure) { this.nodesInfoResponseCode = randomErrorResponseCode(); @@ -248,8 +245,8 @@ public class SnifferTests extends LuceneTestCase { return new SniffResponse("", Collections.emptyList(), true); } - static SniffResponse buildResponse(String nodesInfoBody, List nodes) { - return new SniffResponse(nodesInfoBody, nodes, false); + static SniffResponse buildResponse(String nodesInfoBody, List hosts) { + return new SniffResponse(nodesInfoBody, hosts, false); } } diff --git a/client/src/test/java/org/elasticsearch/client/SniffingConnectionPoolTests.java b/client/src/test/java/org/elasticsearch/client/SniffingConnectionPoolTests.java index 13eaadd619a..97b7d0867c6 100644 --- a/client/src/test/java/org/elasticsearch/client/SniffingConnectionPoolTests.java +++ b/client/src/test/java/org/elasticsearch/client/SniffingConnectionPoolTests.java @@ -38,16 +38,16 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { public void testConstructor() throws Exception { CloseableHttpClient httpClient = HttpClientBuilder.create().build(); int numNodes = RandomInts.randomIntBetween(random(), 1, 5); - Node[] nodes = new Node[numNodes]; + HttpHost[] hosts = new HttpHost[numNodes]; for (int i = 0; i < numNodes; i++) { - nodes[i] = new Node(new HttpHost("localhost", 9200)); + hosts[i] = new HttpHost("localhost", 9200); } try (SniffingConnectionPool connectionPool = new SniffingConnectionPool( RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { @@ -58,7 +58,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0"); @@ -68,7 +68,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), null, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(NullPointerException e) { assertEquals(e.getMessage(), "client cannot be null"); @@ -78,7 +78,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, null, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(NullPointerException e) { assertEquals(e.getMessage(), "sniffRequestConfig cannot be null"); @@ -88,7 +88,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0"); @@ -98,7 +98,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), null, nodes)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), null, hosts)) { fail("pool creation should have failed " + connectionPool); } catch(NullPointerException e) { assertEquals(e.getMessage(), "connection selector predicate cannot be null"); @@ -109,10 +109,10 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), - connection -> random().nextBoolean(), (Node[])null)) { + connection -> random().nextBoolean(), (HttpHost[])null)) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no nodes provided"); + assertEquals(e.getMessage(), "no hosts provided"); } try (SniffingConnectionPool connectionPool = new SniffingConnectionPool( @@ -120,10 +120,10 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), - (Node)null)) { + (HttpHost) null)) { fail("pool creation should have failed " + connectionPool); } catch(NullPointerException e) { - assertEquals(e.getMessage(), "node cannot be null"); + assertEquals(e.getMessage(), "host cannot be null"); } try (SniffingConnectionPool connectionPool = new SniffingConnectionPool( @@ -133,14 +133,14 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean())) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no nodes provided"); + assertEquals(e.getMessage(), "no hosts provided"); } try (SniffingConnectionPool sniffingConnectionPool = new SniffingConnectionPool( RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { assertNotNull(sniffingConnectionPool); } } diff --git a/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java b/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java index 9b5b29df1b1..82e8c60b02f 100644 --- a/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java +++ b/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java @@ -37,51 +37,51 @@ public class StaticConnectionPoolTests extends LuceneTestCase { public void testConstructor() { CloseableHttpClient httpClient = HttpClientBuilder.create().build(); int numNodes = RandomInts.randomIntBetween(random(), 1, 5); - Node[] nodes = new Node[numNodes]; + HttpHost[] hosts = new HttpHost[numNodes]; for (int i = 0; i < numNodes; i++) { - nodes[i] = new Node(new HttpHost("localhost", 9200)); + hosts[i] = new HttpHost("localhost", 9200); } try { - new StaticConnectionPool(null, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean(), nodes); + new StaticConnectionPool(null, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean(), hosts); } catch(NullPointerException e) { assertEquals(e.getMessage(), "client cannot be null"); } try { - new StaticConnectionPool(httpClient, random().nextBoolean(), null, connection -> random().nextBoolean(), nodes); + new StaticConnectionPool(httpClient, random().nextBoolean(), null, connection -> random().nextBoolean(), hosts); } catch(NullPointerException e) { assertEquals(e.getMessage(), "pingRequestConfig cannot be null"); } try { - new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, null, nodes); + new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, null, hosts); } catch(NullPointerException e) { assertEquals(e.getMessage(), "connection selector predicate cannot be null"); } try { new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, - connection -> random().nextBoolean(), (Node)null); + connection -> random().nextBoolean(), (HttpHost) null); } catch(NullPointerException e) { - assertEquals(e.getMessage(), "node cannot be null"); + assertEquals(e.getMessage(), "host cannot be null"); } try { new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, - connection -> random().nextBoolean(), (Node[])null); + connection -> random().nextBoolean(), (HttpHost[])null); } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no nodes provided"); + assertEquals(e.getMessage(), "no hosts provided"); } try { new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean()); } catch(IllegalArgumentException e) { - assertEquals(e.getMessage(), "no nodes provided"); + assertEquals(e.getMessage(), "no hosts provided"); } StaticConnectionPool staticConnectionPool = new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, - connection -> random().nextBoolean(), nodes); + connection -> random().nextBoolean(), hosts); assertNotNull(staticConnectionPool); } }