get rid of Node abstraction

This commit is contained in:
javanna 2016-05-04 16:27:52 +02:00 committed by Luca Cavanna
parent 94cf8437d0
commit f6a5a0a4ad
15 changed files with 171 additions and 423 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import java.io.IOException;
import java.util.ArrayList;
@ -74,11 +75,11 @@ public abstract class AbstractStaticConnectionPool implements ConnectionPool<Sta
return sortedConnections.stream().filter(connection -> connection.isAlive() || connection.shouldBeRetried());
}
protected List<StatefulConnection> createConnections(Node... nodes) {
protected List<StatefulConnection> createConnections(HttpHost... hosts) {
List<StatefulConnection> connections = new ArrayList<>();
for (Node node : nodes) {
Objects.requireNonNull(node, "node cannot be null");
connections.add(new StatefulConnection(node));
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
connections.add(new StatefulConnection(host));
}
return Collections.unmodifiableList(connections);
}
@ -94,12 +95,12 @@ public abstract class AbstractStaticConnectionPool implements ConnectionPool<Sta
@Override
public void onSuccess(StatefulConnection connection) {
connection.markAlive();
logger.trace("marked connection alive for " + connection.getNode());
logger.trace("marked connection alive for " + connection.getHost());
}
@Override
public void onFailure(StatefulConnection connection) throws IOException {
connection.markDead();
logger.debug("marked connection dead for " + connection.getNode());
logger.debug("marked connection dead for " + connection.getHost());
}
}

View File

@ -19,27 +19,29 @@
package org.elasticsearch.client;
import org.apache.http.HttpHost;
/**
* Simplest representation of a connection to an elasticsearch node.
* It doesn't have any mutable state. It holds the node that the connection points to.
* It doesn't have any mutable state. It holds the host that the connection points to.
* Allows the transport to deal with very simple connection objects that are immutable.
* Any change to the state of connections should be made through the connection pool
* which is aware of the connection object that it supports.
*/
public class Connection {
private final Node node;
private final HttpHost host;
/**
* Creates a new connection pointing to the provided {@link Node} argument
* Creates a new connection pointing to the provided {@link HttpHost} argument
*/
public Connection(Node node) {
this.node = node;
public Connection(HttpHost host) {
this.host = host;
}
/**
* Returns the {@link Node} that the connection points to
* Returns the {@link HttpHost} that the connection points to
*/
public Node getNode() {
return node;
public HttpHost getHost() {
return host;
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -31,20 +32,20 @@ import java.util.Objects;
/**
* Holds an elasticsearch response. It wraps the {@link CloseableHttpResponse} response and associates it with
* its corresponding {@link RequestLine} and {@link Node}
* its corresponding {@link RequestLine} and {@link HttpHost}
*/
public class ElasticsearchResponse implements Closeable {
private final RequestLine requestLine;
private final Node node;
private final HttpHost host;
private final CloseableHttpResponse response;
ElasticsearchResponse(RequestLine requestLine, Node node, CloseableHttpResponse response) {
ElasticsearchResponse(RequestLine requestLine, HttpHost host, CloseableHttpResponse response) {
Objects.requireNonNull(requestLine, "requestLine cannot be null");
Objects.requireNonNull(node, "node cannot be null");
Objects.requireNonNull(host, "node cannot be null");
Objects.requireNonNull(response, "response cannot be null");
this.requestLine = requestLine;
this.node = node;
this.host = host;
this.response = response;
}
@ -58,8 +59,8 @@ public class ElasticsearchResponse implements Closeable {
/**
* Returns the node that returned this response
*/
public Node getNode() {
return node;
public HttpHost getHost() {
return host;
}
/**
@ -77,7 +78,8 @@ public class ElasticsearchResponse implements Closeable {
}
/**
* Returns the response bodyi available, null otherwise
* Returns the response body available, null otherwise
* @see HttpEntity
*/
public HttpEntity getEntity() {
return response.getEntity();
@ -87,7 +89,7 @@ public class ElasticsearchResponse implements Closeable {
public String toString() {
return "ElasticsearchResponse{" +
"requestLine=" + requestLine +
", node=" + node +
", host=" + host +
", response=" + response.getStatusLine() +
'}';
}

View File

@ -19,41 +19,57 @@
package org.elasticsearch.client;
import org.apache.http.HttpHost;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
import java.io.IOException;
/**
* Exception thrown when an elasticsearch node responds to a request with a status code that indicates an error
*/
public class ElasticsearchResponseException extends IOException {
private final Node node;
private final HttpHost host;
private final RequestLine requestLine;
private final StatusLine statusLine;
ElasticsearchResponseException(RequestLine requestLine, Node node, StatusLine statusLine) {
super(buildMessage(requestLine, node, statusLine));
this.node = node;
ElasticsearchResponseException(RequestLine requestLine, HttpHost host, StatusLine statusLine) {
super(buildMessage(requestLine, host, statusLine));
this.host = host;
this.requestLine = requestLine;
this.statusLine = statusLine;
}
private static String buildMessage(RequestLine requestLine, Node node, StatusLine statusLine) {
return requestLine.getMethod() + " " + node.getHttpHost() + requestLine.getUri() + ": " + statusLine.toString();
private static String buildMessage(RequestLine requestLine, HttpHost host, StatusLine statusLine) {
return requestLine.getMethod() + " " + host + requestLine.getUri() + ": " + statusLine.toString();
}
/**
* Returns whether the error is recoverable or not, hence whether the same request should be retried on other nodes or not
*/
public boolean isRecoverable() {
//clients don't retry on 500 because elasticsearch still misuses it instead of 400 in some places
return statusLine.getStatusCode() >= 502 && statusLine.getStatusCode() <= 504;
}
public Node getNode() {
return node;
/**
* Returns the {@link HttpHost} that returned the error
*/
public HttpHost getHost() {
return host;
}
/**
* Returns the {@link RequestLine} that triggered the error
*/
public RequestLine getRequestLine() {
return requestLine;
}
/**
* Returns the {@link StatusLine} that was returned by elasticsearch
*/
public StatusLine getStatusLine() {
return statusLine;
}

View File

@ -1,125 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpHost;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Represents an Elasticsearch node.
* Holds its http address as an {@link HttpHost} instance, as well as its optional set of roles and attributes.
* Roles and attributes can be populated in one of the two following ways:
* 1) using a connection pool that supports sniffing, so that all the info is retrieved from elasticsearch itself
* 2) manually passing the info through the {@link #Node(HttpHost, Set, Map)} constructor
* Roles and attributes may be taken into account as part of connection selection by the connection pool, which
* can be customized by passing in a predicate at connection pool creation.
*/
public class Node {
private final HttpHost httpHost;
private final Set<Role> roles;
private final Map<String, String> attributes;
/**
* Creates a node given its http address as an {@link HttpHost} instance.
* Roles are not provided hence all possible roles will be assumed, as that is the default in Elasticsearch.
* No attributes will be associated with the node.
*
* @param httpHost the http address of the node
*/
public Node(HttpHost httpHost) {
this(httpHost, new HashSet<>(Arrays.asList(Role.values())), Collections.emptyMap());
}
/**
* Creates a node given its http address as an {@link HttpHost} instance, its set or roles and attributes.
*
* @param httpHost the http address of the node
* @param roles the set of roles that the node fulfills within the cluster
* @param attributes the attributes associated with the node
*/
public Node(HttpHost httpHost, Set<Role> roles, Map<String, String> attributes) {
Objects.requireNonNull(httpHost, "host cannot be null");
Objects.requireNonNull(roles, "roles cannot be null");
Objects.requireNonNull(attributes, "attributes cannot be null");
this.httpHost = httpHost;
this.roles = Collections.unmodifiableSet(roles);
this.attributes = Collections.unmodifiableMap(attributes);
}
/**
* Returns the http address of the node
*/
public HttpHost getHttpHost() {
return httpHost;
}
/**
* Returns the set of roles associated with the node
*/
public Set<Role> getRoles() {
return roles;
}
/**
* Returns the set of attributes associated with the node
*/
public Map<String, String> getAttributes() {
return attributes;
}
@Override
public String toString() {
return "Node{" +
"httpHost=" + httpHost +
", roles=" + roles +
", attributes=" + attributes +
'}';
}
/**
* Holds all the potential roles that a node can fulfill within a cluster
*/
public enum Role {
/**
* Data node
*/
DATA,
/**
* Master eligible node
*/
MASTER,
/**
* Ingest node
*/
INGEST;
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
}

View File

@ -20,13 +20,14 @@
package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.http.HttpHost;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
import java.io.IOException;
/**
* Helper class that exposes static method to unify the way requests are logged
* Helper class that exposes static methods to unify the way requests are logged
*/
public final class RequestLogger {
@ -36,16 +37,14 @@ public final class RequestLogger {
/**
* Logs a request that yielded a response
*/
public static void log(Log logger, String message, RequestLine requestLine, Node node, StatusLine statusLine) {
logger.debug(message + " [" + requestLine.getMethod() + " " + node.getHttpHost() +
requestLine.getUri() + "] [" + statusLine + "]");
public static void log(Log logger, String message, RequestLine requestLine, HttpHost host, StatusLine statusLine) {
logger.debug(message + " [" + requestLine.getMethod() + " " + host + requestLine.getUri() + "] [" + statusLine + "]");
}
/**
* Logs a request that failed
*/
public static void log(Log logger, String message, RequestLine requestLine, Node node, IOException e) {
logger.debug(message + " [" + requestLine.getMethod() + " " + node.getHttpHost() +
requestLine.getUri() + "]", e);
public static void log(Log logger, String message, RequestLine requestLine, HttpHost host, IOException e) {
logger.debug(message + " [" + requestLine.getMethod() + " " + host + requestLine.getUri() + "]", e);
}
}

View File

@ -37,14 +37,13 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Calls nodes info api and returns a list of http hosts extracted from it
*/
//TODO this could potentially a call to _cat/nodes (although it doesn't support timeout param), but how would we handle bw comp with 2.x?
final class Sniffer {
private static final Log logger = LogFactory.getLog(Sniffer.class);
@ -69,34 +68,34 @@ final class Sniffer {
this.jsonFactory = new JsonFactory();
}
List<Node> sniffNodes(Node node) throws IOException {
List<HttpHost> sniffNodes(HttpHost host) throws IOException {
HttpGet httpGet = new HttpGet("/_nodes/http?timeout=" + sniffRequestTimeout + "ms");
httpGet.setConfig(sniffRequestConfig);
try (CloseableHttpResponse response = client.execute(node.getHttpHost(), httpGet)) {
try (CloseableHttpResponse response = client.execute(host, httpGet)) {
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() >= 300) {
RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), node, statusLine);
RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), host, statusLine);
EntityUtils.consume(response.getEntity());
throw new ElasticsearchResponseException(httpGet.getRequestLine(), node, statusLine);
throw new ElasticsearchResponseException(httpGet.getRequestLine(), host, statusLine);
} else {
List<Node> nodes = readNodes(response.getEntity());
RequestLogger.log(logger, "sniff succeeded", httpGet.getRequestLine(), node, statusLine);
List<HttpHost> nodes = readHosts(response.getEntity());
RequestLogger.log(logger, "sniff succeeded", httpGet.getRequestLine(), host, statusLine);
return nodes;
}
} catch(IOException e) {
RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), node, e);
RequestLogger.log(logger, "sniff failed", httpGet.getRequestLine(), host, e);
throw e;
}
}
private List<Node> readNodes(HttpEntity entity) throws IOException {
private List<HttpHost> readHosts(HttpEntity entity) throws IOException {
try (InputStream inputStream = entity.getContent()) {
JsonParser parser = jsonFactory.createParser(inputStream);
if (parser.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("expected data to start with an object");
}
List<Node> nodes = new ArrayList<>();
List<HttpHost> hosts = new ArrayList<>();
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
if ("nodes".equals(parser.getCurrentName())) {
@ -104,10 +103,10 @@ final class Sniffer {
JsonToken token = parser.nextToken();
assert token == JsonToken.START_OBJECT;
String nodeId = parser.getCurrentName();
Node sniffedNode = readNode(nodeId, parser, this.scheme);
if (sniffedNode != null) {
HttpHost sniffedHost = readNode(nodeId, parser, this.scheme);
if (sniffedHost != null) {
logger.trace("adding node [" + nodeId + "]");
nodes.add(sniffedNode);
hosts.add(sniffedHost);
}
}
} else {
@ -115,31 +114,20 @@ final class Sniffer {
}
}
}
return nodes;
return hosts;
}
}
private static Node readNode(String nodeId, JsonParser parser, String scheme) throws IOException {
private static HttpHost readNode(String nodeId, JsonParser parser, String scheme) throws IOException {
HttpHost httpHost = null;
Set<Node.Role> roles = new HashSet<>();
Map<String, String> attributes = new HashMap<>();
String fieldName = null;
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
fieldName = parser.getCurrentName();
} else if (parser.getCurrentToken() == JsonToken.START_ARRAY && "roles".equals(fieldName)) {
while (parser.nextToken() != JsonToken.END_ARRAY) {
roles.add(Node.Role.valueOf(parser.getValueAsString().toUpperCase(Locale.ROOT)));
}
} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
if ("attributes".equals(fieldName)) {
if ("http".equals(fieldName)) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
attributes.put(parser.getCurrentName(), parser.getValueAsString());
}
} else if ("http".equals(fieldName)) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.VALUE_STRING &&
"publish_address".equals(parser.getCurrentName())) {
if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) {
URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());
httpHost = new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(),
boundAddressAsURI.getScheme());
@ -157,6 +145,6 @@ final class Sniffer {
logger.debug("skipping node [" + nodeId + "] with http disabled");
return null;
}
return new Node(httpHost, roles, attributes);
return httpHost;
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
@ -48,7 +49,7 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
public SniffingConnectionPool(int sniffInterval, boolean sniffOnFailure, int sniffAfterFailureDelay,
CloseableHttpClient client, RequestConfig sniffRequestConfig, int sniffRequestTimeout, Scheme scheme,
Predicate<Connection> connectionSelector, Node... nodes) {
Predicate<Connection> connectionSelector, HttpHost... hosts) {
super(connectionSelector);
if (sniffInterval <= 0) {
throw new IllegalArgumentException("sniffInterval must be greater than 0");
@ -57,12 +58,12 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
throw new IllegalArgumentException("sniffAfterFailureDelay must be greater than 0");
}
Objects.requireNonNull(scheme, "scheme cannot be null");
if (nodes == null || nodes.length == 0) {
throw new IllegalArgumentException("no nodes provided");
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("no hosts provided");
}
this.sniffOnFailure = sniffOnFailure;
this.sniffer = new Sniffer(client, sniffRequestConfig, sniffRequestTimeout, scheme.toString());
this.connections = createConnections(nodes);
this.connections = createConnections(hosts);
this.snifferTask = new SnifferTask(sniffInterval, sniffAfterFailureDelay);
}
@ -81,7 +82,7 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
super.onFailure(connection);
if (sniffOnFailure) {
//re-sniff immediately but take out the node that failed
snifferTask.sniffOnFailure(connection.getNode());
snifferTask.sniffOnFailure(connection.getHost());
}
}
@ -119,22 +120,22 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
sniff(node -> true);
}
void sniffOnFailure(Node failedNode) {
void sniffOnFailure(HttpHost failedHost) {
//sync sniff straightaway on failure
failure = true;
sniff(node -> node.getHttpHost().equals(failedNode.getHttpHost()) == false);
sniff(host -> host.equals(failedHost) == false);
}
void sniff(Predicate<Node> nodeFilter) {
void sniff(Predicate<HttpHost> hostFilter) {
if (running.compareAndSet(false, true)) {
try {
Iterator<StatefulConnection> connectionIterator = nextUnfilteredConnection().iterator();
if (connectionIterator.hasNext()) {
sniff(connectionIterator, nodeFilter);
sniff(connectionIterator, hostFilter);
} else {
StatefulConnection connection = lastResortConnection();
logger.info("no healthy nodes available, trying " + connection.getNode());
sniff(Stream.of(connection).iterator(), nodeFilter);
logger.info("no healthy nodes available, trying " + connection.getHost());
sniff(Stream.of(connection).iterator(), hostFilter);
}
} catch (Throwable t) {
logger.error("error while sniffing nodes", t);
@ -158,13 +159,13 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
}
}
void sniff(Iterator<StatefulConnection> connectionIterator, Predicate<Node> nodeFilter) throws IOException {
void sniff(Iterator<StatefulConnection> connectionIterator, Predicate<HttpHost> hostFilter) throws IOException {
IOException lastSeenException = null;
while (connectionIterator.hasNext()) {
StatefulConnection connection = connectionIterator.next();
try {
List<Node> sniffedNodes = sniffer.sniffNodes(connection.getNode());
Node[] filteredNodes = sniffedNodes.stream().filter(nodeFilter).toArray(Node[]::new);
List<HttpHost> sniffedNodes = sniffer.sniffNodes(connection.getHost());
HttpHost[] filteredNodes = sniffedNodes.stream().filter(hostFilter).toArray(HttpHost[]::new);
logger.debug("adding " + filteredNodes.length + " nodes out of " + sniffedNodes.size() + " sniffed nodes");
connections = createConnections(filteredNodes);
onSuccess(connection);

View File

@ -19,6 +19,8 @@
package org.elasticsearch.client;
import org.apache.http.HttpHost;
import java.util.concurrent.TimeUnit;
/**
@ -43,10 +45,10 @@ public final class StatefulConnection extends Connection {
private volatile long deadUntil = -1;
/**
* Creates a new mutable connection pointing to the provided {@link Node} argument
* Creates a new mutable connection pointing to the provided {@link HttpHost} argument
*/
public StatefulConnection(Node node) {
super(node);
public StatefulConnection(HttpHost host) {
super(host);
}
/**

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -43,17 +44,17 @@ public class StaticConnectionPool extends AbstractStaticConnectionPool {
private final List<StatefulConnection> connections;
public StaticConnectionPool(CloseableHttpClient client, boolean pingEnabled, RequestConfig pingRequestConfig,
Predicate<Connection> connectionSelector, Node... nodes) {
Predicate<Connection> connectionSelector, HttpHost... hosts) {
super(connectionSelector);
Objects.requireNonNull(client, "client cannot be null");
Objects.requireNonNull(pingRequestConfig, "pingRequestConfig cannot be null");
if (nodes == null || nodes.length == 0) {
throw new IllegalArgumentException("no nodes provided");
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("no hosts provided");
}
this.client = client;
this.pingEnabled = pingEnabled;
this.pingRequestConfig = pingRequestConfig;
this.connections = createConnections(nodes);
this.connections = createConnections(hosts);
}
@Override
@ -67,20 +68,20 @@ public class StaticConnectionPool extends AbstractStaticConnectionPool {
HttpHead httpHead = new HttpHead("/");
httpHead.setConfig(pingRequestConfig);
StatusLine statusLine;
try(CloseableHttpResponse httpResponse = client.execute(connection.getNode().getHttpHost(), httpHead)) {
try(CloseableHttpResponse httpResponse = client.execute(connection.getHost(), httpHead)) {
statusLine = httpResponse.getStatusLine();
EntityUtils.consume(httpResponse.getEntity());
} catch(IOException e) {
RequestLogger.log(logger, "ping failed", httpHead.getRequestLine(), connection.getNode(), e);
RequestLogger.log(logger, "ping failed", httpHead.getRequestLine(), connection.getHost(), e);
onFailure(connection);
throw e;
}
if (statusLine.getStatusCode() >= 300) {
RequestLogger.log(logger, "ping failed", httpHead.getRequestLine(), connection.getNode(), statusLine);
RequestLogger.log(logger, "ping failed", httpHead.getRequestLine(), connection.getHost(), statusLine);
onFailure(connection);
throw new ElasticsearchResponseException(httpHead.getRequestLine(), connection.getNode(), statusLine);
throw new ElasticsearchResponseException(httpHead.getRequestLine(), connection.getHost(), statusLine);
} else {
RequestLogger.log(logger, "ping succeeded", httpHead.getRequestLine(), connection.getNode(), statusLine);
RequestLogger.log(logger, "ping succeeded", httpHead.getRequestLine(), connection.getHost(), statusLine);
onSuccess(connection);
}
}

View File

@ -69,7 +69,7 @@ final class Transport<C extends Connection> implements Closeable {
Iterator<C> connectionIterator = connectionPool.nextConnection().iterator();
if (connectionIterator.hasNext() == false) {
C connection = connectionPool.lastResortConnection();
logger.info("no healthy nodes available, trying " + connection.getNode());
logger.info("no healthy nodes available, trying " + connection.getHost());
return performRequest(request, Stream.of(connection).iterator());
}
return performRequest(request, connectionIterator);
@ -127,9 +127,9 @@ final class Transport<C extends Connection> implements Closeable {
private ElasticsearchResponse performRequest(HttpRequestBase request, C connection) throws IOException {
CloseableHttpResponse response;
try {
response = client.execute(connection.getNode().getHttpHost(), request);
response = client.execute(connection.getHost(), request);
} catch(IOException e) {
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getNode(), e);
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), e);
throw e;
} finally {
request.reset();
@ -138,12 +138,12 @@ final class Transport<C extends Connection> implements Closeable {
//TODO make ignore status code configurable. rest-spec and tests support that parameter.
if (statusLine.getStatusCode() < 300 ||
request.getMethod().equals(HttpHead.METHOD_NAME) && statusLine.getStatusCode() == 404) {
RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getNode(), response.getStatusLine());
return new ElasticsearchResponse(request.getRequestLine(), connection.getNode(), response);
RequestLogger.log(logger, "request succeeded", request.getRequestLine(), connection.getHost(), response.getStatusLine());
return new ElasticsearchResponse(request.getRequestLine(), connection.getHost(), response);
} else {
EntityUtils.consume(response.getEntity());
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getNode(), response.getStatusLine());
throw new ElasticsearchResponseException(request.getRequestLine(), connection.getNode(), statusLine);
RequestLogger.log(logger, "request failed", request.getRequestLine(), connection.getHost(), response.getStatusLine());
throw new ElasticsearchResponseException(request.getRequestLine(), connection.getHost(), statusLine);
}
}

View File

@ -1,136 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.http.HttpHost;
import org.apache.lucene.util.LuceneTestCase;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
public class NodeTests extends LuceneTestCase {
public void testSingleArgumentConstructor() {
HttpHost httpHost = new HttpHost(randomHost(), randomPort(), randomScheme());
Node node = new Node(httpHost);
assertThat(node.getHttpHost(), sameInstance(httpHost));
assertThat(node.getAttributes(), notNullValue());
assertThat(node.getAttributes().size(), equalTo(0));
assertThat(node.getRoles(), notNullValue());
assertThat(node.getRoles(), equalTo(new HashSet<>(Arrays.asList(Node.Role.values()))));
try {
new Node(null);
fail("node construction should have failed");
} catch(NullPointerException e) {
assertThat(e.getMessage(), equalTo("host cannot be null"));
}
}
public void testThreeArgumentsConstructor() {
HttpHost httpHost = new HttpHost(randomHost(), randomPort(), randomScheme());
Set<Node.Role> roles = randomRoles();
Map<String, String> attributes = randomAttributes();
Node node = new Node(httpHost, roles, attributes);
assertThat(node.getHttpHost(), sameInstance(httpHost));
assertThat(node.getAttributes(), equalTo(attributes));
assertThat(node.getRoles(), equalTo(roles));
try {
new Node(null, roles, attributes);
fail("node construction should have failed");
} catch(NullPointerException e) {
assertThat(e.getMessage(), equalTo("host cannot be null"));
}
try {
new Node(httpHost, null, attributes);
fail("node construction should have failed");
} catch(NullPointerException e) {
assertThat(e.getMessage(), equalTo("roles cannot be null"));
}
try {
new Node(httpHost, roles, null);
fail("node construction should have failed");
} catch(NullPointerException e) {
assertThat(e.getMessage(), equalTo("attributes cannot be null"));
}
}
public void testToString() {
HttpHost httpHost = new HttpHost(randomHost(), randomPort(), randomScheme());
Set<Node.Role> roles = randomRoles();
Map<String, String> attributes = randomAttributes();
Node node = new Node(httpHost, roles, attributes);
String expectedString = "Node{" +
"httpHost=" + httpHost.toString() +
", roles=" + roles.toString() +
", attributes=" + attributes.toString() +
'}';
assertThat(node.toString(), equalTo(expectedString));
}
private static String randomHost() {
return RandomStrings.randomAsciiOfLengthBetween(random(), 5, 10);
}
private static int randomPort() {
return random().nextInt();
}
private static String randomScheme() {
if (rarely()) {
return null;
}
return random().nextBoolean() ? "http" : "https";
}
private static Map<String, String> randomAttributes() {
int numAttributes = RandomInts.randomIntBetween(random(), 0, 5);
Map<String, String> attributes = new HashMap<>(numAttributes);
for (int i = 0; i < numAttributes; i++) {
String key = RandomStrings.randomAsciiOfLengthBetween(random(), 3, 10);
String value = RandomStrings.randomAsciiOfLengthBetween(random(), 3, 10);
attributes.put(key, value);
}
return attributes;
}
private static Set<Node.Role> randomRoles() {
int numRoles = RandomInts.randomIntBetween(random(), 0, 3);
Set<Node.Role> roles = new HashSet<>(numRoles);
for (int j = 0; j < numRoles; j++) {
roles.add(RandomPicks.randomFrom(random(), Node.Role.values()));
}
return roles;
}
}

View File

@ -90,24 +90,21 @@ public class SnifferTests extends LuceneTestCase {
Sniffer sniffer = new Sniffer(client, RequestConfig.DEFAULT, sniffRequestTimeout, scheme.toString());
HttpHost httpHost = new HttpHost(server.getHostName(), server.getPort());
try {
List<Node> sniffedNodes = sniffer.sniffNodes(new Node(httpHost));
List<HttpHost> sniffedHosts = sniffer.sniffNodes(httpHost);
if (sniffResponse.isFailure) {
fail("sniffNodes should have failed");
}
assertThat(sniffedNodes.size(), equalTo(sniffResponse.nodes.size()));
Iterator<Node> responseNodesIterator = sniffResponse.nodes.iterator();
for (Node sniffedNode : sniffedNodes) {
Node responseNode = responseNodesIterator.next();
assertThat(sniffedNode.getHttpHost(), equalTo(responseNode.getHttpHost()));
assertThat(sniffedNode.getRoles(), equalTo(responseNode.getRoles()));
assertThat(sniffedNode.getAttributes(), equalTo(responseNode.getAttributes()));
assertThat(sniffedHosts.size(), equalTo(sniffResponse.hosts.size()));
Iterator<HttpHost> responseHostsIterator = sniffResponse.hosts.iterator();
for (HttpHost sniffedHost : sniffedHosts) {
assertEquals(sniffedHost, responseHostsIterator.next());
}
} catch(ElasticsearchResponseException e) {
if (sniffResponse.isFailure) {
assertThat(e.getMessage(), containsString("GET http://localhost:" + server.getPort() +
"/_nodes/http?timeout=" + sniffRequestTimeout));
assertThat(e.getMessage(), containsString(Integer.toString(sniffResponse.nodesInfoResponseCode)));
assertThat(e.getNode().getHttpHost(), equalTo(httpHost));
assertThat(e.getHost(), equalTo(httpHost));
assertThat(e.getStatusLine().getStatusCode(), equalTo(sniffResponse.nodesInfoResponseCode));
assertThat(e.getRequestLine().toString(), equalTo("GET /_nodes/http?timeout=" + sniffRequestTimeout + "ms HTTP/1.1"));
} else {
@ -141,7 +138,7 @@ public class SnifferTests extends LuceneTestCase {
private static SniffResponse buildSniffResponse(SniffingConnectionPool.Scheme scheme) throws IOException {
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
List<Node> nodes = new ArrayList<>(numNodes);
List<HttpHost> hosts = new ArrayList<>(numNodes);
JsonFactory jsonFactory = new JsonFactory();
StringWriter writer = new StringWriter();
JsonGenerator generator = jsonFactory.createGenerator(writer);
@ -168,25 +165,11 @@ public class SnifferTests extends LuceneTestCase {
generator.writeEndArray();
}
boolean isHttpEnabled = rarely() == false;
int numRoles = RandomInts.randomIntBetween(random(), 0, 3);
Set<Node.Role> nodeRoles = new HashSet<>(numRoles);
for (int j = 0; j < numRoles; j++) {
Node.Role role;
do {
role = RandomPicks.randomFrom(random(), Node.Role.values());
} while(nodeRoles.add(role) == false);
}
int numAttributes = RandomInts.randomIntBetween(random(), 0, 3);
Map<String, String> attributes = new HashMap<>(numAttributes);
for (int j = 0; j < numAttributes; j++) {
attributes.put("attr" + j, "value" + j);
}
if (isHttpEnabled) {
String host = "host" + i;
int port = RandomInts.randomIntBetween(random(), 9200, 9299);
HttpHost httpHost = new HttpHost(host, port, scheme.toString());
nodes.add(new Node(httpHost, nodeRoles, attributes));
hosts.add(httpHost);
generator.writeObjectFieldStart("http");
if (random().nextBoolean()) {
generator.writeArrayFieldStart("bound_address");
@ -205,11 +188,25 @@ public class SnifferTests extends LuceneTestCase {
}
generator.writeEndObject();
}
String[] roles = {"master", "data", "ingest"};
int numRoles = RandomInts.randomIntBetween(random(), 0, 3);
Set<String> nodeRoles = new HashSet<>(numRoles);
for (int j = 0; j < numRoles; j++) {
String role;
do {
role = RandomPicks.randomFrom(random(), roles);
} while(nodeRoles.add(role) == false);
}
generator.writeArrayFieldStart("roles");
for (Node.Role nodeRole : nodeRoles) {
generator.writeString(nodeRole.toString());
for (String nodeRole : nodeRoles) {
generator.writeString(nodeRole);
}
generator.writeEndArray();
int numAttributes = RandomInts.randomIntBetween(random(), 0, 3);
Map<String, String> attributes = new HashMap<>(numAttributes);
for (int j = 0; j < numAttributes; j++) {
attributes.put("attr" + j, "value" + j);
}
if (numAttributes > 0) {
generator.writeObjectFieldStart("attributes");
}
@ -224,18 +221,18 @@ public class SnifferTests extends LuceneTestCase {
generator.writeEndObject();
generator.writeEndObject();
generator.close();
return SniffResponse.buildResponse(writer.toString(), nodes);
return SniffResponse.buildResponse(writer.toString(), hosts);
}
private static class SniffResponse {
private final String nodesInfoBody;
private final int nodesInfoResponseCode;
private final List<Node> nodes;
private final List<HttpHost> hosts;
private final boolean isFailure;
SniffResponse(String nodesInfoBody, List<Node> nodes, boolean isFailure) {
SniffResponse(String nodesInfoBody, List<HttpHost> hosts, boolean isFailure) {
this.nodesInfoBody = nodesInfoBody;
this.nodes = nodes;
this.hosts = hosts;
this.isFailure = isFailure;
if (isFailure) {
this.nodesInfoResponseCode = randomErrorResponseCode();
@ -248,8 +245,8 @@ public class SnifferTests extends LuceneTestCase {
return new SniffResponse("", Collections.emptyList(), true);
}
static SniffResponse buildResponse(String nodesInfoBody, List<Node> nodes) {
return new SniffResponse(nodesInfoBody, nodes, false);
static SniffResponse buildResponse(String nodesInfoBody, List<HttpHost> hosts) {
return new SniffResponse(nodesInfoBody, hosts, false);
}
}

View File

@ -38,16 +38,16 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
public void testConstructor() throws Exception {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
Node[] nodes = new Node[numNodes];
HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) {
nodes[i] = new Node(new HttpHost("localhost", 9200));
hosts[i] = new HttpHost("localhost", 9200);
}
try (SniffingConnectionPool connectionPool = new SniffingConnectionPool(
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), random().nextBoolean(),
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) {
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
fail("pool creation should have failed " + connectionPool);
} catch(IllegalArgumentException e) {
@ -58,7 +58,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), httpClient, RequestConfig.DEFAULT,
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) {
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
fail("pool creation should have failed " + connectionPool);
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0");
@ -68,7 +68,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), null, RequestConfig.DEFAULT,
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) {
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
fail("pool creation should have failed " + connectionPool);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "client cannot be null");
@ -78,7 +78,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, null,
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) {
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
fail("pool creation should have failed " + connectionPool);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "sniffRequestConfig cannot be null");
@ -88,7 +88,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) {
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
fail("pool creation should have failed " + connectionPool);
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0");
@ -98,7 +98,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), null, nodes)) {
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), null, hosts)) {
fail("pool creation should have failed " + connectionPool);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "connection selector predicate cannot be null");
@ -109,10 +109,10 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()),
connection -> random().nextBoolean(), (Node[])null)) {
connection -> random().nextBoolean(), (HttpHost[])null)) {
fail("pool creation should have failed " + connectionPool);
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no nodes provided");
assertEquals(e.getMessage(), "no hosts provided");
}
try (SniffingConnectionPool connectionPool = new SniffingConnectionPool(
@ -120,10 +120,10 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(),
(Node)null)) {
(HttpHost) null)) {
fail("pool creation should have failed " + connectionPool);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "node cannot be null");
assertEquals(e.getMessage(), "host cannot be null");
}
try (SniffingConnectionPool connectionPool = new SniffingConnectionPool(
@ -133,14 +133,14 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean())) {
fail("pool creation should have failed " + connectionPool);
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no nodes provided");
assertEquals(e.getMessage(), "no hosts provided");
}
try (SniffingConnectionPool sniffingConnectionPool = new SniffingConnectionPool(
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), nodes)) {
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
assertNotNull(sniffingConnectionPool);
}
}

View File

@ -37,51 +37,51 @@ public class StaticConnectionPoolTests extends LuceneTestCase {
public void testConstructor() {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
Node[] nodes = new Node[numNodes];
HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) {
nodes[i] = new Node(new HttpHost("localhost", 9200));
hosts[i] = new HttpHost("localhost", 9200);
}
try {
new StaticConnectionPool(null, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean(), nodes);
new StaticConnectionPool(null, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean(), hosts);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "client cannot be null");
}
try {
new StaticConnectionPool(httpClient, random().nextBoolean(), null, connection -> random().nextBoolean(), nodes);
new StaticConnectionPool(httpClient, random().nextBoolean(), null, connection -> random().nextBoolean(), hosts);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "pingRequestConfig cannot be null");
}
try {
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, null, nodes);
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, null, hosts);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "connection selector predicate cannot be null");
}
try {
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT,
connection -> random().nextBoolean(), (Node)null);
connection -> random().nextBoolean(), (HttpHost) null);
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "node cannot be null");
assertEquals(e.getMessage(), "host cannot be null");
}
try {
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT,
connection -> random().nextBoolean(), (Node[])null);
connection -> random().nextBoolean(), (HttpHost[])null);
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no nodes provided");
assertEquals(e.getMessage(), "no hosts provided");
}
try {
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean());
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "no nodes provided");
assertEquals(e.getMessage(), "no hosts provided");
}
StaticConnectionPool staticConnectionPool = new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT,
connection -> random().nextBoolean(), nodes);
connection -> random().nextBoolean(), hosts);
assertNotNull(staticConnectionPool);
}
}